We need to call msg2sndr_flush() in read_msg_fd() now.
[rsync/rsync.git] / io.c
diff --git a/io.c b/io.c
index 3ad9147..567065b 100644 (file)
--- a/io.c
+++ b/io.c
@@ -107,7 +107,8 @@ static char int_byte_cnt[64] = {
        5, 5, 5, 5, 5, 5, 5, 5, 6, 6, 6, 6, 7, 7, 8, 9, /* (C0 - FF)/4 */
 };
 
-static int readfd_unbuffered(int fd, char *buf, size_t len);
+static void msg2sndr_flush(void);
+static void readfd(int fd, char *buffer, size_t N);
 static void writefd(int fd, const char *buf, size_t len);
 static void writefd_unbuffered(int fd, const char *buf, size_t len);
 static void decrement_active_files(int ndx);
@@ -263,8 +264,9 @@ static void read_msg_fd(void)
        /* Temporarily disable msg_fd_in.  This is needed to avoid looping back
         * to this routine from writefd_unbuffered(). */
        msg_fd_in = -1;
+       defer_forwarding_messages++;
 
-       readfd_unbuffered(fd, buf, 4);
+       readfd(fd, buf, 4);
        tag = IVAL(buf, 0);
 
        len = tag & 0xFFFFFF;
@@ -284,7 +286,7 @@ static void read_msg_fd(void)
        case MSG_REDO:
                if (len != 4 || !am_generator)
                        goto invalid_msg;
-               readfd_unbuffered(fd, buf, 4);
+               readfd(fd, buf, 4);
                if (remove_source_files)
                        decrement_active_files(IVAL(buf,0));
                flist_ndx_push(&redo_list, IVAL(buf,0));
@@ -294,7 +296,7 @@ static void read_msg_fd(void)
        case MSG_FLIST:
                if (len != 4 || !am_generator || !incremental)
                        goto invalid_msg;
-               readfd_unbuffered(fd, buf, 4);
+               readfd(fd, buf, 4);
                /* Read extra file list from receiver. */
                assert(iobuf_in != NULL);
                assert(iobuf_f_in == fd);
@@ -309,13 +311,13 @@ static void read_msg_fd(void)
        case MSG_DELETED:
                if (len >= (int)sizeof buf || !am_generator)
                        goto invalid_msg;
-               readfd_unbuffered(fd, buf, len);
+               readfd(fd, buf, len);
                send_msg(MSG_DELETED, buf, len);
                break;
        case MSG_SUCCESS:
                if (len != 4 || !am_generator)
                        goto invalid_msg;
-               readfd_unbuffered(fd, buf, len);
+               readfd(fd, buf, len);
                if (remove_source_files) {
                        decrement_active_files(IVAL(buf,0));
                        send_msg(MSG_SUCCESS, buf, len);
@@ -328,7 +330,7 @@ static void read_msg_fd(void)
        case MSG_NO_SEND:
                if (len != 4 || !am_generator)
                        goto invalid_msg;
-               readfd_unbuffered(fd, buf, len);
+               readfd(fd, buf, len);
                if (incremental)
                        decrement_flist_in_progress(IVAL(buf,0), 0);
                break;
@@ -346,7 +348,7 @@ static void read_msg_fd(void)
                        n = len;
                        if (n >= sizeof buf)
                                n = sizeof buf - 1;
-                       readfd_unbuffered(fd, buf, n);
+                       readfd(fd, buf, n);
                        rwrite((enum logcode)tag, buf, n);
                        len -= n;
                }
@@ -357,7 +359,9 @@ static void read_msg_fd(void)
                exit_cleanup(RERR_STREAMIO);
        }
 
+       defer_forwarding_messages--;
        msg_fd_in = fd;
+       msg2sndr_flush();
 }
 
 /* This is used by the generator to limit how many file transfers can
@@ -417,7 +421,7 @@ static void decrement_flist_in_progress(int ndx, int redo)
  * This is only active in the receiver. */
 static int msg2genr_flush(void)
 {
-       if (msg_fd_out < 0 || no_flush)
+       if (msg_fd_out < 0 || no_flush || flist_forward_from >= 0)
                return -1;
 
        no_flush++;
@@ -538,7 +542,7 @@ static int read_timeout(int fd, char *buf, size_t len)
 {
        int n, cnt = 0;
 
-       io_flush(NORMAL_FLUSH);
+       io_flush(FULL_FLUSH);
 
        while (cnt == 0) {
                /* until we manage to read *something* */
@@ -818,8 +822,8 @@ void start_flist_forward(int f_in)
 
 void stop_flist_forward()
 {
-       io_flush(NORMAL_FLUSH);
        flist_forward_from = -1;
+       io_flush(FULL_FLUSH);
 }
 
 /**
@@ -947,11 +951,8 @@ static int readfd_unbuffered(int fd, char *buf, size_t len)
        return cnt;
 }
 
-/**
- * Do a buffered read from @p fd.  Don't return until all @p n bytes
- * have been read.  If all @p n can't be read then exit with an
- * error.
- **/
+/* Do a buffered read from fd.  Don't return until all N bytes have
+ * been read.  If all N can't be read then exit with an error. */
 static void readfd(int fd, char *buffer, size_t N)
 {
        int  cnt;
@@ -1345,10 +1346,12 @@ static void mplex_write(enum msgcode code, const char *buf, size_t len)
        }
 }
 
-void io_flush(UNUSED(int flush_it_all))
+void io_flush(int flush_it_all)
 {
-       msg2genr_flush();
-       msg2sndr_flush();
+       if (flush_it_all) {
+               msg2genr_flush();
+               msg2sndr_flush();
+       }
 
        if (!iobuf_out_cnt || no_flush)
                return;
@@ -1543,6 +1546,86 @@ void write_vstring(int f, const char *str, int len)
                writefd(f, str, len);
 }
 
+/* Send a file-list index using a byte-reduction method. */
+void write_ndx(int f, int32 ndx)
+{
+       static int32 prev_positive = -1, prev_negative = 1;
+       int32 diff, cnt = 0;
+       char b[6];
+
+       if (protocol_version < 30 || read_batch) {
+               write_int(f, ndx);
+               return;
+       }
+
+       /* Send NDX_DONE as a single-byte 0 with no side effects.  Send
+        * negative nums as a positive after sending a leading 0xFF. */
+       if (ndx >= 0) {
+               diff = ndx - prev_positive;
+               prev_positive = ndx;
+       } else if (ndx == NDX_DONE) {
+               *b = 0;
+               writefd(f, b, 1);
+               return;
+       } else {
+               b[cnt++] = (char)0xFF;
+               ndx = -ndx;
+               diff = ndx - prev_negative;
+               prev_negative = ndx;
+       }
+
+       /* A diff of 1 - 253 is sent as a one-byte diff; a diff of 254 - 32767
+        * or 0 is sent as a 0xFE + a two-byte diff; otherwise we send 0xFE
+        * & all 4 bytes of the (non-negative) num with the high-bit set. */
+       if (diff < 0xFE && diff > 0)
+               b[cnt++] = (char)diff;
+       else if (diff < 0 || diff > 0x7FFF) {
+               b[cnt++] = (char)0xFE;
+               b[cnt++] = (char)((ndx >> 24) | 0x80);
+               b[cnt++] = (char)(ndx >> 16);
+               b[cnt++] = (char)(ndx >> 8);
+               b[cnt++] = (char)ndx;
+       } else {
+               b[cnt++] = (char)0xFE;
+               b[cnt++] = (char)(diff >> 8);
+               b[cnt++] = (char)diff;
+       }
+       writefd(f, b, cnt);
+}
+
+/* Receive a file-list index using a byte-reduction method. */
+int32 read_ndx(int f)
+{
+       static int32 prev_positive = -1, prev_negative = 1;
+       int32 *prev_ptr, num;
+       char b[4];
+
+       if (protocol_version < 30)
+               return read_int(f);
+
+       readfd(f, b, 1);
+       if (CVAL(b, 0) == 0xFF) {
+               readfd(f, b, 1);
+               prev_ptr = &prev_negative;
+       } else if (CVAL(b, 0) == 0)
+               return NDX_DONE;
+       else
+               prev_ptr = &prev_positive;
+       if (CVAL(b, 0) == 0xFE) {
+               readfd(f, b, 2);
+               if (CVAL(b, 0) & 0x80) {
+                       readfd(f, b+2, 2);
+                       num = NVAL4(b, 0x80);
+               } else
+                       num = NVAL2(b, 0) + *prev_ptr;
+       } else
+               num = CVAL(b, 0) + *prev_ptr;
+       *prev_ptr = num;
+       if (prev_ptr == &prev_negative)
+               num = -num;
+       return num;
+}
+
 /**
  * Read a line of up to @p maxlen characters into @p buf (not counting
  * the trailing null).  Strips the (required) trailing newline and all