+static void drain_multiplex_messages(void)
+{
+ while (IN_MULTIPLEXED && iobuf.in.len) {
+ if (iobuf.raw_input_ends_before) {
+ size_t raw_len = iobuf.raw_input_ends_before - iobuf.in.pos;
+ iobuf.raw_input_ends_before = 0;
+ if (raw_len >= iobuf.in.len) {
+ iobuf.in.len = 0;
+ break;
+ }
+ iobuf.in.pos += raw_len;
+ iobuf.in.len -= raw_len;
+ }
+ read_a_msg();
+ }
+}
+
+void wait_for_receiver(void)
+{
+ if (!iobuf.raw_input_ends_before)
+ read_a_msg();
+
+ if (iobuf.raw_input_ends_before) {
+ int ndx = read_int(iobuf.in_fd);
+ if (ndx < 0) {
+ switch (ndx) {
+ case NDX_FLIST_EOF:
+ flist_eof = 1;
+ if (DEBUG_GTE(FLIST, 3))
+ rprintf(FINFO, "[%s] flist_eof=1\n", who_am_i());
+ break;
+ case NDX_DONE:
+ msgdone_cnt++;
+ break;
+ default:
+ exit_cleanup(RERR_STREAMIO);
+ }
+ } else {
+ struct file_list *flist;
+ if (DEBUG_GTE(FLIST, 2)) {
+ rprintf(FINFO, "[%s] receiving flist for dir %d\n",
+ who_am_i(), ndx);
+ }
+ flist = recv_file_list(iobuf.in_fd);
+ flist->parent_ndx = ndx;
+#ifdef SUPPORT_HARD_LINKS
+ if (preserve_hard_links)
+ match_hard_links(flist);
+#endif
+ }
+ }
+}
+
+unsigned short read_shortint(int f)
+{
+ char b[2];
+ read_buf(f, b, 2);
+ return (UVAL(b, 1) << 8) + UVAL(b, 0);
+}
+
+int32 read_int(int f)
+{
+ char b[4];
+ int32 num;
+
+ read_buf(f, b, 4);
+ num = IVAL(b, 0);
+#if SIZEOF_INT32 > 4
+ if (num & (int32)0x80000000)
+ num |= ~(int32)0xffffffff;
+#endif
+ return num;
+}
+
+int32 read_varint(int f)
+{
+ union {
+ char b[5];
+ int32 x;
+ } u;
+ uchar ch;
+ int extra;
+
+ u.x = 0;
+ ch = read_byte(f);
+ extra = int_byte_extra[ch / 4];
+ if (extra) {
+ uchar bit = ((uchar)1<<(8-extra));
+ if (extra >= (int)sizeof u.b) {
+ rprintf(FERROR, "Overflow in read_varint()\n");
+ exit_cleanup(RERR_STREAMIO);
+ }
+ read_buf(f, u.b, extra);
+ u.b[extra] = ch & (bit-1);
+ } else
+ u.b[0] = ch;
+#if CAREFUL_ALIGNMENT
+ u.x = IVAL(u.b,0);
+#endif
+#if SIZEOF_INT32 > 4
+ if (u.x & (int32)0x80000000)
+ u.x |= ~(int32)0xffffffff;
+#endif
+ return u.x;
+}
+
+int64 read_varlong(int f, uchar min_bytes)
+{
+ union {
+ char b[9];
+ int64 x;
+ } u;
+ char b2[8];
+ int extra;
+
+#if SIZEOF_INT64 < 8
+ memset(u.b, 0, 8);
+#else
+ u.x = 0;
+#endif
+ read_buf(f, b2, min_bytes);
+ memcpy(u.b, b2+1, min_bytes-1);
+ extra = int_byte_extra[CVAL(b2, 0) / 4];
+ if (extra) {
+ uchar bit = ((uchar)1<<(8-extra));
+ if (min_bytes + extra > (int)sizeof u.b) {
+ rprintf(FERROR, "Overflow in read_varlong()\n");
+ exit_cleanup(RERR_STREAMIO);
+ }
+ read_buf(f, u.b + min_bytes - 1, extra);
+ u.b[min_bytes + extra - 1] = CVAL(b2, 0) & (bit-1);
+#if SIZEOF_INT64 < 8
+ if (min_bytes + extra > 5 || u.b[4] || CVAL(u.b,3) & 0x80) {
+ rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
+ exit_cleanup(RERR_UNSUPPORTED);
+ }
+#endif
+ } else
+ u.b[min_bytes + extra - 1] = CVAL(b2, 0);
+#if SIZEOF_INT64 < 8
+ u.x = IVAL(u.b,0);
+#elif CAREFUL_ALIGNMENT
+ u.x = IVAL(u.b,0) | (((int64)IVAL(u.b,4))<<32);
+#endif
+ return u.x;
+}
+
+int64 read_longint(int f)
+{
+#if SIZEOF_INT64 >= 8
+ char b[9];
+#endif
+ int32 num = read_int(f);
+
+ if (num != (int32)0xffffffff)
+ return num;
+
+#if SIZEOF_INT64 < 8
+ rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
+ exit_cleanup(RERR_UNSUPPORTED);
+#else
+ read_buf(f, b, 8);
+ return IVAL(b,0) | (((int64)IVAL(b,4))<<32);
+#endif
+}
+
+void read_buf(int f, char *buf, size_t len)
+{
+ if (f != iobuf.in_fd) {
+ if (safe_read(f, buf, len) != len)
+ whine_about_eof(False); /* Doesn't return. */
+ goto batch_copy;
+ }
+
+ if (!IN_MULTIPLEXED) {
+ memcpy(buf, perform_io(len, PIO_INPUT_AND_CONSUME), len);
+ total_data_read += len;
+ if (forward_flist_data)
+ write_buf(iobuf.out_fd, buf, len);
+ batch_copy:
+ if (f == write_batch_monitor_in)
+ safe_write(batch_fd, buf, len);
+ return;
+ }
+
+ while (1) {
+ char *data;
+ size_t siz;
+
+ while (!iobuf.raw_input_ends_before)
+ read_a_msg();
+
+ siz = MIN(len, iobuf.raw_input_ends_before - iobuf.in.pos);
+ data = perform_io(siz, PIO_INPUT_AND_CONSUME);
+ if (iobuf.in.pos == iobuf.raw_input_ends_before)
+ iobuf.raw_input_ends_before = 0;
+
+ /* The bytes at the "data" pointer will survive long
+ * enough to make a copy, but not past future I/O. */
+ memcpy(buf, data, siz);
+ total_data_read += siz;
+
+ if (forward_flist_data)
+ write_buf(iobuf.out_fd, buf, siz);
+
+ if (f == write_batch_monitor_in)
+ safe_write(batch_fd, buf, siz);
+
+ if ((len -= siz) == 0)
+ break;
+ buf += siz;
+ }
+}
+
+void read_sbuf(int f, char *buf, size_t len)
+{
+ read_buf(f, buf, len);
+ buf[len] = '\0';
+}
+
+uchar read_byte(int f)
+{
+ uchar c;
+ read_buf(f, (char*)&c, 1);
+ return c;
+}
+
+int read_vstring(int f, char *buf, int bufsize)
+{
+ int len = read_byte(f);
+
+ if (len & 0x80)
+ len = (len & ~0x80) * 0x100 + read_byte(f);
+
+ if (len >= bufsize) {
+ rprintf(FERROR, "over-long vstring received (%d > %d)\n",
+ len, bufsize - 1);
+ return -1;
+ }
+
+ if (len)
+ read_buf(f, buf, len);
+ buf[len] = '\0';
+ return len;
+}
+
+/* Populate a sum_struct with values from the socket. This is
+ * called by both the sender and the receiver. */
+void read_sum_head(int f, struct sum_struct *sum)
+{
+ int32 max_blength = protocol_version < 30 ? OLD_MAX_BLOCK_SIZE : MAX_BLOCK_SIZE;
+ sum->count = read_int(f);
+ if (sum->count < 0) {
+ rprintf(FERROR, "Invalid checksum count %ld [%s]\n",
+ (long)sum->count, who_am_i());
+ exit_cleanup(RERR_PROTOCOL);
+ }
+ sum->blength = read_int(f);
+ if (sum->blength < 0 || sum->blength > max_blength) {
+ rprintf(FERROR, "Invalid block length %ld [%s]\n",
+ (long)sum->blength, who_am_i());
+ exit_cleanup(RERR_PROTOCOL);
+ }
+ sum->s2length = protocol_version < 27 ? csum_length : (int)read_int(f);
+ if (sum->s2length < 0 || sum->s2length > MAX_DIGEST_LEN) {
+ rprintf(FERROR, "Invalid checksum length %d [%s]\n",
+ sum->s2length, who_am_i());
+ exit_cleanup(RERR_PROTOCOL);
+ }
+ sum->remainder = read_int(f);
+ if (sum->remainder < 0 || sum->remainder > sum->blength) {
+ rprintf(FERROR, "Invalid remainder length %ld [%s]\n",
+ (long)sum->remainder, who_am_i());
+ exit_cleanup(RERR_PROTOCOL);
+ }
+}
+
+/* Send the values from a sum_struct over the socket. Set sum to
+ * NULL if there are no checksums to send. This is called by both
+ * the generator and the sender. */
+void write_sum_head(int f, struct sum_struct *sum)
+{
+ static struct sum_struct null_sum;
+
+ if (sum == NULL)
+ sum = &null_sum;
+
+ write_int(f, sum->count);
+ write_int(f, sum->blength);
+ if (protocol_version >= 27)
+ write_int(f, sum->s2length);
+ write_int(f, sum->remainder);
+}
+
+/* Sleep after writing to limit I/O bandwidth usage.
+ *
+ * @todo Rather than sleeping after each write, it might be better to
+ * use some kind of averaging. The current algorithm seems to always
+ * use a bit less bandwidth than specified, because it doesn't make up
+ * for slow periods. But arguably this is a feature. In addition, we
+ * ought to take the time used to write the data into account.
+ *
+ * During some phases of big transfers (file FOO is uptodate) this is
+ * called with a small bytes_written every time. As the kernel has to
+ * round small waits up to guarantee that we actually wait at least the
+ * requested number of microseconds, this can become grossly inaccurate.
+ * We therefore keep track of the bytes we've written over time and only
+ * sleep when the accumulated delay is at least 1 tenth of a second. */
+static void sleep_for_bwlimit(int bytes_written)
+{
+ static struct timeval prior_tv;
+ static long total_written = 0;
+ struct timeval tv, start_tv;
+ long elapsed_usec, sleep_usec;
+
+#define ONE_SEC 1000000L /* # of microseconds in a second */
+
+ total_written += bytes_written;
+
+ gettimeofday(&start_tv, NULL);
+ if (prior_tv.tv_sec) {
+ elapsed_usec = (start_tv.tv_sec - prior_tv.tv_sec) * ONE_SEC
+ + (start_tv.tv_usec - prior_tv.tv_usec);
+ total_written -= elapsed_usec * bwlimit / (ONE_SEC/1024);
+ if (total_written < 0)
+ total_written = 0;
+ }
+
+ sleep_usec = total_written * (ONE_SEC/1024) / bwlimit;
+ if (sleep_usec < ONE_SEC / 10) {
+ prior_tv = start_tv;
+ return;
+ }
+
+ tv.tv_sec = sleep_usec / ONE_SEC;
+ tv.tv_usec = sleep_usec % ONE_SEC;
+ select(0, NULL, NULL, NULL, &tv);
+
+ gettimeofday(&prior_tv, NULL);
+ elapsed_usec = (prior_tv.tv_sec - start_tv.tv_sec) * ONE_SEC
+ + (prior_tv.tv_usec - start_tv.tv_usec);
+ total_written = (sleep_usec - elapsed_usec) * bwlimit / (ONE_SEC/1024);
+}
+
+void io_flush(int flush_it_all)
+{
+ if (iobuf.out.len > iobuf.out_empty_len) {
+ if (flush_it_all) /* FULL_FLUSH: flush everything in the output buffers */
+ perform_io(iobuf.out.size - iobuf.out_empty_len, PIO_NEED_OUTROOM);
+ else /* NORMAL_FLUSH: flush at least 1 byte */
+ perform_io(iobuf.out.size - iobuf.out.len + 1, PIO_NEED_OUTROOM);
+ }
+ if (iobuf.msg.len)
+ perform_io(iobuf.msg.size, PIO_NEED_MSGROOM);
+}
+
+void write_shortint(int f, unsigned short x)
+{
+ char b[2];
+ b[0] = (char)x;
+ b[1] = (char)(x >> 8);
+ write_buf(f, b, 2);
+}
+
+void write_int(int f, int32 x)
+{
+ char b[4];
+ SIVAL(b, 0, x);
+ write_buf(f, b, 4);
+}
+
+void write_varint(int f, int32 x)
+{
+ char b[5];
+ uchar bit;
+ int cnt = 4;
+
+ SIVAL(b, 1, x);
+
+ while (cnt > 1 && b[cnt] == 0)
+ cnt--;
+ bit = ((uchar)1<<(7-cnt+1));
+ if (CVAL(b, cnt) >= bit) {
+ cnt++;
+ *b = ~(bit-1);
+ } else if (cnt > 1)
+ *b = b[cnt] | ~(bit*2-1);
+ else
+ *b = b[cnt];
+
+ write_buf(f, b, cnt);
+}