From 3a748b2676428141599a47f4f6d9684896843ede Mon Sep 17 00:00:00 2001 From: Wayne Davison Date: Thu, 16 Mar 2006 03:12:42 +0000 Subject: [PATCH] Updated to work with the newest io.c code. --- threaded-receiver.diff | 191 +++++++++++++++++++++++++---------------- 1 file changed, 115 insertions(+), 76 deletions(-) diff --git a/threaded-receiver.diff b/threaded-receiver.diff index d41f03f..d79af22 100644 --- a/threaded-receiver.diff +++ b/threaded-receiver.diff @@ -314,7 +314,15 @@ After applying this patch, run these commands for a successful build: int sock_f_in = -1; int sock_f_out = -1; -@@ -110,27 +106,32 @@ static OFF_T active_bytecnt = 0; +@@ -102,7 +98,6 @@ static char io_filesfrom_buf[2048]; + static char *io_filesfrom_bp; + static char io_filesfrom_lastchar; + static int io_filesfrom_buflen; +-static int defer_forwarding_messages = 0; + static int select_timeout = SELECT_TIMEOUT; + static int active_filecnt = 0; + static OFF_T active_bytecnt = 0; +@@ -110,27 +105,31 @@ static OFF_T active_bytecnt = 0; static void read_loop(int fd, char *buf, size_t len); struct flist_ndx_item { @@ -336,10 +344,9 @@ After applying this patch, run these commands for a successful build: struct msg_list_item { - struct msg_list_item *next; + volatile struct msg_list_item *next; -+ pthread_mutex_t mutex; - char *buf; int len; + enum msgcode code; + char buf[1]; }; struct msg_list { @@ -348,12 +355,12 @@ After applying this patch, run these commands for a successful build: + pthread_mutex_t mutex; }; --static struct msg_list msg_list; +-static struct msg_list msg2genr, msg2sndr; +static struct msg_list msg_list = { NULL, NULL, PTHREAD_MUTEX_INITIALIZER }; static void flist_ndx_push(struct flist_ndx_list *lp, int ndx) { -@@ -140,27 +141,31 @@ static void flist_ndx_push(struct flist_ +@@ -140,27 +139,31 @@ static void flist_ndx_push(struct flist_ out_of_memory("flist_ndx_push"); item->next = NULL; item->ndx = ndx; @@ -391,7 +398,7 @@ After applying this patch, run these commands for a successful build: return ndx; } -@@ -169,7 +174,7 @@ static void check_timeout(void) +@@ -169,7 +172,7 @@ static void check_timeout(void) { time_t t; @@ -400,7 +407,7 @@ After applying this patch, run these commands for a successful build: return; if (!last_io_in) { -@@ -210,45 +215,40 @@ void set_io_timeout(int secs) +@@ -210,44 +213,38 @@ void set_io_timeout(int secs) /* Setup the fd used to receive MSG_* messages. Only needed during the * early stages of being a local sender (up through the sending of the @@ -421,31 +428,34 @@ After applying this patch, run these commands for a successful build: -} - /* Add a message to the pending MSG_* list. */ - static void msg_list_add(int code, char *buf, int len) +-static void msg_list_add(struct msg_list *lst, int code, char *buf, int len) ++static void msg_list_add(int code, char *buf, int len) { - struct msg_list_item *ml; + struct msg_list_item *m; +- int sz = len + 4 + sizeof m[0] - 1; ++ int sz = len + sizeof m[0] - 1; + assert(am_receiver()); - if (!(ml = new(struct msg_list_item))) - out_of_memory("msg_list_add"); - ml->next = NULL; -- if (!(ml->buf = new_array(char, len+4))) -+ /* NOTE: the "+ 1" allows rwrite() to use the buf! */ -+ if (!(ml->buf = new_array(char, len + 1))) + if (!(m = (struct msg_list_item *)new_array(char, sz))) out_of_memory("msg_list_add"); -- SIVAL(ml->buf, 0, ((code+MPLEX_BASE)<<24) | len); -- memcpy(ml->buf+4, buf, len); -- ml->len = len+4; -+ memcpy(ml->buf, buf, len); -+ ml->len = len; -+ ml->code = code; + m->next = NULL; +- m->len = len + 4; +- SIVAL(m->buf, 0, ((code+MPLEX_BASE)<<24) | len); +- memcpy(m->buf + 4, buf, len); +- if (lst->tail) +- lst->tail->next = m; ++ m->len = len; ++ m->code = code; ++ memcpy(m->buf, buf, len); + + pthread_mutex_lock(&msg_list.mutex); - if (msg_list.tail) - msg_list.tail->next = ml; ++ if (msg_list.tail) ++ msg_list.tail->next = m; else - msg_list.head = ml; - msg_list.tail = ml; +- lst->head = m; +- lst->tail = m; ++ msg_list.head = m; ++ msg_list.tail = m; + pthread_mutex_unlock(&msg_list.mutex); } @@ -458,7 +468,7 @@ After applying this patch, run these commands for a successful build: static void read_msg_fd(void) { char buf[2048]; -@@ -267,51 +267,6 @@ static void read_msg_fd(void) +@@ -266,57 +263,6 @@ static void read_msg_fd(void) tag = (tag >> 24) - MPLEX_BASE; switch (tag) { @@ -485,7 +495,10 @@ After applying this patch, run these commands for a successful build: - exit_cleanup(RERR_STREAMIO); - } - read_loop(fd, buf, len); -- io_multiplex_write(MSG_DELETED, buf, len); +- if (defer_forwarding_messages) +- msg_list_add(&msg2sndr, MSG_DELETED, buf, len); +- else +- io_multiplex_write(MSG_DELETED, buf, len); - break; - case MSG_SUCCESS: - if (len != 4 || !am_generator) { @@ -495,7 +508,10 @@ After applying this patch, run these commands for a successful build: - read_loop(fd, buf, len); - if (remove_sent_files) { - decrement_active_files(IVAL(buf,0)); -- io_multiplex_write(MSG_SUCCESS, buf, len); +- if (defer_forwarding_messages) +- msg_list_add(&msg2sndr, MSG_SUCCESS, buf, len); +- else +- io_multiplex_write(MSG_SUCCESS, buf, len); - } - if (preserve_hard_links) - flist_ndx_push(&hlink_list, IVAL(buf,0)); @@ -510,7 +526,19 @@ After applying this patch, run these commands for a successful build: case MSG_INFO: case MSG_ERROR: case MSG_LOG: -@@ -354,71 +309,75 @@ void decrement_active_files(int ndx) +@@ -325,10 +271,7 @@ static void read_msg_fd(void) + if (n >= sizeof buf) + n = sizeof buf - 1; + read_loop(fd, buf, n); +- if (am_generator && am_server && defer_forwarding_messages) +- msg_list_add(&msg2sndr, tag, buf, n); +- else +- rwrite((enum logcode)tag, buf, n); ++ rwrite((enum logcode)tag, buf, n); + len -= n; + } + break; +@@ -362,70 +305,74 @@ void decrement_active_files(int ndx) active_bytecnt -= the_file_list->files[ndx]->length; } @@ -518,8 +546,9 @@ After applying this patch, run these commands for a successful build: +/* Try to pop messages off the list onto the wire. If we leave with more * to do, return 0. On error, return -1. If everything flushed, return 1. - * This is only active in the receiver. */ +-static int msg2genr_flush(int flush_it_all) + * This is only called by the generator. */ - static int msg_list_flush(int flush_it_all) ++static int msg_list_flush(int flush_it_all) { - static int written = 0; - struct timeval tv; @@ -528,11 +557,9 @@ After applying this patch, run these commands for a successful build: - if (msg_fd_out < 0) - return -1; - -+ assert(am_generator()); -+ no_flush++; - while (msg_list.head) { -- struct msg_list_item *ml = msg_list.head; -- int n = write(msg_fd_out, ml->buf + written, ml->len - written); +- while (msg2genr.head) { +- struct msg_list_item *m = msg2genr.head; +- int n = write(msg_fd_out, m->buf + written, m->len - written); - if (n < 0) { - if (errno == EINTR) - continue; @@ -546,33 +573,34 @@ After applying this patch, run these commands for a successful build: - tv.tv_usec = 0; - if (!select(msg_fd_out+1, NULL, &fds, NULL, &tv)) - check_timeout(); -- } else if ((written += n) == ml->len) { -- free(ml->buf); -- msg_list.head = ml->next; -- if (!msg_list.head) -- msg_list.tail = NULL; -- free(ml); +- } else if ((written += n) == m->len) { +- msg2genr.head = m->next; +- if (!msg2genr.head) +- msg2genr.tail = NULL; +- free(m); - written = 0; -+ struct msg_list_item *ml = (struct msg_list_item *)msg_list.head; -+ switch (ml->code) { ++ assert(am_generator()); ++ no_flush++; ++ while (msg_list.head) { ++ struct msg_list_item *m = (struct msg_list_item *)msg_list.head; ++ switch (m->code) { + case MSG_SOCKERR: + close_multiplexing_out(); + /* FALL THROUGH */ + case MSG_INFO: + case MSG_ERROR: + case MSG_LOG: -+ rwrite(ml->code, ml->buf, ml->len); ++ rwrite(m->code, m->buf, m->len); + break; + default: -+ io_multiplex_write(ml->code, ml->buf, ml->len); ++ io_multiplex_write(m->code, m->buf, m->len); + break; } + pthread_mutex_lock(&msg_list.mutex); -+ if (!(msg_list.head = ml->next)) ++ if (!(msg_list.head = m->next)) + msg_list.tail = NULL; + pthread_mutex_unlock(&msg_list.mutex); -+ free(ml->buf); -+ free(ml); ++ free(m); + if (!flush_it_all) + break; } @@ -590,8 +618,8 @@ After applying this patch, run these commands for a successful build: io_multiplex_write(code, buf, len); - return; - } -- msg_list_add(code, buf, len); -- msg_list_flush(NORMAL_FLUSH); +- msg_list_add(&msg2genr, code, buf, len); +- msg2genr_flush(NORMAL_FLUSH); } -int get_redo_num(int itemizing, enum logcode code) @@ -630,11 +658,11 @@ After applying this patch, run these commands for a successful build: return flist_ndx_pop(&hlink_list); } -@@ -498,11 +457,6 @@ static int read_timeout(int fd, char *bu +@@ -505,11 +452,6 @@ static int read_timeout(int fd, char *bu FD_ZERO(&r_fds); FD_ZERO(&w_fds); FD_SET(fd, &r_fds); -- if (msg_list.head) { +- if (msg2genr.head) { - FD_SET(msg_fd_out, &w_fds); - if (msg_fd_out > maxfd) - maxfd = msg_fd_out; @@ -642,17 +670,17 @@ After applying this patch, run these commands for a successful build: if (io_filesfrom_f_out >= 0) { int new_fd; if (io_filesfrom_buflen == 0) { -@@ -535,9 +489,6 @@ static int read_timeout(int fd, char *bu +@@ -542,9 +484,6 @@ static int read_timeout(int fd, char *bu continue; } -- if (msg_list.head && FD_ISSET(msg_fd_out, &w_fds)) -- msg_list_flush(NORMAL_FLUSH); +- if (msg2genr.head && FD_ISSET(msg_fd_out, &w_fds)) +- msg2genr_flush(NORMAL_FLUSH); - if (io_filesfrom_f_out >= 0) { if (io_filesfrom_buflen) { if (FD_ISSET(io_filesfrom_f_out, &w_fds)) { -@@ -859,6 +810,8 @@ static void readfd(int fd, char *buffer, +@@ -866,6 +805,8 @@ static void readfd(int fd, char *buffer, } if (fd == write_batch_monitor_in) { @@ -661,7 +689,26 @@ After applying this patch, run these commands for a successful build: if ((size_t)write(batch_fd, buffer, total) != total) exit_cleanup(RERR_FILEIO); } -@@ -1119,7 +1072,6 @@ static void writefd_unbuffered(int fd,ch +@@ -1101,18 +1042,6 @@ static void writefd_unbuffered(int fd,ch + if (!FD_ISSET(fd, &w_fds)) + continue; + +- if (msg2sndr.head && !defer_forwarding_messages) { +- struct msg_list_item *m = msg2sndr.head; +- int code = *((uchar*)m->buf+3) - MPLEX_BASE; +- if (!(msg2sndr.head = m->next)) +- msg2sndr.tail = NULL; +- defer_forwarding_messages = 1; +- io_multiplex_write(code, m->buf+4, m->len-4); +- defer_forwarding_messages = 0; +- free(m); +- continue; +- } +- + n = len - total; + if (bwlimit_writemax && n > bwlimit_writemax) + n = bwlimit_writemax; +@@ -1138,7 +1067,6 @@ static void writefd_unbuffered(int fd,ch * to grab any messages they sent before they died. */ while (fd == sock_f_out && io_multiplexing_in) { set_io_timeout(30); @@ -669,8 +716,11 @@ After applying this patch, run these commands for a successful build: readfd_unbuffered(sock_f_in, io_filesfrom_buf, sizeof io_filesfrom_buf); } -@@ -1129,7 +1081,7 @@ static void writefd_unbuffered(int fd,ch +@@ -1146,15 +1074,13 @@ static void writefd_unbuffered(int fd,ch + } + total += cnt; +- defer_forwarding_messages = 1; if (fd == sock_f_out) { - if (io_timeout || am_generator) @@ -678,27 +728,16 @@ After applying this patch, run these commands for a successful build: last_io_out = time(NULL); sleep_for_bwlimit(cnt); } -@@ -1153,7 +1105,7 @@ static void mplex_write(enum msgcode cod - * cause output to occur down the socket. Setting contiguous_write_len - * prevents the reading of msg_fd_in once we actually start to write - * this sequence of data (though we might read it before the start). */ -- if (am_generator && msg_fd_in >= 0) -+ if (am_generator() && msg_fd_in >= 0) - contiguous_write_len = len + 4; - - if (n > sizeof buffer - 4) -@@ -1169,31 +1121,29 @@ static void mplex_write(enum msgcode cod - if (len) - writefd_unbuffered(sock_f_out, buf, len); - -- if (am_generator && msg_fd_in >= 0) -+ if (am_generator() && msg_fd_in >= 0) - contiguous_write_len = 0; + } +- defer_forwarding_messages = 0; + + no_flush--; } +@@ -1186,25 +1112,23 @@ static void mplex_write(enum msgcode cod void io_flush(int flush_it_all) { -- msg_list_flush(flush_it_all); +- msg2genr_flush(flush_it_all); - - if (!iobuf_out_cnt || no_flush) + if (no_flush) @@ -731,7 +770,7 @@ After applying this patch, run these commands for a successful build: if (fd == sock_f_out) stats.total_written += len; -@@ -1406,9 +1356,3 @@ void start_write_batch(int fd) +@@ -1417,9 +1341,3 @@ void start_write_batch(int fd) else write_batch_monitor_in = fd; } -- 2.34.1