Added a message queue for the receiver->generator messages to handle the case
authorWayne Davison <wayned@samba.org>
Fri, 20 Apr 2007 22:39:58 +0000 (22:39 +0000)
committerWayne Davison <wayned@samba.org>
Fri, 20 Apr 2007 22:39:58 +0000 (22:39 +0000)
where the message pipe is being used to forward the file-list data.

io.c

diff --git a/io.c b/io.c
index b7ead41..a7603ff 100644 (file)
--- a/io.c
+++ b/io.c
@@ -133,7 +133,7 @@ struct msg_list {
        struct msg_list_item *head, *tail;
 };
 
-static struct msg_list msg2sndr;
+static struct msg_list msg_queue;
 
 static void flist_ndx_push(struct flist_ndx_list *lp, int ndx)
 {
@@ -247,17 +247,29 @@ static void msg_list_add(struct msg_list *lst, int code, const char *buf, int le
        lst->tail = m;
 }
 
-static void msg2sndr_flush(void)
+static void msg_flush(void)
 {
-       while (msg2sndr.head && io_multiplexing_out) {
-               struct msg_list_item *m = msg2sndr.head;
-               if (!(msg2sndr.head = m->next))
-                       msg2sndr.tail = NULL;
-               stats.total_written += m->len;
-               defer_forwarding_messages = 1;
-               writefd_unbuffered(sock_f_out, m->buf, m->len);
-               defer_forwarding_messages = 0;
-               free(m);
+       if (am_generator) {
+               while (msg_queue.head && io_multiplexing_out) {
+                       struct msg_list_item *m = msg_queue.head;
+                       if (!(msg_queue.head = m->next))
+                               msg_queue.tail = NULL;
+                       stats.total_written += m->len;
+                       defer_forwarding_messages++;
+                       writefd_unbuffered(sock_f_out, m->buf, m->len);
+                       defer_forwarding_messages--;
+                       free(m);
+               }
+       } else {
+               while (msg_queue.head) {
+                       struct msg_list_item *m = msg_queue.head;
+                       if (!(msg_queue.head = m->next))
+                               msg_queue.tail = NULL;
+                       defer_forwarding_messages++;
+                       writefd_unbuffered(msg_fd_out, m->buf, m->len);
+                       defer_forwarding_messages--;
+                       free(m);
+               }
        }
 }
 
@@ -379,7 +391,7 @@ static void read_msg_fd(void)
        no_flush--;
        msg_fd_in = fd;
        if (!--defer_forwarding_messages)
-               msg2sndr_flush();
+               msg_flush();
 }
 
 /* This is used by the generator to limit how many file transfers can
@@ -456,7 +468,7 @@ static void mplex_write(int fd, enum msgcode code, const char *buf, size_t len)
                defer_forwarding_messages++;
                writefd_unbuffered(fd, buf, len);
                if (!--defer_forwarding_messages)
-                       msg2sndr_flush();
+                       msg_flush();
        }
 }
 
@@ -467,10 +479,13 @@ int send_msg(enum msgcode code, const char *buf, int len)
                        return io_multiplex_write(code, buf, len);
                if (!io_multiplexing_out)
                        return 0;
-               msg_list_add(&msg2sndr, code, buf, len);
+               msg_list_add(&msg_queue, code, buf, len);
                return 1;
        }
-       mplex_write(msg_fd_out, code, buf, len);
+       if (flist_forward_from >= 0)
+               msg_list_add(&msg_queue, code, buf, len);
+       else
+               mplex_write(msg_fd_out, code, buf, len);
        return 1;
 }
 
@@ -1254,11 +1269,11 @@ static void writefd_unbuffered(int fd, const char *buf, size_t len)
        size_t n, total = 0;
        fd_set w_fds, r_fds, e_fds;
        int maxfd, count, cnt, using_r_fds;
-       int defer_save = defer_forwarding_messages;
+       int defer_inc = 0;
        struct timeval tv;
 
        if (no_flush++)
-               defer_forwarding_messages = 1;
+               defer_forwarding_messages++, defer_inc++;
 
        while (total < len) {
                FD_ZERO(&w_fds);
@@ -1337,7 +1352,7 @@ static void writefd_unbuffered(int fd, const char *buf, size_t len)
                }
 
                total += cnt;
-               defer_forwarding_messages = 1;
+               defer_forwarding_messages++, defer_inc++;
 
                if (fd == sock_f_out) {
                        if (io_timeout || am_generator)
@@ -1347,15 +1362,12 @@ static void writefd_unbuffered(int fd, const char *buf, size_t len)
        }
 
        no_flush--;
-       if (!(defer_forwarding_messages = defer_save))
-               msg2sndr_flush();
+       if (!(defer_forwarding_messages -= defer_inc))
+               msg_flush();
 }
 
 void io_flush(int flush_it_all)
 {
-       if (flush_it_all && !defer_forwarding_messages)
-               msg2sndr_flush();
-
        if (!iobuf_out_cnt || no_flush)
                return;
 
@@ -1364,6 +1376,9 @@ void io_flush(int flush_it_all)
        else
                writefd_unbuffered(iobuf_f_out, iobuf_out, iobuf_out_cnt);
        iobuf_out_cnt = 0;
+
+       if (flush_it_all && !defer_forwarding_messages)
+               msg_flush();
 }
 
 static void writefd(int fd, const char *buf, size_t len)