+ FD_ZERO(&r_fds); /* Just in case... */
+ FD_ZERO(&w_fds);
+ }
+
+ if (iobuf.in_fd >= 0 && FD_ISSET(iobuf.in_fd, &r_fds)) {
+ size_t len, pos = iobuf.in.pos + iobuf.in.len;
+ int n;
+ if (pos >= iobuf.in.size) {
+ pos -= iobuf.in.size;
+ len = iobuf.in.size - iobuf.in.len;
+ } else
+ len = iobuf.in.size - pos;
+ if ((n = read(iobuf.in_fd, iobuf.in.buf + pos, len)) <= 0) {
+ if (n == 0) {
+ /* Signal that input has become invalid. */
+ if (!read_batch || batch_fd < 0 || am_generator)
+ iobuf.in_fd = -2;
+ batch_fd = -1;
+ continue;
+ }
+ if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)
+ n = 0;
+ else {
+ /* Don't write errors on a dead socket. */
+ if (iobuf.in_fd == sock_f_in) {
+ if (am_sender)
+ msgs2stderr = 1;
+ rsyserr(FERROR_SOCKET, errno, "read error");
+ } else
+ rsyserr(FERROR, errno, "read error");
+ exit_cleanup(RERR_SOCKETIO);
+ }
+ }
+ if (msgs2stderr && DEBUG_GTE(IO, 2))
+ rprintf(FINFO, "[%s] recv=%ld\n", who_am_i(), (long)n);
+
+ if (io_timeout)
+ last_io_in = time(NULL);
+ stats.total_read += n;
+
+ iobuf.in.len += n;
+ }
+
+ if (out && FD_ISSET(iobuf.out_fd, &w_fds)) {
+ size_t len = iobuf.raw_flushing_ends_before ? iobuf.raw_flushing_ends_before - out->pos : out->len;
+ int n;
+
+ if (bwlimit_writemax && len > bwlimit_writemax)
+ len = bwlimit_writemax;
+
+ if (out->pos + len > out->size)
+ len = out->size - out->pos;
+ if ((n = write(iobuf.out_fd, out->buf + out->pos, len)) <= 0) {
+ if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)
+ n = 0;
+ else {
+ /* Don't write errors on a dead socket. */
+ msgs2stderr = 1;
+ iobuf.out_fd = -2;
+ iobuf.out.len = iobuf.msg.len = iobuf.raw_flushing_ends_before = 0;
+ rsyserr(FERROR_SOCKET, errno, "[%s] write error", who_am_i());
+ drain_multiplex_messages();
+ exit_cleanup(RERR_SOCKETIO);
+ }
+ }
+ if (msgs2stderr && DEBUG_GTE(IO, 2)) {
+ rprintf(FINFO, "[%s] %s sent=%ld\n",
+ who_am_i(), out == &iobuf.out ? "out" : "msg", (long)n);
+ }
+
+ if (io_timeout)
+ last_io_out = time(NULL);
+ stats.total_written += n;
+
+ if (bwlimit_writemax)
+ sleep_for_bwlimit(n);
+
+ if ((out->pos += n) == out->size) {
+ if (iobuf.raw_flushing_ends_before)
+ iobuf.raw_flushing_ends_before -= out->size;
+ out->pos = 0;
+ restore_iobuf_size(out);
+ } else if (out->pos == iobuf.raw_flushing_ends_before)
+ iobuf.raw_flushing_ends_before = 0;
+ if ((out->len -= n) == empty_buf_len) {
+ out->pos = 0;
+ restore_iobuf_size(out);
+ if (empty_buf_len)
+ iobuf.raw_data_header_pos = 0;
+ }
+ }
+
+ /* We need to help prevent deadlock by doing what reading
+ * we can whenever we are here trying to write. */
+ if (IN_MULTIPLEXED_AND_READY && !(flags & PIO_NEED_INPUT)) {
+ while (!iobuf.raw_input_ends_before && iobuf.in.len > 512)
+ read_a_msg();
+ if (flist_receiving_enabled && iobuf.in.len > 512)
+ wait_for_receiver(); /* generator only */
+ }
+
+ if (ff_forward_fd >= 0 && FD_ISSET(ff_forward_fd, &r_fds)) {
+ /* This can potentially flush all output and enable
+ * multiplexed output, so keep this last in the loop
+ * and be sure to not cache anything that would break
+ * such a change. */
+ forward_filesfrom_data();
+ }
+ }
+ double_break:
+
+ data = iobuf.in.buf + iobuf.in.pos;
+
+ if (flags & PIO_CONSUME_INPUT) {
+ iobuf.in.len -= needed;
+ iobuf.in.pos += needed;
+ if (iobuf.in.pos == iobuf.raw_input_ends_before)
+ iobuf.raw_input_ends_before = 0;
+ if (iobuf.in.pos >= iobuf.in.size) {
+ iobuf.in.pos -= iobuf.in.size;
+ if (iobuf.raw_input_ends_before)
+ iobuf.raw_input_ends_before -= iobuf.in.size;
+ }
+ }
+
+ return data;
+}
+
+static void raw_read_buf(char *buf, size_t len)
+{
+ size_t pos = iobuf.in.pos;
+ char *data = perform_io(len, PIO_INPUT_AND_CONSUME);
+ if (iobuf.in.pos <= pos && len) {
+ size_t siz = len - iobuf.in.pos;
+ memcpy(buf, data, siz);
+ memcpy(buf + siz, iobuf.in.buf, iobuf.in.pos);
+ } else
+ memcpy(buf, data, len);
+}
+
+static int32 raw_read_int(void)
+{
+ char *data, buf[4];
+ if (iobuf.in.size - iobuf.in.pos >= 4)
+ data = perform_io(4, PIO_INPUT_AND_CONSUME);
+ else
+ raw_read_buf(data = buf, 4);
+ return IVAL(data, 0);
+}
+
+void noop_io_until_death(void)
+{
+ char buf[1024];
+
+ kluge_around_eof = 2;
+ /* Setting an I/O timeout ensures that if something inexplicably weird
+ * happens, we won't hang around forever. */
+ if (!io_timeout)
+ set_io_timeout(60);
+
+ while (1)
+ read_buf(iobuf.in_fd, buf, sizeof buf);
+}
+
+/* Buffer a message for the multiplexed output stream. Is not used for (normal) MSG_DATA. */
+int send_msg(enum msgcode code, const char *buf, size_t len, int convert)
+{
+ char *hdr;
+ size_t needed, pos;
+ BOOL want_debug = DEBUG_GTE(IO, 1) && convert >= 0 && (msgs2stderr || code != MSG_INFO);
+
+ if (!OUT_MULTIPLEXED)
+ return 0;
+
+ if (want_debug)
+ rprintf(FINFO, "[%s] send_msg(%d, %ld)\n", who_am_i(), (int)code, (long)len);
+
+ /* When checking for enough free space for this message, we need to
+ * make sure that there is space for the 4-byte header, plus we'll
+ * assume that we may waste up to 3 bytes (if the header doesn't fit
+ * at the physical end of the buffer). */
+#ifdef ICONV_OPTION
+ if (convert > 0 && ic_send == (iconv_t)-1)
+ convert = 0;
+ if (convert > 0) {
+ /* Ensuring double-size room leaves space for maximal conversion expansion. */
+ needed = len*2 + 4 + 3;
+ } else
+#endif
+ needed = len + 4 + 3;
+ if (iobuf.msg.len + needed > iobuf.msg.size)
+ perform_io(needed, PIO_NEED_MSGROOM);
+
+ pos = iobuf.msg.pos + iobuf.msg.len; /* Must be set after any flushing. */
+ if (pos >= iobuf.msg.size)
+ pos -= iobuf.msg.size;
+ else if (pos + 4 > iobuf.msg.size) {
+ /* The 4-byte header won't fit at the end of the buffer,
+ * so we'll temporarily reduce the message buffer's size
+ * and put the header at the start of the buffer. */
+ reduce_iobuf_size(&iobuf.msg, pos);
+ pos = 0;
+ }
+ hdr = iobuf.msg.buf + pos;
+
+ iobuf.msg.len += 4; /* Allocate room for the coming header bytes. */
+
+#ifdef ICONV_OPTION
+ if (convert > 0) {
+ xbuf inbuf;
+
+ INIT_XBUF(inbuf, (char*)buf, len, (size_t)-1);
+
+ len = iobuf.msg.len;
+ iconvbufs(ic_send, &inbuf, &iobuf.msg,
+ ICB_INCLUDE_BAD | ICB_INCLUDE_INCOMPLETE | ICB_CIRCULAR_OUT | ICB_INIT);
+ if (inbuf.len > 0) {
+ rprintf(FERROR, "overflowed iobuf.msg buffer in send_msg");
+ exit_cleanup(RERR_UNSUPPORTED);