X-Git-Url: https://mattmccutchen.net/rsync/rsync.git/blobdiff_plain/8e6b4ddbe8c4811b443d5463d54f2bf20eeef896..4e2a7e59e540a7a66da0c3e159f0464b05559f7f:/io.c diff --git a/io.c b/io.c index b1748628..b717b431 100644 --- a/io.c +++ b/io.c @@ -76,6 +76,9 @@ int kluge_around_eof = 0; int sock_f_in = -1; int sock_f_out = -1; +int64 total_data_read = 0; +int64 total_data_written = 0; + static struct { xbuf in, out, msg; int in_fd; @@ -538,8 +541,8 @@ static char *perform_io(size_t needed, int flags) if (DEBUG_GTE(IO, 3)) { rprintf(FINFO, "[%s] perform_io(%ld, outroom) needs to flush %ld\n", who_am_i(), (long)needed, - iobuf.out.len > iobuf.out.size - needed - ? (long)iobuf.out.len - (iobuf.out.size - needed) : 0L); + iobuf.out.len + needed > iobuf.out.size + ? (long)(iobuf.out.len + needed - iobuf.out.size) : 0L); } break; @@ -554,8 +557,8 @@ static char *perform_io(size_t needed, int flags) if (DEBUG_GTE(IO, 3)) { rprintf(FINFO, "[%s] perform_io(%ld, msgroom) needs to flush %ld\n", who_am_i(), (long)needed, - iobuf.out.len > iobuf.msg.size - needed - ? (long)iobuf.out.len - (iobuf.msg.size - needed) : 0L); + iobuf.msg.len + needed > iobuf.msg.size + ? (long)(iobuf.msg.len + needed - iobuf.msg.size) : 0L); } break; @@ -575,11 +578,13 @@ static char *perform_io(size_t needed, int flags) goto double_break; break; case PIO_NEED_OUTROOM: - if (iobuf.out.len <= iobuf.out.size - needed) + /* Note that iobuf.out_empty_len doesn't factor into this check + * because iobuf.out.len already holds any needed header len. */ + if (iobuf.out.len + needed <= iobuf.out.size) goto double_break; break; case PIO_NEED_MSGROOM: - if (iobuf.msg.len <= iobuf.msg.size - needed) + if (iobuf.msg.len + needed <= iobuf.msg.size) goto double_break; break; } @@ -635,6 +640,7 @@ static char *perform_io(size_t needed, int flags) iobuf.raw_data_header_pos = iobuf.raw_flushing_ends_before; if (iobuf.raw_data_header_pos >= iobuf.out.size) iobuf.raw_data_header_pos -= iobuf.out.size; + /* Yes, it is possible for this to make len > size for a while. */ iobuf.out.len += 4; } @@ -749,7 +755,7 @@ static char *perform_io(size_t needed, int flags) /* Don't write errors on a dead socket. */ msgs2stderr = 1; out->len = iobuf.raw_flushing_ends_before = out->pos = 0; - rsyserr(FERROR_SOCKET, errno, "write error"); + rsyserr(FERROR_SOCKET, errno, "[%s] write error", who_am_i()); exit_cleanup(RERR_STREAMIO); } } @@ -799,6 +805,17 @@ static char *perform_io(size_t needed, int flags) return data; } +void noop_io_until_death(void) +{ + char buf[1024]; + + kluge_around_eof = 1; + set_io_timeout(protocol_version >= 31 ? 10 : 1); + + while (1) + read_buf(iobuf.in_fd, buf, sizeof buf); +} + /* Buffer a message for the multiplexed output stream. Is never used for MSG_DATA. */ int send_msg(enum msgcode code, const char *buf, size_t len, int convert) { @@ -1261,7 +1278,7 @@ void stop_flist_forward(void) static void read_a_msg(void) { char *data, line[BIGPATHBUFLEN]; - int tag; + int tag, val; size_t msg_bytes; data = perform_io(4, PIO_INPUT_AND_CONSUME); @@ -1299,9 +1316,21 @@ static void read_a_msg(void) if (msg_bytes != 4 || am_sender) goto invalid_msg; data = perform_io(4, PIO_INPUT_AND_CONSUME); - io_error |= IVAL(data, 0); + val = IVAL(data, 0); + io_error |= val; if (!am_generator) - send_msg(MSG_IO_ERROR, data, 4, 0); + send_msg_int(MSG_IO_ERROR, val); + break; + case MSG_IO_TIMEOUT: + if (msg_bytes != 4 || am_server || am_generator) + goto invalid_msg; + data = perform_io(4, PIO_INPUT_AND_CONSUME); + val = IVAL(data, 0); + if (!io_timeout || io_timeout > val) { + if (INFO_GTE(MISC, 2)) + rprintf(FINFO, "Setting --timeout=%d to match server\n", val); + set_io_timeout(val); + } break; case MSG_NOOP: if (am_sender) @@ -1367,19 +1396,21 @@ static void read_a_msg(void) exit_cleanup(RERR_STREAMIO); } data = perform_io(4, PIO_INPUT_AND_CONSUME); + val = IVAL(data, 0); if (am_generator) - got_flist_entry_status(FES_SUCCESS, IVAL(data, 0)); + got_flist_entry_status(FES_SUCCESS, val); else - successful_send(IVAL(data, 0)); + successful_send(val); break; case MSG_NO_SEND: if (msg_bytes != 4) goto invalid_msg; data = perform_io(4, PIO_INPUT_AND_CONSUME); + val = IVAL(data, 0); if (am_generator) - got_flist_entry_status(FES_NO_SEND, IVAL(data, 0)); + got_flist_entry_status(FES_NO_SEND, val); else - send_msg(MSG_NO_SEND, data, 4, 0); + send_msg_int(MSG_NO_SEND, val); break; case MSG_ERROR_SOCKET: case MSG_ERROR_UTF8: @@ -1412,6 +1443,27 @@ static void read_a_msg(void) first_message = 0; } break; + case MSG_ERROR_EXIT: + if (msg_bytes == 0) { + if (!am_sender && !am_generator) { + send_msg(MSG_ERROR_EXIT, "", 0, 0); + io_flush(FULL_FLUSH); + } + val = 0; + } else if (msg_bytes == 4) { + data = perform_io(4, PIO_INPUT_AND_CONSUME); + val = IVAL(data, 0); + if (protocol_version >= 31) { + if (am_generator) + send_msg_int(MSG_ERROR_EXIT, val); + else + send_msg(MSG_ERROR_EXIT, "", 0, 0); + } + } else + goto invalid_msg; + /* Send a negative linenum so that we don't end up + * with a duplicate exit message. */ + _exit_cleanup(val, __FILE__, 0 - __LINE__); default: rprintf(FERROR, "unexpected tag %d [%s%s]\n", tag, who_am_i(), inc_recurse ? "/inc" : ""); @@ -1578,6 +1630,7 @@ void read_buf(int f, char *buf, size_t len) if (!IN_MULTIPLEXED) { memcpy(buf, perform_io(len, PIO_INPUT_AND_CONSUME), len); + total_data_read += len; if (forward_flist_data) write_buf(iobuf.out_fd, buf, len); batch_copy: @@ -1601,6 +1654,7 @@ void read_buf(int f, char *buf, size_t len) /* The bytes at the "data" pointer will survive long * enough to make a copy, but not past future I/O. */ memcpy(buf, data, siz); + total_data_read += siz; if (forward_flist_data) write_buf(iobuf.out_fd, buf, siz); @@ -1858,7 +1912,7 @@ void write_buf(int f, const char *buf, size_t len) goto batch_copy; } - if (iobuf.out.size - iobuf.out.len < len) + if (iobuf.out.len + len > iobuf.out.size) perform_io(len, PIO_NEED_OUTROOM); pos = iobuf.out.pos + iobuf.out.len; /* Must be set after any flushing. */ @@ -1873,6 +1927,7 @@ void write_buf(int f, const char *buf, size_t len) memcpy(iobuf.out.buf + pos, buf, len); iobuf.out.len += len; + total_data_written += len; batch_copy: if (f == write_batch_monitor_out)