X-Git-Url: https://mattmccutchen.net/rsync/rsync.git/blobdiff_plain/08c88178aaa9e9ccdfc3bf7ea30ffaa27906f68c..1618c9e6d179fc46285cf556c5bb090ed0e9e503:/io.c diff --git a/io.c b/io.c index b9668af4..475ac40f 100644 --- a/io.c +++ b/io.c @@ -41,7 +41,6 @@ extern int bwlimit; extern size_t bwlimit_writemax; -extern int verbose; extern int io_timeout; extern int allowed_lull; extern int am_server; @@ -105,6 +104,8 @@ static char io_filesfrom_lastchar; static int io_filesfrom_buflen; static size_t contiguous_write_len = 0; static int select_timeout = SELECT_TIMEOUT; +static int active_filecnt = 0; +static OFF_T active_bytecnt = 0; static void read_loop(int fd, char *buf, size_t len); @@ -244,16 +245,6 @@ static void msg_list_add(int code, char *buf, int len) msg_list.tail = ml; } -void send_msg(enum msgcode code, char *buf, int len) -{ - if (msg_fd_out < 0) { - io_multiplex_write(code, buf, len); - return; - } - msg_list_add(code, buf, len); - msg_list_push(NORMAL_FLUSH); -} - /* Read a message from the MSG_* fd and handle it. This is called either * during the early stages of being a local sender (up through the sending * of the file list) or when we're the generator (to fetch the messages @@ -289,6 +280,8 @@ static void read_msg_fd(void) exit_cleanup(RERR_STREAMIO); } read_loop(fd, buf, 4); + if (remove_sent_files) + decrement_active_files(IVAL(buf,0)); flist_ndx_push(&redo_list, IVAL(buf,0)); break; case MSG_DELETED: @@ -305,11 +298,20 @@ static void read_msg_fd(void) exit_cleanup(RERR_STREAMIO); } read_loop(fd, buf, len); - if (remove_sent_files) + if (remove_sent_files) { + decrement_active_files(IVAL(buf,0)); io_multiplex_write(MSG_SUCCESS, buf, len); + } if (preserve_hard_links) flist_ndx_push(&hlink_list, IVAL(buf,0)); break; + case MSG_SOCKERR: + if (!am_generator) { + rprintf(FERROR, "invalid message %d:%d\n", tag, len); + exit_cleanup(RERR_STREAMIO); + } + close_multiplexing_out(); + /* FALL THROUGH */ case MSG_INFO: case MSG_ERROR: case MSG_LOG: @@ -330,10 +332,32 @@ static void read_msg_fd(void) msg_fd_in = fd; } +/* This is used by the generator to limit how many file transfers can + * be active at once when --remove-sent-files is specified. Without + * this, sender-side deletions were mostly happening at the end. */ +void increment_active_files(int ndx, int itemizing, enum logcode code) +{ + /* TODO: tune these limits? */ + while (active_filecnt >= (active_bytecnt >= 128*1024 ? 10 : 50)) { + if (hlink_list.head) + check_for_finished_hlinks(itemizing, code); + read_msg_fd(); + } + + active_filecnt++; + active_bytecnt += the_file_list->files[ndx]->length; +} + +void decrement_active_files(int ndx) +{ + active_filecnt--; + active_bytecnt -= the_file_list->files[ndx]->length; +} + /* Try to push messages off the list onto the wire. If we leave with more * to do, return 0. On error, return -1. If everything flushed, return 1. * This is only active in the receiver. */ -int msg_list_push(int flush_it_all) +static int msg_list_flush(int flush_it_all) { static int written = 0; struct timeval tv; @@ -370,6 +394,16 @@ int msg_list_push(int flush_it_all) return 1; } +void send_msg(enum msgcode code, char *buf, int len) +{ + if (msg_fd_out < 0) { + io_multiplex_write(code, buf, len); + return; + } + msg_list_add(code, buf, len); + msg_list_flush(NORMAL_FLUSH); +} + int get_redo_num(int itemizing, enum logcode code) { while (1) { @@ -437,7 +471,6 @@ static void whine_about_eof(int fd) exit_cleanup(RERR_STREAMIO); } - /** * Read from a socket with I/O timeout. return the number of bytes * read. If no bytes can be read then exit, never return a number <= 0. @@ -451,11 +484,11 @@ static void whine_about_eof(int fd) */ static int read_timeout(int fd, char *buf, size_t len) { - int n, ret = 0; + int n, cnt = 0; io_flush(NORMAL_FLUSH); - while (ret == 0) { + while (cnt == 0) { /* until we manage to read *something* */ fd_set r_fds, w_fds; struct timeval tv; @@ -503,7 +536,7 @@ static int read_timeout(int fd, char *buf, size_t len) } if (msg_list.head && FD_ISSET(msg_fd_out, &w_fds)) - msg_list_push(NORMAL_FLUSH); + msg_list_flush(NORMAL_FLUSH); if (io_filesfrom_f_out >= 0) { if (io_filesfrom_buflen) { @@ -581,21 +614,23 @@ static int read_timeout(int fd, char *buf, size_t len) continue; /* Don't write errors on a dead socket. */ - if (fd == sock_f_in) + if (fd == sock_f_in) { close_multiplexing_out(); - rsyserr(FERROR, errno, "read error"); + rsyserr(FSOCKERR, errno, "read error"); + } else + rsyserr(FERROR, errno, "read error"); exit_cleanup(RERR_STREAMIO); } buf += n; len -= n; - ret += n; + cnt += n; if (fd == sock_f_in && io_timeout) last_io_in = time(NULL); } - return ret; + return cnt; } /** @@ -645,7 +680,6 @@ int read_filesfrom_line(int fd, char *fname) return s - fname; } - static char *iobuf_out; static int iobuf_out_cnt; @@ -658,7 +692,6 @@ void io_start_buffering_out(void) iobuf_out_cnt = 0; } - static char *iobuf_in; static size_t iobuf_in_siz; @@ -671,7 +704,6 @@ void io_start_buffering_in(void) out_of_memory("io_start_buffering_in"); } - void io_end_buffering(void) { io_flush(NORMAL_FLUSH); @@ -681,14 +713,12 @@ void io_end_buffering(void) } } - void maybe_flush_socket(void) { if (iobuf_out && iobuf_out_cnt && time(NULL) - last_io_out >= 5) io_flush(NORMAL_FLUSH); } - void maybe_send_keepalive(void) { if (time(NULL) - last_io_out >= allowed_lull) { @@ -703,7 +733,6 @@ void maybe_send_keepalive(void) } } - /** * Continue trying to read len bytes - don't return until len has been * read. @@ -718,7 +747,6 @@ static void read_loop(int fd, char *buf, size_t len) } } - /** * Read from the file descriptor handling multiplexing - return number * of bytes read. @@ -730,7 +758,7 @@ static int readfd_unbuffered(int fd, char *buf, size_t len) static size_t remaining; static size_t iobuf_in_ndx; size_t msg_bytes; - int tag, ret = 0; + int tag, cnt = 0; char line[BIGPATHBUFLEN]; if (!iobuf_in || fd != sock_f_in) @@ -741,13 +769,13 @@ static int readfd_unbuffered(int fd, char *buf, size_t len) iobuf_in_ndx = 0; } - while (ret == 0) { + while (cnt == 0) { if (remaining) { len = MIN(len, remaining); memcpy(buf, iobuf_in + iobuf_in_ndx, len); iobuf_in_ndx += len; remaining -= len; - ret = len; + cnt = len; break; } @@ -773,12 +801,13 @@ static int readfd_unbuffered(int fd, char *buf, size_t len) if (msg_bytes >= sizeof line) goto overflow; read_loop(fd, line, msg_bytes); - line[msg_bytes] = '\0'; /* A directory name was sent with the trailing null */ if (msg_bytes > 0 && !line[msg_bytes-1]) log_delete(line, S_IFDIR); - else + else { + line[msg_bytes] = '\0'; log_delete(line, S_IFREG); + } break; case MSG_SUCCESS: if (msg_bytes != 4) { @@ -811,11 +840,9 @@ static int readfd_unbuffered(int fd, char *buf, size_t len) if (remaining == 0) io_flush(NORMAL_FLUSH); - return ret; + return cnt; } - - /** * Do a buffered read from @p fd. Don't return until all @p n bytes * have been read. If all @p n can't be read then exit with an @@ -823,12 +850,12 @@ static int readfd_unbuffered(int fd, char *buf, size_t len) **/ static void readfd(int fd, char *buffer, size_t N) { - int ret; + int cnt; size_t total = 0; while (total < N) { - ret = readfd_unbuffered(fd, buffer + total, N-total); - total += ret; + cnt = readfd_unbuffered(fd, buffer + total, N-total); + total += cnt; } if (fd == write_batch_monitor_in) { @@ -840,7 +867,6 @@ static void readfd(int fd, char *buffer, size_t N) stats.total_read += total; } - int read_shortint(int f) { uchar b[2]; @@ -848,37 +874,36 @@ int read_shortint(int f) return (b[1] << 8) + b[0]; } - int32 read_int(int f) { char b[4]; - int32 ret; + int32 num; readfd(f,b,4); - ret = IVAL(b,0); - if (ret == (int32)0xffffffff) + num = IVAL(b,0); + if (num == (int32)0xffffffff) return -1; - return ret; + return num; } int64 read_longint(int f) { - int64 ret; + int64 num; char b[8]; - ret = read_int(f); + num = read_int(f); - if ((int32)ret != (int32)0xffffffff) - return ret; + if ((int32)num != (int32)0xffffffff) + return num; #if SIZEOF_INT64 < 8 rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n"); exit_cleanup(RERR_UNSUPPORTED); #else readfd(f,b,8); - ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32); + num = IVAL(b,0) | (((int64)IVAL(b,4))<<32); #endif - return ret; + return num; } void read_buf(int f,char *buf,size_t len) @@ -923,6 +948,11 @@ int read_vstring(int f, char *buf, int bufsize) void read_sum_head(int f, struct sum_struct *sum) { sum->count = read_int(f); + if (sum->count < 0) { + rprintf(FERROR, "Invalid checksum count %ld [%s]\n", + (long)sum->count, who_am_i()); + exit_cleanup(RERR_PROTOCOL); + } sum->blength = read_int(f); if (sum->blength < 0 || sum->blength > MAX_BLOCK_SIZE) { rprintf(FERROR, "Invalid block length %ld [%s]\n", @@ -960,7 +990,6 @@ void write_sum_head(int f, struct sum_struct *sum) write_int(f, sum->remainder); } - /** * Sleep after writing to limit I/O bandwidth usage. * @@ -986,7 +1015,7 @@ static void sleep_for_bwlimit(int bytes_written) #define ONE_SEC 1000000L /* # of microseconds in a second */ - if (!bwlimit) + if (!bwlimit_writemax) return; total_written += bytes_written; @@ -1016,7 +1045,6 @@ static void sleep_for_bwlimit(int bytes_written) total_written = (sleep_usec - elapsed_usec) * bwlimit / (ONE_SEC/1024); } - /* Write len bytes to the file descriptor fd, looping as necessary to get * the job done and also (in certain circumstances) reading any data on * msg_fd_in to avoid deadlock. @@ -1027,7 +1055,7 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) { size_t n, total = 0; fd_set w_fds, r_fds; - int maxfd, count, ret, using_r_fds; + int maxfd, count, cnt, using_r_fds; struct timeval tv; no_flush++; @@ -1067,12 +1095,12 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) continue; n = len - total; - if (bwlimit && n > bwlimit_writemax) + if (bwlimit_writemax && n > bwlimit_writemax) n = bwlimit_writemax; - ret = write(fd, buf + total, n); + cnt = write(fd, buf + total, n); - if (ret <= 0) { - if (ret < 0) { + if (cnt <= 0) { + if (cnt < 0) { if (errno == EINTR) continue; if (errno == EWOULDBLOCK || errno == EAGAIN) { @@ -1098,26 +1126,25 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) exit_cleanup(RERR_STREAMIO); } - total += ret; + total += cnt; if (fd == sock_f_out) { if (io_timeout || am_generator) last_io_out = time(NULL); - sleep_for_bwlimit(ret); + sleep_for_bwlimit(cnt); } } no_flush--; } - /** * Write an message to a multiplexed stream. If this fails then rsync * exits. **/ static void mplex_write(enum msgcode code, char *buf, size_t len) { - char buffer[BIGPATHBUFLEN]; + char buffer[1024]; size_t n = len; SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len); @@ -1130,9 +1157,10 @@ static void mplex_write(enum msgcode code, char *buf, size_t len) contiguous_write_len = len + 4; if (n > sizeof buffer - 4) - n = sizeof buffer - 4; + n = 0; + else + memcpy(buffer + 4, buf, n); - memcpy(&buffer[4], buf, n); writefd_unbuffered(sock_f_out, buffer, n+4); len -= n; @@ -1145,10 +1173,9 @@ static void mplex_write(enum msgcode code, char *buf, size_t len) contiguous_write_len = 0; } - void io_flush(int flush_it_all) { - msg_list_push(flush_it_all); + msg_list_flush(flush_it_all); if (!iobuf_out_cnt || no_flush) return; @@ -1160,7 +1187,6 @@ void io_flush(int flush_it_all) iobuf_out_cnt = 0; } - static void writefd(int fd,char *buf,size_t len) { if (fd == msg_fd_out) { @@ -1195,7 +1221,6 @@ static void writefd(int fd,char *buf,size_t len) } } - void write_shortint(int f, int x) { uchar b[2]; @@ -1204,7 +1229,6 @@ void write_shortint(int f, int x) writefd(f, (char *)b, 2); } - void write_int(int f,int32 x) { char b[4]; @@ -1212,7 +1236,6 @@ void write_int(int f,int32 x) writefd(f,b,4); } - void write_int_named(int f, int32 x, const char *phase) { io_write_phase = phase; @@ -1220,7 +1243,6 @@ void write_int_named(int f, int32 x, const char *phase) io_write_phase = phase_unknown; } - /* * Note: int64 may actually be a 32-bit type if ./configure couldn't find any * 64-bit types on this platform. @@ -1282,7 +1304,6 @@ void write_vstring(int f, char *str, int len) writefd(f, str, len); } - /** * Read a line of up to @p maxlen characters into @p buf (not counting * the trailing null). Strips the (required) trailing newline and all @@ -1308,7 +1329,6 @@ int read_line(int f, char *buf, size_t maxlen) return maxlen > 0; } - void io_printf(int fd, const char *format, ...) { va_list ap; @@ -1330,7 +1350,6 @@ void io_printf(int fd, const char *format, ...) write_sbuf(fd, buf); } - /** Setup for multiplexing a MSG_* stream with the data stream. */ void io_start_multiplex_out(void) {