+void maybe_send_keepalive(void)
+{
+ if (time(NULL) - last_io_out >= allowed_lull) {
+ if (!iobuf_out || !iobuf_out_cnt) {
+ if (protocol_version < 29)
+ return; /* there's nothing we can do */
+ write_int(sock_f_out, the_file_list->count);
+ write_shortint(sock_f_out, ITEM_IS_NEW);
+ }
+ if (iobuf_out)
+ io_flush(NORMAL_FLUSH);
+ }
+}
+
+/**
+ * Continue trying to read len bytes - don't return until len has been
+ * read.
+ **/
+static void read_loop(int fd, char *buf, size_t len)
+{
+ while (len) {
+ int n = read_timeout(fd, buf, len);
+
+ buf += n;
+ len -= n;
+ }
+}
+
+/**
+ * Read from the file descriptor handling multiplexing - return number
+ * of bytes read.
+ *
+ * Never returns <= 0.
+ */
+static int readfd_unbuffered(int fd, char *buf, size_t len)
+{
+ static size_t remaining;
+ static size_t iobuf_in_ndx;
+ size_t msg_bytes;
+ int tag, cnt = 0;
+ char line[BIGPATHBUFLEN];
+
+ if (!iobuf_in || fd != sock_f_in)
+ return read_timeout(fd, buf, len);
+
+ if (!io_multiplexing_in && remaining == 0) {
+ remaining = read_timeout(fd, iobuf_in, iobuf_in_siz);
+ iobuf_in_ndx = 0;
+ }
+
+ while (cnt == 0) {
+ if (remaining) {
+ len = MIN(len, remaining);
+ memcpy(buf, iobuf_in + iobuf_in_ndx, len);
+ iobuf_in_ndx += len;
+ remaining -= len;
+ cnt = len;
+ break;
+ }
+
+ read_loop(fd, line, 4);
+ tag = IVAL(line, 0);
+
+ msg_bytes = tag & 0xFFFFFF;
+ tag = (tag >> 24) - MPLEX_BASE;
+
+ switch (tag) {
+ case MSG_DATA:
+ if (msg_bytes > iobuf_in_siz) {
+ if (!(iobuf_in = realloc_array(iobuf_in, char,
+ msg_bytes)))
+ out_of_memory("readfd_unbuffered");
+ iobuf_in_siz = msg_bytes;
+ }
+ read_loop(fd, iobuf_in, msg_bytes);
+ remaining = msg_bytes;
+ iobuf_in_ndx = 0;
+ break;
+ case MSG_DELETED:
+ if (msg_bytes >= sizeof line)
+ goto overflow;
+ read_loop(fd, line, msg_bytes);
+ /* A directory name was sent with the trailing null */
+ if (msg_bytes > 0 && !line[msg_bytes-1])
+ log_delete(line, S_IFDIR);
+ else {
+ line[msg_bytes] = '\0';
+ log_delete(line, S_IFREG);
+ }
+ break;
+ case MSG_SUCCESS:
+ if (msg_bytes != 4) {
+ rprintf(FERROR, "invalid multi-message %d:%ld [%s]\n",
+ tag, (long)msg_bytes, who_am_i());
+ exit_cleanup(RERR_STREAMIO);
+ }
+ read_loop(fd, line, msg_bytes);
+ successful_send(IVAL(line, 0));
+ break;
+ case MSG_INFO:
+ case MSG_ERROR:
+ if (msg_bytes >= sizeof line) {
+ overflow:
+ rprintf(FERROR,
+ "multiplexing overflow %d:%ld [%s]\n",
+ tag, (long)msg_bytes, who_am_i());
+ exit_cleanup(RERR_STREAMIO);
+ }
+ read_loop(fd, line, msg_bytes);
+ rwrite((enum logcode)tag, line, msg_bytes);
+ break;
+ default:
+ rprintf(FERROR, "unexpected tag %d [%s]\n",
+ tag, who_am_i());
+ exit_cleanup(RERR_STREAMIO);
+ }
+ }
+
+ if (remaining == 0)
+ io_flush(NORMAL_FLUSH);
+
+ return cnt;
+}
+
+/**
+ * Do a buffered read from @p fd. Don't return until all @p n bytes
+ * have been read. If all @p n can't be read then exit with an
+ * error.
+ **/
+static void readfd(int fd, char *buffer, size_t N)
+{
+ int cnt;
+ size_t total = 0;
+
+ while (total < N) {
+ cnt = readfd_unbuffered(fd, buffer + total, N-total);
+ total += cnt;
+ }
+
+ if (fd == write_batch_monitor_in) {
+ if ((size_t)write(batch_fd, buffer, total) != total)
+ exit_cleanup(RERR_FILEIO);
+ }
+
+ if (fd == sock_f_in)
+ stats.total_read += total;
+}
+
+int read_shortint(int f)
+{
+ uchar b[2];
+ readfd(f, (char *)b, 2);
+ return (b[1] << 8) + b[0];
+}