+BOOL io_start_buffering_out(int f_out)
+{
+ if (msgs2stderr && DEBUG_GTE(IO, 2))
+ rprintf(FINFO, "[%s] io_start_buffering_out(%d)\n", who_am_i(), f_out);
+
+ if (iobuf.out.buf) {
+ if (iobuf.out_fd == -1)
+ iobuf.out_fd = f_out;
+ else
+ assert(f_out == iobuf.out_fd);
+ return False;
+ }
+
+ alloc_xbuf(&iobuf.out, ROUND_UP_1024(IO_BUFFER_SIZE * 2));
+ iobuf.out_fd = f_out;
+
+ return True;
+}
+
+BOOL io_start_buffering_in(int f_in)
+{
+ if (msgs2stderr && DEBUG_GTE(IO, 2))
+ rprintf(FINFO, "[%s] io_start_buffering_in(%d)\n", who_am_i(), f_in);
+
+ if (iobuf.in.buf) {
+ if (iobuf.in_fd == -1)
+ iobuf.in_fd = f_in;
+ else
+ assert(f_in == iobuf.in_fd);
+ return False;
+ }
+
+ alloc_xbuf(&iobuf.in, ROUND_UP_1024(IO_BUFFER_SIZE));
+ iobuf.in_fd = f_in;
+
+ return True;
+}
+
+void io_end_buffering_in(BOOL free_buffers)
+{
+ if (msgs2stderr && DEBUG_GTE(IO, 2)) {
+ rprintf(FINFO, "[%s] io_end_buffering_in(IOBUF_%s_BUFS)\n",
+ who_am_i(), free_buffers ? "FREE" : "KEEP");
+ }
+
+ if (free_buffers)
+ free_xbuf(&iobuf.in);
+ else
+ iobuf.in.pos = iobuf.in.len = 0;
+
+ iobuf.in_fd = -1;
+}
+
+void io_end_buffering_out(BOOL free_buffers)
+{
+ if (msgs2stderr && DEBUG_GTE(IO, 2)) {
+ rprintf(FINFO, "[%s] io_end_buffering_out(IOBUF_%s_BUFS)\n",
+ who_am_i(), free_buffers ? "FREE" : "KEEP");
+ }
+
+ io_flush(FULL_FLUSH);
+
+ if (free_buffers) {
+ free_xbuf(&iobuf.out);
+ free_xbuf(&iobuf.msg);
+ }
+
+ iobuf.out_fd = -1;
+}
+
+void maybe_flush_socket(int important)
+{
+ if (flist_eof && iobuf.out.buf && iobuf.out.len > iobuf.out_empty_len
+ && (important || time(NULL) - last_io_out >= 5))
+ io_flush(NORMAL_FLUSH);
+}
+
+/* Older rsync versions used to send either a MSG_NOOP (protocol 30) or a
+ * raw-data-based keep-alive (protocol 29), both of which implied forwarding of
+ * the message through the sender. Since the new timeout method does not need
+ * any forwarding, we just send an empty MSG_DATA message, which works with all
+ * rsync versions. This avoids any message forwarding, and leaves the raw-data
+ * stream alone (since we can never be quite sure if that stream is in the
+ * right state for a keep-alive message). */
+void maybe_send_keepalive(time_t now, int flags)
+{
+ if (flags & MSK_ACTIVE_RECEIVER)
+ last_io_in = now; /* Fudge things when we're working hard on the files. */
+
+ if (now - last_io_out >= allowed_lull) {
+ /* The receiver is special: it only sends keep-alive messages if it is
+ * actively receiving data. Otherwise, it lets the generator timeout. */
+ if (am_receiver && now - last_io_in >= io_timeout)
+ return;
+
+ if (!iobuf.msg.len && iobuf.out.len == iobuf.out_empty_len)
+ send_msg(MSG_DATA, "", 0, 0);
+ if (!(flags & MSK_ALLOW_FLUSH)) {
+ /* Let the caller worry about writing out the data. */
+ } else if (iobuf.msg.len)
+ perform_io(iobuf.msg.size - iobuf.msg.len + 1, PIO_NEED_MSGROOM);
+ else if (iobuf.out.len > iobuf.out_empty_len)
+ io_flush(NORMAL_FLUSH);
+ }
+}
+
+void start_flist_forward(int ndx)
+{
+ write_int(iobuf.out_fd, ndx);
+ forward_flist_data = 1;
+}
+
+void stop_flist_forward(void)
+{
+ forward_flist_data = 0;
+}
+
+/* Read a message from a multiplexed source. */
+static void read_a_msg(void)
+{
+ char data[BIGPATHBUFLEN];
+ int tag, val;
+ size_t msg_bytes;
+
+ /* This ensures that perform_io() does not try to do any message reading
+ * until we've read all of the data for this message. We should also
+ * try to avoid calling things that will cause data to be written via
+ * perform_io() prior to this being reset to 1. */
+ iobuf.in_multiplexed = -1;
+
+ tag = raw_read_int();
+
+ msg_bytes = tag & 0xFFFFFF;
+ tag = (tag >> 24) - MPLEX_BASE;
+
+ if (DEBUG_GTE(IO, 1) && msgs2stderr)
+ rprintf(FINFO, "[%s] got msg=%d, len=%ld\n", who_am_i(), (int)tag, (long)msg_bytes);
+
+ switch (tag) {
+ case MSG_DATA:
+ assert(iobuf.raw_input_ends_before == 0);
+ /* Though this does not yet read the data, we do mark where in
+ * the buffer the msg data will end once it is read. It is
+ * possible that this points off the end of the buffer, in
+ * which case the gradual reading of the input stream will
+ * cause this value to wrap around and eventually become real. */
+ if (msg_bytes)
+ iobuf.raw_input_ends_before = iobuf.in.pos + msg_bytes;
+ iobuf.in_multiplexed = 1;
+ break;
+ case MSG_STATS:
+ if (msg_bytes != sizeof stats.total_read || !am_generator)
+ goto invalid_msg;
+ raw_read_buf((char*)&stats.total_read, sizeof stats.total_read);
+ iobuf.in_multiplexed = 1;
+ break;
+ case MSG_REDO:
+ if (msg_bytes != 4 || !am_generator)
+ goto invalid_msg;
+ val = raw_read_int();
+ iobuf.in_multiplexed = 1;
+ got_flist_entry_status(FES_REDO, val);
+ break;
+ case MSG_IO_ERROR:
+ if (msg_bytes != 4)
+ goto invalid_msg;
+ val = raw_read_int();
+ iobuf.in_multiplexed = 1;
+ io_error |= val;
+ if (am_receiver)
+ send_msg_int(MSG_IO_ERROR, val);
+ break;
+ case MSG_IO_TIMEOUT:
+ if (msg_bytes != 4 || am_server || am_generator)
+ goto invalid_msg;
+ val = raw_read_int();
+ iobuf.in_multiplexed = 1;
+ if (!io_timeout || io_timeout > val) {
+ if (INFO_GTE(MISC, 2))
+ rprintf(FINFO, "Setting --timeout=%d to match server\n", val);
+ set_io_timeout(val);
+ }
+ break;
+ case MSG_NOOP:
+ /* Support protocol-30 keep-alive method. */
+ if (msg_bytes != 0)
+ goto invalid_msg;
+ iobuf.in_multiplexed = 1;
+ if (am_sender)
+ maybe_send_keepalive(time(NULL), MSK_ALLOW_FLUSH);
+ break;
+ case MSG_DELETED:
+ if (msg_bytes >= sizeof data)
+ goto overflow;
+ if (am_generator) {
+ raw_read_buf(data, msg_bytes);
+ iobuf.in_multiplexed = 1;
+ send_msg(MSG_DELETED, data, msg_bytes, 1);
+ break;
+ }
+#ifdef ICONV_OPTION
+ if (ic_recv != (iconv_t)-1) {
+ xbuf outbuf, inbuf;
+ char ibuf[512];
+ int add_null = 0;
+ int flags = ICB_INCLUDE_BAD | ICB_INIT;
+
+ INIT_CONST_XBUF(outbuf, data);
+ INIT_XBUF(inbuf, ibuf, 0, (size_t)-1);
+
+ while (msg_bytes) {
+ size_t len = msg_bytes > sizeof ibuf - inbuf.len ? sizeof ibuf - inbuf.len : msg_bytes;
+ raw_read_buf(ibuf + inbuf.len, len);
+ inbuf.pos = 0;
+ inbuf.len += len;
+ if (!(msg_bytes -= len) && !ibuf[inbuf.len-1])
+ inbuf.len--, add_null = 1;
+ if (iconvbufs(ic_send, &inbuf, &outbuf, flags) < 0) {
+ if (errno == E2BIG)
+ goto overflow;
+ /* Buffer ended with an incomplete char, so move the
+ * bytes to the start of the buffer and continue. */
+ memmove(ibuf, ibuf + inbuf.pos, inbuf.len);
+ }
+ flags &= ~ICB_INIT;
+ }
+ if (add_null) {
+ if (outbuf.len == outbuf.size)
+ goto overflow;
+ outbuf.buf[outbuf.len++] = '\0';