extern int am_server;
extern int am_daemon;
extern int am_sender;
+extern int am_generator;
extern int eol_nulls;
+extern int csum_length;
extern int checksum_seed;
extern int protocol_version;
extern char *remote_filesfrom_file;
sock_f_out = f_out;
}
-/** Setup the fd used to receive MSG_* messages. Only needed when
- * we're the generator because the sender and receiver both use the
- * multiplexed I/O setup. */
+/* Setup the fd used to receive MSG_* messages. Only needed during the
+ * early stages of being a local sender (up through the sending of the
+ * file list) or when we're the generator (to fetch the messages from
+ * the receiver). */
void set_msg_fd_in(int fd)
{
msg_fd_in = fd;
}
-/** Setup the fd used to send our MSG_* messages. Only needed when
- * we're the receiver because the generator and the sender both use
- * the multiplexed I/O setup. */
+/* Setup the fd used to send our MSG_* messages. Only needed when
+ * we're the receiver (to send our messages to the generator). */
void set_msg_fd_out(int fd)
{
msg_fd_out = fd;
msg_list_push(NORMAL_FLUSH);
}
-/** Read a message from the MSG_* fd and dispatch it. This is only
- * called by the generator. */
+/* Read a message from the MSG_* fd and handle it. This is called either
+ * during the early stages of being a local sender (up through the sending
+ * of the file list) or when we're the generator (to fetch the messages
+ * from the receiver). */
static void read_msg_fd(void)
{
char buf[2048];
switch (tag) {
case MSG_DONE:
- if (len != 0) {
+ if (len != 0 || !am_generator) {
rprintf(FERROR, "invalid message %d:%d\n", tag, len);
exit_cleanup(RERR_STREAMIO);
}
redo_list_add(-1);
break;
case MSG_REDO:
- if (len != 4) {
+ if (len != 4 || !am_generator) {
rprintf(FERROR, "invalid message %d:%d\n", tag, len);
exit_cleanup(RERR_STREAMIO);
}
exit_cleanup(0);
rprintf(FERROR, RSYNC_NAME ": connection unexpectedly closed "
- "(%.0f bytes read so far)\n",
- (double)stats.total_read);
+ "(%.0f bytes received so far) [%s]\n",
+ (double)stats.total_read, who_am_i());
exit_cleanup(RERR_STREAMIO);
}
/* Don't write errors on a dead socket. */
if (fd == sock_f_in)
- io_multiplexing_close();
+ close_multiplexing_out();
rsyserr(FERROR, errno, "read error");
exit_cleanup(RERR_STREAMIO);
}
case MSG_INFO:
case MSG_ERROR:
if (remaining >= sizeof line) {
- rprintf(FERROR, "multiplexing overflow %d:%ld\n\n",
- tag, (long)remaining);
+ rprintf(FERROR,
+ "[%s] multiplexing overflow %d:%ld\n\n",
+ who_am_i(), tag, (long)remaining);
exit_cleanup(RERR_STREAMIO);
}
read_loop(fd, line, remaining);
remaining = 0;
break;
default:
- rprintf(FERROR, "unexpected tag %d\n", tag);
+ rprintf(FERROR, "[%s] unexpected tag %d\n",
+ who_am_i(), tag);
exit_cleanup(RERR_STREAMIO);
}
}
if ((int32)ret != (int32)0xffffffff)
return ret;
-#ifdef NO_INT64
- rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
- exit_cleanup(RERR_UNSUPPORTED);
-#else
+#ifdef INT64_IS_OFF_T
+ if (sizeof (int64) < 8) {
+ rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
+ exit_cleanup(RERR_UNSUPPORTED);
+ }
+#endif
readfd(f,b,8);
ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
-#endif
return ret;
}
return c;
}
+/* Populate a sum_struct with values from the socket. This is
+ * called by both the sender and the receiver. */
+void read_sum_head(int f, struct sum_struct *sum)
+{
+ sum->count = read_int(f);
+ sum->blength = read_int(f);
+ if (sum->blength < 0 || sum->blength > MAX_BLOCK_SIZE) {
+ rprintf(FERROR, "Invalid block length %ld\n",
+ (long)sum->blength);
+ exit_cleanup(RERR_PROTOCOL);
+ }
+ sum->s2length = protocol_version < 27 ? csum_length : (int)read_int(f);
+ if (sum->s2length < 0 || sum->s2length > MD4_SUM_LENGTH) {
+ rprintf(FERROR, "Invalid checksum length %d\n", sum->s2length);
+ exit_cleanup(RERR_PROTOCOL);
+ }
+ sum->remainder = read_int(f);
+ if (sum->remainder < 0 || sum->remainder > sum->blength) {
+ rprintf(FERROR, "Invalid remainder length %ld\n",
+ (long)sum->remainder);
+ exit_cleanup(RERR_PROTOCOL);
+ }
+}
+
/**
* Sleep after writing to limit I/O bandwidth usage.
if (msg_fd_in > maxfd)
maxfd = msg_fd_in;
}
+ if (fd != sock_f_out && iobuf_out_cnt && no_flush == 1) {
+ FD_SET(sock_f_out, &w_fds);
+ if (sock_f_out > maxfd)
+ maxfd = sock_f_out;
+ }
tv.tv_sec = select_timeout;
tv.tv_usec = 0;
if (msg_fd_in >= 0 && FD_ISSET(msg_fd_in, &r_fds))
read_msg_fd();
- if (!FD_ISSET(fd, &w_fds))
+ if (!FD_ISSET(fd, &w_fds)) {
+ if (fd != sock_f_out && iobuf_out_cnt) {
+ no_flush--;
+ io_flush(NORMAL_FLUSH);
+ no_flush++;
+ }
continue;
+ }
n = len - total;
if (bwlimit && n > bwlimit_writemax)
/* Don't try to write errors back across the stream. */
if (fd == sock_f_out)
- io_multiplexing_close();
+ close_multiplexing_out();
rsyserr(FERROR, errno,
- "writefd_unbuffered failed to write %ld bytes: phase \"%s\"",
- (long)len, io_write_phase);
+ "writefd_unbuffered failed to write %ld bytes: phase \"%s\" [%s]",
+ (long)len, io_write_phase, who_am_i());
+ /* If the other side is sending us error messages, try
+ * to grab any messages they sent before they died. */
+ while (fd == sock_f_out && io_multiplexing_in) {
+ io_timeout = select_timeout = 30;
+ readfd_unbuffered(sock_f_in, io_filesfrom_buf,
+ sizeof io_filesfrom_buf);
+ }
exit_cleanup(RERR_STREAMIO);
}
return;
}
-#ifdef NO_INT64
- rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
- exit_cleanup(RERR_UNSUPPORTED);
-#else
+#ifdef INT64_IS_OFF_T
+ if (sizeof (int64) < 8) {
+ rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
+ exit_cleanup(RERR_UNSUPPORTED);
+ }
+#endif
+
write_int(f, (int32)0xFFFFFFFF);
SIVAL(b,0,(x&0xFFFFFFFF));
SIVAL(b,4,((x>>32)&0xFFFFFFFF));
writefd(f,b,8);
-#endif
}
void write_buf(int f,char *buf,size_t len)
return 1;
}
+void close_multiplexing_in(void)
+{
+ io_multiplexing_in = 0;
+}
+
/** Stop output multiplexing. */
-void io_multiplexing_close(void)
+void close_multiplexing_out(void)
{
io_multiplexing_out = 0;
}
void start_write_batch(int fd)
{
+ write_stream_flags(batch_fd);
+
/* Some communication has already taken place, but we don't
* enable batch writing until here so that we can write a
* canonical record of the communication even though the