+/* Buffer a message for the multiplexed output stream. Is never used for MSG_DATA. */
+int send_msg(enum msgcode code, const char *buf, size_t len, int convert)
+{
+ char *hdr;
+ size_t pos;
+ BOOL want_debug = DEBUG_GTE(IO, 1) && convert >= 0 && (msgs2stderr || code != MSG_INFO);
+
+ if (!OUT_MULTIPLEXED)
+ return 0;
+
+ if (want_debug)
+ rprintf(FINFO, "[%s] send_msg(%d, %ld)\n", who_am_i(), (int)code, (long)len);
+
+#ifdef ICONV_OPTION
+ if (convert > 0 && ic_send == (iconv_t)-1)
+ convert = 0;
+ if (convert > 0) {
+ /* Ensuring double-size room leaves space for a potential conversion. */
+ if (iobuf.msg.len + len*2 + 4 > iobuf.msg.size)
+ perform_io(len*2 + 4, PIO_NEED_MSGROOM);
+ } else
+#endif
+ if (iobuf.msg.len + len + 4 > iobuf.msg.size)
+ perform_io(len + 4, PIO_NEED_MSGROOM);
+
+ pos = iobuf.msg.pos + iobuf.msg.len; /* Must be set after any flushing. */
+ if (pos >= iobuf.msg.size)
+ pos -= iobuf.msg.size;
+ hdr = iobuf.msg.buf + pos;
+
+ iobuf.msg.len += 4; /* Leave room for the coming header bytes. */
+
+#ifdef ICONV_OPTION
+ if (convert > 0) {
+ xbuf inbuf;
+
+ INIT_XBUF(inbuf, (char*)buf, len, (size_t)-1);
+
+ len = iobuf.msg.len;
+ iconvbufs(ic_send, &inbuf, &iobuf.msg,
+ ICB_INCLUDE_BAD | ICB_INCLUDE_INCOMPLETE | ICB_CIRCULAR_OUT | ICB_INIT);
+ if (inbuf.len > 0) {
+ rprintf(FERROR, "overflowed iobuf.msg buffer in send_msg");
+ exit_cleanup(RERR_UNSUPPORTED);
+ }
+ len = iobuf.msg.len - len;
+ } else
+#endif
+ {
+ size_t siz;
+
+ if ((pos += 4) >= iobuf.msg.size)
+ pos -= iobuf.msg.size;
+
+ /* Handle a split copy if we wrap around the end of the circular buffer. */
+ if (pos >= iobuf.msg.pos && (siz = iobuf.msg.size - pos) < len) {
+ memcpy(iobuf.msg.buf + pos, buf, siz);
+ memcpy(iobuf.msg.buf, buf + siz, len - siz);
+ } else
+ memcpy(iobuf.msg.buf + pos, buf, len);
+
+ iobuf.msg.len += len;
+ }
+
+ SIVAL(hdr, 0, ((MPLEX_BASE + (int)code)<<24) + len);
+ /* If the header used any overflow bytes, move them to the start. */
+ if ((pos = hdr+4 - iobuf.msg.buf) > iobuf.msg.size) {
+ size_t siz = pos - iobuf.msg.size;
+ if (DEBUG_GTE(IO, 4))
+ rprintf(FINFO, "[%s] wrap-bytes moved: %d (send_msg)\n", who_am_i(), (int)siz);
+ memcpy(iobuf.msg.buf, hdr+4 - siz, siz);
+ }
+
+ if (want_debug && convert > 0)
+ rprintf(FINFO, "[%s] converted msg len=%ld\n", who_am_i(), (long)len);
+
+ return 1;
+}
+
+void send_msg_int(enum msgcode code, int num)
+{
+ char numbuf[4];
+
+ if (DEBUG_GTE(IO, 1))
+ rprintf(FINFO, "[%s] send_msg_int(%d, %d)\n", who_am_i(), (int)code, num);
+
+ SIVAL(numbuf, 0, num);
+ send_msg(code, numbuf, 4, -1);
+}
+
+static void got_flist_entry_status(enum festatus status, int ndx)
+{
+ struct file_list *flist = flist_for_ndx(ndx, "got_flist_entry_status");
+
+ if (remove_source_files) {
+ active_filecnt--;
+ active_bytecnt -= F_LENGTH(flist->files[ndx - flist->ndx_start]);
+ }
+
+ if (inc_recurse)
+ flist->in_progress--;
+
+ switch (status) {
+ case FES_SUCCESS:
+ if (remove_source_files)
+ send_msg_int(MSG_SUCCESS, ndx);
+ if (preserve_hard_links) {
+ struct file_struct *file = flist->files[ndx - flist->ndx_start];
+ if (F_IS_HLINKED(file)) {
+ flist_ndx_push(&hlink_list, ndx);
+ flist->in_progress++;
+ }
+ }
+ break;
+ case FES_REDO:
+ if (read_batch) {
+ if (inc_recurse)
+ flist->in_progress++;
+ break;
+ }
+ if (inc_recurse)
+ flist->to_redo++;
+ flist_ndx_push(&redo_list, ndx);
+ break;
+ case FES_NO_SEND:
+ break;
+ }
+}
+
+/* Note the fds used for the main socket (which might really be a pipe
+ * for a local transfer, but we can ignore that). */
+void io_set_sock_fds(int f_in, int f_out)
+{
+ sock_f_in = f_in;
+ sock_f_out = f_out;
+}
+
+void set_io_timeout(int secs)
+{
+ io_timeout = secs;
+
+ if (!io_timeout || io_timeout > SELECT_TIMEOUT)
+ select_timeout = SELECT_TIMEOUT;
+ else
+ select_timeout = io_timeout;
+
+ allowed_lull = read_batch ? 0 : (io_timeout + 1) / 2;
+}
+
+static void check_for_d_option_error(const char *msg)
+{
+ static char rsync263_opts[] = "BCDHIKLPRSTWabceghlnopqrtuvxz";
+ char *colon;
+ int saw_d = 0;
+
+ if (*msg != 'r'
+ || strncmp(msg, REMOTE_OPTION_ERROR, sizeof REMOTE_OPTION_ERROR - 1) != 0)
+ return;
+
+ msg += sizeof REMOTE_OPTION_ERROR - 1;
+ if (*msg == '-' || (colon = strchr(msg, ':')) == NULL
+ || strncmp(colon, REMOTE_OPTION_ERROR2, sizeof REMOTE_OPTION_ERROR2 - 1) != 0)
+ return;
+
+ for ( ; *msg != ':'; msg++) {
+ if (*msg == 'd')
+ saw_d = 1;
+ else if (*msg == 'e')
+ break;
+ else if (strchr(rsync263_opts, *msg) == NULL)
+ return;
+ }
+
+ if (saw_d) {
+ rprintf(FWARNING,
+ "*** Try using \"--old-d\" if remote rsync is <= 2.6.3 ***\n");
+ }
+}
+
+/* This is used by the generator to limit how many file transfers can
+ * be active at once when --remove-source-files is specified. Without
+ * this, sender-side deletions were mostly happening at the end. */
+void increment_active_files(int ndx, int itemizing, enum logcode code)
+{
+ while (1) {
+ /* TODO: tune these limits? */
+ int limit = active_bytecnt >= 128*1024 ? 10 : 50;
+ if (active_filecnt < limit)
+ break;
+ check_for_finished_files(itemizing, code, 0);
+ if (active_filecnt < limit)
+ break;
+ wait_for_receiver();
+ }
+
+ active_filecnt++;
+ active_bytecnt += F_LENGTH(cur_flist->files[ndx - cur_flist->ndx_start]);
+}
+
+int get_redo_num(void)
+{
+ return flist_ndx_pop(&redo_list);
+}
+
+int get_hlink_num(void)
+{
+ return flist_ndx_pop(&hlink_list);
+}
+
+/* When we're the receiver and we have a local --files-from list of names
+ * that needs to be sent over the socket to the sender, we have to do two
+ * things at the same time: send the sender a list of what files we're
+ * processing and read the incoming file+info list from the sender. We do
+ * this by making recv_file_list() call forward_filesfrom_data(), which
+ * will ensure that we forward data to the sender until we get some data
+ * for recv_file_list() to use. */
+void start_filesfrom_forwarding(int fd)
+{
+ if (protocol_version < 31 && OUT_MULTIPLEXED) {
+ /* Older protocols send the files-from data w/o packaging
+ * it in multiplexed I/O packets, so temporarily switch
+ * to buffered I/O to match this behavior. */
+ iobuf.msg.pos = iobuf.msg.len = 0; /* Be extra sure no messages go out. */
+ ff_reenable_multiplex = io_end_multiplex_out(MPLX_TO_BUFFERED);
+ }
+ ff_forward_fd = fd;
+
+ alloc_xbuf(&ff_xb, FILESFROM_BUFLEN);
+}
+
+/* Read a line into the "buf" buffer. */
+int read_line(int fd, char *buf, size_t bufsiz, int flags)