- Optimized the msg_list_item structure to make the buffer an
authorWayne Davison <wayned@samba.org>
Thu, 16 Mar 2006 00:57:54 +0000 (00:57 +0000)
committerWayne Davison <wayned@samba.org>
Thu, 16 Mar 2006 00:57:54 +0000 (00:57 +0000)
  integral part of the structure instead of pointer to a
  separately allocated buffer.
- Improved the code that was ensuring that no messages from the
  receiver could be merged into the middle of a partially written
  buffer from the generator.  The new code ensures that we never
  avoid reading the messages from the receiver (like we used to).
  This ensures that the generator will not hang when the receiver
  got a read error on the socket, sent us a message about it, but
  the socket in the generator never becomes writable for it to get
  a similar error (now we are assured of getting the receiver's
  note about their read error, and we know to shut things down).

io.c

diff --git a/io.c b/io.c
index 475ac40..4ade1ea 100644 (file)
--- a/io.c
+++ b/io.c
@@ -102,7 +102,7 @@ static char io_filesfrom_buf[2048];
 static char *io_filesfrom_bp;
 static char io_filesfrom_lastchar;
 static int io_filesfrom_buflen;
 static char *io_filesfrom_bp;
 static char io_filesfrom_lastchar;
 static int io_filesfrom_buflen;
-static size_t contiguous_write_len = 0;
+static int defer_forwarding_messages = 0;
 static int select_timeout = SELECT_TIMEOUT;
 static int active_filecnt = 0;
 static OFF_T active_bytecnt = 0;
 static int select_timeout = SELECT_TIMEOUT;
 static int active_filecnt = 0;
 static OFF_T active_bytecnt = 0;
@@ -122,15 +122,15 @@ static struct flist_ndx_list redo_list, hlink_list;
 
 struct msg_list_item {
        struct msg_list_item *next;
 
 struct msg_list_item {
        struct msg_list_item *next;
-       char *buf;
        int len;
        int len;
+       char buf[1];
 };
 
 struct msg_list {
        struct msg_list_item *head, *tail;
 };
 
 };
 
 struct msg_list {
        struct msg_list_item *head, *tail;
 };
 
-static struct msg_list msg_list;
+static struct msg_list msg2genr, msg2sndr;
 
 static void flist_ndx_push(struct flist_ndx_list *lp, int ndx)
 {
 
 static void flist_ndx_push(struct flist_ndx_list *lp, int ndx)
 {
@@ -226,23 +226,22 @@ void set_msg_fd_out(int fd)
 }
 
 /* Add a message to the pending MSG_* list. */
 }
 
 /* Add a message to the pending MSG_* list. */
-static void msg_list_add(int code, char *buf, int len)
+static void msg_list_add(struct msg_list *lst, int code, char *buf, int len)
 {
 {
-       struct msg_list_item *ml;
+       struct msg_list_item *m;
+       int sz = len + 4 + sizeof m[0] - 1;
 
 
-       if (!(ml = new(struct msg_list_item)))
+       if (!(m = (struct msg_list_item *)new_array(char, sz)))
                out_of_memory("msg_list_add");
                out_of_memory("msg_list_add");
-       ml->next = NULL;
-       if (!(ml->buf = new_array(char, len+4)))
-               out_of_memory("msg_list_add");
-       SIVAL(ml->buf, 0, ((code+MPLEX_BASE)<<24) | len);
-       memcpy(ml->buf+4, buf, len);
-       ml->len = len+4;
-       if (msg_list.tail)
-               msg_list.tail->next = ml;
+       m->next = NULL;
+       m->len = len + 4;
+       SIVAL(m->buf, 0, ((code+MPLEX_BASE)<<24) | len);
+       memcpy(m->buf + 4, buf, len);
+       if (lst->tail)
+               lst->tail->next = m;
        else
        else
-               msg_list.head = ml;
-       msg_list.tail = ml;
+               lst->head = m;
+       lst->tail = m;
 }
 
 /* Read a message from the MSG_* fd and handle it.  This is called either
 }
 
 /* Read a message from the MSG_* fd and handle it.  This is called either
@@ -290,7 +289,10 @@ static void read_msg_fd(void)
                        exit_cleanup(RERR_STREAMIO);
                }
                read_loop(fd, buf, len);
                        exit_cleanup(RERR_STREAMIO);
                }
                read_loop(fd, buf, len);
-               io_multiplex_write(MSG_DELETED, buf, len);
+               if (defer_forwarding_messages)
+                       msg_list_add(&msg2sndr, MSG_DELETED, buf, len);
+               else
+                       io_multiplex_write(MSG_DELETED, buf, len);
                break;
        case MSG_SUCCESS:
                if (len != 4 || !am_generator) {
                break;
        case MSG_SUCCESS:
                if (len != 4 || !am_generator) {
@@ -300,7 +302,10 @@ static void read_msg_fd(void)
                read_loop(fd, buf, len);
                if (remove_sent_files) {
                        decrement_active_files(IVAL(buf,0));
                read_loop(fd, buf, len);
                if (remove_sent_files) {
                        decrement_active_files(IVAL(buf,0));
-                       io_multiplex_write(MSG_SUCCESS, buf, len);
+                       if (defer_forwarding_messages)
+                               msg_list_add(&msg2sndr, MSG_SUCCESS, buf, len);
+                       else
+                               io_multiplex_write(MSG_SUCCESS, buf, len);
                }
                if (preserve_hard_links)
                        flist_ndx_push(&hlink_list, IVAL(buf,0));
                }
                if (preserve_hard_links)
                        flist_ndx_push(&hlink_list, IVAL(buf,0));
@@ -320,7 +325,10 @@ static void read_msg_fd(void)
                        if (n >= sizeof buf)
                                n = sizeof buf - 1;
                        read_loop(fd, buf, n);
                        if (n >= sizeof buf)
                                n = sizeof buf - 1;
                        read_loop(fd, buf, n);
-                       rwrite((enum logcode)tag, buf, n);
+                       if (am_generator && am_server && defer_forwarding_messages)
+                               msg_list_add(&msg2sndr, tag, buf, n);
+                       else
+                               rwrite((enum logcode)tag, buf, n);
                        len -= n;
                }
                break;
                        len -= n;
                }
                break;
@@ -357,7 +365,7 @@ void decrement_active_files(int ndx)
 /* Try to push messages off the list onto the wire.  If we leave with more
  * to do, return 0.  On error, return -1.  If everything flushed, return 1.
  * This is only active in the receiver. */
 /* Try to push messages off the list onto the wire.  If we leave with more
  * to do, return 0.  On error, return -1.  If everything flushed, return 1.
  * This is only active in the receiver. */
-static int msg_list_flush(int flush_it_all)
+static int msg2genr_flush(int flush_it_all)
 {
        static int written = 0;
        struct timeval tv;
 {
        static int written = 0;
        struct timeval tv;
@@ -366,9 +374,9 @@ static int msg_list_flush(int flush_it_all)
        if (msg_fd_out < 0)
                return -1;
 
        if (msg_fd_out < 0)
                return -1;
 
-       while (msg_list.head) {
-               struct msg_list_item *ml = msg_list.head;
-               int n = write(msg_fd_out, ml->buf + written, ml->len - written);
+       while (msg2genr.head) {
+               struct msg_list_item *m = msg2genr.head;
+               int n = write(msg_fd_out, m->buf + written, m->len - written);
                if (n < 0) {
                        if (errno == EINTR)
                                continue;
                if (n < 0) {
                        if (errno == EINTR)
                                continue;
@@ -382,12 +390,11 @@ static int msg_list_flush(int flush_it_all)
                        tv.tv_usec = 0;
                        if (!select(msg_fd_out+1, NULL, &fds, NULL, &tv))
                                check_timeout();
                        tv.tv_usec = 0;
                        if (!select(msg_fd_out+1, NULL, &fds, NULL, &tv))
                                check_timeout();
-               } else if ((written += n) == ml->len) {
-                       free(ml->buf);
-                       msg_list.head = ml->next;
-                       if (!msg_list.head)
-                               msg_list.tail = NULL;
-                       free(ml);
+               } else if ((written += n) == m->len) {
+                       msg2genr.head = m->next;
+                       if (!msg2genr.head)
+                               msg2genr.tail = NULL;
+                       free(m);
                        written = 0;
                }
        }
                        written = 0;
                }
        }
@@ -400,8 +407,8 @@ void send_msg(enum msgcode code, char *buf, int len)
                io_multiplex_write(code, buf, len);
                return;
        }
                io_multiplex_write(code, buf, len);
                return;
        }
-       msg_list_add(code, buf, len);
-       msg_list_flush(NORMAL_FLUSH);
+       msg_list_add(&msg2genr, code, buf, len);
+       msg2genr_flush(NORMAL_FLUSH);
 }
 
 int get_redo_num(int itemizing, enum logcode code)
 }
 
 int get_redo_num(int itemizing, enum logcode code)
@@ -498,7 +505,7 @@ static int read_timeout(int fd, char *buf, size_t len)
                FD_ZERO(&r_fds);
                FD_ZERO(&w_fds);
                FD_SET(fd, &r_fds);
                FD_ZERO(&r_fds);
                FD_ZERO(&w_fds);
                FD_SET(fd, &r_fds);
-               if (msg_list.head) {
+               if (msg2genr.head) {
                        FD_SET(msg_fd_out, &w_fds);
                        if (msg_fd_out > maxfd)
                                maxfd = msg_fd_out;
                        FD_SET(msg_fd_out, &w_fds);
                        if (msg_fd_out > maxfd)
                                maxfd = msg_fd_out;
@@ -535,8 +542,8 @@ static int read_timeout(int fd, char *buf, size_t len)
                        continue;
                }
 
                        continue;
                }
 
-               if (msg_list.head && FD_ISSET(msg_fd_out, &w_fds))
-                       msg_list_flush(NORMAL_FLUSH);
+               if (msg2genr.head && FD_ISSET(msg_fd_out, &w_fds))
+                       msg2genr_flush(NORMAL_FLUSH);
 
                if (io_filesfrom_f_out >= 0) {
                        if (io_filesfrom_buflen) {
 
                if (io_filesfrom_f_out >= 0) {
                        if (io_filesfrom_buflen) {
@@ -1065,7 +1072,7 @@ static void writefd_unbuffered(int fd,char *buf,size_t len)
                FD_SET(fd,&w_fds);
                maxfd = fd;
 
                FD_SET(fd,&w_fds);
                maxfd = fd;
 
-               if (msg_fd_in >= 0 && len-total >= contiguous_write_len) {
+               if (msg_fd_in >= 0) {
                        FD_ZERO(&r_fds);
                        FD_SET(msg_fd_in,&r_fds);
                        if (msg_fd_in > maxfd)
                        FD_ZERO(&r_fds);
                        FD_SET(msg_fd_in,&r_fds);
                        if (msg_fd_in > maxfd)
@@ -1094,6 +1101,17 @@ static void writefd_unbuffered(int fd,char *buf,size_t len)
                if (!FD_ISSET(fd, &w_fds))
                        continue;
 
                if (!FD_ISSET(fd, &w_fds))
                        continue;
 
+               if (msg2sndr.head && !defer_forwarding_messages) {
+                       struct msg_list_item *m = msg2sndr.head;
+                       if (!(msg2sndr.head = m->next))
+                               msg2sndr.tail = NULL;
+                       defer_forwarding_messages = 1;
+                       io_multiplex_write(IVAL(m->buf,0), m->buf+4, m->len-4);
+                       defer_forwarding_messages = 0;
+                       free(m);
+                       continue;
+               }
+
                n = len - total;
                if (bwlimit_writemax && n > bwlimit_writemax)
                        n = bwlimit_writemax;
                n = len - total;
                if (bwlimit_writemax && n > bwlimit_writemax)
                        n = bwlimit_writemax;
@@ -1127,6 +1145,7 @@ static void writefd_unbuffered(int fd,char *buf,size_t len)
                }
 
                total += cnt;
                }
 
                total += cnt;
+               defer_forwarding_messages = 1;
 
                if (fd == sock_f_out) {
                        if (io_timeout || am_generator)
 
                if (fd == sock_f_out) {
                        if (io_timeout || am_generator)
@@ -1134,6 +1153,7 @@ static void writefd_unbuffered(int fd,char *buf,size_t len)
                        sleep_for_bwlimit(cnt);
                }
        }
                        sleep_for_bwlimit(cnt);
                }
        }
+       defer_forwarding_messages = 0;
 
        no_flush--;
 }
 
        no_flush--;
 }
@@ -1149,13 +1169,6 @@ static void mplex_write(enum msgcode code, char *buf, size_t len)
 
        SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
 
 
        SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
 
-       /* When the generator reads messages from the msg_fd_in pipe, it can
-        * cause output to occur down the socket.  Setting contiguous_write_len
-        * prevents the reading of msg_fd_in once we actually start to write
-        * this sequence of data (though we might read it before the start). */
-       if (am_generator && msg_fd_in >= 0)
-               contiguous_write_len = len + 4;
-
        if (n > sizeof buffer - 4)
                n = 0;
        else
        if (n > sizeof buffer - 4)
                n = 0;
        else
@@ -1168,14 +1181,11 @@ static void mplex_write(enum msgcode code, char *buf, size_t len)
 
        if (len)
                writefd_unbuffered(sock_f_out, buf, len);
 
        if (len)
                writefd_unbuffered(sock_f_out, buf, len);
-
-       if (am_generator && msg_fd_in >= 0)
-               contiguous_write_len = 0;
 }
 
 void io_flush(int flush_it_all)
 {
 }
 
 void io_flush(int flush_it_all)
 {
-       msg_list_flush(flush_it_all);
+       msg2genr_flush(flush_it_all);
 
        if (!iobuf_out_cnt || no_flush)
                return;
 
        if (!iobuf_out_cnt || no_flush)
                return;