- Make sure that we can't write via io_flush() when msg_fd_in is
authorWayne Davison <wayned@samba.org>
Sat, 20 Jan 2007 22:10:14 +0000 (22:10 +0000)
committerWayne Davison <wayned@samba.org>
Sat, 20 Jan 2007 22:10:14 +0000 (22:10 +0000)
  temporarily set to -1.
- Got rid of the msg2genr message cache.

io.c

diff --git a/io.c b/io.c
index 567065b..7bf0afc 100644 (file)
--- a/io.c
+++ b/io.c
@@ -107,7 +107,6 @@ static char int_byte_cnt[64] = {
        5, 5, 5, 5, 5, 5, 5, 5, 6, 6, 6, 6, 7, 7, 8, 9, /* (C0 - FF)/4 */
 };
 
-static void msg2sndr_flush(void);
 static void readfd(int fd, char *buffer, size_t N);
 static void writefd(int fd, const char *buf, size_t len);
 static void writefd_unbuffered(int fd, const char *buf, size_t len);
@@ -135,7 +134,7 @@ struct msg_list {
        struct msg_list_item *head, *tail;
 };
 
-static struct msg_list msg2genr, msg2sndr;
+static struct msg_list msg2sndr;
 
 static void flist_ndx_push(struct flist_ndx_list *lp, int ndx)
 {
@@ -249,6 +248,20 @@ static void msg_list_add(struct msg_list *lst, int code, const char *buf, int le
        lst->tail = m;
 }
 
+static void msg2sndr_flush(void)
+{
+       while (msg2sndr.head && io_multiplexing_out) {
+               struct msg_list_item *m = msg2sndr.head;
+               if (!(msg2sndr.head = m->next))
+                       msg2sndr.tail = NULL;
+               stats.total_written += m->len;
+               defer_forwarding_messages = 1;
+               writefd_unbuffered(sock_f_out, m->buf, m->len);
+               defer_forwarding_messages = 0;
+               free(m);
+       }
+}
+
 /* Read a message from the MSG_* fd and handle it.  This is called either
  * during the early stages of being a local sender (up through the sending
  * of the file list) or when we're the generator (to fetch the messages
@@ -263,6 +276,7 @@ static void read_msg_fd(void)
 
        /* Temporarily disable msg_fd_in.  This is needed to avoid looping back
         * to this routine from writefd_unbuffered(). */
+       no_flush++;
        msg_fd_in = -1;
        defer_forwarding_messages++;
 
@@ -359,9 +373,10 @@ static void read_msg_fd(void)
                exit_cleanup(RERR_STREAMIO);
        }
 
-       defer_forwarding_messages--;
+       no_flush--;
        msg_fd_in = fd;
-       msg2sndr_flush();
+       if (!--defer_forwarding_messages)
+               msg2sndr_flush();
 }
 
 /* This is used by the generator to limit how many file transfers can
@@ -375,6 +390,7 @@ void increment_active_files(int ndx, int itemizing, enum logcode code)
                if (hlink_list.head)
                        check_for_finished_hlinks(itemizing, code);
 #endif
+               io_flush(NORMAL_FLUSH);
                read_msg_fd();
        }
 
@@ -416,29 +432,30 @@ static void decrement_flist_in_progress(int ndx, int redo)
                flist->to_redo++;
 }
 
-/* Try to push messages off the list onto the wire.  If we leave with more
- * to do, return 0.  On error, return -1.  If everything flushed, return 1.
- * This is only active in the receiver. */
-static int msg2genr_flush(void)
+/* Write an message to a multiplexed stream. If this fails, rsync exits. */
+static void mplex_write(int fd, enum msgcode code, const char *buf, size_t len)
 {
-       if (msg_fd_out < 0 || no_flush || flist_forward_from >= 0)
-               return -1;
+       char buffer[1024];
+       size_t n = len;
 
-       no_flush++;
-       while (msg2genr.head) {
-               struct msg_list_item *m = msg2genr.head;
-               writefd(msg_fd_out, m->buf, m->len);
-               msg2genr.head = m->next;
-               if (!msg2genr.head)
-                       msg2genr.tail = NULL;
-               free(m);
-       }
-       if (iobuf_out_cnt) {
-               writefd_unbuffered(iobuf_f_out, iobuf_out, iobuf_out_cnt);
-               iobuf_out_cnt = 0;
+       SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
+
+       if (n > sizeof buffer - 4)
+               n = 0;
+       else
+               memcpy(buffer + 4, buf, n);
+
+       writefd_unbuffered(fd, buffer, n+4);
+
+       len -= n;
+       buf += n;
+
+       if (len) {
+               defer_forwarding_messages++;
+               writefd_unbuffered(fd, buf, len);
+               if (!--defer_forwarding_messages)
+                       msg2sndr_flush();
        }
-       no_flush--;
-       return 1;
 }
 
 int send_msg(enum msgcode code, const char *buf, int len)
@@ -451,8 +468,7 @@ int send_msg(enum msgcode code, const char *buf, int len)
                msg_list_add(&msg2sndr, code, buf, len);
                return 1;
        }
-       msg_list_add(&msg2genr, code, buf, len);
-       msg2genr_flush();
+       mplex_write(msg_fd_out, code, buf, len);
        return 1;
 }
 
@@ -465,6 +481,7 @@ void send_msg_int(enum msgcode code, int num)
 
 void wait_for_receiver(void)
 {
+       io_flush(NORMAL_FLUSH);
        read_msg_fd();
 }
 
@@ -554,11 +571,6 @@ static int read_timeout(int fd, char *buf, size_t len)
                FD_ZERO(&r_fds);
                FD_ZERO(&w_fds);
                FD_SET(fd, &r_fds);
-               if (msg2genr.head) {
-                       FD_SET(msg_fd_out, &w_fds);
-                       if (msg_fd_out > maxfd)
-                               maxfd = msg_fd_out;
-               }
                if (io_filesfrom_f_out >= 0) {
                        int new_fd;
                        if (io_filesfrom_buflen == 0) {
@@ -591,9 +603,6 @@ static int read_timeout(int fd, char *buf, size_t len)
                        continue;
                }
 
-               if (msg2genr.head && FD_ISSET(msg_fd_out, &w_fds))
-                       msg2genr_flush();
-
                if (io_filesfrom_f_out >= 0) {
                        if (io_filesfrom_buflen) {
                                if (FD_ISSET(io_filesfrom_f_out, &w_fds)) {
@@ -1211,7 +1220,8 @@ static void writefd_unbuffered(int fd, const char *buf, size_t len)
        int defer_save = defer_forwarding_messages;
        struct timeval tv;
 
-       no_flush++;
+       if (no_flush++)
+               defer_forwarding_messages = 1;
 
        while (total < len) {
                FD_ZERO(&w_fds);
@@ -1296,68 +1306,21 @@ static void writefd_unbuffered(int fd, const char *buf, size_t len)
                }
        }
 
-       defer_forwarding_messages = defer_save;
        no_flush--;
-}
-
-static void msg2sndr_flush(void)
-{
-       if (defer_forwarding_messages)
-               return;
-
-       while (msg2sndr.head && io_multiplexing_out) {
-               struct msg_list_item *m = msg2sndr.head;
-               if (!(msg2sndr.head = m->next))
-                       msg2sndr.tail = NULL;
-               stats.total_written += m->len;
-               defer_forwarding_messages = 1;
-               writefd_unbuffered(sock_f_out, m->buf, m->len);
-               defer_forwarding_messages = 0;
-               free(m);
-       }
-}
-
-/**
- * Write an message to a multiplexed stream. If this fails then rsync
- * exits.
- **/
-static void mplex_write(enum msgcode code, const char *buf, size_t len)
-{
-       char buffer[1024];
-       size_t n = len;
-
-       SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
-
-       if (n > sizeof buffer - 4)
-               n = 0;
-       else
-               memcpy(buffer + 4, buf, n);
-
-       writefd_unbuffered(sock_f_out, buffer, n+4);
-
-       len -= n;
-       buf += n;
-
-       if (len) {
-               defer_forwarding_messages = 1;
-               writefd_unbuffered(sock_f_out, buf, len);
-               defer_forwarding_messages = 0;
+       if (!(defer_forwarding_messages = defer_save))
                msg2sndr_flush();
-       }
 }
 
 void io_flush(int flush_it_all)
 {
-       if (flush_it_all) {
-               msg2genr_flush();
+       if (flush_it_all && !defer_forwarding_messages)
                msg2sndr_flush();
-       }
 
        if (!iobuf_out_cnt || no_flush)
                return;
 
        if (io_multiplexing_out)
-               mplex_write(MSG_DATA, iobuf_out, iobuf_out_cnt);
+               mplex_write(sock_f_out, MSG_DATA, iobuf_out, iobuf_out_cnt);
        else
                writefd_unbuffered(iobuf_f_out, iobuf_out, iobuf_out_cnt);
        iobuf_out_cnt = 0;
@@ -1695,7 +1658,7 @@ int io_multiplex_write(enum msgcode code, const char *buf, size_t len)
                return 0;
        io_flush(NORMAL_FLUSH);
        stats.total_written += (len+4);
-       mplex_write(code, buf, len);
+       mplex_write(sock_f_out, code, buf, len);
        return 1;
 }