+ msg_fd_in = fd;
+}
+
+/* This is used by the generator to limit how many file transfers can
+ * be active at once when --remove-sent-files is specified. Without
+ * this, sender-side deletions were mostly happening at the end. */
+void increment_active_files(int ndx, int itemizing, enum logcode code)
+{
+ /* TODO: tune these limits? */
+ while (active_filecnt >= (active_bytecnt >= 128*1024 ? 10 : 50)) {
+ if (hlink_list.head)
+ check_for_finished_hlinks(itemizing, code);
+ read_msg_fd();
+ }
+
+ active_filecnt++;
+ active_bytecnt += the_file_list->files[ndx]->length;
+}
+
+void decrement_active_files(int ndx)
+{
+ active_filecnt--;
+ active_bytecnt -= the_file_list->files[ndx]->length;
+}
+
+/* Try to push messages off the list onto the wire. If we leave with more
+ * to do, return 0. On error, return -1. If everything flushed, return 1.
+ * This is only active in the receiver. */
+static int msg2genr_flush(int flush_it_all)
+{
+ static int written = 0;
+ struct timeval tv;
+ fd_set fds;
+
+ if (msg_fd_out < 0)
+ return -1;
+
+ while (msg2genr.head) {
+ struct msg_list_item *m = msg2genr.head;
+ int n = write(msg_fd_out, m->buf + written, m->len - written);
+ if (n < 0) {
+ if (errno == EINTR)
+ continue;
+ if (errno != EWOULDBLOCK && errno != EAGAIN)
+ return -1;
+ if (!flush_it_all)
+ return 0;
+ FD_ZERO(&fds);
+ FD_SET(msg_fd_out, &fds);
+ tv.tv_sec = select_timeout;
+ tv.tv_usec = 0;
+ if (!select(msg_fd_out+1, NULL, &fds, NULL, &tv))
+ check_timeout();
+ } else if ((written += n) == m->len) {
+ msg2genr.head = m->next;
+ if (!msg2genr.head)
+ msg2genr.tail = NULL;
+ free(m);
+ written = 0;
+ }
+ }
+ return 1;
+}
+
+int send_msg(enum msgcode code, char *buf, int len)
+{
+ if (msg_fd_out < 0) {
+ if (!defer_forwarding_messages)
+ return io_multiplex_write(code, buf, len);
+ if (!io_multiplexing_out)
+ return 0;
+ msg_list_add(&msg2sndr, code, buf, len);
+ return 1;
+ }
+ msg_list_add(&msg2genr, code, buf, len);
+ msg2genr_flush(NORMAL_FLUSH);
+ return 1;
+}
+
+int get_redo_num(int itemizing, enum logcode code)
+{
+ while (1) {
+ if (hlink_list.head)
+ check_for_finished_hlinks(itemizing, code);
+ if (redo_list.head)
+ break;
+ read_msg_fd();
+ }
+
+ return flist_ndx_pop(&redo_list);
+}
+
+int get_hlink_num(void)
+{
+ return flist_ndx_pop(&hlink_list);