+/* Read a message from a multiplexed source. */
+static void read_a_msg(void)
+{
+ char *data, line[BIGPATHBUFLEN];
+ int tag, val;
+ size_t msg_bytes;
+
+ data = perform_io(4, PIO_INPUT_AND_CONSUME);
+ tag = IVAL(data, 0);
+
+ msg_bytes = tag & 0xFFFFFF;
+ tag = (tag >> 24) - MPLEX_BASE;
+
+ if (DEBUG_GTE(IO, 1) && (msgs2stderr || tag != MSG_INFO))
+ rprintf(FINFO, "[%s] got msg=%d, len=%ld\n", who_am_i(), (int)tag, (long)msg_bytes);
+
+ switch (tag) {
+ case MSG_DATA:
+ assert(iobuf.raw_input_ends_before == 0);
+ /* Though this does not yet read the data, we do mark where in
+ * the buffer the msg data will end once it is read. It is
+ * possible that this points off the end of the buffer, in
+ * which case the gradual reading of the input stream will
+ * cause this value to decrease and eventually become real. */
+ iobuf.raw_input_ends_before = iobuf.in.pos + msg_bytes;
+ break;
+ case MSG_STATS:
+ if (msg_bytes != sizeof stats.total_read || !am_generator)
+ goto invalid_msg;
+ data = perform_io(sizeof stats.total_read, PIO_INPUT_AND_CONSUME);
+ memcpy((char*)&stats.total_read, data, sizeof stats.total_read);
+ break;
+ case MSG_REDO:
+ if (msg_bytes != 4 || !am_generator)
+ goto invalid_msg;
+ data = perform_io(4, PIO_INPUT_AND_CONSUME);
+ got_flist_entry_status(FES_REDO, IVAL(data, 0));
+ break;
+ case MSG_IO_ERROR:
+ if (msg_bytes != 4 || am_sender)
+ goto invalid_msg;
+ data = perform_io(4, PIO_INPUT_AND_CONSUME);
+ io_error |= IVAL(data, 0);
+ if (!am_generator)
+ send_msg(MSG_IO_ERROR, data, 4, 0);
+ break;
+ case MSG_IO_TIMEOUT:
+ if (msg_bytes != 4 || am_server || am_generator)
+ goto invalid_msg;
+ data = perform_io(4, PIO_INPUT_AND_CONSUME);
+ val = IVAL(data, 0);
+ if (!io_timeout || io_timeout > val) {
+ if (INFO_GTE(MISC, 2))
+ rprintf(FINFO, "Setting --timeout=%d to match server\n", val);
+ io_timeout = val;
+ }
+ break;
+ case MSG_NOOP:
+ if (am_sender)
+ maybe_send_keepalive();
+ break;
+ case MSG_DELETED:
+ if (msg_bytes >= sizeof line)
+ goto overflow;
+ if (am_generator) {
+ memcpy(line, perform_io(msg_bytes, PIO_INPUT_AND_CONSUME), msg_bytes);
+ send_msg(MSG_DELETED, line, msg_bytes, 1);
+ break;
+ }
+#ifdef ICONV_OPTION
+ if (ic_recv != (iconv_t)-1) {
+ xbuf outbuf, inbuf;
+ char ibuf[512];
+ int add_null = 0;
+ int flags = ICB_INCLUDE_BAD | ICB_INIT;
+
+ INIT_CONST_XBUF(outbuf, line);
+ INIT_XBUF(inbuf, ibuf, 0, (size_t)-1);
+
+ while (msg_bytes) {
+ size_t len = msg_bytes > sizeof ibuf - inbuf.len ? sizeof ibuf - inbuf.len : msg_bytes;
+ memcpy(ibuf + inbuf.len, perform_io(len, PIO_INPUT_AND_CONSUME), len);
+ inbuf.pos = 0;
+ inbuf.len += len;
+ if (!(msg_bytes -= len) && !ibuf[inbuf.len-1])
+ inbuf.len--, add_null = 1;
+ if (iconvbufs(ic_send, &inbuf, &outbuf, flags) < 0) {
+ if (errno == E2BIG)
+ goto overflow;
+ /* Buffer ended with an incomplete char, so move the
+ * bytes to the start of the buffer and continue. */
+ memmove(ibuf, ibuf + inbuf.pos, inbuf.len);
+ }
+ flags &= ~ICB_INIT;