int batch_fd = -1;
int batch_gen_fd = -1;
-/**
- * The connection might be dropped at some point; perhaps because the
- * remote instance crashed. Just giving the offset on the stream is
- * not very helpful. So instead we try to make io_phase_name point to
- * something useful.
- *
- * For buffered/multiplexed I/O these names will be somewhat
- * approximate; perhaps for ease of support we would rather make the
- * buffer always flush when a single application-level I/O finishes.
- *
- * @todo Perhaps we want some simple stack functionality, but there's
- * no need to overdo it.
- **/
-const char *io_write_phase = phase_unknown;
-const char *io_read_phase = phase_unknown;
-
/* Ignore an EOF error if non-zero. See whine_about_eof(). */
int kluge_around_eof = 0;
}
break;
default:
- rprintf(FERROR, "unknown message %d:%d\n", tag, len);
+ rprintf(FERROR, "unknown message %d:%d [%s]\n",
+ tag, len, who_am_i());
exit_cleanup(RERR_STREAMIO);
}
size_t n, total = 0;
fd_set w_fds, r_fds;
int maxfd, count, cnt, using_r_fds;
+ int defer_save = defer_forwarding_messages;
struct timeval tv;
no_flush++;
if (!FD_ISSET(fd, &w_fds))
continue;
- if (msg2sndr.head && !defer_forwarding_messages) {
- struct msg_list_item *m = msg2sndr.head;
- int code = *((uchar*)m->buf+3) - MPLEX_BASE;
- if (!(msg2sndr.head = m->next))
- msg2sndr.tail = NULL;
- defer_forwarding_messages = 1;
- io_multiplex_write(code, m->buf+4, m->len-4);
- defer_forwarding_messages = 0;
- free(m);
- continue;
- }
-
n = len - total;
if (bwlimit_writemax && n > bwlimit_writemax)
n = bwlimit_writemax;
if (fd == sock_f_out)
close_multiplexing_out();
rsyserr(FERROR, errno,
- "writefd_unbuffered failed to write %ld bytes: phase \"%s\" [%s]",
- (long)len, io_write_phase, who_am_i());
+ "writefd_unbuffered failed to write %ld bytes [%s]",
+ (long)len, who_am_i());
/* If the other side is sending us error messages, try
* to grab any messages they sent before they died. */
while (fd == sock_f_out && io_multiplexing_in) {
sleep_for_bwlimit(cnt);
}
}
- defer_forwarding_messages = 0;
+ defer_forwarding_messages = defer_save;
no_flush--;
}
+static void msg2sndr_flush(void)
+{
+ if (defer_forwarding_messages)
+ return;
+
+ while (msg2sndr.head && io_multiplexing_out) {
+ struct msg_list_item *m = msg2sndr.head;
+ if (!(msg2sndr.head = m->next))
+ msg2sndr.tail = NULL;
+ stats.total_written += m->len;
+ defer_forwarding_messages = 1;
+ writefd_unbuffered(sock_f_out, m->buf, m->len);
+ defer_forwarding_messages = 0;
+ free(m);
+ }
+}
+
/**
* Write an message to a multiplexed stream. If this fails then rsync
* exits.
len -= n;
buf += n;
- if (len)
+ if (len) {
+ defer_forwarding_messages = 1;
writefd_unbuffered(sock_f_out, buf, len);
+ defer_forwarding_messages = 0;
+ msg2sndr_flush();
+ }
}
void io_flush(int flush_it_all)
{
msg2genr_flush(flush_it_all);
+ msg2sndr_flush();
if (!iobuf_out_cnt || no_flush)
return;
writefd(f,b,4);
}
-void write_int_named(int f, int32 x, const char *phase)
-{
- io_write_phase = phase;
- write_int(f, x);
- io_write_phase = phase_unknown;
-}
-
/*
* Note: int64 may actually be a 32-bit type if ./configure couldn't find any
* 64-bit types on this platform.