From e4c877cf7082ddcb4a979fa7a8b252f95a34eb62 Mon Sep 17 00:00:00 2001 From: Wayne Davison Date: Fri, 20 Apr 2007 22:39:58 +0000 Subject: [PATCH] Added a message queue for the receiver->generator messages to handle the case where the message pipe is being used to forward the file-list data. --- io.c | 61 +++++++++++++++++++++++++++++++++++++----------------------- 1 file changed, 38 insertions(+), 23 deletions(-) diff --git a/io.c b/io.c index b7ead41f..a7603ff8 100644 --- a/io.c +++ b/io.c @@ -133,7 +133,7 @@ struct msg_list { struct msg_list_item *head, *tail; }; -static struct msg_list msg2sndr; +static struct msg_list msg_queue; static void flist_ndx_push(struct flist_ndx_list *lp, int ndx) { @@ -247,17 +247,29 @@ static void msg_list_add(struct msg_list *lst, int code, const char *buf, int le lst->tail = m; } -static void msg2sndr_flush(void) +static void msg_flush(void) { - while (msg2sndr.head && io_multiplexing_out) { - struct msg_list_item *m = msg2sndr.head; - if (!(msg2sndr.head = m->next)) - msg2sndr.tail = NULL; - stats.total_written += m->len; - defer_forwarding_messages = 1; - writefd_unbuffered(sock_f_out, m->buf, m->len); - defer_forwarding_messages = 0; - free(m); + if (am_generator) { + while (msg_queue.head && io_multiplexing_out) { + struct msg_list_item *m = msg_queue.head; + if (!(msg_queue.head = m->next)) + msg_queue.tail = NULL; + stats.total_written += m->len; + defer_forwarding_messages++; + writefd_unbuffered(sock_f_out, m->buf, m->len); + defer_forwarding_messages--; + free(m); + } + } else { + while (msg_queue.head) { + struct msg_list_item *m = msg_queue.head; + if (!(msg_queue.head = m->next)) + msg_queue.tail = NULL; + defer_forwarding_messages++; + writefd_unbuffered(msg_fd_out, m->buf, m->len); + defer_forwarding_messages--; + free(m); + } } } @@ -379,7 +391,7 @@ static void read_msg_fd(void) no_flush--; msg_fd_in = fd; if (!--defer_forwarding_messages) - msg2sndr_flush(); + msg_flush(); } /* This is used by the generator to limit how many file transfers can @@ -456,7 +468,7 @@ static void mplex_write(int fd, enum msgcode code, const char *buf, size_t len) defer_forwarding_messages++; writefd_unbuffered(fd, buf, len); if (!--defer_forwarding_messages) - msg2sndr_flush(); + msg_flush(); } } @@ -467,10 +479,13 @@ int send_msg(enum msgcode code, const char *buf, int len) return io_multiplex_write(code, buf, len); if (!io_multiplexing_out) return 0; - msg_list_add(&msg2sndr, code, buf, len); + msg_list_add(&msg_queue, code, buf, len); return 1; } - mplex_write(msg_fd_out, code, buf, len); + if (flist_forward_from >= 0) + msg_list_add(&msg_queue, code, buf, len); + else + mplex_write(msg_fd_out, code, buf, len); return 1; } @@ -1254,11 +1269,11 @@ static void writefd_unbuffered(int fd, const char *buf, size_t len) size_t n, total = 0; fd_set w_fds, r_fds, e_fds; int maxfd, count, cnt, using_r_fds; - int defer_save = defer_forwarding_messages; + int defer_inc = 0; struct timeval tv; if (no_flush++) - defer_forwarding_messages = 1; + defer_forwarding_messages++, defer_inc++; while (total < len) { FD_ZERO(&w_fds); @@ -1337,7 +1352,7 @@ static void writefd_unbuffered(int fd, const char *buf, size_t len) } total += cnt; - defer_forwarding_messages = 1; + defer_forwarding_messages++, defer_inc++; if (fd == sock_f_out) { if (io_timeout || am_generator) @@ -1347,15 +1362,12 @@ static void writefd_unbuffered(int fd, const char *buf, size_t len) } no_flush--; - if (!(defer_forwarding_messages = defer_save)) - msg2sndr_flush(); + if (!(defer_forwarding_messages -= defer_inc)) + msg_flush(); } void io_flush(int flush_it_all) { - if (flush_it_all && !defer_forwarding_messages) - msg2sndr_flush(); - if (!iobuf_out_cnt || no_flush) return; @@ -1364,6 +1376,9 @@ void io_flush(int flush_it_all) else writefd_unbuffered(iobuf_f_out, iobuf_out, iobuf_out_cnt); iobuf_out_cnt = 0; + + if (flush_it_all && !defer_forwarding_messages) + msg_flush(); } static void writefd(int fd, const char *buf, size_t len) -- 2.34.1