+ if (iobuf_out)
+ io_flush(NORMAL_FLUSH);
+ }
+}
+
+
+/**
+ * Continue trying to read len bytes - don't return until len has been
+ * read.
+ **/
+static void read_loop(int fd, char *buf, size_t len)
+{
+ while (len) {
+ int n = read_timeout(fd, buf, len);
+
+ buf += n;
+ len -= n;
+ }
+}
+
+
+/**
+ * Read from the file descriptor handling multiplexing - return number
+ * of bytes read.
+ *
+ * Never returns <= 0.
+ */
+static int readfd_unbuffered(int fd, char *buf, size_t len)
+{
+ static size_t remaining;
+ static size_t iobuf_in_ndx;
+ int tag, ret = 0;
+#if MAXPATHLEN < 4096
+ char line[4096+1024];
+#else
+ char line[MAXPATHLEN+1024];
+#endif
+
+ if (!iobuf_in || fd != sock_f_in)
+ return read_timeout(fd, buf, len);
+
+ if (!io_multiplexing_in && remaining == 0) {
+ remaining = read_timeout(fd, iobuf_in, iobuf_in_siz);
+ iobuf_in_ndx = 0;
+ }
+
+ while (ret == 0) {
+ if (remaining) {
+ len = MIN(len, remaining);
+ memcpy(buf, iobuf_in + iobuf_in_ndx, len);
+ iobuf_in_ndx += len;
+ remaining -= len;
+ ret = len;
+ break;
+ }
+
+ read_loop(fd, line, 4);
+ tag = IVAL(line, 0);
+
+ remaining = tag & 0xFFFFFF;
+ tag = (tag >> 24) - MPLEX_BASE;
+
+ switch (tag) {
+ case MSG_DATA:
+ if (remaining > iobuf_in_siz) {
+ if (!(iobuf_in = realloc_array(iobuf_in, char,
+ remaining)))
+ out_of_memory("readfd_unbuffered");
+ iobuf_in_siz = remaining;
+ }
+ read_loop(fd, iobuf_in, remaining);
+ iobuf_in_ndx = 0;
+ break;
+ case MSG_DELETED:
+ if (remaining >= sizeof line) {
+ rprintf(FERROR, "invalid multi-message %d:%ld\n",
+ tag, (long)remaining);
+ exit_cleanup(RERR_STREAMIO);
+ }
+ read_loop(fd, line, remaining);
+ line[remaining] = '\0';
+ /* A directory name was sent with the trailing null */
+ if (remaining > 0 && !line[remaining-1])
+ log_delete(line, S_IFDIR);
+ else
+ log_delete(line, S_IFREG);
+ remaining = 0;
+ break;
+ case MSG_SUCCESS:
+ if (remaining != 4) {
+ rprintf(FERROR, "invalid multi-message %d:%ld [%s]\n",
+ tag, (long)remaining, who_am_i());
+ exit_cleanup(RERR_STREAMIO);
+ }
+ read_loop(fd, line, remaining);
+ successful_send(IVAL(line, 0));
+ remaining = 0;
+ break;
+ case MSG_INFO:
+ case MSG_ERROR:
+ if (remaining >= sizeof line) {
+ rprintf(FERROR,
+ "multiplexing overflow %d:%ld [%s]\n",
+ tag, (long)remaining, who_am_i());
+ exit_cleanup(RERR_STREAMIO);
+ }
+ read_loop(fd, line, remaining);
+ rwrite((enum logcode)tag, line, remaining);
+ remaining = 0;
+ break;
+ default:
+ rprintf(FERROR, "unexpected tag %d [%s]\n",
+ tag, who_am_i());
+ exit_cleanup(RERR_STREAMIO);