+ int len = read_byte(f);
+
+ if (len & 0x80)
+ len = (len & ~0x80) * 0x100 + read_byte(f);
+
+ if (len >= bufsize) {
+ rprintf(FERROR, "over-long vstring received (%d > %d)\n",
+ len, bufsize - 1);
+ return -1;
+ }
+
+ if (len)
+ readfd(f, buf, len);
+ buf[len] = '\0';
+ return len;
+}
+
+/* Populate a sum_struct with values from the socket. This is
+ * called by both the sender and the receiver. */
+void read_sum_head(int f, struct sum_struct *sum)
+{
+ int32 max_blength = protocol_version < 30 ? OLD_MAX_BLOCK_SIZE : MAX_BLOCK_SIZE;
+ sum->count = read_int(f);
+ if (sum->count < 0) {
+ rprintf(FERROR, "Invalid checksum count %ld [%s]\n",
+ (long)sum->count, who_am_i());
+ exit_cleanup(RERR_PROTOCOL);
+ }
+ sum->blength = read_int(f);
+ if (sum->blength < 0 || sum->blength > max_blength) {
+ rprintf(FERROR, "Invalid block length %ld [%s]\n",
+ (long)sum->blength, who_am_i());
+ exit_cleanup(RERR_PROTOCOL);
+ }
+ sum->s2length = protocol_version < 27 ? csum_length : (int)read_int(f);
+ if (sum->s2length < 0 || sum->s2length > MAX_DIGEST_LEN) {
+ rprintf(FERROR, "Invalid checksum length %d [%s]\n",
+ sum->s2length, who_am_i());
+ exit_cleanup(RERR_PROTOCOL);
+ }
+ sum->remainder = read_int(f);
+ if (sum->remainder < 0 || sum->remainder > sum->blength) {
+ rprintf(FERROR, "Invalid remainder length %ld [%s]\n",
+ (long)sum->remainder, who_am_i());
+ exit_cleanup(RERR_PROTOCOL);
+ }
+}
+
+/* Send the values from a sum_struct over the socket. Set sum to
+ * NULL if there are no checksums to send. This is called by both
+ * the generator and the sender. */
+void write_sum_head(int f, struct sum_struct *sum)
+{
+ static struct sum_struct null_sum;
+
+ if (sum == NULL)
+ sum = &null_sum;
+
+ write_int(f, sum->count);
+ write_int(f, sum->blength);
+ if (protocol_version >= 27)
+ write_int(f, sum->s2length);
+ write_int(f, sum->remainder);
+}
+
+/**
+ * Sleep after writing to limit I/O bandwidth usage.
+ *
+ * @todo Rather than sleeping after each write, it might be better to
+ * use some kind of averaging. The current algorithm seems to always
+ * use a bit less bandwidth than specified, because it doesn't make up
+ * for slow periods. But arguably this is a feature. In addition, we
+ * ought to take the time used to write the data into account.
+ *
+ * During some phases of big transfers (file FOO is uptodate) this is
+ * called with a small bytes_written every time. As the kernel has to
+ * round small waits up to guarantee that we actually wait at least the
+ * requested number of microseconds, this can become grossly inaccurate.
+ * We therefore keep track of the bytes we've written over time and only
+ * sleep when the accumulated delay is at least 1 tenth of a second.
+ **/
+static void sleep_for_bwlimit(int bytes_written)
+{
+ static struct timeval prior_tv;
+ static long total_written = 0;
+ struct timeval tv, start_tv;
+ long elapsed_usec, sleep_usec;
+
+#define ONE_SEC 1000000L /* # of microseconds in a second */
+
+ if (!bwlimit_writemax)
+ return;
+
+ total_written += bytes_written;
+
+ gettimeofday(&start_tv, NULL);
+ if (prior_tv.tv_sec) {
+ elapsed_usec = (start_tv.tv_sec - prior_tv.tv_sec) * ONE_SEC
+ + (start_tv.tv_usec - prior_tv.tv_usec);
+ total_written -= elapsed_usec * bwlimit / (ONE_SEC/1024);
+ if (total_written < 0)
+ total_written = 0;
+ }
+
+ sleep_usec = total_written * (ONE_SEC/1024) / bwlimit;
+ if (sleep_usec < ONE_SEC / 10) {
+ prior_tv = start_tv;
+ return;
+ }
+
+ tv.tv_sec = sleep_usec / ONE_SEC;
+ tv.tv_usec = sleep_usec % ONE_SEC;
+ select(0, NULL, NULL, NULL, &tv);
+
+ gettimeofday(&prior_tv, NULL);
+ elapsed_usec = (prior_tv.tv_sec - start_tv.tv_sec) * ONE_SEC
+ + (prior_tv.tv_usec - start_tv.tv_usec);
+ total_written = (sleep_usec - elapsed_usec) * bwlimit / (ONE_SEC/1024);
+}
+
+static const char *what_fd_is(int fd)
+{
+ static char buf[20];
+
+ if (fd == sock_f_out)
+ return "socket";
+ else if (fd == msg_fd_out)
+ return "message fd";
+ else if (fd == batch_fd)
+ return "batch file";
+ else {
+ snprintf(buf, sizeof buf, "fd %d", fd);
+ return buf;
+ }
+}
+
+/* Write len bytes to the file descriptor fd, looping as necessary to get
+ * the job done and also (in certain circumstances) reading any data on
+ * msg_fd_in to avoid deadlock.
+ *
+ * This function underlies the multiplexing system. The body of the
+ * application never calls this function directly. */
+static void writefd_unbuffered(int fd, const char *buf, size_t len)
+{
+ size_t n, total = 0;
+ fd_set w_fds, r_fds, e_fds;
+ int maxfd, count, cnt, using_r_fds;
+ int defer_inc = 0;