static void read_msg_fd(void)
{
char buf[2048];
-@@ -244,58 +242,6 @@ static void read_msg_fd(void)
+@@ -244,51 +242,6 @@ static void read_msg_fd(void)
tag = (tag >> 24) - MPLEX_BASE;
switch (tag) {
- exit_cleanup(RERR_STREAMIO);
- }
- read_loop(fd, buf, len);
-- if (defer_forwarding_messages)
-- msg_list_add(&msg2sndr, MSG_DELETED, buf, len);
-- else
-- io_multiplex_write(MSG_DELETED, buf, len);
+- send_msg(MSG_DELETED, buf, len);
- break;
- case MSG_SUCCESS:
- if (len != 4 || !am_generator) {
- read_loop(fd, buf, len);
- if (remove_sent_files) {
- decrement_active_files(IVAL(buf,0));
-- if (defer_forwarding_messages)
-- msg_list_add(&msg2sndr, MSG_SUCCESS, buf, len);
-- else
-- io_multiplex_write(MSG_SUCCESS, buf, len);
+- send_msg(MSG_SUCCESS, buf, len);
- }
- if (preserve_hard_links)
- flist_ndx_push(&hlink_list, IVAL(buf,0));
- exit_cleanup(RERR_STREAMIO);
- }
- close_multiplexing_out();
-- defer_forwarding_messages = 0;
- /* FALL THROUGH */
case MSG_INFO:
case MSG_ERROR:
case MSG_LOG:
-@@ -304,11 +250,7 @@ static void read_msg_fd(void)
- if (n >= sizeof buf)
- n = sizeof buf - 1;
- read_loop(fd, buf, n);
-- if (am_generator && am_server
-- && defer_forwarding_messages && tag != MSG_LOG)
-- msg_list_add(&msg2sndr, tag, buf, n);
-- else
-- rwrite((enum logcode)tag, buf, n);
-+ rwrite((enum logcode)tag, buf, n);
- len -= n;
- }
- break;
-@@ -343,70 +285,76 @@ void decrement_active_files(int ndx)
+@@ -332,75 +285,80 @@ void decrement_active_files(int ndx)
active_bytecnt -= the_file_list->files[ndx]->length;
}
+ no_flush--;
}
- void send_msg(enum msgcode code, char *buf, int len)
+ int send_msg(enum msgcode code, char *buf, int len)
{
- if (msg_fd_out < 0) {
-+ if (am_receiver())
-+ msg_list_add(code, buf, len);
-+ else
- io_multiplex_write(code, buf, len);
-- return;
-- }
++ if (!am_receiver()) {
+ if (!defer_forwarding_messages)
+ return io_multiplex_write(code, buf, len);
+ if (!io_multiplexing_out)
+ return 0;
+- msg_list_add(&msg2sndr, code, buf, len);
+- return 1;
+ }
- msg_list_add(&msg2genr, code, buf, len);
- msg2genr_flush(NORMAL_FLUSH);
++ msg_list_add(code, buf, len);
+ return 1;
}
-int get_redo_num(int itemizing, enum logcode code)
return flist_ndx_pop(&hlink_list);
}
-@@ -486,11 +434,6 @@ static int read_timeout(int fd, char *bu
+@@ -480,11 +438,6 @@ static int read_timeout(int fd, char *bu
FD_ZERO(&r_fds);
FD_ZERO(&w_fds);
FD_SET(fd, &r_fds);
if (io_filesfrom_f_out >= 0) {
int new_fd;
if (io_filesfrom_buflen == 0) {
-@@ -523,9 +466,6 @@ static int read_timeout(int fd, char *bu
+@@ -517,9 +470,6 @@ static int read_timeout(int fd, char *bu
continue;
}
if (io_filesfrom_f_out >= 0) {
if (io_filesfrom_buflen) {
if (FD_ISSET(io_filesfrom_f_out, &w_fds)) {
-@@ -847,6 +787,8 @@ static void readfd(int fd, char *buffer,
+@@ -841,6 +791,8 @@ static void readfd(int fd, char *buffer,
}
if (fd == write_batch_monitor_in) {
if ((size_t)write(batch_fd, buffer, total) != total)
exit_cleanup(RERR_FILEIO);
}
-@@ -1108,7 +1050,6 @@ static void writefd_unbuffered(int fd,ch
+@@ -1102,7 +1054,6 @@ static void writefd_unbuffered(int fd,ch
* to grab any messages they sent before they died. */
while (fd == sock_f_out && io_multiplexing_in) {
set_io_timeout(30);
readfd_unbuffered(sock_f_in, io_filesfrom_buf,
sizeof io_filesfrom_buf);
}
-@@ -1119,7 +1060,7 @@ static void writefd_unbuffered(int fd,ch
+@@ -1113,7 +1064,7 @@ static void writefd_unbuffered(int fd,ch
defer_forwarding_messages = 1;
if (fd == sock_f_out) {
last_io_out = time(NULL);
sleep_for_bwlimit(cnt);
}
-@@ -1129,32 +1070,6 @@ static void writefd_unbuffered(int fd,ch
+@@ -1123,23 +1074,6 @@ static void writefd_unbuffered(int fd,ch
no_flush--;
}
-
- while (msg2sndr.head && io_multiplexing_out) {
- struct msg_list_item *m = msg2sndr.head;
-- int tag = *((uchar*)m->buf+3) - MPLEX_BASE;
- if (!(msg2sndr.head = m->next))
- msg2sndr.tail = NULL;
+- stats.total_written += m->len;
- defer_forwarding_messages = 1;
-- switch (tag) {
-- case MSG_INFO:
-- case MSG_ERROR:
-- rwrite((enum logcode)tag, m->buf + 4, m->len - 4);
-- break;
-- default:
-- stats.total_written += m->len;
-- writefd_unbuffered(sock_f_out, m->buf, m->len);
-- break;
-- }
+- writefd_unbuffered(sock_f_out, m->buf, m->len);
- defer_forwarding_messages = 0;
- free(m);
- }
/**
* Write an message to a multiplexed stream. If this fails then rsync
* exits.
-@@ -1180,14 +1095,15 @@ static void mplex_write(enum msgcode cod
+@@ -1165,14 +1099,15 @@ static void mplex_write(enum msgcode cod
defer_forwarding_messages = 1;
writefd_unbuffered(sock_f_out, buf, len);
defer_forwarding_messages = 0;
if (!iobuf_out_cnt || no_flush)
return;
-@@ -1201,11 +1117,6 @@ void io_flush(int flush_it_all)
+@@ -1186,11 +1121,6 @@ void io_flush(int flush_it_all)
static void writefd(int fd,char *buf,size_t len)
{
if (fd == sock_f_out)
stats.total_written += len;
-@@ -1411,9 +1322,3 @@ void start_write_batch(int fd)
+@@ -1396,9 +1326,3 @@ void start_write_batch(int fd)
else
write_batch_monitor_in = fd;
}