X-Git-Url: https://mattmccutchen.net/rsync/rsync.git/blobdiff_plain/954bbed84aef742d65327a4de0ea214ed90cb6b3..de3438407c4f976c55172093795b87e49cb4a6e2:/io.c diff --git a/io.c b/io.c index 4ade1ea1..09aab912 100644 --- a/io.c +++ b/io.c @@ -333,7 +333,8 @@ static void read_msg_fd(void) } break; default: - rprintf(FERROR, "unknown message %d:%d\n", tag, len); + rprintf(FERROR, "unknown message %d:%d [%s]\n", + tag, len, who_am_i()); exit_cleanup(RERR_STREAMIO); } @@ -1063,6 +1064,7 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) size_t n, total = 0; fd_set w_fds, r_fds; int maxfd, count, cnt, using_r_fds; + int defer_save = defer_forwarding_messages; struct timeval tv; no_flush++; @@ -1101,17 +1103,6 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) if (!FD_ISSET(fd, &w_fds)) continue; - if (msg2sndr.head && !defer_forwarding_messages) { - struct msg_list_item *m = msg2sndr.head; - if (!(msg2sndr.head = m->next)) - msg2sndr.tail = NULL; - defer_forwarding_messages = 1; - io_multiplex_write(IVAL(m->buf,0), m->buf+4, m->len-4); - defer_forwarding_messages = 0; - free(m); - continue; - } - n = len - total; if (bwlimit_writemax && n > bwlimit_writemax) n = bwlimit_writemax; @@ -1153,11 +1144,28 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) sleep_for_bwlimit(cnt); } } - defer_forwarding_messages = 0; + defer_forwarding_messages = defer_save; no_flush--; } +static void msg2sndr_flush(void) +{ + if (defer_forwarding_messages) + return; + + while (msg2sndr.head && io_multiplexing_out) { + struct msg_list_item *m = msg2sndr.head; + if (!(msg2sndr.head = m->next)) + msg2sndr.tail = NULL; + stats.total_written += m->len; + defer_forwarding_messages = 1; + writefd_unbuffered(sock_f_out, m->buf, m->len); + defer_forwarding_messages = 0; + free(m); + } +} + /** * Write an message to a multiplexed stream. If this fails then rsync * exits. @@ -1179,13 +1187,18 @@ static void mplex_write(enum msgcode code, char *buf, size_t len) len -= n; buf += n; - if (len) + if (len) { + defer_forwarding_messages = 1; writefd_unbuffered(sock_f_out, buf, len); + defer_forwarding_messages = 0; + msg2sndr_flush(); + } } void io_flush(int flush_it_all) { msg2genr_flush(flush_it_all); + msg2sndr_flush(); if (!iobuf_out_cnt || no_flush) return;