X-Git-Url: https://mattmccutchen.net/rsync/rsync.git/blobdiff_plain/12bda6f7106a57a05f5b0b25fc82b48f78e4cfb0..de3438407c4f976c55172093795b87e49cb4a6e2:/io.c diff --git a/io.c b/io.c index dd99913d..09aab912 100644 --- a/io.c +++ b/io.c @@ -41,7 +41,6 @@ extern int bwlimit; extern size_t bwlimit_writemax; -extern int verbose; extern int io_timeout; extern int allowed_lull; extern int am_server; @@ -103,8 +102,10 @@ static char io_filesfrom_buf[2048]; static char *io_filesfrom_bp; static char io_filesfrom_lastchar; static int io_filesfrom_buflen; -static size_t contiguous_write_len = 0; +static int defer_forwarding_messages = 0; static int select_timeout = SELECT_TIMEOUT; +static int active_filecnt = 0; +static OFF_T active_bytecnt = 0; static void read_loop(int fd, char *buf, size_t len); @@ -121,15 +122,15 @@ static struct flist_ndx_list redo_list, hlink_list; struct msg_list_item { struct msg_list_item *next; - char *buf; int len; + char buf[1]; }; struct msg_list { struct msg_list_item *head, *tail; }; -static struct msg_list msg_list; +static struct msg_list msg2genr, msg2sndr; static void flist_ndx_push(struct flist_ndx_list *lp, int ndx) { @@ -225,23 +226,22 @@ void set_msg_fd_out(int fd) } /* Add a message to the pending MSG_* list. */ -static void msg_list_add(int code, char *buf, int len) +static void msg_list_add(struct msg_list *lst, int code, char *buf, int len) { - struct msg_list_item *ml; + struct msg_list_item *m; + int sz = len + 4 + sizeof m[0] - 1; - if (!(ml = new(struct msg_list_item))) + if (!(m = (struct msg_list_item *)new_array(char, sz))) out_of_memory("msg_list_add"); - ml->next = NULL; - if (!(ml->buf = new_array(char, len+4))) - out_of_memory("msg_list_add"); - SIVAL(ml->buf, 0, ((code+MPLEX_BASE)<<24) | len); - memcpy(ml->buf+4, buf, len); - ml->len = len+4; - if (msg_list.tail) - msg_list.tail->next = ml; + m->next = NULL; + m->len = len + 4; + SIVAL(m->buf, 0, ((code+MPLEX_BASE)<<24) | len); + memcpy(m->buf + 4, buf, len); + if (lst->tail) + lst->tail->next = m; else - msg_list.head = ml; - msg_list.tail = ml; + lst->head = m; + lst->tail = m; } /* Read a message from the MSG_* fd and handle it. This is called either @@ -279,6 +279,8 @@ static void read_msg_fd(void) exit_cleanup(RERR_STREAMIO); } read_loop(fd, buf, 4); + if (remove_sent_files) + decrement_active_files(IVAL(buf,0)); flist_ndx_push(&redo_list, IVAL(buf,0)); break; case MSG_DELETED: @@ -287,7 +289,10 @@ static void read_msg_fd(void) exit_cleanup(RERR_STREAMIO); } read_loop(fd, buf, len); - io_multiplex_write(MSG_DELETED, buf, len); + if (defer_forwarding_messages) + msg_list_add(&msg2sndr, MSG_DELETED, buf, len); + else + io_multiplex_write(MSG_DELETED, buf, len); break; case MSG_SUCCESS: if (len != 4 || !am_generator) { @@ -295,8 +300,13 @@ static void read_msg_fd(void) exit_cleanup(RERR_STREAMIO); } read_loop(fd, buf, len); - if (remove_sent_files) - io_multiplex_write(MSG_SUCCESS, buf, len); + if (remove_sent_files) { + decrement_active_files(IVAL(buf,0)); + if (defer_forwarding_messages) + msg_list_add(&msg2sndr, MSG_SUCCESS, buf, len); + else + io_multiplex_write(MSG_SUCCESS, buf, len); + } if (preserve_hard_links) flist_ndx_push(&hlink_list, IVAL(buf,0)); break; @@ -315,22 +325,48 @@ static void read_msg_fd(void) if (n >= sizeof buf) n = sizeof buf - 1; read_loop(fd, buf, n); - rwrite((enum logcode)tag, buf, n); + if (am_generator && am_server && defer_forwarding_messages) + msg_list_add(&msg2sndr, tag, buf, n); + else + rwrite((enum logcode)tag, buf, n); len -= n; } break; default: - rprintf(FERROR, "unknown message %d:%d\n", tag, len); + rprintf(FERROR, "unknown message %d:%d [%s]\n", + tag, len, who_am_i()); exit_cleanup(RERR_STREAMIO); } msg_fd_in = fd; } +/* This is used by the generator to limit how many file transfers can + * be active at once when --remove-sent-files is specified. Without + * this, sender-side deletions were mostly happening at the end. */ +void increment_active_files(int ndx, int itemizing, enum logcode code) +{ + /* TODO: tune these limits? */ + while (active_filecnt >= (active_bytecnt >= 128*1024 ? 10 : 50)) { + if (hlink_list.head) + check_for_finished_hlinks(itemizing, code); + read_msg_fd(); + } + + active_filecnt++; + active_bytecnt += the_file_list->files[ndx]->length; +} + +void decrement_active_files(int ndx) +{ + active_filecnt--; + active_bytecnt -= the_file_list->files[ndx]->length; +} + /* Try to push messages off the list onto the wire. If we leave with more * to do, return 0. On error, return -1. If everything flushed, return 1. * This is only active in the receiver. */ -static int msg_list_flush(int flush_it_all) +static int msg2genr_flush(int flush_it_all) { static int written = 0; struct timeval tv; @@ -339,9 +375,9 @@ static int msg_list_flush(int flush_it_all) if (msg_fd_out < 0) return -1; - while (msg_list.head) { - struct msg_list_item *ml = msg_list.head; - int n = write(msg_fd_out, ml->buf + written, ml->len - written); + while (msg2genr.head) { + struct msg_list_item *m = msg2genr.head; + int n = write(msg_fd_out, m->buf + written, m->len - written); if (n < 0) { if (errno == EINTR) continue; @@ -355,12 +391,11 @@ static int msg_list_flush(int flush_it_all) tv.tv_usec = 0; if (!select(msg_fd_out+1, NULL, &fds, NULL, &tv)) check_timeout(); - } else if ((written += n) == ml->len) { - free(ml->buf); - msg_list.head = ml->next; - if (!msg_list.head) - msg_list.tail = NULL; - free(ml); + } else if ((written += n) == m->len) { + msg2genr.head = m->next; + if (!msg2genr.head) + msg2genr.tail = NULL; + free(m); written = 0; } } @@ -373,8 +408,8 @@ void send_msg(enum msgcode code, char *buf, int len) io_multiplex_write(code, buf, len); return; } - msg_list_add(code, buf, len); - msg_list_flush(NORMAL_FLUSH); + msg_list_add(&msg2genr, code, buf, len); + msg2genr_flush(NORMAL_FLUSH); } int get_redo_num(int itemizing, enum logcode code) @@ -471,7 +506,7 @@ static int read_timeout(int fd, char *buf, size_t len) FD_ZERO(&r_fds); FD_ZERO(&w_fds); FD_SET(fd, &r_fds); - if (msg_list.head) { + if (msg2genr.head) { FD_SET(msg_fd_out, &w_fds); if (msg_fd_out > maxfd) maxfd = msg_fd_out; @@ -508,8 +543,8 @@ static int read_timeout(int fd, char *buf, size_t len) continue; } - if (msg_list.head && FD_ISSET(msg_fd_out, &w_fds)) - msg_list_flush(NORMAL_FLUSH); + if (msg2genr.head && FD_ISSET(msg_fd_out, &w_fds)) + msg2genr_flush(NORMAL_FLUSH); if (io_filesfrom_f_out >= 0) { if (io_filesfrom_buflen) { @@ -921,6 +956,11 @@ int read_vstring(int f, char *buf, int bufsize) void read_sum_head(int f, struct sum_struct *sum) { sum->count = read_int(f); + if (sum->count < 0) { + rprintf(FERROR, "Invalid checksum count %ld [%s]\n", + (long)sum->count, who_am_i()); + exit_cleanup(RERR_PROTOCOL); + } sum->blength = read_int(f); if (sum->blength < 0 || sum->blength > MAX_BLOCK_SIZE) { rprintf(FERROR, "Invalid block length %ld [%s]\n", @@ -983,7 +1023,7 @@ static void sleep_for_bwlimit(int bytes_written) #define ONE_SEC 1000000L /* # of microseconds in a second */ - if (!bwlimit) + if (!bwlimit_writemax) return; total_written += bytes_written; @@ -1024,6 +1064,7 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) size_t n, total = 0; fd_set w_fds, r_fds; int maxfd, count, cnt, using_r_fds; + int defer_save = defer_forwarding_messages; struct timeval tv; no_flush++; @@ -1033,7 +1074,7 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) FD_SET(fd,&w_fds); maxfd = fd; - if (msg_fd_in >= 0 && len-total >= contiguous_write_len) { + if (msg_fd_in >= 0) { FD_ZERO(&r_fds); FD_SET(msg_fd_in,&r_fds); if (msg_fd_in > maxfd) @@ -1063,7 +1104,7 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) continue; n = len - total; - if (bwlimit && n > bwlimit_writemax) + if (bwlimit_writemax && n > bwlimit_writemax) n = bwlimit_writemax; cnt = write(fd, buf + total, n); @@ -1095,6 +1136,7 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) } total += cnt; + defer_forwarding_messages = 1; if (fd == sock_f_out) { if (io_timeout || am_generator) @@ -1103,9 +1145,27 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) } } + defer_forwarding_messages = defer_save; no_flush--; } +static void msg2sndr_flush(void) +{ + if (defer_forwarding_messages) + return; + + while (msg2sndr.head && io_multiplexing_out) { + struct msg_list_item *m = msg2sndr.head; + if (!(msg2sndr.head = m->next)) + msg2sndr.tail = NULL; + stats.total_written += m->len; + defer_forwarding_messages = 1; + writefd_unbuffered(sock_f_out, m->buf, m->len); + defer_forwarding_messages = 0; + free(m); + } +} + /** * Write an message to a multiplexed stream. If this fails then rsync * exits. @@ -1117,13 +1177,6 @@ static void mplex_write(enum msgcode code, char *buf, size_t len) SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len); - /* When the generator reads messages from the msg_fd_in pipe, it can - * cause output to occur down the socket. Setting contiguous_write_len - * prevents the reading of msg_fd_in once we actually start to write - * this sequence of data (though we might read it before the start). */ - if (am_generator && msg_fd_in >= 0) - contiguous_write_len = len + 4; - if (n > sizeof buffer - 4) n = 0; else @@ -1134,16 +1187,18 @@ static void mplex_write(enum msgcode code, char *buf, size_t len) len -= n; buf += n; - if (len) + if (len) { + defer_forwarding_messages = 1; writefd_unbuffered(sock_f_out, buf, len); - - if (am_generator && msg_fd_in >= 0) - contiguous_write_len = 0; + defer_forwarding_messages = 0; + msg2sndr_flush(); + } } void io_flush(int flush_it_all) { - msg_list_flush(flush_it_all); + msg2genr_flush(flush_it_all); + msg2sndr_flush(); if (!iobuf_out_cnt || no_flush) return;