X-Git-Url: https://mattmccutchen.net/rsync/rsync.git/blobdiff_plain/41cfde6be388364f9bf7dfa9a532625b1f660df7..eae7165c797a9546b50aed0efbc70bae7880bf0b:/io.c diff --git a/io.c b/io.c index 50eca536..1cfcd722 100644 --- a/io.c +++ b/io.c @@ -46,6 +46,7 @@ extern int io_timeout; extern int am_server; extern int am_daemon; extern int am_sender; +extern int am_generator; extern int eol_nulls; extern int checksum_seed; extern int protocol_version; @@ -162,17 +163,17 @@ void io_set_sock_fds(int f_in, int f_out) sock_f_out = f_out; } -/** Setup the fd used to receive MSG_* messages. Only needed when - * we're the generator because the sender and receiver both use the - * multiplexed I/O setup. */ +/* Setup the fd used to receive MSG_* messages. Only needed during the + * early stages of being a local sender (up through the sending of the + * file list) or when we're the generator (to fetch the messages from + * the receiver). */ void set_msg_fd_in(int fd) { msg_fd_in = fd; } -/** Setup the fd used to send our MSG_* messages. Only needed when - * we're the receiver because the generator and the sender both use - * the multiplexed I/O setup. */ +/* Setup the fd used to send our MSG_* messages. Only needed when + * we're the receiver (to send our messages to the generator). */ void set_msg_fd_out(int fd) { msg_fd_out = fd; @@ -205,8 +206,10 @@ void send_msg(enum msgcode code, char *buf, int len) msg_list_push(NORMAL_FLUSH); } -/** Read a message from the MSG_* fd and dispatch it. This is only - * called by the generator. */ +/* Read a message from the MSG_* fd and handle it. This is called either + * during the early stages of being a local sender (up through the sending + * of the file list) or when we're the generator (to fetch the messages + * from the receiver). */ static void read_msg_fd(void) { char buf[2048]; @@ -226,14 +229,14 @@ static void read_msg_fd(void) switch (tag) { case MSG_DONE: - if (len != 0) { + if (len != 0 || !am_generator) { rprintf(FERROR, "invalid message %d:%d\n", tag, len); exit_cleanup(RERR_STREAMIO); } redo_list_add(-1); break; case MSG_REDO: - if (len != 4) { + if (len != 4 || !am_generator) { rprintf(FERROR, "invalid message %d:%d\n", tag, len); exit_cleanup(RERR_STREAMIO); } @@ -676,8 +679,9 @@ static int readfd_unbuffered(int fd, char *buf, size_t len) case MSG_INFO: case MSG_ERROR: if (remaining >= sizeof line) { - rprintf(FERROR, "multiplexing overflow %d:%ld\n\n", - tag, (long)remaining); + rprintf(FERROR, + "[%s] multiplexing overflow %d:%ld\n\n", + who_am_i(), tag, (long)remaining); exit_cleanup(RERR_STREAMIO); } read_loop(fd, line, remaining); @@ -685,7 +689,8 @@ static int readfd_unbuffered(int fd, char *buf, size_t len) remaining = 0; break; default: - rprintf(FERROR, "unexpected tag %d\n", tag); + rprintf(FERROR, "[%s] unexpected tag %d\n", + who_am_i(), tag); exit_cleanup(RERR_STREAMIO); } } @@ -913,7 +918,7 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) /* If the other side is sending us error messages, try * to grab any messages they sent before they died. */ while (fd == sock_f_out && io_multiplexing_in) { - io_timeout = 30; + io_timeout = select_timeout = 30; readfd_unbuffered(sock_f_in, io_filesfrom_buf, sizeof io_filesfrom_buf); }