int sock_f_in = -1;
int sock_f_out = -1;
+int64 total_data_read = 0;
+int64 total_data_written = 0;
+
static struct {
xbuf in, out, msg;
int in_fd;
static int write_batch_monitor_out = -1;
static int ff_forward_fd = -1;
-static char ff_lastchar;
+static int ff_reenable_multiplex = -1;
+static char ff_lastchar = '\0';
static xbuf ff_xb = EMPTY_XBUF;
#ifdef ICONV_OPTION
static xbuf iconv_buf = EMPTY_XBUF;
ff_forward_fd = -1;
write_buf(iobuf.out_fd, "\0\0", ff_lastchar ? 2 : 1);
free_xbuf(&ff_xb);
- if (protocol_version < 31)
- io_start_multiplex_out(iobuf.out_fd);
+ if (ff_reenable_multiplex >= 0)
+ io_start_multiplex_out(ff_reenable_multiplex);
}
return;
}
if (DEBUG_GTE(IO, 3)) {
rprintf(FINFO, "[%s] perform_io(%ld, outroom) needs to flush %ld\n",
who_am_i(), (long)needed,
- iobuf.out.len > iobuf.out.size - needed
- ? (long)iobuf.out.len - (iobuf.out.size - needed) : 0L);
+ iobuf.out.len + needed > iobuf.out.size
+ ? (long)(iobuf.out.len + needed - iobuf.out.size) : 0L);
}
break;
if (DEBUG_GTE(IO, 3)) {
rprintf(FINFO, "[%s] perform_io(%ld, msgroom) needs to flush %ld\n",
who_am_i(), (long)needed,
- iobuf.out.len > iobuf.msg.size - needed
- ? (long)iobuf.out.len - (iobuf.msg.size - needed) : 0L);
+ iobuf.msg.len + needed > iobuf.msg.size
+ ? (long)(iobuf.msg.len + needed - iobuf.msg.size) : 0L);
}
break;
goto double_break;
break;
case PIO_NEED_OUTROOM:
- if (iobuf.out.len <= iobuf.out.size - needed)
+ /* Note that iobuf.out_empty_len doesn't factor into this check
+ * because iobuf.out.len already holds any needed header len. */
+ if (iobuf.out.len + needed <= iobuf.out.size)
goto double_break;
break;
case PIO_NEED_MSGROOM:
- if (iobuf.msg.len <= iobuf.msg.size - needed)
+ if (iobuf.msg.len + needed <= iobuf.msg.size)
goto double_break;
break;
}
if (iobuf.raw_flushing_ends_before
|| (!iobuf.msg.len && iobuf.out.len > iobuf.out_empty_len && !(flags & PIO_NEED_MSGROOM))) {
if (OUT_MULTIPLEXED && !iobuf.raw_flushing_ends_before) {
- size_t val;
-
/* The iobuf.raw_flushing_ends_before value can point off the end
* of the iobuf.out buffer for a while, for easier subtracting. */
iobuf.raw_flushing_ends_before = iobuf.out.pos + iobuf.out.len;
SIVAL(iobuf.out.buf + iobuf.raw_data_header_pos, 0,
((MPLEX_BASE + (int)MSG_DATA)<<24) + iobuf.out.len - 4);
- if ((val = iobuf.out.size - iobuf.raw_data_header_pos) < 4) {
+ if (iobuf.raw_data_header_pos + 4 > iobuf.out.size) {
+ int siz = (int)(iobuf.raw_data_header_pos + 4 - iobuf.out.size);
/* We used some of the overflow bytes, so move them. */
if (DEBUG_GTE(IO, 4)) {
rprintf(FINFO, "[%s] wrap-bytes moved: %d (perform_io)\n",
- who_am_i(), (int)val);
+ who_am_i(), siz);
}
- memcpy(iobuf.out.buf, iobuf.out.buf + iobuf.out.size, 4 - val);
+ memcpy(iobuf.out.buf, iobuf.out.buf + iobuf.out.size, siz);
}
if (DEBUG_GTE(IO, 1)) {
iobuf.raw_data_header_pos = iobuf.raw_flushing_ends_before;
if (iobuf.raw_data_header_pos >= iobuf.out.size)
iobuf.raw_data_header_pos -= iobuf.out.size;
+ /* Yes, it is possible for this to make len > size for a while. */
iobuf.out.len += 4;
}
}
if (extra_flist_sending_enabled) {
- if (file_total - file_old_total < MAX_FILECNT_LOOKAHEAD
- && file_total - file_old_total >= MIN_FILECNT_LOOKAHEAD)
+ if (file_total - file_old_total < MAX_FILECNT_LOOKAHEAD)
tv.tv_sec = 0;
else {
extra_flist_sending_enabled = False;
/* Don't write errors on a dead socket. */
msgs2stderr = 1;
out->len = iobuf.raw_flushing_ends_before = out->pos = 0;
- rsyserr(FERROR_SOCKET, errno, "write error");
+ rsyserr(FERROR_SOCKET, errno, "[%s] write error", who_am_i());
exit_cleanup(RERR_STREAMIO);
}
}
if (iobuf.raw_flushing_ends_before)
iobuf.raw_flushing_ends_before -= out->size;
out->pos = 0;
- }
- if (out->pos == iobuf.raw_flushing_ends_before)
+ } else if (out->pos == iobuf.raw_flushing_ends_before)
iobuf.raw_flushing_ends_before = 0;
if ((out->len -= n) == empty_buf_len) {
out->pos = 0;
return data;
}
+void noop_io_until_death(void)
+{
+ char buf[1024];
+
+ kluge_around_eof = 1;
+ /* For protocol 31: setting an I/O timeout ensures that if something
+ * inexplicably weird happens, we won't hang around forever. For older
+ * protocols: we can't tell the other side to die, so we linger a brief
+ * time (to try to give our error messages time to arrive) and then let
+ * the "unexpectedly" closed socket tell them to die. */
+ set_io_timeout(protocol_version >= 31 ? 30 : 1);
+
+ while (1)
+ read_buf(iobuf.in_fd, buf, sizeof buf);
+}
+
/* Buffer a message for the multiplexed output stream. Is never used for MSG_DATA. */
int send_msg(enum msgcode code, const char *buf, size_t len, int convert)
{
if (convert > 0 && ic_send == (iconv_t)-1)
convert = 0;
if (convert > 0) {
- /* Ensuring double-size room leaves space for a potential conversion. */
+ /* Ensuring double-size room leaves space for maximal conversion expansion. */
if (iobuf.msg.len + len*2 + 4 > iobuf.msg.size)
perform_io(len*2 + 4, PIO_NEED_MSGROOM);
} else
pos -= iobuf.msg.size;
hdr = iobuf.msg.buf + pos;
- iobuf.msg.len += 4; /* Leave room for the coming header bytes. */
+ iobuf.msg.len += 4; /* Allocate room for the coming header bytes. */
#ifdef ICONV_OPTION
if (convert > 0) {
SIVAL(hdr, 0, ((MPLEX_BASE + (int)code)<<24) + len);
/* If the header used any overflow bytes, move them to the start. */
if ((pos = hdr+4 - iobuf.msg.buf) > iobuf.msg.size) {
- size_t siz = pos - iobuf.msg.size;
+ int siz = (int)(pos - iobuf.msg.size);
if (DEBUG_GTE(IO, 4))
- rprintf(FINFO, "[%s] wrap-bytes moved: %d (send_msg)\n", who_am_i(), (int)siz);
- memcpy(iobuf.msg.buf, hdr+4 - siz, siz);
+ rprintf(FINFO, "[%s] wrap-bytes moved: %d (send_msg)\n", who_am_i(), siz);
+ memcpy(iobuf.msg.buf, iobuf.msg.buf + iobuf.msg.size, siz);
}
if (want_debug && convert > 0)
* for recv_file_list() to use. */
void start_filesfrom_forwarding(int fd)
{
- ff_forward_fd = fd;
- if (protocol_version < 31) {
- int save_fd = iobuf.out_fd;
- /* Older protocols send the files-from data w/o packaging it in
- * multiplexed I/O packets. To match this, we temporarily turn
- * off the multiplexing of our output w/o disabling buffering. */
- assert(OUT_MULTIPLEXED);
- /* Be extra, extra sure no messages go out before files-from data. */
- iobuf.msg.pos = iobuf.msg.len = 0;
- io_end_multiplex_out(False);
- iobuf.out_fd = save_fd;
+ if (protocol_version < 31 && OUT_MULTIPLEXED) {
+ /* Older protocols send the files-from data w/o packaging
+ * it in multiplexed I/O packets, so temporarily switch
+ * to buffered I/O to match this behavior. */
+ iobuf.msg.pos = iobuf.msg.len = 0; /* Be extra sure no messages go out. */
+ ff_reenable_multiplex = io_end_multiplex_out(MPLX_TO_BUFFERED);
}
+ ff_forward_fd = fd;
alloc_xbuf(&ff_xb, FILESFROM_BUFLEN);
}
*argv_p = argv;
}
-int io_start_buffering_out(int f_out)
+BOOL io_start_buffering_out(int f_out)
{
if (msgs2stderr && DEBUG_GTE(IO, 2))
rprintf(FINFO, "[%s] io_start_buffering_out(%d)\n", who_am_i(), f_out);
if (iobuf.out.buf) {
if (iobuf.out_fd == -1)
iobuf.out_fd = f_out;
- assert(f_out == iobuf.out_fd);
- return 0;
+ else
+ assert(f_out == iobuf.out_fd);
+ return False;
}
iobuf.out.size = IO_BUFFER_SIZE * 2 - 4;
iobuf.out.pos = iobuf.out.len = 0;
iobuf.out_fd = f_out;
- return 1;
+ return True;
}
-int io_start_buffering_in(int f_in)
+BOOL io_start_buffering_in(int f_in)
{
if (msgs2stderr && DEBUG_GTE(IO, 2))
rprintf(FINFO, "[%s] io_start_buffering_in(%d)\n", who_am_i(), f_in);
if (iobuf.in.buf) {
if (iobuf.in_fd == -1)
iobuf.in_fd = f_in;
- assert(f_in == iobuf.in_fd);
- return 0;
+ else
+ assert(f_in == iobuf.in_fd);
+ return False;
}
iobuf.in.size = IO_BUFFER_SIZE;
iobuf.in_fd = f_in;
- return 1;
+ return True;
}
void io_end_buffering_in(BOOL free_buffers)
{
- if (DEBUG_GTE(IO, 2)) {
- rprintf(FINFO, "[%s] io_end_buffering_in(%s)\n",
- who_am_i(), free_buffers ? "True" : "False");
+ if (msgs2stderr && DEBUG_GTE(IO, 2)) {
+ rprintf(FINFO, "[%s] io_end_buffering_in(IOBUF_%s_BUFS)\n",
+ who_am_i(), free_buffers ? "FREE" : "KEEP");
}
if (free_buffers)
void io_end_buffering_out(BOOL free_buffers)
{
- if (DEBUG_GTE(IO, 2)) {
- rprintf(FINFO, "[%s] io_end_buffering_out(%s)\n",
- who_am_i(), free_buffers ? "True" : "False");
+ if (msgs2stderr && DEBUG_GTE(IO, 2)) {
+ rprintf(FINFO, "[%s] io_end_buffering_out(IOBUF_%s_BUFS)\n",
+ who_am_i(), free_buffers ? "FREE" : "KEEP");
}
io_flush(FULL_FLUSH);
if (free_buffers) {
free_xbuf(&iobuf.out);
free_xbuf(&iobuf.msg);
- } else {
- iobuf.out.pos = iobuf.out.len = 0;
- iobuf.msg.pos = iobuf.msg.len = 0;
}
iobuf.out_fd = -1;
static void read_a_msg(void)
{
char *data, line[BIGPATHBUFLEN];
- int tag;
+ int tag, val;
size_t msg_bytes;
data = perform_io(4, PIO_INPUT_AND_CONSUME);
if (msg_bytes != 4 || am_sender)
goto invalid_msg;
data = perform_io(4, PIO_INPUT_AND_CONSUME);
- io_error |= IVAL(data, 0);
+ val = IVAL(data, 0);
+ io_error |= val;
if (!am_generator)
- send_msg(MSG_IO_ERROR, data, 4, 0);
+ send_msg_int(MSG_IO_ERROR, val);
+ break;
+ case MSG_IO_TIMEOUT:
+ if (msg_bytes != 4 || am_server || am_generator)
+ goto invalid_msg;
+ data = perform_io(4, PIO_INPUT_AND_CONSUME);
+ val = IVAL(data, 0);
+ if (!io_timeout || io_timeout > val) {
+ if (INFO_GTE(MISC, 2))
+ rprintf(FINFO, "Setting --timeout=%d to match server\n", val);
+ set_io_timeout(val);
+ }
break;
case MSG_NOOP:
if (am_sender)
exit_cleanup(RERR_STREAMIO);
}
data = perform_io(4, PIO_INPUT_AND_CONSUME);
+ val = IVAL(data, 0);
if (am_generator)
- got_flist_entry_status(FES_SUCCESS, IVAL(data, 0));
+ got_flist_entry_status(FES_SUCCESS, val);
else
- successful_send(IVAL(data, 0));
+ successful_send(val);
break;
case MSG_NO_SEND:
if (msg_bytes != 4)
goto invalid_msg;
data = perform_io(4, PIO_INPUT_AND_CONSUME);
+ val = IVAL(data, 0);
if (am_generator)
- got_flist_entry_status(FES_NO_SEND, IVAL(data, 0));
+ got_flist_entry_status(FES_NO_SEND, val);
else
- send_msg(MSG_NO_SEND, data, 4, 0);
+ send_msg_int(MSG_NO_SEND, val);
break;
case MSG_ERROR_SOCKET:
case MSG_ERROR_UTF8:
first_message = 0;
}
break;
+ case MSG_ERROR_EXIT:
+ if (msg_bytes == 0) {
+ if (!am_sender && !am_generator) {
+ send_msg(MSG_ERROR_EXIT, "", 0, 0);
+ io_flush(FULL_FLUSH);
+ }
+ val = 0;
+ } else if (msg_bytes == 4) {
+ data = perform_io(4, PIO_INPUT_AND_CONSUME);
+ val = IVAL(data, 0);
+ if (protocol_version >= 31) {
+ if (am_generator)
+ send_msg_int(MSG_ERROR_EXIT, val);
+ else
+ send_msg(MSG_ERROR_EXIT, "", 0, 0);
+ }
+ } else
+ goto invalid_msg;
+ /* Send a negative linenum so that we don't end up
+ * with a duplicate exit message. */
+ _exit_cleanup(val, __FILE__, 0 - __LINE__);
default:
rprintf(FERROR, "unexpected tag %d [%s%s]\n",
tag, who_am_i(), inc_recurse ? "/inc" : "");
if (!IN_MULTIPLEXED) {
memcpy(buf, perform_io(len, PIO_INPUT_AND_CONSUME), len);
+ total_data_read += len;
if (forward_flist_data)
write_buf(iobuf.out_fd, buf, len);
batch_copy:
/* The bytes at the "data" pointer will survive long
* enough to make a copy, but not past future I/O. */
memcpy(buf, data, siz);
+ total_data_read += siz;
if (forward_flist_data)
write_buf(iobuf.out_fd, buf, siz);
goto batch_copy;
}
- if (iobuf.out.size - iobuf.out.len < len)
+ if (iobuf.out.len + len > iobuf.out.size)
perform_io(len, PIO_NEED_OUTROOM);
pos = iobuf.out.pos + iobuf.out.len; /* Must be set after any flushing. */
memcpy(iobuf.out.buf + pos, buf, len);
iobuf.out.len += len;
+ total_data_written += len;
batch_copy:
if (f == write_batch_monitor_out)
/* Setup for multiplexing a MSG_* stream with the data stream. */
void io_start_multiplex_out(int fd)
{
+ io_flush(FULL_FLUSH);
+
if (msgs2stderr && DEBUG_GTE(IO, 2))
rprintf(FINFO, "[%s] io_start_multiplex_out(%d)\n", who_am_i(), fd);
- io_flush(FULL_FLUSH);
-
iobuf.out_empty_len = 4; /* See also OUT_MULTIPLEXED */
io_start_buffering_out(fd);
io_start_buffering_in(fd);
}
-void io_end_multiplex_in(BOOL free_buffers)
+int io_end_multiplex_in(int mode)
{
- if (DEBUG_GTE(IO, 2)) {
- rprintf(FINFO, "[%s] io_end_multiplex_in(%s)\n",
- who_am_i(), free_buffers ? "True" : "False");
- }
+ int ret = iobuf.in_multiplexed ? iobuf.in_fd : -1;
+
+ if (msgs2stderr && DEBUG_GTE(IO, 2))
+ rprintf(FINFO, "[%s] io_end_multiplex_in(mode=%d)\n", who_am_i(), mode);
iobuf.in_multiplexed = False;
- iobuf.raw_input_ends_before = 0;
- io_end_buffering_in(free_buffers);
+ if (mode == MPLX_SWITCHING)
+ iobuf.raw_input_ends_before = 0;
+ else
+ assert(iobuf.raw_input_ends_before == 0);
+ if (mode != MPLX_TO_BUFFERED)
+ io_end_buffering_in(mode);
+
+ return ret;
}
-/* Stop output multiplexing. */
-void io_end_multiplex_out(BOOL free_buffers)
+int io_end_multiplex_out(int mode)
{
- if (DEBUG_GTE(IO, 2)) {
- rprintf(FINFO, "[%s] io_end_multiplex_out(%s)\n",
- who_am_i(), free_buffers ? "True" : "False");
- }
+ int ret = iobuf.out_empty_len ? iobuf.out_fd : -1;
- io_end_buffering_out(free_buffers);
+ if (msgs2stderr && DEBUG_GTE(IO, 2))
+ rprintf(FINFO, "[%s] io_end_multiplex_out(mode=%d)\n", who_am_i(), mode);
+
+ if (mode != MPLX_TO_BUFFERED)
+ io_end_buffering_out(mode);
+ else
+ io_flush(FULL_FLUSH);
+
+ iobuf.out.len = 0;
iobuf.out_empty_len = 0;
+
+ return ret;
}
void start_write_batch(int fd)