int ignore_timeout = 0;
int batch_fd = -1;
int msgdone_cnt = 0;
-int check_for_io_err = 0;
/* Ignore an EOF error if non-zero. See whine_about_eof(). */
int kluge_around_eof = 0;
#ifdef ICONV_OPTION
static xbuf iconv_buf = EMPTY_XBUF;
#endif
-static int defer_forwarding_messages = 0, defer_forwarding_keep = 0;
+static int defer_forwarding_messages = 0, keep_defer_forwarding = 0;
static int select_timeout = SELECT_TIMEOUT;
static int active_filecnt = 0;
static OFF_T active_bytecnt = 0;
static void got_flist_entry_status(enum festatus status, const char *buf)
{
int ndx = IVAL(buf, 0);
- struct file_list *flist = flist_for_ndx(ndx);
-
- assert(flist != NULL);
+ struct file_list *flist = flist_for_ndx(ndx, "got_flist_entry_status");
if (remove_source_files) {
active_filecnt--;
lst->tail = m;
}
+static inline int flush_a_msg(int fd)
+{
+ struct msg_list_item *m = msg_queue.head;
+ int len = IVAL(m->buf, 0) & 0xFFFFFF;
+ int tag = *((uchar*)m->buf+3) - MPLEX_BASE;
+
+ if (!(msg_queue.head = m->next))
+ msg_queue.tail = NULL;
+
+ defer_forwarding_messages++;
+ mplex_write(fd, tag, m->buf + 4, len, m->convert);
+ defer_forwarding_messages--;
+
+ free(m);
+
+ return len;
+}
+
static void msg_flush(void)
{
if (am_generator) {
- while (msg_queue.head && io_multiplexing_out) {
- struct msg_list_item *m = msg_queue.head;
- int len = IVAL(m->buf, 0) & 0xFFFFFF;
- int tag = *((uchar*)m->buf+3) - MPLEX_BASE;
- if (!(msg_queue.head = m->next))
- msg_queue.tail = NULL;
- stats.total_written += len + 4;
- defer_forwarding_messages++;
- mplex_write(sock_f_out, tag, m->buf + 4, len, m->convert);
- defer_forwarding_messages--;
- free(m);
- }
+ while (msg_queue.head && io_multiplexing_out)
+ stats.total_written += flush_a_msg(sock_f_out) + 4;
} else {
- while (msg_queue.head) {
- struct msg_list_item *m = msg_queue.head;
- int len = IVAL(m->buf, 0) & 0xFFFFFF;
- int tag = *((uchar*)m->buf+3) - MPLEX_BASE;
- if (!(msg_queue.head = m->next))
- msg_queue.tail = NULL;
- defer_forwarding_messages++;
- mplex_write(msg_fd_out, tag, m->buf + 4, len, m->convert);
- defer_forwarding_messages--;
- free(m);
- }
+ while (msg_queue.head)
+ (void)flush_a_msg(msg_fd_out);
}
}
len = tag & 0xFFFFFF;
tag = (tag >> 24) - MPLEX_BASE;
- check_for_io_err = 0;
-
switch (tag) {
case MSG_DONE:
if (len < 0 || len > 1 || !am_generator) {
}
flist = recv_file_list(fd);
flist->parent_ndx = IVAL(buf,0);
- /* If the sender is going to send us an MSG_IO_ERROR value, it
- * will always be the very next message following MSG_FLIST. */
- check_for_io_err = 1;
#ifdef SUPPORT_HARD_LINKS
if (preserve_hard_links)
match_hard_links(flist);
SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
- defer_forwarding_keep = 1; /* defer_forwarding_messages++ on return */
+ keep_defer_forwarding++; /* defer_forwarding_messages++ on return */
writefd_unbuffered(fd, buffer, n+4);
- defer_forwarding_keep = 0;
+ keep_defer_forwarding--;
if (len > n)
writefd_unbuffered(fd, buf+n, len-n);
void wait_for_receiver(void)
{
- if (iobuf_out_cnt)
- io_flush(NORMAL_FLUSH);
- else
- read_msg_fd();
+ if (io_flush(FULL_FLUSH))
+ return;
+ read_msg_fd();
}
int get_redo_num(void)
msg_bytes = tag & 0xFFFFFF;
tag = (tag >> 24) - MPLEX_BASE;
- check_for_io_err = 0;
-
switch (tag) {
case MSG_DATA:
if (msg_bytes > iobuf_in_siz) {
}
no_flush--;
- defer_inc -= defer_forwarding_keep;
+ if (keep_defer_forwarding)
+ defer_inc--;
if (!(defer_forwarding_messages -= defer_inc) && !no_flush)
msg_flush();
}
-void io_flush(int flush_it_all)
+int io_flush(int flush_it_all)
{
- if (!iobuf_out_cnt || no_flush)
- return;
+ int flushed_something = 0;
- if (io_multiplexing_out)
- mplex_write(sock_f_out, MSG_DATA, iobuf_out, iobuf_out_cnt, 0);
- else
- writefd_unbuffered(iobuf_f_out, iobuf_out, iobuf_out_cnt);
- iobuf_out_cnt = 0;
+ if (no_flush)
+ return 0;
+
+ if (iobuf_out_cnt) {
+ if (io_multiplexing_out)
+ mplex_write(sock_f_out, MSG_DATA, iobuf_out, iobuf_out_cnt, 0);
+ else
+ writefd_unbuffered(iobuf_f_out, iobuf_out, iobuf_out_cnt);
+ iobuf_out_cnt = 0;
+ flushed_something = 1;
+ }
- if (flush_it_all && !defer_forwarding_messages)
+ if (flush_it_all && !defer_forwarding_messages && msg_queue.head) {
msg_flush();
+ flushed_something = 1;
+ }
+
+ return flushed_something;
}
static void writefd(int fd, const char *buf, size_t len)