int batch_fd = -1;
int msgdone_cnt = 0;
int forward_flist_data = 0;
+BOOL flist_receiving_enabled = False;
/* Ignore an EOF error if non-zero. See whine_about_eof(). */
int kluge_around_eof = 0;
static flist_ndx_list redo_list, hlink_list;
+static void read_a_msg(void);
static void drain_multiplex_messages(void);
static void sleep_for_bwlimit(int bytes_written);
}
}
+static void slide_iobuf_in(size_t needed)
+{
+ memmove(iobuf.in.buf, iobuf.in.buf + iobuf.in.pos, iobuf.in.len);
+ if (DEBUG_GTE(IO, 4)) {
+ rprintf(FINFO,
+ "[%s] moved %ld bytes from %ld to 0 in the input buffer (size=%ld, needed=%ld).\n",
+ who_am_i(), (long)iobuf.in.len, (long)iobuf.in.pos, (long)iobuf.in.size, (long)needed);
+ }
+ if (iobuf.raw_input_ends_before)
+ iobuf.raw_input_ends_before -= iobuf.in.pos;
+ iobuf.in.pos = 0;
+}
+
/* Perform buffered input and output until specified conditions are met. When
* given a "needed" read requirement, we'll return without doing any I/O if the
* iobuf.in bytes are already available. When reading, we'll read as many
}
realloc_xbuf(&iobuf.in, new_size);
}
- if (iobuf.in.size - iobuf.in.pos < needed
- || (iobuf.in.len < needed && iobuf.in.len < 1024
- && iobuf.in.size - (iobuf.in.pos + iobuf.in.len) < 1024)) {
- memmove(iobuf.in.buf, iobuf.in.buf + iobuf.in.pos, iobuf.in.len);
- if (DEBUG_GTE(IO, 4)) {
- rprintf(FINFO,
- "[%s] moved %ld bytes from %ld to 0 in the input buffer (size=%ld, needed=%ld).\n",
- who_am_i(), (long)iobuf.in.len, (long)iobuf.in.pos, (long)iobuf.in.size, (long)needed);
- }
- if (iobuf.raw_input_ends_before)
- iobuf.raw_input_ends_before -= iobuf.in.pos;
- iobuf.in.pos = 0;
- }
+ if (iobuf.in.size - iobuf.in.pos < needed)
+ slide_iobuf_in(needed);
break;
case PIO_NEED_OUTROOM:
break;
}
+ if (iobuf.in.len < 1024 && iobuf.in.size - (iobuf.in.pos + iobuf.in.len) < 1024)
+ slide_iobuf_in(flags & PIO_NEED_INPUT ? needed : 0);
+
max_fd = -1;
FD_ZERO(&r_fds);
}
}
+ /* We need to help prevent deadlock by doing what reading
+ * we can whenever we are here trying to write. */
+ if (IN_MULTIPLEXED && !(flags & PIO_NEED_INPUT)) {
+ while (!iobuf.raw_input_ends_before && iobuf.in.len > 512)
+ read_a_msg();
+ if (flist_receiving_enabled && iobuf.in.len > 512)
+ wait_for_receiver(); /* generator only */
+ }
+
if (ff_forward_fd >= 0 && FD_ISSET(ff_forward_fd, &r_fds)) {
/* This can potentially flush all output and enable
* multiplexed output, so keep this last in the loop
}
} else {
struct file_list *flist;
+ flist_receiving_enabled = False;
if (DEBUG_GTE(FLIST, 2)) {
rprintf(FINFO, "[%s] receiving flist for dir %d\n",
who_am_i(), ndx);
if (preserve_hard_links)
match_hard_links(flist);
#endif
+ flist_receiving_enabled = True;
}
}
}