- Made the maximum-fd computation prior to a select() use the same idiom
[rsync/rsync.git] / io.c
diff --git a/io.c b/io.c
index 6028565..f34a899 100644 (file)
--- a/io.c
+++ b/io.c
@@ -53,10 +53,12 @@ extern int io_timeout;
 extern int am_server;
 extern int am_daemon;
 extern int am_sender;
+extern int eol_nulls;
+extern char *remote_filesfrom_file;
 extern struct stats stats;
 
-
 const char phase_unknown[] = "unknown";
+int select_timeout = SELECT_TIMEOUT;
 
 /**
  * The connection might be dropped at some point; perhaps because the
@@ -192,13 +194,13 @@ void send_msg(enum msgcode code, char *buf, int len)
  * called by the generator. */
 static void read_msg_fd(void)
 {
-       char buf[200];
+       char buf[2048];
        size_t n;
        int fd = msg_fd_in;
        int tag, len;
 
-       /* Temporarily disable msg_fd_in.  This is needed because we
-        * may call a write routine that could try to call us back. */
+       /* Temporarily disable msg_fd_in.  This is needed to avoid looping back
+        * to this routine from read_timeout() and writefd_unbuffered(). */
        msg_fd_in = -1;
 
        read_loop(fd, buf, 4);
@@ -245,7 +247,7 @@ static void read_msg_fd(void)
 
 /* Try to push messages off the list onto the wire.  If we leave with more
  * to do, return 0.  On error, return -1.  If everything flushed, return 1.
- * This is only called by the receiver. */
+ * This is only active in the receiver. */
 int msg_list_push(int flush_it_all)
 {
        static int written = 0;
@@ -267,7 +269,7 @@ int msg_list_push(int flush_it_all)
                                return 0;
                        FD_ZERO(&fds);
                        FD_SET(msg_fd_out, &fds);
-                       tv.tv_sec = io_timeout ? io_timeout : SELECT_TIMEOUT;
+                       tv.tv_sec = select_timeout;
                        tv.tv_usec = 0;
                        if (!select(msg_fd_out+1, NULL, &fds, NULL, &tv))
                                check_timeout();
@@ -334,13 +336,12 @@ static void whine_about_eof(void)
 {
        if (kludge_around_eof)
                exit_cleanup(0);
-       else {
-               rprintf(FERROR, RSYNC_NAME ": connection unexpectedly closed "
-                       "(%.0f bytes read so far)\n",
-                       (double)stats.total_read);
 
-               exit_cleanup(RERR_STREAMIO);
-       }
+       rprintf(FERROR, RSYNC_NAME ": connection unexpectedly closed "
+               "(%.0f bytes read so far)\n",
+               (double)stats.total_read);
+
+       exit_cleanup(RERR_STREAMIO);
 }
 
 
@@ -375,15 +376,19 @@ static int read_timeout(int fd, char *buf, size_t len)
                /* until we manage to read *something* */
                fd_set r_fds, w_fds;
                struct timeval tv;
-               int fd_count = fd+1;
+               int maxfd = fd;
                int count;
 
                FD_ZERO(&r_fds);
                FD_SET(fd, &r_fds);
                if (msg_fd_in >= 0) {
                        FD_SET(msg_fd_in, &r_fds);
-                       if (msg_fd_in >= fd_count)
-                               fd_count = msg_fd_in+1;
+                       if (msg_fd_in > maxfd)
+                               maxfd = msg_fd_in;
+               } else if (msg_list_head) {
+                       FD_SET(msg_fd_out, &w_fds);
+                       if (msg_fd_out > maxfd)
+                               maxfd = msg_fd_out;
                }
                if (io_filesfrom_f_out >= 0) {
                        int new_fd;
@@ -400,33 +405,30 @@ static int read_timeout(int fd, char *buf, size_t len)
                                FD_SET(io_filesfrom_f_out, &w_fds);
                                new_fd = io_filesfrom_f_out;
                        }
-                       if (new_fd >= fd_count)
-                               fd_count = new_fd+1;
+                       if (new_fd > maxfd)
+                               maxfd = new_fd;
                }
 
-               tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
+               tv.tv_sec = select_timeout;
                tv.tv_usec = 0;
 
                errno = 0;
 
-               count = select(fd_count, &r_fds,
+               count = select(maxfd + 1, &r_fds,
                               io_filesfrom_buflen? &w_fds : NULL,
                               NULL, &tv);
 
-               if (count == 0) {
-                       msg_list_push(NORMAL_FLUSH);
-                       check_timeout();
-               }
-
                if (count <= 0) {
-                       if (errno == EBADF) {
+                       check_timeout();
+                       if (errno == EBADF)
                                exit_cleanup(RERR_SOCKETIO);
-                       }
                        continue;
                }
 
                if (msg_fd_in >= 0 && FD_ISSET(msg_fd_in, &r_fds))
                        read_msg_fd();
+               else if (msg_list_head && FD_ISSET(msg_fd_out, &w_fds))
+                       msg_list_push(NORMAL_FLUSH);
 
                if (io_filesfrom_f_out >= 0) {
                        if (io_filesfrom_buflen) {
@@ -456,7 +458,6 @@ static int read_timeout(int fd, char *buf, size_t len)
                                                io_filesfrom_buflen = io_filesfrom_lastchar? 2 : 1;
                                                io_filesfrom_f_in = -1;
                                        } else {
-                                               extern int eol_nulls;
                                                if (!eol_nulls) {
                                                        char *s = io_filesfrom_buf + l;
                                                        /* Transform CR and/or LF into '\0' */
@@ -492,26 +493,25 @@ static int read_timeout(int fd, char *buf, size_t len)
                        }
                }
 
-               if (!FD_ISSET(fd, &r_fds)) continue;
+               if (!FD_ISSET(fd, &r_fds))
+                       continue;
 
                n = read(fd, buf, len);
 
-               if (n > 0) {
-                       buf += n;
-                       len -= n;
-                       ret += n;
-                       if (io_timeout)
-                               last_io = time(NULL);
-                       continue;
-               } else if (n == 0) {
-                       whine_about_eof();
-                       return -1; /* doesn't return */
-               } else if (n < 0) {
+               if (n <= 0) {
+                       if (n == 0)
+                               whine_about_eof(); /* Doesn't return. */
                        if (errno == EINTR || errno == EWOULDBLOCK
                            || errno == EAGAIN)
                                continue;
-                       die_from_readerr(errno);
+                       die_from_readerr(errno); /* Doesn't return. */
                }
+
+               buf += n;
+               len -= n;
+               ret += n;
+               if (io_timeout)
+                       last_io = time(NULL);
        }
 
        return ret;
@@ -525,9 +525,6 @@ int read_filesfrom_line(int fd, char *fname)
 {
        char ch, *s, *eob = fname + MAXPATHLEN - 1;
        int cnt;
-       extern int io_timeout;
-       extern int eol_nulls;
-       extern char *remote_filesfrom_file;
        int reading_remotely = remote_filesfrom_file != NULL;
        int nulls = eol_nulls || reading_remotely;
 
@@ -541,7 +538,7 @@ int read_filesfrom_line(int fd, char *fname)
                        fd_set fds;
                        FD_ZERO(&fds);
                        FD_SET(fd, &fds);
-                       tv.tv_sec = io_timeout? io_timeout : SELECT_TIMEOUT;
+                       tv.tv_sec = select_timeout;
                        tv.tv_usec = 0;
                        if (!select(fd+1, &fds, NULL, NULL, &tv))
                                check_timeout();
@@ -605,7 +602,8 @@ static int read_unbuffered(int fd, char *buf, size_t len)
                if (!buffer) {
                        bufferSz = 2 * IO_BUFFER_SIZE;
                        buffer   = new_array(char, bufferSz);
-                       if (!buffer) out_of_memory("read_unbuffered");
+                       if (!buffer)
+                               out_of_memory("read_unbuffered");
                }
                remaining = read_timeout(fd, buffer, bufferSz);
                bufferIdx = 0;
@@ -631,7 +629,8 @@ static int read_unbuffered(int fd, char *buf, size_t len)
                case MSG_DATA:
                        if (!buffer || remaining > bufferSz) {
                                buffer = realloc_array(buffer, char, remaining);
-                               if (!buffer) out_of_memory("read_unbuffered");
+                               if (!buffer)
+                                       out_of_memory("read_unbuffered");
                                bufferSz = remaining;
                        }
                        read_loop(fd, buffer, remaining);
@@ -688,7 +687,8 @@ int32 read_int(int f)
 
        readfd(f,b,4);
        ret = IVAL(b,0);
-       if (ret == (int32)0xffffffff) return -1;
+       if (ret == (int32)0xffffffff)
+               return -1;
        return ret;
 }
 
@@ -698,9 +698,8 @@ int64 read_longint(int f)
        char b[8];
        ret = read_int(f);
 
-       if ((int32)ret != (int32)0xffffffff) {
+       if ((int32)ret != (int32)0xffffffff)
                return ret;
-       }
 
 #ifdef NO_INT64
        rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
@@ -796,56 +795,56 @@ static void sleep_for_bwlimit(int bytes_written)
  **/
 static void writefd_unbuffered(int fd,char *buf,size_t len)
 {
-       size_t total = 0;
+       size_t n, total = 0;
        fd_set w_fds, r_fds;
-       int fd_count, count;
+       int maxfd, count, ret;
        struct timeval tv;
 
-       msg_list_push(NORMAL_FLUSH);
+       if (fd == msg_fd_out) {
+               rprintf(FERROR, "Internal error: wrong write used in receiver.\n");
+               exit_cleanup(RERR_PROTOCOL);
+       }
 
        no_flush++;
 
        while (total < len) {
                FD_ZERO(&w_fds);
                FD_SET(fd,&w_fds);
-               fd_count = fd;
+               maxfd = fd;
 
                if (msg_fd_in >= 0) {
                        FD_ZERO(&r_fds);
                        FD_SET(msg_fd_in,&r_fds);
-                       if (msg_fd_in > fd_count)
-                               fd_count = msg_fd_in;
+                       if (msg_fd_in > maxfd)
+                               maxfd = msg_fd_in;
                }
 
-               tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
+               tv.tv_sec = select_timeout;
                tv.tv_usec = 0;
 
                errno = 0;
-               count = select(fd_count+1, msg_fd_in >= 0 ? &r_fds : NULL,
+               count = select(maxfd + 1, msg_fd_in >= 0 ? &r_fds : NULL,
                               &w_fds, NULL, &tv);
 
-               if (count == 0) {
-                       msg_list_push(NORMAL_FLUSH);
-                       check_timeout();
-               }
-
                if (count <= 0) {
-                       if (errno == EBADF) {
+                       check_timeout();
+                       if (count < 0 && errno == EBADF)
                                exit_cleanup(RERR_SOCKETIO);
-                       }
                        continue;
                }
 
                if (msg_fd_in >= 0 && FD_ISSET(msg_fd_in, &r_fds))
                        read_msg_fd();
 
-               if (FD_ISSET(fd, &w_fds)) {
-                       int ret;
-                       size_t n = len-total;
-                       if (bwlimit && n > bwlimit_writemax)
-                               n = bwlimit_writemax;
-                       ret = write(fd,buf+total,n);
+               if (!FD_ISSET(fd, &w_fds))
+                       continue;
+
+               n = len - total;
+               if (bwlimit && n > bwlimit_writemax)
+                       n = bwlimit_writemax;
+               ret = write(fd, buf + total, n);
 
+               if (ret <= 0) {
                        if (ret < 0) {
                                if (errno == EINTR)
                                        continue;
@@ -855,23 +854,20 @@ static void writefd_unbuffered(int fd,char *buf,size_t len)
                                }
                        }
 
-                       if (ret <= 0) {
-                               /* Don't try to write errors back
-                                * across the stream */
-                               io_multiplexing_close();
-                               rsyserr(FERROR, errno,
-                                       "writefd_unbuffered failed to write %ld bytes: phase \"%s\"",
-                                       (long) len, io_write_phase);
-                               exit_cleanup(RERR_STREAMIO);
-                       }
+                       /* Don't try to write errors back across the stream. */
+                       io_multiplexing_close();
+                       rsyserr(FERROR, errno,
+                               "writefd_unbuffered failed to write %ld bytes: phase \"%s\"",
+                               (long)len, io_write_phase);
+                       exit_cleanup(RERR_STREAMIO);
+               }
 
-                       sleep_for_bwlimit(ret);
+               sleep_for_bwlimit(ret);
 
-                       total += ret;
+               total += ret;
 
-                       if (io_timeout)
-                               last_io = time(NULL);
-               }
+               if (io_timeout)
+                       last_io = time(NULL);
        }
 
        no_flush--;
@@ -883,10 +879,12 @@ static int io_buffer_count;
 
 void io_start_buffering_out(int fd)
 {
-       if (io_buffer) return;
+       if (io_buffer)
+               return;
        multiplex_out_fd = fd;
        io_buffer = new_array(char, IO_BUFFER_SIZE);
-       if (!io_buffer) out_of_memory("writefd");
+       if (!io_buffer)
+               out_of_memory("writefd");
        io_buffer_count = 0;
 }
 
@@ -916,9 +914,8 @@ static void mplex_write(int fd, enum msgcode code, char *buf, size_t len)
        len -= n;
        buf += n;
 
-       if (len) {
+       if (len)
                writefd_unbuffered(fd, buf, len);
-       }
 }
 
 
@@ -952,7 +949,10 @@ static void writefd(int fd,char *buf,size_t len)
 {
        stats.total_written += len;
 
-       msg_list_push(NORMAL_FLUSH);
+       if (fd == msg_fd_out) {
+               rprintf(FERROR, "Internal error: wrong write used in receiver.\n");
+               exit_cleanup(RERR_PROTOCOL);
+       }
 
        if (!io_buffer || fd != multiplex_out_fd) {
                writefd_unbuffered(fd, buf, len);
@@ -960,7 +960,7 @@ static void writefd(int fd,char *buf,size_t len)
        }
 
        while (len) {
-               int n = MIN((int) len, IO_BUFFER_SIZE-io_buffer_count);
+               int n = MIN((int)len, IO_BUFFER_SIZE-io_buffer_count);
                if (n > 0) {
                        memcpy(io_buffer+io_buffer_count, buf, n);
                        buf += n;
@@ -1070,7 +1070,8 @@ void io_printf(int fd, const char *format, ...)
        len = vsnprintf(buf, sizeof buf, format, ap);
        va_end(ap);
 
-       if (len < 0) exit_cleanup(RERR_STREAMIO);
+       if (len < 0)
+               exit_cleanup(RERR_STREAMIO);
 
        write_sbuf(fd, buf);
 }
@@ -1096,7 +1097,8 @@ void io_start_multiplex_in(int fd)
 /** Write an message to the multiplexed data stream. */
 int io_multiplex_write(enum msgcode code, char *buf, size_t len)
 {
-       if (!io_multiplexing_out) return 0;
+       if (!io_multiplexing_out)
+               return 0;
 
        io_flush(NORMAL_FLUSH);
        stats.total_written += (len+4);