+void io_end_buffering_out(BOOL free_buffers)
+{
+ if (DEBUG_GTE(IO, 2)) {
+ rprintf(FINFO, "[%s] io_end_buffering_out(%s)\n",
+ who_am_i(), free_buffers ? "True" : "False");
+ }
+
+ io_flush(FULL_FLUSH);
+
+ if (free_buffers) {
+ free_xbuf(&iobuf.out);
+ free_xbuf(&iobuf.msg);
+ } else {
+ iobuf.out.pos = iobuf.out.len = 0;
+ iobuf.msg.pos = iobuf.msg.len = 0;
+ }
+
+ iobuf.out_fd = -1;
+}
+
+void maybe_flush_socket(int important)
+{
+ if (flist_eof && iobuf.out.buf && iobuf.out.len > iobuf.out_empty_len
+ && (important || time(NULL) - last_io_out >= 5))
+ io_flush(NORMAL_FLUSH);
+}
+
+void maybe_send_keepalive(void)
+{
+ if (time(NULL) - last_io_out >= allowed_lull) {
+ if (!iobuf.msg.len && iobuf.out.len == iobuf.out_empty_len) {
+ if (protocol_version < 29)
+ return; /* there's nothing we can do */
+ if (protocol_version >= 30)
+ send_msg(MSG_NOOP, "", 0, 0);
+ else {
+ write_int(iobuf.out_fd, cur_flist->used);
+ write_shortint(iobuf.out_fd, ITEM_IS_NEW);
+ }
+ }
+ if (iobuf.msg.len)
+ perform_io(iobuf.msg.size - iobuf.msg.len + 1, PIO_NEED_MSGROOM);
+ else if (iobuf.out.len > iobuf.out_empty_len)
+ io_flush(NORMAL_FLUSH);
+ }
+}
+
+void start_flist_forward(int ndx)
+{
+ write_int(iobuf.out_fd, ndx);
+ forward_flist_data = 1;
+}
+
+void stop_flist_forward(void)
+{
+ forward_flist_data = 0;
+}
+
+/* Read a message from a multiplexed source. */
+static void read_a_msg(void)
+{
+ char *data, line[BIGPATHBUFLEN];
+ int tag;
+ size_t msg_bytes;
+
+ data = perform_io(4, PIO_INPUT_AND_CONSUME);
+ tag = IVAL(data, 0);
+
+ msg_bytes = tag & 0xFFFFFF;
+ tag = (tag >> 24) - MPLEX_BASE;
+
+ if (DEBUG_GTE(IO, 1) && (msgs2stderr || tag != MSG_INFO))
+ rprintf(FINFO, "[%s] got msg=%d, len=%ld\n", who_am_i(), (int)tag, (long)msg_bytes);
+
+ switch (tag) {
+ case MSG_DATA:
+ assert(iobuf.raw_input_ends_before == 0);
+ /* Though this does not yet read the data, we do mark where in
+ * the buffer the msg data will end once it is read. It is
+ * possible that this points off the end of the buffer, in
+ * which case the gradual reading of the input stream will
+ * cause this value to decrease and eventually become real. */
+ iobuf.raw_input_ends_before = iobuf.in.pos + msg_bytes;
+ break;
+ case MSG_STATS:
+ if (msg_bytes != sizeof stats.total_read || !am_generator)
+ goto invalid_msg;
+ data = perform_io(sizeof stats.total_read, PIO_INPUT_AND_CONSUME);
+ memcpy((char*)&stats.total_read, data, sizeof stats.total_read);
+ break;
+ case MSG_REDO:
+ if (msg_bytes != 4 || !am_generator)
+ goto invalid_msg;
+ data = perform_io(4, PIO_INPUT_AND_CONSUME);
+ got_flist_entry_status(FES_REDO, IVAL(data, 0));
+ break;
+ case MSG_IO_ERROR:
+ if (msg_bytes != 4 || am_sender)
+ goto invalid_msg;
+ data = perform_io(4, PIO_INPUT_AND_CONSUME);
+ io_error |= IVAL(data, 0);
+ if (!am_generator)
+ send_msg(MSG_IO_ERROR, data, 4, 0);
+ break;
+ case MSG_NOOP:
+ if (am_sender)
+ maybe_send_keepalive();
+ break;
+ case MSG_DELETED:
+ if (msg_bytes >= sizeof line)
+ goto overflow;
+ if (am_generator) {
+ memcpy(line, perform_io(msg_bytes, PIO_INPUT_AND_CONSUME), msg_bytes);
+ send_msg(MSG_DELETED, line, msg_bytes, 1);
+ break;
+ }
+#ifdef ICONV_OPTION
+ if (ic_recv != (iconv_t)-1) {
+ xbuf outbuf, inbuf;
+ char ibuf[512];
+ int add_null = 0;
+
+ INIT_CONST_XBUF(outbuf, line);
+ INIT_XBUF(inbuf, ibuf, 0, (size_t)-1);
+
+ while (msg_bytes) {
+ inbuf.len = msg_bytes > sizeof ibuf ? sizeof ibuf : msg_bytes;
+ memcpy(inbuf.buf, perform_io(inbuf.len, PIO_INPUT_AND_CONSUME), inbuf.len);
+ if (!(msg_bytes -= inbuf.len)
+ && !ibuf[inbuf.len-1])
+ inbuf.len--, add_null = 1;
+ if (iconvbufs(ic_send, &inbuf, &outbuf,
+ ICB_INCLUDE_BAD | ICB_INCLUDE_INCOMPLETE) < 0)
+ goto overflow;
+ }
+ if (add_null) {
+ if (outbuf.len == outbuf.size)
+ goto overflow;
+ outbuf.buf[outbuf.len++] = '\0';
+ }
+ msg_bytes = outbuf.len;
+ } else
+#endif
+ memcpy(line, perform_io(msg_bytes, PIO_INPUT_AND_CONSUME), msg_bytes);
+ /* A directory name was sent with the trailing null */
+ if (msg_bytes > 0 && !line[msg_bytes-1])
+ log_delete(line, S_IFDIR);
+ else {
+ line[msg_bytes] = '\0';
+ log_delete(line, S_IFREG);
+ }
+ break;
+ case MSG_SUCCESS:
+ if (msg_bytes != 4) {
+ invalid_msg:
+ rprintf(FERROR, "invalid multi-message %d:%lu [%s%s]\n",
+ tag, (unsigned long)msg_bytes, who_am_i(),
+ inc_recurse ? "/inc" : "");
+ exit_cleanup(RERR_STREAMIO);
+ }
+ data = perform_io(4, PIO_INPUT_AND_CONSUME);
+ if (am_generator)
+ got_flist_entry_status(FES_SUCCESS, IVAL(data, 0));
+ else
+ successful_send(IVAL(data, 0));
+ break;
+ case MSG_NO_SEND:
+ if (msg_bytes != 4)
+ goto invalid_msg;
+ data = perform_io(4, PIO_INPUT_AND_CONSUME);
+ if (am_generator)
+ got_flist_entry_status(FES_NO_SEND, IVAL(data, 0));
+ else
+ send_msg(MSG_NO_SEND, data, 4, 0);
+ break;
+ case MSG_ERROR_SOCKET:
+ case MSG_ERROR_UTF8:
+ case MSG_CLIENT:
+ case MSG_LOG:
+ if (!am_generator)
+ goto invalid_msg;
+ if (tag == MSG_ERROR_SOCKET)
+ msgs2stderr = 1;
+ /* FALL THROUGH */
+ case MSG_INFO:
+ case MSG_ERROR:
+ case MSG_ERROR_XFER:
+ case MSG_WARNING:
+ if (msg_bytes >= sizeof line) {
+ overflow:
+ rprintf(FERROR,
+ "multiplexing overflow %d:%lu [%s%s]\n",
+ tag, (unsigned long)msg_bytes, who_am_i(),
+ inc_recurse ? "/inc" : "");
+ exit_cleanup(RERR_STREAMIO);
+ }
+ memcpy(line, perform_io(msg_bytes, PIO_INPUT_AND_CONSUME), msg_bytes);
+ rwrite((enum logcode)tag, line, msg_bytes, !am_generator);
+ if (first_message) {
+ if (list_only && !am_sender && tag == 1 && msg_bytes < sizeof line) {
+ line[msg_bytes] = '\0';
+ check_for_d_option_error(line);
+ }
+ first_message = 0;
+ }
+ break;
+ default:
+ rprintf(FERROR, "unexpected tag %d [%s%s]\n",
+ tag, who_am_i(), inc_recurse ? "/inc" : "");
+ exit_cleanup(RERR_STREAMIO);
+ }
+}