X-Git-Url: https://mattmccutchen.net/rsync/rsync.git/blobdiff_plain/8a65e0ce00bb5150c1cc70b985a649e0dc9e5278..be21e29c351698b3f86caf13e446c5e4b19c9fde:/io.c diff --git a/io.c b/io.c index e2304d4f..915d21db 100644 --- a/io.c +++ b/io.c @@ -107,7 +107,7 @@ static char int_byte_cnt[64] = { 5, 5, 5, 5, 5, 5, 5, 5, 6, 6, 6, 6, 7, 7, 8, 9, /* (C0 - FF)/4 */ }; -static int readfd_unbuffered(int fd, char *buf, size_t len); +static void readfd(int fd, char *buffer, size_t N); static void writefd(int fd, const char *buf, size_t len); static void writefd_unbuffered(int fd, const char *buf, size_t len); static void decrement_active_files(int ndx); @@ -263,8 +263,9 @@ static void read_msg_fd(void) /* Temporarily disable msg_fd_in. This is needed to avoid looping back * to this routine from writefd_unbuffered(). */ msg_fd_in = -1; + defer_forwarding_messages++; - readfd_unbuffered(fd, buf, 4); + readfd(fd, buf, 4); tag = IVAL(buf, 0); len = tag & 0xFFFFFF; @@ -284,7 +285,7 @@ static void read_msg_fd(void) case MSG_REDO: if (len != 4 || !am_generator) goto invalid_msg; - readfd_unbuffered(fd, buf, 4); + readfd(fd, buf, 4); if (remove_source_files) decrement_active_files(IVAL(buf,0)); flist_ndx_push(&redo_list, IVAL(buf,0)); @@ -294,7 +295,7 @@ static void read_msg_fd(void) case MSG_FLIST: if (len != 4 || !am_generator || !incremental) goto invalid_msg; - readfd_unbuffered(fd, buf, 4); + readfd(fd, buf, 4); /* Read extra file list from receiver. */ assert(iobuf_in != NULL); assert(iobuf_f_in == fd); @@ -309,13 +310,13 @@ static void read_msg_fd(void) case MSG_DELETED: if (len >= (int)sizeof buf || !am_generator) goto invalid_msg; - readfd_unbuffered(fd, buf, len); + readfd(fd, buf, len); send_msg(MSG_DELETED, buf, len); break; case MSG_SUCCESS: if (len != 4 || !am_generator) goto invalid_msg; - readfd_unbuffered(fd, buf, len); + readfd(fd, buf, len); if (remove_source_files) { decrement_active_files(IVAL(buf,0)); send_msg(MSG_SUCCESS, buf, len); @@ -328,7 +329,7 @@ static void read_msg_fd(void) case MSG_NO_SEND: if (len != 4 || !am_generator) goto invalid_msg; - readfd_unbuffered(fd, buf, len); + readfd(fd, buf, len); if (incremental) decrement_flist_in_progress(IVAL(buf,0), 0); break; @@ -346,7 +347,7 @@ static void read_msg_fd(void) n = len; if (n >= sizeof buf) n = sizeof buf - 1; - readfd_unbuffered(fd, buf, n); + readfd(fd, buf, n); rwrite((enum logcode)tag, buf, n); len -= n; } @@ -357,6 +358,7 @@ static void read_msg_fd(void) exit_cleanup(RERR_STREAMIO); } + defer_forwarding_messages--; msg_fd_in = fd; } @@ -417,7 +419,7 @@ static void decrement_flist_in_progress(int ndx, int redo) * This is only active in the receiver. */ static int msg2genr_flush(void) { - if (msg_fd_out < 0 || no_flush) + if (msg_fd_out < 0 || no_flush || flist_forward_from >= 0) return -1; no_flush++; @@ -538,7 +540,7 @@ static int read_timeout(int fd, char *buf, size_t len) { int n, cnt = 0; - io_flush(NORMAL_FLUSH); + io_flush(FULL_FLUSH); while (cnt == 0) { /* until we manage to read *something* */ @@ -818,8 +820,8 @@ void start_flist_forward(int f_in) void stop_flist_forward() { - io_flush(NORMAL_FLUSH); flist_forward_from = -1; + io_flush(FULL_FLUSH); } /** @@ -947,11 +949,8 @@ static int readfd_unbuffered(int fd, char *buf, size_t len) return cnt; } -/** - * Do a buffered read from @p fd. Don't return until all @p n bytes - * have been read. If all @p n can't be read then exit with an - * error. - **/ +/* Do a buffered read from fd. Don't return until all N bytes have + * been read. If all N can't be read then exit with an error. */ static void readfd(int fd, char *buffer, size_t N) { int cnt; @@ -1345,10 +1344,12 @@ static void mplex_write(enum msgcode code, const char *buf, size_t len) } } -void io_flush(UNUSED(int flush_it_all)) +void io_flush(int flush_it_all) { - msg2genr_flush(); - msg2sndr_flush(); + if (flush_it_all) { + msg2genr_flush(); + msg2sndr_flush(); + } if (!iobuf_out_cnt || no_flush) return;