From d8587b4690b1987c02c71c136720f366abf250e6 Mon Sep 17 00:00:00 2001 From: Wayne Davison Date: Tue, 15 Sep 2009 16:12:24 -0700 Subject: [PATCH] Change the msg pipe to use a real multiplexed IO mode for the data that goes from the receiver to the generator. --- clientserver.c | 6 +- io.c | 493 ++++++++++++++++++++++--------------------------- main.c | 18 +- 3 files changed, 231 insertions(+), 286 deletions(-) diff --git a/clientserver.c b/clientserver.c index 88f341e8..f2751a4f 100644 --- a/clientserver.c +++ b/clientserver.c @@ -339,7 +339,7 @@ int start_inband_exchange(int f_in, int f_out, const char *user, int argc, char if (protocol_version < 23) { if (protocol_version == 22 || !am_sender) - io_start_multiplex_in(); + io_start_multiplex_in(f_in); } free(modname); @@ -885,7 +885,7 @@ static int rsync_module(int f_in, int f_out, int i, const char *addr, const char if (protocol_version < 23 && (protocol_version == 22 || am_sender)) - io_start_multiplex_out(); + io_start_multiplex_out(f_out); else if (!ret || err_msg) { /* We have to get I/O multiplexing started so that we can * get the error back to the client. This means getting @@ -909,7 +909,7 @@ static int rsync_module(int f_in, int f_out, int i, const char *addr, const char if (files_from) write_byte(f_out, 0); } - io_start_multiplex_out(); + io_start_multiplex_out(f_out); } if (!ret || err_msg) { diff --git a/io.c b/io.c index c9d990ad..da6d0a59 100644 --- a/io.c +++ b/io.c @@ -125,6 +125,7 @@ static void readfd(int fd, char *buffer, size_t N); static void writefd(int fd, const char *buf, size_t len); static void writefd_unbuffered(int fd, const char *buf, size_t len); static void mplex_write(int fd, enum msgcode code, const char *buf, size_t len, int convert); +static void read_a_msg(int fd); static flist_ndx_list redo_list, hlink_list; @@ -318,132 +319,6 @@ static void check_for_d_option_error(const char *msg) } } -/* Read a message from the MSG_* fd and handle it. This is called either - * during the early stages of being a local sender (up through the sending - * of the file list) or when we're the generator (to fetch the messages - * from the receiver). */ -static void read_msg_fd(void) -{ - char buf[2048]; - size_t n; - struct file_list *flist; - int fd = msg_fd_in; - int tag, len; - - /* Temporarily disable msg_fd_in. This is needed to avoid looping back - * to this routine from writefd_unbuffered(). */ - no_flush++; - msg_fd_in = -1; - defer_forwarding_messages++; - - readfd(fd, buf, 4); - tag = IVAL(buf, 0); - - len = tag & 0xFFFFFF; - tag = (tag >> 24) - MPLEX_BASE; - - switch (tag) { - case MSG_DONE: - if (len < 0 || len > 1 || !am_generator) { - invalid_msg: - rprintf(FERROR, "invalid message %d:%d [%s%s]\n", - tag, len, who_am_i(), - inc_recurse ? "/inc" : ""); - exit_cleanup(RERR_STREAMIO); - } - if (len) { - readfd(fd, buf, len); - stats.total_read = read_varlong(fd, 3); - } - msgdone_cnt++; - break; - case MSG_REDO: - if (len != 4 || !am_generator) - goto invalid_msg; - readfd(fd, buf, 4); - got_flist_entry_status(FES_REDO, buf); - break; - case MSG_FLIST: - if (len != 4 || !am_generator || !inc_recurse) - goto invalid_msg; - readfd(fd, buf, 4); - /* Read extra file list from receiver. */ - assert(iobuf_in != NULL); - assert(iobuf_f_in == fd); - if (DEBUG_GTE(FLIST, 2)) { - rprintf(FINFO, "[%s] receiving flist for dir %d\n", - who_am_i(), IVAL(buf,0)); - } - flist = recv_file_list(fd); - flist->parent_ndx = IVAL(buf,0); -#ifdef SUPPORT_HARD_LINKS - if (preserve_hard_links) - match_hard_links(flist); -#endif - break; - case MSG_FLIST_EOF: - if (len != 0 || !am_generator || !inc_recurse) - goto invalid_msg; - flist_eof = 1; - break; - case MSG_IO_ERROR: - if (len != 4) - goto invalid_msg; - readfd(fd, buf, len); - io_error |= IVAL(buf, 0); - break; - case MSG_DELETED: - if (len >= (int)sizeof buf || !am_generator) - goto invalid_msg; - readfd(fd, buf, len); - send_msg(MSG_DELETED, buf, len, 1); - break; - case MSG_SUCCESS: - if (len != 4 || !am_generator) - goto invalid_msg; - readfd(fd, buf, 4); - got_flist_entry_status(FES_SUCCESS, buf); - break; - case MSG_NO_SEND: - if (len != 4 || !am_generator) - goto invalid_msg; - readfd(fd, buf, 4); - got_flist_entry_status(FES_NO_SEND, buf); - break; - case MSG_ERROR_SOCKET: - case MSG_ERROR_UTF8: - case MSG_CLIENT: - if (!am_generator) - goto invalid_msg; - if (tag == MSG_ERROR_SOCKET) - io_end_multiplex_out(); - /* FALL THROUGH */ - case MSG_INFO: - case MSG_ERROR: - case MSG_ERROR_XFER: - case MSG_WARNING: - case MSG_LOG: - while (len) { - n = len; - if (n >= sizeof buf) - n = sizeof buf - 1; - readfd(fd, buf, n); - rwrite((enum logcode)tag, buf, n, !am_generator); - len -= n; - } - break; - default: - rprintf(FERROR, "unknown message %d:%d [%s]\n", - tag, len, who_am_i()); - exit_cleanup(RERR_STREAMIO); - } - - no_flush--; - msg_fd_in = fd; - if (!--defer_forwarding_messages && !no_flush) - msg_flush(); -} - /* This is used by the generator to limit how many file transfers can * be active at once when --remove-source-files is specified. Without * this, sender-side deletions were mostly happening at the end. */ @@ -460,7 +335,7 @@ void increment_active_files(int ndx, int itemizing, enum logcode code) if (iobuf_out_cnt) io_flush(NORMAL_FLUSH); else - read_msg_fd(); + read_a_msg(msg_fd_in); } active_filecnt++; @@ -520,7 +395,7 @@ int send_msg(enum msgcode code, const char *buf, int len, int convert) msg_list_add(&msg_queue, code, buf, len, convert); return 1; } - if (flist_forward_from >= 0) + if (defer_forwarding_messages) msg_list_add(&msg_queue, code, buf, len, convert); else mplex_write(msg_fd_out, code, buf, len, convert); @@ -538,7 +413,7 @@ void wait_for_receiver(void) { if (io_flush(FULL_FLUSH)) return; - read_msg_fd(); + read_a_msg(msg_fd_in); } int get_redo_num(void) @@ -602,8 +477,7 @@ static void whine_about_eof(int fd) exit_cleanup(RERR_STREAMIO); } -/** - * Read from a socket with I/O timeout. return the number of bytes +/* Read from a socket with I/O timeout. return the number of bytes * read. If no bytes can be read then exit, never return a number <= 0. * * TODO: If the remote shell connection fails, then current versions @@ -611,8 +485,7 @@ static void whine_about_eof(int fd) * fairly common mistake to try to use rsh when ssh is required, we * should trap that: if we fail to read any data at all, we should * give a better explanation. We can tell whether the connection has - * started by looking e.g. at whether the remote version is known yet. - */ + * started by looking e.g. at whether the remote version is known yet. */ static int read_timeout(int fd, char *buf, size_t len) { int n, cnt = 0; @@ -986,10 +859,8 @@ void stop_flist_forward() io_flush(FULL_FLUSH); } -/** - * Continue trying to read len bytes - don't return until len has been - * read. - **/ +/* Continue trying to read len bytes until all have been read. + * Used to read raw bytes from a multiplexed source. */ static void read_loop(int fd, char *buf, size_t len) { while (len) { @@ -1000,17 +871,209 @@ static void read_loop(int fd, char *buf, size_t len) } } -/** - * Read from the file descriptor handling multiplexing - return number - * of bytes read. - * - * Never returns <= 0. - */ -static int readfd_unbuffered(int fd, char *buf, size_t len) +/* Read a message from a multiplexed source. */ +static void read_a_msg(int fd) { - size_t msg_bytes; - int tag, cnt = 0; char line[BIGPATHBUFLEN]; + size_t msg_bytes; + int tag, flist_parent = -1, save_msg_fd_in = msg_fd_in; + + /* Temporarily disable msg_fd_in. This is needed to avoid looping back + * to this routine from writefd_unbuffered(). */ + msg_fd_in = -1; + + read_loop(fd, line, 4); + tag = IVAL(line, 0); + + msg_bytes = tag & 0xFFFFFF; + tag = (tag >> 24) - MPLEX_BASE; + + no_flush++; + + switch (tag) { + case MSG_DATA: + if (msg_bytes > iobuf_in_siz) { + if (!(iobuf_in = realloc_array(iobuf_in, char, msg_bytes))) + out_of_memory("read_a_msg"); + iobuf_in_siz = msg_bytes; + } + read_loop(fd, iobuf_in, msg_bytes); + iobuf_in_remaining = msg_bytes; + iobuf_in_ndx = 0; + break; + case MSG_DONE: + if (msg_bytes > 1 || !am_generator) + goto invalid_msg; + if (msg_bytes) { + read_loop(fd, line, 1); + stats.total_read = read_varlong(fd, 3); + } + msgdone_cnt++; + break; + case MSG_REDO: + if (msg_bytes != 4 || !am_generator) + goto invalid_msg; + read_loop(fd, line, 4); + got_flist_entry_status(FES_REDO, line); + break; + case MSG_FLIST: + if (msg_bytes != 4 || !am_generator || !inc_recurse) + goto invalid_msg; + read_loop(fd, line, 4); + /* Read extra file list from receiver. */ + if (DEBUG_GTE(FLIST, 2)) { + rprintf(FINFO, "[%s] receiving flist for dir %d\n", + who_am_i(), IVAL(line, 0)); + } + flist_parent = IVAL(line, 0); + break; + case MSG_FLIST_EOF: + if (msg_bytes != 0 || !am_generator || !inc_recurse) + goto invalid_msg; + flist_eof = 1; + break; + case MSG_IO_ERROR: + if (msg_bytes != 4 || am_sender) + goto invalid_msg; + read_loop(fd, line, 4); + io_error |= IVAL(line, 0); + if (!am_generator) + send_msg_int(MSG_IO_ERROR, IVAL(line, 0)); + break; + case MSG_NOOP: + if (am_sender) + maybe_send_keepalive(); + break; + case MSG_DEL_STATS: + if (msg_bytes) + goto invalid_msg; + read_del_stats(fd); + if (am_sender && am_server) + write_del_stats(sock_f_out); + break; + case MSG_DELETED: + if (msg_bytes >= sizeof line) + goto overflow; + if (am_generator) { + read_loop(fd, line, msg_bytes); + send_msg(MSG_DELETED, line, msg_bytes, 1); + break; + } +#ifdef ICONV_OPTION + if (ic_recv != (iconv_t)-1) { + xbuf outbuf, inbuf; + char ibuf[512]; + int add_null = 0; + + INIT_CONST_XBUF(outbuf, line); + INIT_XBUF(inbuf, ibuf, 0, (size_t)-1); + + while (msg_bytes) { + inbuf.len = msg_bytes > sizeof ibuf + ? sizeof ibuf : msg_bytes; + read_loop(fd, inbuf.buf, inbuf.len); + if (!(msg_bytes -= inbuf.len) + && !ibuf[inbuf.len-1]) + inbuf.len--, add_null = 1; + if (iconvbufs(ic_send, &inbuf, &outbuf, + ICB_INCLUDE_BAD | ICB_INCLUDE_INCOMPLETE) < 0) + goto overflow; + } + if (add_null) { + if (outbuf.len == outbuf.size) + goto overflow; + outbuf.buf[outbuf.len++] = '\0'; + } + msg_bytes = outbuf.len; + } else +#endif + read_loop(fd, line, msg_bytes); + /* A directory name was sent with the trailing null */ + if (msg_bytes > 0 && !line[msg_bytes-1]) + log_delete(line, S_IFDIR); + else { + line[msg_bytes] = '\0'; + log_delete(line, S_IFREG); + } + break; + case MSG_SUCCESS: + if (msg_bytes != 4) { + invalid_msg: + rprintf(FERROR, "invalid multi-message %d:%lu [%s%s]\n", + tag, (unsigned long)msg_bytes, who_am_i(), + inc_recurse ? "/inc" : ""); + exit_cleanup(RERR_STREAMIO); + } + read_loop(fd, line, 4); + if (am_generator) + got_flist_entry_status(FES_SUCCESS, line); + else + successful_send(IVAL(line, 0)); + break; + case MSG_NO_SEND: + if (msg_bytes != 4) + goto invalid_msg; + read_loop(fd, line, 4); + if (am_generator) + got_flist_entry_status(FES_NO_SEND, line); + else + send_msg_int(MSG_NO_SEND, IVAL(line, 0)); + break; + case MSG_ERROR_SOCKET: + case MSG_ERROR_UTF8: + case MSG_CLIENT: + case MSG_LOG: + if (!am_generator) + goto invalid_msg; + if (tag == MSG_ERROR_SOCKET) + io_end_multiplex_out(); + /* FALL THROUGH */ + case MSG_INFO: + case MSG_ERROR: + case MSG_ERROR_XFER: + case MSG_WARNING: + if (msg_bytes >= sizeof line) { + overflow: + rprintf(FERROR, + "multiplexing overflow %d:%lu [%s%s]\n", + tag, (unsigned long)msg_bytes, who_am_i(), + inc_recurse ? "/inc" : ""); + exit_cleanup(RERR_STREAMIO); + } + read_loop(fd, line, msg_bytes); + rwrite((enum logcode)tag, line, msg_bytes, !am_generator); + if (first_message) { + if (list_only && !am_sender && tag == 1) { + line[msg_bytes] = '\0'; + check_for_d_option_error(line); + } + first_message = 0; + } + break; + default: + rprintf(FERROR, "unexpected tag %d [%s%s]\n", + tag, who_am_i(), inc_recurse ? "/inc" : ""); + exit_cleanup(RERR_STREAMIO); + } + + msg_fd_in = save_msg_fd_in; + no_flush--; + + if (flist_parent >= 0) { + struct file_list *flist = recv_file_list(fd); + flist->parent_ndx = flist_parent; +#ifdef SUPPORT_HARD_LINKS + if (preserve_hard_links) + match_hard_links(flist); +#endif + } +} + +/* Read from the file descriptor handling multiplexing and return the + * number of bytes read. Never returns <= 0. */ +static int readfd_unbuffered(int fd, char *buf, size_t len) +{ + int cnt = 0; if (!iobuf_in || fd != iobuf_f_in) return read_timeout(fd, buf, len); @@ -1029,125 +1092,7 @@ static int readfd_unbuffered(int fd, char *buf, size_t len) cnt = len; break; } - - read_loop(fd, line, 4); - tag = IVAL(line, 0); - - msg_bytes = tag & 0xFFFFFF; - tag = (tag >> 24) - MPLEX_BASE; - - switch (tag) { - case MSG_DATA: - if (msg_bytes > iobuf_in_siz) { - if (!(iobuf_in = realloc_array(iobuf_in, char, - msg_bytes))) - out_of_memory("readfd_unbuffered"); - iobuf_in_siz = msg_bytes; - } - read_loop(fd, iobuf_in, msg_bytes); - iobuf_in_remaining = msg_bytes; - iobuf_in_ndx = 0; - break; - case MSG_NOOP: - if (am_sender) - maybe_send_keepalive(); - break; - case MSG_IO_ERROR: - if (msg_bytes != 4) - goto invalid_msg; - read_loop(fd, line, msg_bytes); - send_msg_int(MSG_IO_ERROR, IVAL(line, 0)); - io_error |= IVAL(line, 0); - break; - case MSG_DEL_STATS: - if (msg_bytes) - goto invalid_msg; - read_del_stats(fd); - if (am_sender && am_server) - write_del_stats(sock_f_out); - break; - case MSG_DELETED: - if (msg_bytes >= sizeof line) - goto overflow; -#ifdef ICONV_OPTION - if (ic_recv != (iconv_t)-1) { - xbuf outbuf, inbuf; - char ibuf[512]; - int add_null = 0; - - INIT_CONST_XBUF(outbuf, line); - INIT_XBUF(inbuf, ibuf, 0, (size_t)-1); - - while (msg_bytes) { - inbuf.len = msg_bytes > sizeof ibuf - ? sizeof ibuf : msg_bytes; - read_loop(fd, inbuf.buf, inbuf.len); - if (!(msg_bytes -= inbuf.len) - && !ibuf[inbuf.len-1]) - inbuf.len--, add_null = 1; - if (iconvbufs(ic_send, &inbuf, &outbuf, - ICB_INCLUDE_BAD | ICB_INCLUDE_INCOMPLETE) < 0) - goto overflow; - } - if (add_null) { - if (outbuf.len == outbuf.size) - goto overflow; - outbuf.buf[outbuf.len++] = '\0'; - } - msg_bytes = outbuf.len; - } else -#endif - read_loop(fd, line, msg_bytes); - /* A directory name was sent with the trailing null */ - if (msg_bytes > 0 && !line[msg_bytes-1]) - log_delete(line, S_IFDIR); - else { - line[msg_bytes] = '\0'; - log_delete(line, S_IFREG); - } - break; - case MSG_SUCCESS: - if (msg_bytes != 4) { - invalid_msg: - rprintf(FERROR, "invalid multi-message %d:%ld [%s]\n", - tag, (long)msg_bytes, who_am_i()); - exit_cleanup(RERR_STREAMIO); - } - read_loop(fd, line, msg_bytes); - successful_send(IVAL(line, 0)); - break; - case MSG_NO_SEND: - if (msg_bytes != 4) - goto invalid_msg; - read_loop(fd, line, msg_bytes); - send_msg_int(MSG_NO_SEND, IVAL(line, 0)); - break; - case MSG_INFO: - case MSG_ERROR: - case MSG_ERROR_XFER: - case MSG_WARNING: - if (msg_bytes >= sizeof line) { - overflow: - rprintf(FERROR, - "multiplexing overflow %d:%ld [%s]\n", - tag, (long)msg_bytes, who_am_i()); - exit_cleanup(RERR_STREAMIO); - } - read_loop(fd, line, msg_bytes); - rwrite((enum logcode)tag, line, msg_bytes, 1); - if (first_message) { - if (list_only && !am_sender && tag == 1) { - line[msg_bytes] = '\0'; - check_for_d_option_error(line); - } - first_message = 0; - } - break; - default: - rprintf(FERROR, "unexpected tag %d [%s]\n", - tag, who_am_i()); - exit_cleanup(RERR_STREAMIO); - } + read_a_msg(fd); } if (iobuf_in_remaining == 0) @@ -1473,7 +1418,7 @@ static void writefd_unbuffered(int fd, const char *buf, size_t len) FD_SET(fd, &e_fds); maxfd = fd; - if (msg_fd_in >= 0) { + if (msg_fd_in >= 0 && iobuf_in_remaining == 0) { FD_ZERO(&r_fds); FD_SET(msg_fd_in, &r_fds); if (msg_fd_in > maxfd) @@ -1500,7 +1445,7 @@ static void writefd_unbuffered(int fd, const char *buf, size_t len) rprintf(FINFO, "select exception on fd %d\n", fd); */ if (using_r_fds && FD_ISSET(msg_fd_in, &r_fds)) - read_msg_fd(); + read_a_msg(msg_fd_in); if (!FD_ISSET(fd, &w_fds)) continue; @@ -1535,7 +1480,7 @@ static void writefd_unbuffered(int fd, const char *buf, size_t len) char buf[1024]; set_io_timeout(30); ignore_timeout = 0; - readfd_unbuffered(sock_f_in, buf, sizeof buf); + readfd_unbuffered(iobuf_f_in, buf, sizeof buf); } exit_cleanup(RERR_STREAMIO); } @@ -1566,7 +1511,7 @@ int io_flush(int flush_it_all) if (iobuf_out_cnt) { if (io_multiplexing_out) - mplex_write(sock_f_out, MSG_DATA, iobuf_out, iobuf_out_cnt, 0); + mplex_write(iobuf_f_out, MSG_DATA, iobuf_out, iobuf_out_cnt, 0); else writefd_unbuffered(iobuf_f_out, iobuf_out, iobuf_out_cnt); iobuf_out_cnt = 0; @@ -1863,18 +1808,18 @@ void io_printf(int fd, const char *format, ...) } /** Setup for multiplexing a MSG_* stream with the data stream. */ -void io_start_multiplex_out(void) +void io_start_multiplex_out(int f) { io_flush(NORMAL_FLUSH); - io_start_buffering_out(sock_f_out); + io_start_buffering_out(f); io_multiplexing_out = 1; } /** Setup for multiplexing a MSG_* stream with the data stream. */ -void io_start_multiplex_in(void) +void io_start_multiplex_in(int f) { io_flush(NORMAL_FLUSH); - io_start_buffering_in(sock_f_in); + io_start_buffering_in(f); io_multiplexing_in = 1; } @@ -1885,7 +1830,7 @@ int io_multiplex_write(enum msgcode code, const char *buf, size_t len, int conve return 0; io_flush(NORMAL_FLUSH); stats.total_written += (len+4); - mplex_write(sock_f_out, code, buf, len, convert); + mplex_write(iobuf_f_out, code, buf, len, convert); return 1; } diff --git a/main.c b/main.c index da6fafc3..d8055cb6 100644 --- a/main.c +++ b/main.c @@ -847,7 +847,7 @@ static int do_recv(int f_in, int f_out, char *local_name) /* set place to send errors */ set_msg_fd_out(error_pipe[1]); - io_start_buffering_out(error_pipe[1]); + io_start_multiplex_out(error_pipe[1]); recv_files(f_in, local_name); io_flush(FULL_FLUSH); @@ -895,7 +895,7 @@ static int do_recv(int f_in, int f_out, char *local_name) io_start_buffering_out(f_out); set_msg_fd_in(error_pipe[0]); - io_start_buffering_in(error_pipe[0]); + io_start_multiplex_in(error_pipe[0]); #ifdef SUPPORT_HARD_LINKS if (preserve_hard_links && inc_recurse) { @@ -959,7 +959,7 @@ static void do_server_recv(int f_in, int f_out, int argc, char *argv[]) } if (protocol_version >= 30) - io_start_multiplex_in(); + io_start_multiplex_in(f_in); else io_start_buffering_in(f_in); recv_filter_list(f_in); @@ -1040,12 +1040,12 @@ void start_server(int f_in, int f_out, int argc, char *argv[]) setup_protocol(f_out, f_in); if (protocol_version >= 23) - io_start_multiplex_out(); + io_start_multiplex_out(f_out); if (am_sender) { keep_dirlinks = 0; /* Must be disabled on the sender. */ if (need_messages_from_generator) - io_start_multiplex_in(); + io_start_multiplex_in(f_in); recv_filter_list(f_in); do_server_sender(f_in, f_out, argc, argv); } else @@ -1092,7 +1092,7 @@ int client_run(int f_in, int f_out, pid_t pid, int argc, char *argv[]) sender_keeps_checksum = 1; if (protocol_version >= 30) - io_start_multiplex_out(); + io_start_multiplex_out(f_out); else io_start_buffering_out(f_out); if (!filesfrom_host) @@ -1108,7 +1108,7 @@ int client_run(int f_in, int f_out, pid_t pid, int argc, char *argv[]) rprintf(FINFO,"file list sent\n"); if (protocol_version >= 23) - io_start_multiplex_in(); + io_start_multiplex_in(f_in); io_flush(NORMAL_FLUSH); send_files(f_in, f_out); @@ -1129,9 +1129,9 @@ int client_run(int f_in, int f_out, pid_t pid, int argc, char *argv[]) if (!read_batch) { if (protocol_version >= 23) - io_start_multiplex_in(); + io_start_multiplex_in(f_in); if (need_messages_from_generator) - io_start_multiplex_out(); + io_start_multiplex_out(f_out); } send_filter_list(read_batch ? -1 : f_out); -- 2.34.1