Matt McCutchen's Web Site
/
rsync
/
rsync.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Moved the reading of the final MSG_DONE message here from main.c
[rsync/rsync.git]
/
io.c
diff --git
a/io.c
b/io.c
index
f3ea2a6
..
62880fb
100644
(file)
--- a/
io.c
+++ b/
io.c
@@
-46,6
+46,7
@@
extern int io_timeout;
extern int am_server;
extern int am_daemon;
extern int am_sender;
extern int am_server;
extern int am_daemon;
extern int am_sender;
+extern int am_generator;
extern int eol_nulls;
extern int checksum_seed;
extern int protocol_version;
extern int eol_nulls;
extern int checksum_seed;
extern int protocol_version;
@@
-162,17
+163,17
@@
void io_set_sock_fds(int f_in, int f_out)
sock_f_out = f_out;
}
sock_f_out = f_out;
}
-/** Setup the fd used to receive MSG_* messages. Only needed when
- * we're the generator because the sender and receiver both use the
- * multiplexed I/O setup. */
+/* Setup the fd used to receive MSG_* messages. Only needed during the
+ * early stages of being a local sender (up through the sending of the
+ * file list) or when we're the generator (to fetch the messages from
+ * the receiver). */
void set_msg_fd_in(int fd)
{
msg_fd_in = fd;
}
void set_msg_fd_in(int fd)
{
msg_fd_in = fd;
}
-/** Setup the fd used to send our MSG_* messages. Only needed when
- * we're the receiver because the generator and the sender both use
- * the multiplexed I/O setup. */
+/* Setup the fd used to send our MSG_* messages. Only needed when
+ * we're the receiver (to send our messages to the generator). */
void set_msg_fd_out(int fd)
{
msg_fd_out = fd;
void set_msg_fd_out(int fd)
{
msg_fd_out = fd;
@@
-205,8
+206,10
@@
void send_msg(enum msgcode code, char *buf, int len)
msg_list_push(NORMAL_FLUSH);
}
msg_list_push(NORMAL_FLUSH);
}
-/** Read a message from the MSG_* fd and dispatch it. This is only
- * called by the generator. */
+/* Read a message from the MSG_* fd and handle it. This is called either
+ * during the early stages of being a local sender (up through the sending
+ * of the file list) or when we're the generator (to fetch the messages
+ * from the receiver). */
static void read_msg_fd(void)
{
char buf[2048];
static void read_msg_fd(void)
{
char buf[2048];
@@
-226,14
+229,14
@@
static void read_msg_fd(void)
switch (tag) {
case MSG_DONE:
switch (tag) {
case MSG_DONE:
- if (len != 0) {
+ if (len != 0
|| !am_generator
) {
rprintf(FERROR, "invalid message %d:%d\n", tag, len);
exit_cleanup(RERR_STREAMIO);
}
redo_list_add(-1);
break;
case MSG_REDO:
rprintf(FERROR, "invalid message %d:%d\n", tag, len);
exit_cleanup(RERR_STREAMIO);
}
redo_list_add(-1);
break;
case MSG_REDO:
- if (len != 4) {
+ if (len != 4
|| !am_generator
) {
rprintf(FERROR, "invalid message %d:%d\n", tag, len);
exit_cleanup(RERR_STREAMIO);
}
rprintf(FERROR, "invalid message %d:%d\n", tag, len);
exit_cleanup(RERR_STREAMIO);
}
@@
-353,8
+356,8
@@
static void whine_about_eof(int fd)
exit_cleanup(0);
rprintf(FERROR, RSYNC_NAME ": connection unexpectedly closed "
exit_cleanup(0);
rprintf(FERROR, RSYNC_NAME ": connection unexpectedly closed "
- "(%.0f bytes re
ad so far)
\n",
- (double)stats.total_read);
+ "(%.0f bytes re
ceived so far) [%s]
\n",
+ (double)stats.total_read
, who_am_i()
);
exit_cleanup(RERR_STREAMIO);
}
exit_cleanup(RERR_STREAMIO);
}
@@
-510,7
+513,7
@@
static int read_timeout(int fd, char *buf, size_t len)
/* Don't write errors on a dead socket. */
if (fd == sock_f_in)
/* Don't write errors on a dead socket. */
if (fd == sock_f_in)
-
io_multiplexing_close
();
+
close_multiplexing_out
();
rsyserr(FERROR, errno, "read error");
exit_cleanup(RERR_STREAMIO);
}
rsyserr(FERROR, errno, "read error");
exit_cleanup(RERR_STREAMIO);
}
@@
-857,6
+860,11
@@
static void writefd_unbuffered(int fd,char *buf,size_t len)
if (msg_fd_in > maxfd)
maxfd = msg_fd_in;
}
if (msg_fd_in > maxfd)
maxfd = msg_fd_in;
}
+ if (fd != sock_f_out && iobuf_out_cnt && no_flush == 1) {
+ FD_SET(sock_f_out, &w_fds);
+ if (sock_f_out > maxfd)
+ maxfd = sock_f_out;
+ }
tv.tv_sec = select_timeout;
tv.tv_usec = 0;
tv.tv_sec = select_timeout;
tv.tv_usec = 0;
@@
-875,8
+883,14
@@
static void writefd_unbuffered(int fd,char *buf,size_t len)
if (msg_fd_in >= 0 && FD_ISSET(msg_fd_in, &r_fds))
read_msg_fd();
if (msg_fd_in >= 0 && FD_ISSET(msg_fd_in, &r_fds))
read_msg_fd();
- if (!FD_ISSET(fd, &w_fds))
+ if (!FD_ISSET(fd, &w_fds)) {
+ if (fd != sock_f_out && iobuf_out_cnt) {
+ no_flush--;
+ io_flush(NORMAL_FLUSH);
+ no_flush++;
+ }
continue;
continue;
+ }
n = len - total;
if (bwlimit && n > bwlimit_writemax)
n = len - total;
if (bwlimit && n > bwlimit_writemax)
@@
-895,14
+909,14
@@
static void writefd_unbuffered(int fd,char *buf,size_t len)
/* Don't try to write errors back across the stream. */
if (fd == sock_f_out)
/* Don't try to write errors back across the stream. */
if (fd == sock_f_out)
-
io_multiplexing_close
();
+
close_multiplexing_out
();
rsyserr(FERROR, errno,
rsyserr(FERROR, errno,
- "writefd_unbuffered failed to write %ld bytes: phase \"%s\"",
- (long)len, io_write_phase);
+ "writefd_unbuffered failed to write %ld bytes: phase \"%s\"
[%s]
",
+ (long)len, io_write_phase
, who_am_i()
);
/* If the other side is sending us error messages, try
* to grab any messages they sent before they died. */
/* If the other side is sending us error messages, try
* to grab any messages they sent before they died. */
- while (fd == sock_f_out &&
am_sender
) {
- io_timeout = 30;
+ while (fd == sock_f_out &&
io_multiplexing_in
) {
+ io_timeout =
select_timeout =
30;
readfd_unbuffered(sock_f_in, io_filesfrom_buf,
sizeof io_filesfrom_buf);
}
readfd_unbuffered(sock_f_in, io_filesfrom_buf,
sizeof io_filesfrom_buf);
}
@@
-1129,8
+1143,13
@@
int io_multiplex_write(enum msgcode code, char *buf, size_t len)
return 1;
}
return 1;
}
+void close_multiplexing_in(void)
+{
+ io_multiplexing_in = 0;
+}
+
/** Stop output multiplexing. */
/** Stop output multiplexing. */
-void
io_multiplexing_close
(void)
+void
close_multiplexing_out
(void)
{
io_multiplexing_out = 0;
}
{
io_multiplexing_out = 0;
}