+#include "ifuncs.h"
+#include "inums.h"
+
+/** If no timeout is specified then use a 60 second select timeout */
+#define SELECT_TIMEOUT 60
+
+extern int bwlimit;
+extern size_t bwlimit_writemax;
+extern int io_timeout;
+extern int am_server;
+extern int am_sender;
+extern int am_receiver;
+extern int am_generator;
+extern int msgs2stderr;
+extern int inc_recurse;
+extern int io_error;
+extern int eol_nulls;
+extern int flist_eof;
+extern int file_total;
+extern int file_old_total;
+extern int list_only;
+extern int read_batch;
+extern int compat_flags;
+extern int protect_args;
+extern int checksum_seed;
+extern int protocol_version;
+extern int remove_source_files;
+extern int preserve_hard_links;
+extern BOOL extra_flist_sending_enabled;
+extern struct stats stats;
+extern struct file_list *cur_flist;
+#ifdef ICONV_OPTION
+extern int filesfrom_convert;
+extern iconv_t ic_send, ic_recv;
+#endif
+
+int csum_length = SHORT_SUM_LENGTH; /* initial value */
+int allowed_lull = 0;
+int batch_fd = -1;
+int msgdone_cnt = 0;
+int forward_flist_data = 0;
+BOOL flist_receiving_enabled = False;
+
+/* Ignore an EOF error if non-zero. See whine_about_eof(). */
+int kluge_around_eof = 0;
+
+int sock_f_in = -1;
+int sock_f_out = -1;
+
+int64 total_data_read = 0;
+int64 total_data_written = 0;
+
+static struct {
+ xbuf in, out, msg;
+ int in_fd;
+ int out_fd; /* Both "out" and "msg" go to this fd. */
+ int in_multiplexed;
+ unsigned out_empty_len;
+ size_t raw_data_header_pos; /* in the out xbuf */
+ size_t raw_flushing_ends_before; /* in the out xbuf */
+ size_t raw_input_ends_before; /* in the in xbuf */
+} iobuf = { .in_fd = -1, .out_fd = -1 };
+
+static time_t last_io_in;
+static time_t last_io_out;
+
+static int write_batch_monitor_in = -1;
+static int write_batch_monitor_out = -1;
+
+static int ff_forward_fd = -1;
+static int ff_reenable_multiplex = -1;
+static char ff_lastchar = '\0';
+static xbuf ff_xb = EMPTY_XBUF;
+#ifdef ICONV_OPTION
+static xbuf iconv_buf = EMPTY_XBUF;
+#endif
+static int select_timeout = SELECT_TIMEOUT;
+static int active_filecnt = 0;
+static OFF_T active_bytecnt = 0;
+static int first_message = 1;
+
+static char int_byte_extra[64] = {
+ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, /* (00 - 3F)/4 */
+ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, /* (40 - 7F)/4 */
+ 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, /* (80 - BF)/4 */
+ 2, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 4, 4, 5, 6, /* (C0 - FF)/4 */
+};
+
+/* Our I/O buffers are sized with no bits on in the lowest byte of the "size"
+ * (indeed, our rounding of sizes in 1024-byte units assures more than this).
+ * This allows the code that is storing bytes near the physical end of a
+ * circular buffer to temporarily reduce the buffer's size (in order to make
+ * some storing idioms easier), while also making it simple to restore the
+ * buffer's actual size when the buffer's "pos" wraps around to the start (we
+ * just round the buffer's size up again). */
+
+#define IOBUF_WAS_REDUCED(siz) ((siz) & 0xFF)
+#define IOBUF_RESTORE_SIZE(siz) (((siz) | 0xFF) + 1)
+
+#define IN_MULTIPLEXED (iobuf.in_multiplexed != 0)
+#define IN_MULTIPLEXED_AND_READY (iobuf.in_multiplexed > 0)
+#define OUT_MULTIPLEXED (iobuf.out_empty_len != 0)
+
+#define PIO_NEED_INPUT (1<<0) /* The *_NEED_* flags are mutually exclusive. */
+#define PIO_NEED_OUTROOM (1<<1)
+#define PIO_NEED_MSGROOM (1<<2)
+
+#define PIO_CONSUME_INPUT (1<<4) /* Must becombined with PIO_NEED_INPUT. */
+
+#define PIO_INPUT_AND_CONSUME (PIO_NEED_INPUT | PIO_CONSUME_INPUT)
+#define PIO_NEED_FLAGS (PIO_NEED_INPUT | PIO_NEED_OUTROOM | PIO_NEED_MSGROOM)
+
+#define REMOTE_OPTION_ERROR "rsync: on remote machine: -"
+#define REMOTE_OPTION_ERROR2 ": unknown option"
+
+#define FILESFROM_BUFLEN 2048
+
+enum festatus { FES_SUCCESS, FES_REDO, FES_NO_SEND };
+
+static flist_ndx_list redo_list, hlink_list;
+
+static void read_a_msg(void);
+static void drain_multiplex_messages(void);
+static void sleep_for_bwlimit(int bytes_written);
+
+static void check_timeout(BOOL allow_keepalive)
+{
+ time_t t, chk;
+
+ /* On the receiving side, the generator is now the one that decides
+ * when a timeout has occurred. When it is sifting through a lot of
+ * files looking for work, it will be sending keep-alive messages to
+ * the sender, and even though the receiver won't be sending/receiving
+ * anything (not even keep-alive messages), the successful writes to
+ * the sender will keep things going. If the receiver is actively
+ * receiving data, it will ensure that the generator knows that it is
+ * not idle by sending the generator keep-alive messages (since the
+ * generator might be blocked trying to send checksums, it needs to
+ * know that the receiver is active). Thus, as long as one or the
+ * other is successfully doing work, the generator will not timeout. */
+ if (!io_timeout)
+ return;
+
+ t = time(NULL);
+
+ if (allow_keepalive) {
+ /* This may put data into iobuf.msg w/o flushing. */
+ maybe_send_keepalive(t, 0);
+ }
+
+ if (!last_io_in)
+ last_io_in = t;
+
+ if (am_receiver)
+ return;
+
+ chk = MAX(last_io_out, last_io_in);
+ if (t - chk >= io_timeout) {
+ if (am_server)
+ msgs2stderr = 1;
+ rprintf(FERROR, "[%s] io timeout after %d seconds -- exiting\n",
+ who_am_i(), (int)(t-chk));
+ exit_cleanup(RERR_TIMEOUT);
+ }
+}
+
+/* It's almost always an error to get an EOF when we're trying to read from the
+ * network, because the protocol is (for the most part) self-terminating.
+ *
+ * There is one case for the receiver when it is at the end of the transfer
+ * (hanging around reading any keep-alive packets that might come its way): if
+ * the sender dies before the generator's kill-signal comes through, we can end
+ * up here needing to loop until the kill-signal arrives. In this situation,
+ * kluge_around_eof will be < 0.
+ *
+ * There is another case for older protocol versions (< 24) where the module
+ * listing was not terminated, so we must ignore an EOF error in that case and
+ * exit. In this situation, kluge_around_eof will be > 0. */
+static NORETURN void whine_about_eof(BOOL allow_kluge)
+{
+ if (kluge_around_eof && allow_kluge) {
+ int i;
+ if (kluge_around_eof > 0)
+ exit_cleanup(0);
+ /* If we're still here after 10 seconds, exit with an error. */
+ for (i = 10*1000/20; i--; )
+ msleep(20);
+ }
+
+ rprintf(FERROR, RSYNC_NAME ": connection unexpectedly closed "
+ "(%s bytes received so far) [%s]\n",
+ big_num(stats.total_read), who_am_i());
+
+ exit_cleanup(RERR_STREAMIO);
+}
+
+/* Do a safe read, handling any needed looping and error handling.
+ * Returns the count of the bytes read, which will only be different
+ * from "len" if we encountered an EOF. This routine is not used on
+ * the socket except very early in the transfer. */
+static size_t safe_read(int fd, char *buf, size_t len)
+{
+ size_t got;
+ int n;
+
+ assert(fd != iobuf.in_fd);
+
+ n = read(fd, buf, len);
+ if ((size_t)n == len || n == 0) {
+ if (DEBUG_GTE(IO, 2))
+ rprintf(FINFO, "[%s] safe_read(%d)=%ld\n", who_am_i(), fd, (long)n);
+ return n;
+ }
+ if (n < 0) {
+ if (errno != EINTR && errno != EWOULDBLOCK && errno != EAGAIN) {
+ read_failed:
+ rsyserr(FERROR, errno, "safe_read failed to read %ld bytes [%s]",
+ (long)len, who_am_i());
+ exit_cleanup(RERR_STREAMIO);
+ }
+ got = 0;
+ } else
+ got = n;
+
+ while (1) {
+ struct timeval tv;
+ fd_set r_fds, e_fds;
+ int cnt;
+
+ FD_ZERO(&r_fds);
+ FD_SET(fd, &r_fds);
+ FD_ZERO(&e_fds);
+ FD_SET(fd, &e_fds);
+ tv.tv_sec = select_timeout;
+ tv.tv_usec = 0;
+
+ cnt = select(fd+1, &r_fds, NULL, &e_fds, &tv);
+ if (cnt <= 0) {
+ if (cnt < 0 && errno == EBADF) {
+ rsyserr(FERROR, errno, "safe_read select failed [%s]",
+ who_am_i());
+ exit_cleanup(RERR_FILEIO);
+ }
+ if (io_timeout)
+ maybe_send_keepalive(time(NULL), MSK_ALLOW_FLUSH);
+ continue;
+ }
+
+ /*if (FD_ISSET(fd, &e_fds))
+ rprintf(FINFO, "select exception on fd %d\n", fd); */
+
+ if (FD_ISSET(fd, &r_fds)) {
+ n = read(fd, buf + got, len - got);
+ if (DEBUG_GTE(IO, 2))
+ rprintf(FINFO, "[%s] safe_read(%d)=%ld\n", who_am_i(), fd, (long)n);
+ if (n == 0)
+ break;
+ if (n < 0) {
+ if (errno == EINTR)
+ continue;
+ goto read_failed;
+ }
+ if ((got += (size_t)n) == len)
+ break;
+ }
+ }
+
+ return got;
+}