Changed a few FINFO messages to FLOG.
[rsync/rsync.git] / io.c
diff --git a/io.c b/io.c
index c9691ed..929609e 100644 (file)
--- a/io.c
+++ b/io.c
@@ -47,15 +47,18 @@ static time_t last_io;
 static int no_flush;
 
 extern int bwlimit;
+extern size_t bwlimit_writemax;
 extern int verbose;
 extern int io_timeout;
 extern int am_server;
 extern int am_daemon;
 extern int am_sender;
+extern int eol_nulls;
+extern char *remote_filesfrom_file;
 extern struct stats stats;
 
-
 const char phase_unknown[] = "unknown";
+int select_timeout = SELECT_TIMEOUT;
 
 /**
  * The connection might be dropped at some point; perhaps because the
@@ -266,7 +269,7 @@ int msg_list_push(int flush_it_all)
                                return 0;
                        FD_ZERO(&fds);
                        FD_SET(msg_fd_out, &fds);
-                       tv.tv_sec = io_timeout ? io_timeout : SELECT_TIMEOUT;
+                       tv.tv_sec = select_timeout;
                        tv.tv_usec = 0;
                        if (!select(msg_fd_out+1, NULL, &fds, NULL, &tv))
                                check_timeout();
@@ -383,6 +386,10 @@ static int read_timeout(int fd, char *buf, size_t len)
                        FD_SET(msg_fd_in, &r_fds);
                        if (msg_fd_in >= fd_count)
                                fd_count = msg_fd_in+1;
+               } else if (msg_list_head) {
+                       FD_SET(msg_fd_out, &w_fds);
+                       if (msg_fd_out >= fd_count)
+                               fd_count = msg_fd_out+1;
                }
                if (io_filesfrom_f_out >= 0) {
                        int new_fd;
@@ -403,7 +410,7 @@ static int read_timeout(int fd, char *buf, size_t len)
                                fd_count = new_fd+1;
                }
 
-               tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
+               tv.tv_sec = select_timeout;
                tv.tv_usec = 0;
 
                errno = 0;
@@ -412,20 +419,17 @@ static int read_timeout(int fd, char *buf, size_t len)
                               io_filesfrom_buflen? &w_fds : NULL,
                               NULL, &tv);
 
-               if (count == 0) {
-                       msg_list_push(NORMAL_FLUSH);
-                       check_timeout();
-               }
-
                if (count <= 0) {
-                       if (errno == EBADF) {
+                       check_timeout();
+                       if (errno == EBADF)
                                exit_cleanup(RERR_SOCKETIO);
-                       }
                        continue;
                }
 
                if (msg_fd_in >= 0 && FD_ISSET(msg_fd_in, &r_fds))
                        read_msg_fd();
+               else if (msg_list_head && FD_ISSET(msg_fd_out, &w_fds))
+                       msg_list_push(NORMAL_FLUSH);
 
                if (io_filesfrom_f_out >= 0) {
                        if (io_filesfrom_buflen) {
@@ -455,7 +459,6 @@ static int read_timeout(int fd, char *buf, size_t len)
                                                io_filesfrom_buflen = io_filesfrom_lastchar? 2 : 1;
                                                io_filesfrom_f_in = -1;
                                        } else {
-                                               extern int eol_nulls;
                                                if (!eol_nulls) {
                                                        char *s = io_filesfrom_buf + l;
                                                        /* Transform CR and/or LF into '\0' */
@@ -491,7 +494,8 @@ static int read_timeout(int fd, char *buf, size_t len)
                        }
                }
 
-               if (!FD_ISSET(fd, &r_fds)) continue;
+               if (!FD_ISSET(fd, &r_fds))
+                       continue;
 
                n = read(fd, buf, len);
 
@@ -524,9 +528,6 @@ int read_filesfrom_line(int fd, char *fname)
 {
        char ch, *s, *eob = fname + MAXPATHLEN - 1;
        int cnt;
-       extern int io_timeout;
-       extern int eol_nulls;
-       extern char *remote_filesfrom_file;
        int reading_remotely = remote_filesfrom_file != NULL;
        int nulls = eol_nulls || reading_remotely;
 
@@ -540,7 +541,7 @@ int read_filesfrom_line(int fd, char *fname)
                        fd_set fds;
                        FD_ZERO(&fds);
                        FD_SET(fd, &fds);
-                       tv.tv_sec = io_timeout? io_timeout : SELECT_TIMEOUT;
+                       tv.tv_sec = select_timeout;
                        tv.tv_usec = 0;
                        if (!select(fd+1, &fds, NULL, NULL, &tv))
                                check_timeout();
@@ -604,7 +605,8 @@ static int read_unbuffered(int fd, char *buf, size_t len)
                if (!buffer) {
                        bufferSz = 2 * IO_BUFFER_SIZE;
                        buffer   = new_array(char, bufferSz);
-                       if (!buffer) out_of_memory("read_unbuffered");
+                       if (!buffer)
+                               out_of_memory("read_unbuffered");
                }
                remaining = read_timeout(fd, buffer, bufferSz);
                bufferIdx = 0;
@@ -630,7 +632,8 @@ static int read_unbuffered(int fd, char *buf, size_t len)
                case MSG_DATA:
                        if (!buffer || remaining > bufferSz) {
                                buffer = realloc_array(buffer, char, remaining);
-                               if (!buffer) out_of_memory("read_unbuffered");
+                               if (!buffer)
+                                       out_of_memory("read_unbuffered");
                                bufferSz = remaining;
                        }
                        read_loop(fd, buffer, remaining);
@@ -687,7 +690,8 @@ int32 read_int(int f)
 
        readfd(f,b,4);
        ret = IVAL(b,0);
-       if (ret == (int32)0xffffffff) return -1;
+       if (ret == (int32)0xffffffff)
+               return -1;
        return ret;
 }
 
@@ -697,9 +701,8 @@ int64 read_longint(int f)
        char b[8];
        ret = read_int(f);
 
-       if ((int32)ret != (int32)0xffffffff) {
+       if ((int32)ret != (int32)0xffffffff)
                return ret;
-       }
 
 #ifdef NO_INT64
        rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
@@ -739,22 +742,51 @@ unsigned char read_byte(int f)
  * use a bit less bandwidth than specified, because it doesn't make up
  * for slow periods.  But arguably this is a feature.  In addition, we
  * ought to take the time used to write the data into account.
+ *
+ * During some phases of big transfers (file FOO is uptodate) this is
+ * called with a small bytes_written every time.  As the kernel has to
+ * round small waits up to guarantee that we actually wait at least the
+ * requested number of microseconds, this can become grossly inaccurate.
+ * We therefore keep track of the bytes we've written over time and only
+ * sleep when the accumulated delay is at least 1 tenth of a second.
  **/
 static void sleep_for_bwlimit(int bytes_written)
 {
-       struct timeval tv;
+       static struct timeval prior_tv;
+       static long total_written = 0; 
+       struct timeval tv, start_tv;
+       long elapsed_usec, sleep_usec;
+
+#define ONE_SEC        1000000L /* # of microseconds in a second */
 
        if (!bwlimit)
                return;
 
-       assert(bytes_written > 0);
-       assert(bwlimit > 0);
+       total_written += bytes_written; 
 
-       tv.tv_usec = bytes_written * 1000 / bwlimit;
-       tv.tv_sec  = tv.tv_usec / 1000000;
-       tv.tv_usec = tv.tv_usec % 1000000;
+       gettimeofday(&start_tv, NULL);
+       if (prior_tv.tv_sec) {
+               elapsed_usec = (start_tv.tv_sec - prior_tv.tv_sec) * ONE_SEC
+                            + (start_tv.tv_usec - prior_tv.tv_usec);
+               total_written -= elapsed_usec * bwlimit / (ONE_SEC/1024);
+               if (total_written < 0)
+                       total_written = 0;
+       }
 
+       sleep_usec = total_written * (ONE_SEC/1024) / bwlimit;
+       if (sleep_usec < ONE_SEC / 10) {
+               prior_tv = start_tv;
+               return;
+       }
+
+       tv.tv_sec  = sleep_usec / ONE_SEC;
+       tv.tv_usec = sleep_usec % ONE_SEC;
        select(0, NULL, NULL, NULL, &tv);
+
+       gettimeofday(&prior_tv, NULL);
+       elapsed_usec = (prior_tv.tv_sec - start_tv.tv_sec) * ONE_SEC
+                    + (prior_tv.tv_usec - start_tv.tv_usec);
+       total_written = (sleep_usec - elapsed_usec) * bwlimit / (ONE_SEC/1024);
 }
 
 
@@ -771,7 +803,10 @@ static void writefd_unbuffered(int fd,char *buf,size_t len)
        int fd_count, count;
        struct timeval tv;
 
-       msg_list_push(NORMAL_FLUSH);
+       if (fd == msg_fd_out) {
+               rprintf(FERROR, "Internal error: wrong write used in receiver.\n");
+               exit_cleanup(RERR_PROTOCOL);
+       }
 
        no_flush++;
 
@@ -787,22 +822,17 @@ static void writefd_unbuffered(int fd,char *buf,size_t len)
                                fd_count = msg_fd_in;
                }
 
-               tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
+               tv.tv_sec = select_timeout;
                tv.tv_usec = 0;
 
                errno = 0;
                count = select(fd_count+1, msg_fd_in >= 0 ? &r_fds : NULL,
                               &w_fds, NULL, &tv);
 
-               if (count == 0) {
-                       msg_list_push(NORMAL_FLUSH);
-                       check_timeout();
-               }
-
                if (count <= 0) {
-                       if (errno == EBADF) {
+                       check_timeout();
+                       if (errno == EBADF)
                                exit_cleanup(RERR_SOCKETIO);
-                       }
                        continue;
                }
 
@@ -812,6 +842,8 @@ static void writefd_unbuffered(int fd,char *buf,size_t len)
                if (FD_ISSET(fd, &w_fds)) {
                        int ret;
                        size_t n = len-total;
+                       if (bwlimit && n > bwlimit_writemax)
+                               n = bwlimit_writemax;
                        ret = write(fd,buf+total,n);
 
                        if (ret < 0) {
@@ -829,7 +861,7 @@ static void writefd_unbuffered(int fd,char *buf,size_t len)
                                io_multiplexing_close();
                                rsyserr(FERROR, errno,
                                        "writefd_unbuffered failed to write %ld bytes: phase \"%s\"",
-                                       (long) len, io_write_phase);
+                                       (long)len, io_write_phase);
                                exit_cleanup(RERR_STREAMIO);
                        }
 
@@ -851,10 +883,12 @@ static int io_buffer_count;
 
 void io_start_buffering_out(int fd)
 {
-       if (io_buffer) return;
+       if (io_buffer)
+               return;
        multiplex_out_fd = fd;
        io_buffer = new_array(char, IO_BUFFER_SIZE);
-       if (!io_buffer) out_of_memory("writefd");
+       if (!io_buffer)
+               out_of_memory("writefd");
        io_buffer_count = 0;
 }
 
@@ -920,7 +954,10 @@ static void writefd(int fd,char *buf,size_t len)
 {
        stats.total_written += len;
 
-       msg_list_push(NORMAL_FLUSH);
+       if (fd == msg_fd_out) {
+               rprintf(FERROR, "Internal error: wrong write used in receiver.\n");
+               exit_cleanup(RERR_PROTOCOL);
+       }
 
        if (!io_buffer || fd != multiplex_out_fd) {
                writefd_unbuffered(fd, buf, len);
@@ -928,7 +965,7 @@ static void writefd(int fd,char *buf,size_t len)
        }
 
        while (len) {
-               int n = MIN((int) len, IO_BUFFER_SIZE-io_buffer_count);
+               int n = MIN((int)len, IO_BUFFER_SIZE-io_buffer_count);
                if (n > 0) {
                        memcpy(io_buffer+io_buffer_count, buf, n);
                        buf += n;
@@ -1038,7 +1075,8 @@ void io_printf(int fd, const char *format, ...)
        len = vsnprintf(buf, sizeof buf, format, ap);
        va_end(ap);
 
-       if (len < 0) exit_cleanup(RERR_STREAMIO);
+       if (len < 0)
+               exit_cleanup(RERR_STREAMIO);
 
        write_sbuf(fd, buf);
 }
@@ -1064,7 +1102,8 @@ void io_start_multiplex_in(int fd)
 /** Write an message to the multiplexed data stream. */
 int io_multiplex_write(enum msgcode code, char *buf, size_t len)
 {
-       if (!io_multiplexing_out) return 0;
+       if (!io_multiplexing_out)
+               return 0;
 
        io_flush(NORMAL_FLUSH);
        stats.total_written += (len+4);