xbuf in, out, msg;
int in_fd;
int out_fd; /* Both "out" and "msg" go to this fd. */
- BOOL in_multiplexed;
+ int in_multiplexed;
unsigned out_empty_len;
size_t raw_data_header_pos; /* in the out xbuf */
size_t raw_flushing_ends_before; /* in the out xbuf */
#define IOBUF_WAS_REDUCED(siz) ((siz) & 0xFF)
#define IOBUF_RESTORE_SIZE(siz) (((siz) | 0xFF) + 1)
-#define IN_MULTIPLEXED (iobuf.in_multiplexed)
+#define IN_MULTIPLEXED (iobuf.in_multiplexed != 0)
+#define IN_MULTIPLEXED_AND_READY (iobuf.in_multiplexed > 0)
#define OUT_MULTIPLEXED (iobuf.out_empty_len != 0)
#define PIO_NEED_INPUT (1<<0) /* The *_NEED_* flags are mutually exclusive. */
iobuf.in.len += n;
}
- if (iobuf.out_fd >= 0 && FD_ISSET(iobuf.out_fd, &w_fds)) {
+ if (out && FD_ISSET(iobuf.out_fd, &w_fds)) {
size_t len = iobuf.raw_flushing_ends_before ? iobuf.raw_flushing_ends_before - out->pos : out->len;
int n;
/* We need to help prevent deadlock by doing what reading
* we can whenever we are here trying to write. */
- if (IN_MULTIPLEXED && !(flags & PIO_NEED_INPUT)) {
+ if (IN_MULTIPLEXED_AND_READY && !(flags & PIO_NEED_INPUT)) {
while (!iobuf.raw_input_ends_before && iobuf.in.len > 512)
read_a_msg();
if (flist_receiving_enabled && iobuf.in.len > 512)
/* Read a message from a multiplexed source. */
static void read_a_msg(void)
{
- char line[BIGPATHBUFLEN];
+ char data[BIGPATHBUFLEN];
int tag, val;
size_t msg_bytes;
+ /* This ensures that perform_io() does not try to do any message reading
+ * until we've read all of the data for this message. We should also
+ * try to avoid calling things that will cause data to be written via
+ * perform_io() prior to this being reset to 1. */
+ iobuf.in_multiplexed = -1;
+
tag = raw_read_int();
msg_bytes = tag & 0xFFFFFF;
tag = (tag >> 24) - MPLEX_BASE;
- if (DEBUG_GTE(IO, 1) && (msgs2stderr || tag != MSG_INFO))
+ if (DEBUG_GTE(IO, 1) && msgs2stderr)
rprintf(FINFO, "[%s] got msg=%d, len=%ld\n", who_am_i(), (int)tag, (long)msg_bytes);
switch (tag) {
* which case the gradual reading of the input stream will
* cause this value to decrease and eventually become real. */
iobuf.raw_input_ends_before = iobuf.in.pos + msg_bytes;
+ iobuf.in_multiplexed = 1;
break;
case MSG_STATS:
if (msg_bytes != sizeof stats.total_read || !am_generator)
goto invalid_msg;
raw_read_buf((char*)&stats.total_read, sizeof stats.total_read);
+ iobuf.in_multiplexed = 1;
break;
case MSG_REDO:
if (msg_bytes != 4 || !am_generator)
goto invalid_msg;
- got_flist_entry_status(FES_REDO, raw_read_int());
+ val = raw_read_int();
+ iobuf.in_multiplexed = 1;
+ got_flist_entry_status(FES_REDO, val);
break;
case MSG_IO_ERROR:
if (msg_bytes != 4 || am_sender)
goto invalid_msg;
val = raw_read_int();
+ iobuf.in_multiplexed = 1;
io_error |= val;
if (!am_generator)
send_msg_int(MSG_IO_ERROR, val);
if (msg_bytes != 4 || am_server || am_generator)
goto invalid_msg;
val = raw_read_int();
+ iobuf.in_multiplexed = 1;
if (!io_timeout || io_timeout > val) {
if (INFO_GTE(MISC, 2))
rprintf(FINFO, "Setting --timeout=%d to match server\n", val);
case MSG_NOOP:
if (am_sender)
maybe_send_keepalive();
+ iobuf.in_multiplexed = 1;
break;
case MSG_DELETED:
- if (msg_bytes >= sizeof line)
+ if (msg_bytes >= sizeof data)
goto overflow;
if (am_generator) {
- raw_read_buf(line, msg_bytes);
- send_msg(MSG_DELETED, line, msg_bytes, 1);
+ raw_read_buf(data, msg_bytes);
+ iobuf.in_multiplexed = 1;
+ send_msg(MSG_DELETED, data, msg_bytes, 1);
break;
}
#ifdef ICONV_OPTION
int add_null = 0;
int flags = ICB_INCLUDE_BAD | ICB_INIT;
- INIT_CONST_XBUF(outbuf, line);
+ INIT_CONST_XBUF(outbuf, data);
INIT_XBUF(inbuf, ibuf, 0, (size_t)-1);
while (msg_bytes) {
msg_bytes = outbuf.len;
} else
#endif
- raw_read_buf(line, msg_bytes);
+ raw_read_buf(data, msg_bytes);
+ iobuf.in_multiplexed = 1;
/* A directory name was sent with the trailing null */
- if (msg_bytes > 0 && !line[msg_bytes-1])
- log_delete(line, S_IFDIR);
+ if (msg_bytes > 0 && !data[msg_bytes-1])
+ log_delete(data, S_IFDIR);
else {
- line[msg_bytes] = '\0';
- log_delete(line, S_IFREG);
+ data[msg_bytes] = '\0';
+ log_delete(data, S_IFREG);
}
break;
case MSG_SUCCESS:
exit_cleanup(RERR_STREAMIO);
}
val = raw_read_int();
+ iobuf.in_multiplexed = 1;
if (am_generator)
got_flist_entry_status(FES_SUCCESS, val);
else
if (msg_bytes != 4)
goto invalid_msg;
val = raw_read_int();
+ iobuf.in_multiplexed = 1;
if (am_generator)
got_flist_entry_status(FES_NO_SEND, val);
else
case MSG_ERROR:
case MSG_ERROR_XFER:
case MSG_WARNING:
- if (msg_bytes >= sizeof line) {
+ if (msg_bytes >= sizeof data) {
overflow:
rprintf(FERROR,
"multiplexing overflow %d:%lu [%s%s]\n",
inc_recurse ? "/inc" : "");
exit_cleanup(RERR_STREAMIO);
}
- raw_read_buf(line, msg_bytes);
- rwrite((enum logcode)tag, line, msg_bytes, !am_generator);
+ raw_read_buf(data, msg_bytes);
+ iobuf.in_multiplexed = 1;
+ rwrite((enum logcode)tag, data, msg_bytes, !am_generator);
if (first_message) {
- if (list_only && !am_sender && tag == 1 && msg_bytes < sizeof line) {
- line[msg_bytes] = '\0';
- check_for_d_option_error(line);
+ if (list_only && !am_sender && tag == 1 && msg_bytes < sizeof data) {
+ data[msg_bytes] = '\0';
+ check_for_d_option_error(data);
}
first_message = 0;
}
break;
case MSG_ERROR_EXIT:
+ if (msg_bytes == 4)
+ val = raw_read_int();
+ else if (msg_bytes == 0)
+ val = 0;
+ else
+ goto invalid_msg;
+ iobuf.in_multiplexed = 1;
if (DEBUG_GTE(EXIT, 3))
- rprintf(FINFO, "[%s] got MSG_ERROR_EXIT with %d bytes\n", who_am_i(), msg_bytes);
+ rprintf(FINFO, "[%s] got MSG_ERROR_EXIT with %ld bytes\n", who_am_i(), (long)msg_bytes);
if (msg_bytes == 0) {
if (!am_sender && !am_generator) {
if (DEBUG_GTE(EXIT, 3)) {
io_flush(FULL_FLUSH);
}
val = 0;
- } else if (msg_bytes == 4) {
+ } else {
val = raw_read_int();
if (protocol_version >= 31) {
if (am_generator) {
send_msg(MSG_ERROR_EXIT, "", 0, 0);
}
}
- } else
- goto invalid_msg;
+ }
/* Send a negative linenum so that we don't end up
* with a duplicate exit message. */
_exit_cleanup(val, __FILE__, 0 - __LINE__);
tag, who_am_i(), inc_recurse ? "/inc" : "");
exit_cleanup(RERR_STREAMIO);
}
+
+ assert(iobuf.in_multiplexed > 0);
}
static void drain_multiplex_messages(void)
{
- while (IN_MULTIPLEXED && iobuf.in.len) {
+ while (IN_MULTIPLEXED_AND_READY && iobuf.in.len) {
if (iobuf.raw_input_ends_before) {
size_t raw_len = iobuf.raw_input_ends_before - iobuf.in.pos;
iobuf.raw_input_ends_before = 0;
if (msgs2stderr && DEBUG_GTE(IO, 2))
rprintf(FINFO, "[%s] io_start_multiplex_in(%d)\n", who_am_i(), fd);
- iobuf.in_multiplexed = True; /* See also IN_MULTIPLEXED */
+ iobuf.in_multiplexed = 1; /* See also IN_MULTIPLEXED */
io_start_buffering_in(fd);
}
if (msgs2stderr && DEBUG_GTE(IO, 2))
rprintf(FINFO, "[%s] io_end_multiplex_in(mode=%d)\n", who_am_i(), mode);
- iobuf.in_multiplexed = False;
+ iobuf.in_multiplexed = 0;
if (mode == MPLX_SWITCHING)
iobuf.raw_input_ends_before = 0;
else