+ continue;
+ }
+ if (cnt != 1)
+ break;
+ if (nulls? !ch : (ch == '\r' || ch == '\n')) {
+ /* Skip empty lines if reading locally. */
+ if (!reading_remotely && s == fname)
+ continue;
+ break;
+ }
+ if (s < eob)
+ *s++ = ch;
+ }
+ *s = '\0';
+
+ /* Dump comments. */
+ if (*fname == '#' || *fname == ';')
+ goto start;
+
+ return s - fname;
+}
+
+
+static char *iobuf_out;
+static int iobuf_out_cnt;
+
+void io_start_buffering_out(void)
+{
+ if (iobuf_out)
+ return;
+ if (!(iobuf_out = new_array(char, IO_BUFFER_SIZE)))
+ out_of_memory("io_start_buffering_out");
+ iobuf_out_cnt = 0;
+}
+
+
+static char *iobuf_in;
+static size_t iobuf_in_siz;
+
+void io_start_buffering_in(void)
+{
+ if (iobuf_in)
+ return;
+ iobuf_in_siz = 2 * IO_BUFFER_SIZE;
+ if (!(iobuf_in = new_array(char, iobuf_in_siz)))
+ out_of_memory("io_start_buffering_in");
+}
+
+
+void io_end_buffering(void)
+{
+ io_flush(NORMAL_FLUSH);
+ if (!io_multiplexing_out) {
+ free(iobuf_out);
+ iobuf_out = NULL;
+ }
+}
+
+
+void maybe_flush_socket(void)
+{
+ if (iobuf_out && iobuf_out_cnt && time(NULL) - last_io >= 5)
+ io_flush(NORMAL_FLUSH);
+}
+
+
+void maybe_send_keepalive(void)
+{
+ if (time(NULL) - last_io >= allowed_lull) {
+ if (!iobuf_out || !iobuf_out_cnt) {
+ if (protocol_version < 29)
+ return; /* there's nothing we can do */
+ write_int(sock_f_out, the_file_list->count);
+ write_shortint(sock_f_out, ITEM_IS_NEW);
+ }
+ if (iobuf_out)
+ io_flush(NORMAL_FLUSH);
+ }
+}
+
+
+/**
+ * Continue trying to read len bytes - don't return until len has been
+ * read.
+ **/
+static void read_loop(int fd, char *buf, size_t len)
+{
+ while (len) {
+ int n = read_timeout(fd, buf, len);
+
+ buf += n;
+ len -= n;
+ }
+}
+
+
+/**
+ * Read from the file descriptor handling multiplexing - return number
+ * of bytes read.
+ *
+ * Never returns <= 0.
+ */
+static int readfd_unbuffered(int fd, char *buf, size_t len)
+{
+ static size_t remaining;
+ static size_t iobuf_in_ndx;
+ int tag, ret = 0;
+#if MAXPATHLEN < 4096
+ char line[4096+1024];
+#else
+ char line[MAXPATHLEN+1024];
+#endif
+
+ if (!iobuf_in || fd != sock_f_in)
+ return read_timeout(fd, buf, len);
+
+ if (!io_multiplexing_in && remaining == 0) {
+ remaining = read_timeout(fd, iobuf_in, iobuf_in_siz);
+ iobuf_in_ndx = 0;
+ }
+
+ while (ret == 0) {
+ if (remaining) {
+ len = MIN(len, remaining);
+ memcpy(buf, iobuf_in + iobuf_in_ndx, len);
+ iobuf_in_ndx += len;
+ remaining -= len;
+ ret = len;
+ break;
+ }
+
+ read_loop(fd, line, 4);
+ tag = IVAL(line, 0);
+
+ remaining = tag & 0xFFFFFF;
+ tag = (tag >> 24) - MPLEX_BASE;
+
+ switch (tag) {
+ case MSG_DATA:
+ if (remaining > iobuf_in_siz) {
+ if (!(iobuf_in = realloc_array(iobuf_in, char,
+ remaining)))
+ out_of_memory("readfd_unbuffered");
+ iobuf_in_siz = remaining;
+ }
+ read_loop(fd, iobuf_in, remaining);
+ iobuf_in_ndx = 0;
+ break;
+ case MSG_DELETED:
+ if (remaining >= sizeof line) {
+ rprintf(FERROR, "invalid multi-message %d:%ld\n",
+ tag, (long)remaining);
+ exit_cleanup(RERR_STREAMIO);
+ }
+ read_loop(fd, line, remaining);
+ line[remaining] = '\0';
+ /* A directory name was sent with the trailing null */
+ if (remaining > 0 && !line[remaining-1])
+ log_delete(line, S_IFDIR);
+ else
+ log_delete(line, S_IFREG);
+ remaining = 0;
+ break;
+ case MSG_SUCCESS:
+ if (remaining != 4) {
+ rprintf(FERROR, "invalid multi-message %d:%ld [%s]\n",
+ tag, (long)remaining, who_am_i());
+ exit_cleanup(RERR_STREAMIO);
+ }
+ read_loop(fd, line, remaining);
+ successful_send(IVAL(line, 0));
+ remaining = 0;
+ break;
+ case MSG_INFO:
+ case MSG_ERROR:
+ if (remaining >= sizeof line) {
+ rprintf(FERROR,
+ "multiplexing overflow %d:%ld [%s]\n",
+ tag, (long)remaining, who_am_i());
+ exit_cleanup(RERR_STREAMIO);