+ io_flush(NORMAL_FLUSH);
+ if (!io_multiplexing_out) {
+ free(iobuf_out);
+ iobuf_out = NULL;
+ }
+}
+
+
+/**
+ * Continue trying to read len bytes - don't return until len has been
+ * read.
+ **/
+static void read_loop(int fd, char *buf, size_t len)
+{
+ while (len) {
+ int n = read_timeout(fd, buf, len);
+
+ buf += n;
+ len -= n;
+ }
+}
+
+
+/**
+ * Read from the file descriptor handling multiplexing - return number
+ * of bytes read.
+ *
+ * Never returns <= 0.
+ */
+static int readfd_unbuffered(int fd, char *buf, size_t len)
+{
+ static size_t remaining;
+ static size_t iobuf_in_ndx;
+ int tag, ret = 0;
+ char line[1024];
+
+ if (!iobuf_in || fd != sock_f_in)
+ return read_timeout(fd, buf, len);
+
+ if (!io_multiplexing_in && remaining == 0) {
+ remaining = read_timeout(fd, iobuf_in, iobuf_in_siz);
+ iobuf_in_ndx = 0;
+ }
+
+ while (ret == 0) {
+ if (remaining) {
+ len = MIN(len, remaining);
+ memcpy(buf, iobuf_in + iobuf_in_ndx, len);
+ iobuf_in_ndx += len;
+ remaining -= len;
+ ret = len;
+ break;
+ }
+
+ read_loop(fd, line, 4);
+ tag = IVAL(line, 0);
+
+ remaining = tag & 0xFFFFFF;
+ tag = (tag >> 24) - MPLEX_BASE;
+
+ switch (tag) {
+ case MSG_DATA:
+ if (remaining > iobuf_in_siz) {
+ if (!(iobuf_in = realloc_array(iobuf_in, char,
+ remaining)))
+ out_of_memory("readfd_unbuffered");
+ iobuf_in_siz = remaining;
+ }
+ read_loop(fd, iobuf_in, remaining);
+ iobuf_in_ndx = 0;
+ break;
+ case MSG_INFO:
+ case MSG_ERROR:
+ if (remaining >= sizeof line) {
+ rprintf(FERROR, "multiplexing overflow %d:%ld\n\n",
+ tag, (long)remaining);
+ exit_cleanup(RERR_STREAMIO);
+ }
+ read_loop(fd, line, remaining);
+ rwrite((enum logcode)tag, line, remaining);
+ remaining = 0;
+ break;
+ default:
+ rprintf(FERROR, "unexpected tag %d\n", tag);
+ exit_cleanup(RERR_STREAMIO);
+ }
+ }
+
+ if (remaining == 0)
+ io_flush(NORMAL_FLUSH);
+
+ return ret;
+}
+
+
+
+/**
+ * Do a buffered read from @p fd. Don't return until all @p n bytes
+ * have been read. If all @p n can't be read then exit with an
+ * error.
+ **/
+static void readfd(int fd, char *buffer, size_t N)
+{
+ int ret;
+ size_t total = 0;
+
+ while (total < N) {
+ ret = readfd_unbuffered(fd, buffer + total, N-total);
+ total += ret;
+ }
+
+ if (fd == write_batch_monitor_in) {
+ if ((size_t)write(batch_fd, buffer, total) != total)
+ exit_cleanup(RERR_FILEIO);
+ }
+
+ if (fd == sock_f_in)
+ stats.total_read += total;
+}
+
+
+int32 read_int(int f)
+{
+ char b[4];
+ int32 ret;
+
+ readfd(f,b,4);
+ ret = IVAL(b,0);
+ if (ret == (int32)0xffffffff)
+ return -1;
+ return ret;