int batch_fd = -1;
int msgdone_cnt = 0;
int forward_flist_data = 0;
+BOOL flist_receiving_enabled = False;
/* Ignore an EOF error if non-zero. See whine_about_eof(). */
int kluge_around_eof = 0;
static flist_ndx_list redo_list, hlink_list;
+static void read_a_msg(void);
static void drain_multiplex_messages(void);
static void sleep_for_bwlimit(int bytes_written);
}
}
+static void slide_iobuf_in(size_t needed)
+{
+ memmove(iobuf.in.buf, iobuf.in.buf + iobuf.in.pos, iobuf.in.len);
+ if (DEBUG_GTE(IO, 4)) {
+ rprintf(FINFO,
+ "[%s] moved %ld bytes from %ld to 0 in the input buffer (size=%ld, needed=%ld).\n",
+ who_am_i(), (long)iobuf.in.len, (long)iobuf.in.pos, (long)iobuf.in.size, (long)needed);
+ }
+ if (iobuf.raw_input_ends_before)
+ iobuf.raw_input_ends_before -= iobuf.in.pos;
+ iobuf.in.pos = 0;
+}
+
/* Perform buffered input and output until specified conditions are met. When
* given a "needed" read requirement, we'll return without doing any I/O if the
* iobuf.in bytes are already available. When reading, we'll read as many
}
realloc_xbuf(&iobuf.in, new_size);
}
- if (iobuf.in.size - iobuf.in.pos < needed
- || (iobuf.in.len < needed && iobuf.in.len < 1024
- && iobuf.in.size - (iobuf.in.pos + iobuf.in.len) < 1024)) {
- memmove(iobuf.in.buf, iobuf.in.buf + iobuf.in.pos, iobuf.in.len);
- if (DEBUG_GTE(IO, 4)) {
- rprintf(FINFO,
- "[%s] moved %ld bytes from %ld to 0 in the input buffer (size=%ld, needed=%ld).\n",
- who_am_i(), (long)iobuf.in.len, (long)iobuf.in.pos, (long)iobuf.in.size, (long)needed);
- }
- if (iobuf.raw_input_ends_before)
- iobuf.raw_input_ends_before -= iobuf.in.pos;
- iobuf.in.pos = 0;
- }
+ if (iobuf.in.size - iobuf.in.pos < needed)
+ slide_iobuf_in(needed);
break;
case PIO_NEED_OUTROOM:
break;
}
+ if (iobuf.in.len < 1024 && iobuf.in.size - (iobuf.in.pos + iobuf.in.len) < 1024)
+ slide_iobuf_in(flags & PIO_NEED_INPUT ? needed : 0);
+
max_fd = -1;
FD_ZERO(&r_fds);
}
}
+ /* We need to help prevent deadlock by doing what reading
+ * we can whenever we are here trying to write. */
+ if (IN_MULTIPLEXED && !(flags & PIO_NEED_INPUT)) {
+ while (!iobuf.raw_input_ends_before && iobuf.in.len > 512)
+ read_a_msg();
+ if (flist_receiving_enabled && iobuf.in.len > 512)
+ wait_for_receiver(); /* generator only */
+ }
+
if (ff_forward_fd >= 0 && FD_ISSET(ff_forward_fd, &r_fds)) {
/* This can potentially flush all output and enable
* multiplexed output, so keep this last in the loop
}
break;
case MSG_ERROR_EXIT:
+ if (DEBUG_GTE(EXIT, 3))
+ rprintf(FINFO, "[%s] got MSG_ERROR_EXIT with %d bytes\n", who_am_i(), msg_bytes);
if (msg_bytes == 0) {
if (!am_sender && !am_generator) {
+ if (DEBUG_GTE(EXIT, 3)) {
+ rprintf(FINFO, "[%s] sending MSG_ERROR_EXIT (len 0)\n",
+ who_am_i());
+ }
send_msg(MSG_ERROR_EXIT, "", 0, 0);
io_flush(FULL_FLUSH);
}
data = perform_io(4, PIO_INPUT_AND_CONSUME);
val = IVAL(data, 0);
if (protocol_version >= 31) {
- if (am_generator)
+ if (am_generator) {
+ if (DEBUG_GTE(EXIT, 3)) {
+ rprintf(FINFO, "[%s] sending MSG_ERROR_EXIT with exit_code %d\n",
+ who_am_i(), val);
+ }
send_msg_int(MSG_ERROR_EXIT, val);
- else
+ } else {
+ if (DEBUG_GTE(EXIT, 3)) {
+ rprintf(FINFO, "[%s] sending MSG_ERROR_EXIT (len 0)\n",
+ who_am_i());
+ }
send_msg(MSG_ERROR_EXIT, "", 0, 0);
+ }
}
} else
goto invalid_msg;
}
} else {
struct file_list *flist;
+ flist_receiving_enabled = False;
if (DEBUG_GTE(FLIST, 2)) {
rprintf(FINFO, "[%s] receiving flist for dir %d\n",
who_am_i(), ndx);
if (preserve_hard_links)
match_hard_links(flist);
#endif
+ flist_receiving_enabled = True;
}
}
}