+/* Note the fds used for the main socket (which might really be a pipe
+ * for a local transfer, but we can ignore that). */
+void io_set_sock_fds(int f_in, int f_out)
+{
+ sock_f_in = f_in;
+ sock_f_out = f_out;
+}
+
+void set_io_timeout(int secs)
+{
+ io_timeout = secs;
+
+ if (!io_timeout || io_timeout > SELECT_TIMEOUT)
+ select_timeout = SELECT_TIMEOUT;
+ else
+ select_timeout = io_timeout;
+
+ allowed_lull = read_batch ? 0 : (io_timeout + 1) / 2;
+}
+
+/* Setup the fd used to receive MSG_* messages. Only needed during the
+ * early stages of being a local sender (up through the sending of the
+ * file list) or when we're the generator (to fetch the messages from
+ * the receiver). */
+void set_msg_fd_in(int fd)
+{
+ msg_fd_in = fd;
+}
+
+/* Setup the fd used to send our MSG_* messages. Only needed when
+ * we're the receiver (to send our messages to the generator). */
+void set_msg_fd_out(int fd)
+{
+ msg_fd_out = fd;
+ set_nonblocking(msg_fd_out);
+}
+
+/* Add a message to the pending MSG_* list. */
+static void msg_list_add(struct msg_list *lst, int code, const char *buf, int len)
+{
+ struct msg_list_item *m;
+ int sz = len + 4 + sizeof m[0] - 1;
+
+ if (!(m = (struct msg_list_item *)new_array(char, sz)))
+ out_of_memory("msg_list_add");
+ m->next = NULL;
+ m->len = len + 4;
+ SIVAL(m->buf, 0, ((code+MPLEX_BASE)<<24) | len);
+ memcpy(m->buf + 4, buf, len);
+ if (lst->tail)
+ lst->tail->next = m;
+ else
+ lst->head = m;
+ lst->tail = m;
+}
+
+static void msg2sndr_flush(void)
+{
+ while (msg2sndr.head && io_multiplexing_out) {
+ struct msg_list_item *m = msg2sndr.head;
+ if (!(msg2sndr.head = m->next))
+ msg2sndr.tail = NULL;
+ stats.total_written += m->len;
+ defer_forwarding_messages = 1;
+ writefd_unbuffered(sock_f_out, m->buf, m->len);
+ defer_forwarding_messages = 0;
+ free(m);
+ }
+}
+
+/* Read a message from the MSG_* fd and handle it. This is called either
+ * during the early stages of being a local sender (up through the sending
+ * of the file list) or when we're the generator (to fetch the messages
+ * from the receiver). */
+static void read_msg_fd(void)
+{
+ char buf[2048];
+ size_t n;
+ struct file_list *flist;
+ int fd = msg_fd_in;
+ int tag, len;
+
+ /* Temporarily disable msg_fd_in. This is needed to avoid looping back
+ * to this routine from writefd_unbuffered(). */
+ no_flush++;
+ msg_fd_in = -1;
+ defer_forwarding_messages++;
+
+ readfd(fd, buf, 4);
+ tag = IVAL(buf, 0);
+
+ len = tag & 0xFFFFFF;
+ tag = (tag >> 24) - MPLEX_BASE;
+
+ switch (tag) {
+ case MSG_DONE:
+ if (len < 0 || len > 1 || !am_generator) {
+ invalid_msg:
+ rprintf(FERROR, "invalid message %d:%d [%s%s]\n",
+ tag, len, who_am_i(),
+ inc_recurse ? "/inc" : "");
+ exit_cleanup(RERR_STREAMIO);
+ }
+ if (len) {
+ readfd(fd, buf, len);
+ stats.total_read = read_longint(fd);
+ }
+ msgdone_cnt++;
+ break;
+ case MSG_REDO:
+ if (len != 4 || !am_generator)
+ goto invalid_msg;
+ readfd(fd, buf, 4);
+ if (remove_source_files)
+ decrement_active_files(IVAL(buf,0));
+ flist_ndx_push(&redo_list, IVAL(buf,0));
+ if (inc_recurse)
+ decrement_flist_in_progress(IVAL(buf,0), 1);
+ break;
+ case MSG_FLIST:
+ if (len != 4 || !am_generator || !inc_recurse)
+ goto invalid_msg;
+ readfd(fd, buf, 4);
+ /* Read extra file list from receiver. */
+ assert(iobuf_in != NULL);
+ assert(iobuf_f_in == fd);
+ flist = recv_file_list(fd);
+ flist->parent_ndx = IVAL(buf,0);
+ break;
+ case MSG_FLIST_EOF:
+ if (len != 0 || !am_generator || !inc_recurse)
+ goto invalid_msg;
+ flist_eof = 1;
+ break;
+ case MSG_DELETED:
+ if (len >= (int)sizeof buf || !am_generator)
+ goto invalid_msg;
+ readfd(fd, buf, len);
+ send_msg(MSG_DELETED, buf, len);
+ break;
+ case MSG_SUCCESS:
+ if (len != 4 || !am_generator)
+ goto invalid_msg;
+ readfd(fd, buf, len);
+ if (remove_source_files) {
+ decrement_active_files(IVAL(buf,0));
+ send_msg(MSG_SUCCESS, buf, len);
+ }
+ if (preserve_hard_links)
+ flist_ndx_push(&hlink_list, IVAL(buf,0));
+ if (inc_recurse)
+ decrement_flist_in_progress(IVAL(buf,0), 0);
+ break;
+ case MSG_NO_SEND:
+ if (len != 4 || !am_generator)
+ goto invalid_msg;
+ readfd(fd, buf, len);
+ if (inc_recurse)
+ decrement_flist_in_progress(IVAL(buf,0), 0);
+ break;
+ case MSG_SOCKERR:
+ case MSG_CLIENT:
+ if (!am_generator)
+ goto invalid_msg;
+ if (tag == MSG_SOCKERR)
+ io_end_multiplex_out();
+ /* FALL THROUGH */
+ case MSG_INFO:
+ case MSG_ERROR:
+ case MSG_LOG:
+ while (len) {
+ n = len;
+ if (n >= sizeof buf)
+ n = sizeof buf - 1;
+ readfd(fd, buf, n);
+ rwrite((enum logcode)tag, buf, n);
+ len -= n;
+ }
+ break;
+ default:
+ rprintf(FERROR, "unknown message %d:%d [%s]\n",
+ tag, len, who_am_i());
+ exit_cleanup(RERR_STREAMIO);
+ }
+
+ no_flush--;
+ msg_fd_in = fd;
+ if (!--defer_forwarding_messages)
+ msg2sndr_flush();
+}
+
+/* This is used by the generator to limit how many file transfers can
+ * be active at once when --remove-source-files is specified. Without
+ * this, sender-side deletions were mostly happening at the end. */
+void increment_active_files(int ndx, int itemizing, enum logcode code)