X-Git-Url: https://mattmccutchen.net/rsync/rsync.git/blobdiff_plain/4dde3347fb614270f6aa6812598185aa0ccca3ef..4e2a7e59e540a7a66da0c3e159f0464b05559f7f:/io.c diff --git a/io.c b/io.c index b400ba29..b717b431 100644 --- a/io.c +++ b/io.c @@ -541,8 +541,8 @@ static char *perform_io(size_t needed, int flags) if (DEBUG_GTE(IO, 3)) { rprintf(FINFO, "[%s] perform_io(%ld, outroom) needs to flush %ld\n", who_am_i(), (long)needed, - iobuf.out.len > iobuf.out.size - needed - ? (long)iobuf.out.len - (iobuf.out.size - needed) : 0L); + iobuf.out.len + needed > iobuf.out.size + ? (long)(iobuf.out.len + needed - iobuf.out.size) : 0L); } break; @@ -557,8 +557,8 @@ static char *perform_io(size_t needed, int flags) if (DEBUG_GTE(IO, 3)) { rprintf(FINFO, "[%s] perform_io(%ld, msgroom) needs to flush %ld\n", who_am_i(), (long)needed, - iobuf.out.len > iobuf.msg.size - needed - ? (long)iobuf.out.len - (iobuf.msg.size - needed) : 0L); + iobuf.msg.len + needed > iobuf.msg.size + ? (long)(iobuf.msg.len + needed - iobuf.msg.size) : 0L); } break; @@ -578,11 +578,13 @@ static char *perform_io(size_t needed, int flags) goto double_break; break; case PIO_NEED_OUTROOM: - if (iobuf.out.len <= iobuf.out.size - needed) + /* Note that iobuf.out_empty_len doesn't factor into this check + * because iobuf.out.len already holds any needed header len. */ + if (iobuf.out.len + needed <= iobuf.out.size) goto double_break; break; case PIO_NEED_MSGROOM: - if (iobuf.msg.len <= iobuf.msg.size - needed) + if (iobuf.msg.len + needed <= iobuf.msg.size) goto double_break; break; } @@ -638,6 +640,7 @@ static char *perform_io(size_t needed, int flags) iobuf.raw_data_header_pos = iobuf.raw_flushing_ends_before; if (iobuf.raw_data_header_pos >= iobuf.out.size) iobuf.raw_data_header_pos -= iobuf.out.size; + /* Yes, it is possible for this to make len > size for a while. */ iobuf.out.len += 4; } @@ -752,7 +755,7 @@ static char *perform_io(size_t needed, int flags) /* Don't write errors on a dead socket. */ msgs2stderr = 1; out->len = iobuf.raw_flushing_ends_before = out->pos = 0; - rsyserr(FERROR_SOCKET, errno, "write error"); + rsyserr(FERROR_SOCKET, errno, "[%s] write error", who_am_i()); exit_cleanup(RERR_STREAMIO); } } @@ -802,6 +805,17 @@ static char *perform_io(size_t needed, int flags) return data; } +void noop_io_until_death(void) +{ + char buf[1024]; + + kluge_around_eof = 1; + set_io_timeout(protocol_version >= 31 ? 10 : 1); + + while (1) + read_buf(iobuf.in_fd, buf, sizeof buf); +} + /* Buffer a message for the multiplexed output stream. Is never used for MSG_DATA. */ int send_msg(enum msgcode code, const char *buf, size_t len, int convert) { @@ -1264,7 +1278,7 @@ void stop_flist_forward(void) static void read_a_msg(void) { char *data, line[BIGPATHBUFLEN]; - int tag; + int tag, val; size_t msg_bytes; data = perform_io(4, PIO_INPUT_AND_CONSUME); @@ -1302,9 +1316,21 @@ static void read_a_msg(void) if (msg_bytes != 4 || am_sender) goto invalid_msg; data = perform_io(4, PIO_INPUT_AND_CONSUME); - io_error |= IVAL(data, 0); + val = IVAL(data, 0); + io_error |= val; if (!am_generator) - send_msg(MSG_IO_ERROR, data, 4, 0); + send_msg_int(MSG_IO_ERROR, val); + break; + case MSG_IO_TIMEOUT: + if (msg_bytes != 4 || am_server || am_generator) + goto invalid_msg; + data = perform_io(4, PIO_INPUT_AND_CONSUME); + val = IVAL(data, 0); + if (!io_timeout || io_timeout > val) { + if (INFO_GTE(MISC, 2)) + rprintf(FINFO, "Setting --timeout=%d to match server\n", val); + set_io_timeout(val); + } break; case MSG_NOOP: if (am_sender) @@ -1370,19 +1396,21 @@ static void read_a_msg(void) exit_cleanup(RERR_STREAMIO); } data = perform_io(4, PIO_INPUT_AND_CONSUME); + val = IVAL(data, 0); if (am_generator) - got_flist_entry_status(FES_SUCCESS, IVAL(data, 0)); + got_flist_entry_status(FES_SUCCESS, val); else - successful_send(IVAL(data, 0)); + successful_send(val); break; case MSG_NO_SEND: if (msg_bytes != 4) goto invalid_msg; data = perform_io(4, PIO_INPUT_AND_CONSUME); + val = IVAL(data, 0); if (am_generator) - got_flist_entry_status(FES_NO_SEND, IVAL(data, 0)); + got_flist_entry_status(FES_NO_SEND, val); else - send_msg(MSG_NO_SEND, data, 4, 0); + send_msg_int(MSG_NO_SEND, val); break; case MSG_ERROR_SOCKET: case MSG_ERROR_UTF8: @@ -1415,6 +1443,27 @@ static void read_a_msg(void) first_message = 0; } break; + case MSG_ERROR_EXIT: + if (msg_bytes == 0) { + if (!am_sender && !am_generator) { + send_msg(MSG_ERROR_EXIT, "", 0, 0); + io_flush(FULL_FLUSH); + } + val = 0; + } else if (msg_bytes == 4) { + data = perform_io(4, PIO_INPUT_AND_CONSUME); + val = IVAL(data, 0); + if (protocol_version >= 31) { + if (am_generator) + send_msg_int(MSG_ERROR_EXIT, val); + else + send_msg(MSG_ERROR_EXIT, "", 0, 0); + } + } else + goto invalid_msg; + /* Send a negative linenum so that we don't end up + * with a duplicate exit message. */ + _exit_cleanup(val, __FILE__, 0 - __LINE__); default: rprintf(FERROR, "unexpected tag %d [%s%s]\n", tag, who_am_i(), inc_recurse ? "/inc" : ""); @@ -1863,7 +1912,7 @@ void write_buf(int f, const char *buf, size_t len) goto batch_copy; } - if (iobuf.out.size - iobuf.out.len < len) + if (iobuf.out.len + len > iobuf.out.size) perform_io(len, PIO_NEED_OUTROOM); pos = iobuf.out.pos + iobuf.out.len; /* Must be set after any flushing. */