+/* Write len bytes to the file descriptor fd, looping as necessary to get
+ * the job done and also (in certain circumstances) reading any data on
+ * msg_fd_in to avoid deadlock.
+ *
+ * This function underlies the multiplexing system. The body of the
+ * application never calls this function directly. */
+static void writefd_unbuffered(int fd, const char *buf, size_t len)
+{
+ size_t n, total = 0;
+ fd_set w_fds, r_fds, e_fds;
+ int maxfd, count, cnt, using_r_fds;
+ int defer_inc = 0;
+ struct timeval tv;
+
+ if (no_flush++)
+ defer_forwarding_messages++, defer_inc++;
+
+ while (total < len) {
+ FD_ZERO(&w_fds);
+ FD_SET(fd, &w_fds);
+ FD_ZERO(&e_fds);
+ FD_SET(fd, &e_fds);
+ maxfd = fd;
+
+ if (msg_fd_in >= 0) {
+ FD_ZERO(&r_fds);
+ FD_SET(msg_fd_in, &r_fds);
+ if (msg_fd_in > maxfd)
+ maxfd = msg_fd_in;
+ using_r_fds = 1;
+ } else
+ using_r_fds = 0;
+
+ tv.tv_sec = select_timeout;
+ tv.tv_usec = 0;
+
+ errno = 0;
+ count = select(maxfd + 1, using_r_fds ? &r_fds : NULL,
+ &w_fds, &e_fds, &tv);
+
+ if (count <= 0) {
+ if (count < 0 && errno == EBADF)
+ exit_cleanup(RERR_SOCKETIO);
+ check_timeout();
+ continue;
+ }
+
+ /*if (FD_ISSET(fd, &e_fds))
+ rprintf(FINFO, "select exception on fd %d\n", fd); */
+
+ if (using_r_fds && FD_ISSET(msg_fd_in, &r_fds))
+ read_msg_fd();
+
+ if (!FD_ISSET(fd, &w_fds))
+ continue;
+
+ n = len - total;
+ if (bwlimit_writemax && n > bwlimit_writemax)
+ n = bwlimit_writemax;
+ cnt = write(fd, buf + total, n);
+
+ if (cnt <= 0) {
+ if (cnt < 0) {
+ if (errno == EINTR)
+ continue;
+ if (errno == EWOULDBLOCK || errno == EAGAIN) {
+ msleep(1);
+ continue;
+ }
+ }
+
+ /* Don't try to write errors back across the stream. */
+ if (fd == sock_f_out)
+ io_end_multiplex_out();
+ /* Don't try to write errors down a failing msg pipe. */
+ if (am_server && fd == msg_fd_out)
+ exit_cleanup(RERR_STREAMIO);
+ rsyserr(FERROR, errno,
+ "writefd_unbuffered failed to write %ld bytes [%s]",
+ (long)len, who_am_i());
+ /* If the other side is sending us error messages, try
+ * to grab any messages they sent before they died. */
+ while (!am_server && fd == sock_f_out && io_multiplexing_in) {
+ char buf[1024];
+ set_io_timeout(30);
+ ignore_timeout = 0;
+ readfd_unbuffered(sock_f_in, buf, sizeof buf);
+ }
+ exit_cleanup(RERR_STREAMIO);
+ }
+
+ total += cnt;
+ defer_forwarding_messages++, defer_inc++;
+
+ if (fd == sock_f_out) {
+ if (io_timeout || am_generator)
+ last_io_out = time(NULL);
+ sleep_for_bwlimit(cnt);
+ }
+ }
+
+ no_flush--;
+ defer_inc -= defer_forwarding_keep;
+ if (!(defer_forwarding_messages -= defer_inc) && !no_flush)
+ msg_flush();
+}
+
+void io_flush(int flush_it_all)
+{
+ if (!iobuf_out_cnt || no_flush)
+ return;
+
+ if (io_multiplexing_out)
+ mplex_write(sock_f_out, MSG_DATA, iobuf_out, iobuf_out_cnt, 0);
+ else
+ writefd_unbuffered(iobuf_f_out, iobuf_out, iobuf_out_cnt);
+ iobuf_out_cnt = 0;
+
+ if (flush_it_all && !defer_forwarding_messages)
+ msg_flush();
+}
+
+static void writefd(int fd, const char *buf, size_t len)
+{
+ if (fd == sock_f_out)
+ stats.total_written += len;
+
+ if (fd == write_batch_monitor_out) {
+ if ((size_t)write(batch_fd, buf, len) != len)
+ exit_cleanup(RERR_FILEIO);
+ }
+
+ if (!iobuf_out || fd != iobuf_f_out) {
+ writefd_unbuffered(fd, buf, len);
+ return;
+ }
+
+ while (len) {
+ int n = MIN((int)len, IO_BUFFER_SIZE - iobuf_out_cnt);
+ if (n > 0) {
+ memcpy(iobuf_out+iobuf_out_cnt, buf, n);
+ buf += n;
+ len -= n;
+ iobuf_out_cnt += n;
+ }
+
+ if (iobuf_out_cnt == IO_BUFFER_SIZE)
+ io_flush(NORMAL_FLUSH);
+ }
+}
+
+void write_shortint(int f, unsigned short x)
+{
+ char b[2];
+ b[0] = (char)x;
+ b[1] = (char)(x >> 8);
+ writefd(f, b, 2);
+}
+
+void write_int(int f, int32 x)
+{
+ char b[4];
+ SIVAL(b, 0, x);
+ writefd(f, b, 4);
+}
+
+void write_varint(int f, int32 x)
+{
+ char b[5];
+ uchar bit;
+ int cnt = 4;
+
+ SIVAL(b, 1, x);
+
+ while (cnt > 1 && b[cnt] == 0)
+ cnt--;
+ bit = ((uchar)1<<(7-cnt+1));
+ if (CVAL(b, cnt) >= bit) {
+ cnt++;
+ *b = ~(bit-1);
+ } else if (cnt > 1)
+ *b = b[cnt] | ~(bit*2-1);
+ else
+ *b = b[cnt];
+
+ writefd(f, b, cnt);
+}
+
+void write_varlong(int f, int64 x, uchar min_bytes)
+{
+ char b[9];
+ uchar bit;
+ int cnt = 8;
+
+ SIVAL(b, 1, x);
+#if SIZEOF_INT64 >= 8
+ SIVAL(b, 5, x >> 32);
+#else
+ if (x <= 0x7FFFFFFF && x >= 0)
+ memset(b + 5, 0, 4);
+ else {
+ rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
+ exit_cleanup(RERR_UNSUPPORTED);
+ }
+#endif
+
+ while (cnt > min_bytes && b[cnt] == 0)
+ cnt--;
+ bit = ((uchar)1<<(7-cnt+min_bytes));
+ if (CVAL(b, cnt) >= bit) {
+ cnt++;
+ *b = ~(bit-1);
+ } else if (cnt > min_bytes)
+ *b = b[cnt] | ~(bit*2-1);
+ else
+ *b = b[cnt];
+
+ writefd(f, b, cnt);
+}
+
+/*
+ * Note: int64 may actually be a 32-bit type if ./configure couldn't find any
+ * 64-bit types on this platform.
+ */