- Made the maximum-fd computation prior to a select() use the same idiom
[rsync/rsync.git] / io.c
diff --git a/io.c b/io.c
index 2e9b540..f34a899 100644 (file)
--- a/io.c
+++ b/io.c
@@ -53,10 +53,12 @@ extern int io_timeout;
 extern int am_server;
 extern int am_daemon;
 extern int am_sender;
+extern int eol_nulls;
+extern char *remote_filesfrom_file;
 extern struct stats stats;
 
-
 const char phase_unknown[] = "unknown";
+int select_timeout = SELECT_TIMEOUT;
 
 /**
  * The connection might be dropped at some point; perhaps because the
@@ -192,13 +194,13 @@ void send_msg(enum msgcode code, char *buf, int len)
  * called by the generator. */
 static void read_msg_fd(void)
 {
-       char buf[200];
+       char buf[2048];
        size_t n;
        int fd = msg_fd_in;
        int tag, len;
 
-       /* Temporarily disable msg_fd_in.  This is needed because we
-        * may call a write routine that could try to call us back. */
+       /* Temporarily disable msg_fd_in.  This is needed to avoid looping back
+        * to this routine from read_timeout() and writefd_unbuffered(). */
        msg_fd_in = -1;
 
        read_loop(fd, buf, 4);
@@ -245,7 +247,7 @@ static void read_msg_fd(void)
 
 /* Try to push messages off the list onto the wire.  If we leave with more
  * to do, return 0.  On error, return -1.  If everything flushed, return 1.
- * This is only called by the receiver. */
+ * This is only active in the receiver. */
 int msg_list_push(int flush_it_all)
 {
        static int written = 0;
@@ -267,7 +269,7 @@ int msg_list_push(int flush_it_all)
                                return 0;
                        FD_ZERO(&fds);
                        FD_SET(msg_fd_out, &fds);
-                       tv.tv_sec = io_timeout ? io_timeout : SELECT_TIMEOUT;
+                       tv.tv_sec = select_timeout;
                        tv.tv_usec = 0;
                        if (!select(msg_fd_out+1, NULL, &fds, NULL, &tv))
                                check_timeout();
@@ -334,13 +336,12 @@ static void whine_about_eof(void)
 {
        if (kludge_around_eof)
                exit_cleanup(0);
-       else {
-               rprintf(FERROR, RSYNC_NAME ": connection unexpectedly closed "
-                       "(%.0f bytes read so far)\n",
-                       (double)stats.total_read);
 
-               exit_cleanup(RERR_STREAMIO);
-       }
+       rprintf(FERROR, RSYNC_NAME ": connection unexpectedly closed "
+               "(%.0f bytes read so far)\n",
+               (double)stats.total_read);
+
+       exit_cleanup(RERR_STREAMIO);
 }
 
 
@@ -375,15 +376,19 @@ static int read_timeout(int fd, char *buf, size_t len)
                /* until we manage to read *something* */
                fd_set r_fds, w_fds;
                struct timeval tv;
-               int fd_count = fd+1;
+               int maxfd = fd;
                int count;
 
                FD_ZERO(&r_fds);
                FD_SET(fd, &r_fds);
                if (msg_fd_in >= 0) {
                        FD_SET(msg_fd_in, &r_fds);
-                       if (msg_fd_in >= fd_count)
-                               fd_count = msg_fd_in+1;
+                       if (msg_fd_in > maxfd)
+                               maxfd = msg_fd_in;
+               } else if (msg_list_head) {
+                       FD_SET(msg_fd_out, &w_fds);
+                       if (msg_fd_out > maxfd)
+                               maxfd = msg_fd_out;
                }
                if (io_filesfrom_f_out >= 0) {
                        int new_fd;
@@ -400,25 +405,21 @@ static int read_timeout(int fd, char *buf, size_t len)
                                FD_SET(io_filesfrom_f_out, &w_fds);
                                new_fd = io_filesfrom_f_out;
                        }
-                       if (new_fd >= fd_count)
-                               fd_count = new_fd+1;
+                       if (new_fd > maxfd)
+                               maxfd = new_fd;
                }
 
-               tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
+               tv.tv_sec = select_timeout;
                tv.tv_usec = 0;
 
                errno = 0;
 
-               count = select(fd_count, &r_fds,
+               count = select(maxfd + 1, &r_fds,
                               io_filesfrom_buflen? &w_fds : NULL,
                               NULL, &tv);
 
-               if (count == 0) {
-                       msg_list_push(NORMAL_FLUSH);
-                       check_timeout();
-               }
-
                if (count <= 0) {
+                       check_timeout();
                        if (errno == EBADF)
                                exit_cleanup(RERR_SOCKETIO);
                        continue;
@@ -426,6 +427,8 @@ static int read_timeout(int fd, char *buf, size_t len)
 
                if (msg_fd_in >= 0 && FD_ISSET(msg_fd_in, &r_fds))
                        read_msg_fd();
+               else if (msg_list_head && FD_ISSET(msg_fd_out, &w_fds))
+                       msg_list_push(NORMAL_FLUSH);
 
                if (io_filesfrom_f_out >= 0) {
                        if (io_filesfrom_buflen) {
@@ -455,7 +458,6 @@ static int read_timeout(int fd, char *buf, size_t len)
                                                io_filesfrom_buflen = io_filesfrom_lastchar? 2 : 1;
                                                io_filesfrom_f_in = -1;
                                        } else {
-                                               extern int eol_nulls;
                                                if (!eol_nulls) {
                                                        char *s = io_filesfrom_buf + l;
                                                        /* Transform CR and/or LF into '\0' */
@@ -496,22 +498,20 @@ static int read_timeout(int fd, char *buf, size_t len)
 
                n = read(fd, buf, len);
 
-               if (n > 0) {
-                       buf += n;
-                       len -= n;
-                       ret += n;
-                       if (io_timeout)
-                               last_io = time(NULL);
-                       continue;
-               } else if (n == 0) {
-                       whine_about_eof();
-                       return -1; /* doesn't return */
-               } else if (n < 0) {
+               if (n <= 0) {
+                       if (n == 0)
+                               whine_about_eof(); /* Doesn't return. */
                        if (errno == EINTR || errno == EWOULDBLOCK
                            || errno == EAGAIN)
                                continue;
-                       die_from_readerr(errno);
+                       die_from_readerr(errno); /* Doesn't return. */
                }
+
+               buf += n;
+               len -= n;
+               ret += n;
+               if (io_timeout)
+                       last_io = time(NULL);
        }
 
        return ret;
@@ -525,9 +525,6 @@ int read_filesfrom_line(int fd, char *fname)
 {
        char ch, *s, *eob = fname + MAXPATHLEN - 1;
        int cnt;
-       extern int io_timeout;
-       extern int eol_nulls;
-       extern char *remote_filesfrom_file;
        int reading_remotely = remote_filesfrom_file != NULL;
        int nulls = eol_nulls || reading_remotely;
 
@@ -541,7 +538,7 @@ int read_filesfrom_line(int fd, char *fname)
                        fd_set fds;
                        FD_ZERO(&fds);
                        FD_SET(fd, &fds);
-                       tv.tv_sec = io_timeout? io_timeout : SELECT_TIMEOUT;
+                       tv.tv_sec = select_timeout;
                        tv.tv_usec = 0;
                        if (!select(fd+1, &fds, NULL, NULL, &tv))
                                check_timeout();
@@ -798,41 +795,40 @@ static void sleep_for_bwlimit(int bytes_written)
  **/
 static void writefd_unbuffered(int fd,char *buf,size_t len)
 {
-       size_t total = 0;
+       size_t n, total = 0;
        fd_set w_fds, r_fds;
-       int fd_count, count;
+       int maxfd, count, ret;
        struct timeval tv;
 
-       msg_list_push(NORMAL_FLUSH);
+       if (fd == msg_fd_out) {
+               rprintf(FERROR, "Internal error: wrong write used in receiver.\n");
+               exit_cleanup(RERR_PROTOCOL);
+       }
 
        no_flush++;
 
        while (total < len) {
                FD_ZERO(&w_fds);
                FD_SET(fd,&w_fds);
-               fd_count = fd;
+               maxfd = fd;
 
                if (msg_fd_in >= 0) {
                        FD_ZERO(&r_fds);
                        FD_SET(msg_fd_in,&r_fds);
-                       if (msg_fd_in > fd_count)
-                               fd_count = msg_fd_in;
+                       if (msg_fd_in > maxfd)
+                               maxfd = msg_fd_in;
                }
 
-               tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
+               tv.tv_sec = select_timeout;
                tv.tv_usec = 0;
 
                errno = 0;
-               count = select(fd_count+1, msg_fd_in >= 0 ? &r_fds : NULL,
+               count = select(maxfd + 1, msg_fd_in >= 0 ? &r_fds : NULL,
                               &w_fds, NULL, &tv);
 
-               if (count == 0) {
-                       msg_list_push(NORMAL_FLUSH);
-                       check_timeout();
-               }
-
                if (count <= 0) {
-                       if (errno == EBADF)
+                       check_timeout();
+                       if (count < 0 && errno == EBADF)
                                exit_cleanup(RERR_SOCKETIO);
                        continue;
                }
@@ -840,13 +836,15 @@ static void writefd_unbuffered(int fd,char *buf,size_t len)
                if (msg_fd_in >= 0 && FD_ISSET(msg_fd_in, &r_fds))
                        read_msg_fd();
 
-               if (FD_ISSET(fd, &w_fds)) {
-                       int ret;
-                       size_t n = len-total;
-                       if (bwlimit && n > bwlimit_writemax)
-                               n = bwlimit_writemax;
-                       ret = write(fd,buf+total,n);
+               if (!FD_ISSET(fd, &w_fds))
+                       continue;
+
+               n = len - total;
+               if (bwlimit && n > bwlimit_writemax)
+                       n = bwlimit_writemax;
+               ret = write(fd, buf + total, n);
 
+               if (ret <= 0) {
                        if (ret < 0) {
                                if (errno == EINTR)
                                        continue;
@@ -856,23 +854,20 @@ static void writefd_unbuffered(int fd,char *buf,size_t len)
                                }
                        }
 
-                       if (ret <= 0) {
-                               /* Don't try to write errors back
-                                * across the stream */
-                               io_multiplexing_close();
-                               rsyserr(FERROR, errno,
-                                       "writefd_unbuffered failed to write %ld bytes: phase \"%s\"",
-                                       (long)len, io_write_phase);
-                               exit_cleanup(RERR_STREAMIO);
-                       }
+                       /* Don't try to write errors back across the stream. */
+                       io_multiplexing_close();
+                       rsyserr(FERROR, errno,
+                               "writefd_unbuffered failed to write %ld bytes: phase \"%s\"",
+                               (long)len, io_write_phase);
+                       exit_cleanup(RERR_STREAMIO);
+               }
 
-                       sleep_for_bwlimit(ret);
+               sleep_for_bwlimit(ret);
 
-                       total += ret;
+               total += ret;
 
-                       if (io_timeout)
-                               last_io = time(NULL);
-               }
+               if (io_timeout)
+                       last_io = time(NULL);
        }
 
        no_flush--;
@@ -919,9 +914,8 @@ static void mplex_write(int fd, enum msgcode code, char *buf, size_t len)
        len -= n;
        buf += n;
 
-       if (len) {
+       if (len)
                writefd_unbuffered(fd, buf, len);
-       }
 }
 
 
@@ -955,7 +949,10 @@ static void writefd(int fd,char *buf,size_t len)
 {
        stats.total_written += len;
 
-       msg_list_push(NORMAL_FLUSH);
+       if (fd == msg_fd_out) {
+               rprintf(FERROR, "Internal error: wrong write used in receiver.\n");
+               exit_cleanup(RERR_PROTOCOL);
+       }
 
        if (!io_buffer || fd != multiplex_out_fd) {
                writefd_unbuffered(fd, buf, len);