static int io_filesfrom_buflen;
static size_t contiguous_write_len = 0;
static int select_timeout = SELECT_TIMEOUT;
+static int active_filecnt = 0;
+static OFF_T active_bytecnt = 0;
static void read_loop(int fd, char *buf, size_t len);
exit_cleanup(RERR_STREAMIO);
}
read_loop(fd, buf, 4);
+ if (remove_sent_files)
+ decrement_active_files(IVAL(buf,0));
flist_ndx_push(&redo_list, IVAL(buf,0));
break;
case MSG_DELETED:
exit_cleanup(RERR_STREAMIO);
}
read_loop(fd, buf, len);
- if (remove_sent_files)
+ if (remove_sent_files) {
+ decrement_active_files(IVAL(buf,0));
io_multiplex_write(MSG_SUCCESS, buf, len);
+ }
if (preserve_hard_links)
flist_ndx_push(&hlink_list, IVAL(buf,0));
break;
+ case MSG_SOCKERR:
+ if (!am_generator) {
+ rprintf(FERROR, "invalid message %d:%d\n", tag, len);
+ exit_cleanup(RERR_STREAMIO);
+ }
+ close_multiplexing_out();
+ /* FALL THROUGH */
case MSG_INFO:
case MSG_ERROR:
case MSG_LOG:
msg_fd_in = fd;
}
+/* This is used by the generator to limit how many file transfers can
+ * be active at once when --remove-sent-files is specified. Without
+ * this, sender-side deletions were mostly happening at the end. */
+void increment_active_files(int ndx, int itemizing, enum logcode code)
+{
+ /* TODO: tune these limits? */
+ while (active_filecnt >= (active_bytecnt >= 128*1024 ? 10 : 50)) {
+ if (hlink_list.head)
+ check_for_finished_hlinks(itemizing, code);
+ read_msg_fd();
+ }
+
+ active_filecnt++;
+ active_bytecnt += the_file_list->files[ndx]->length;
+}
+
+void decrement_active_files(int ndx)
+{
+ active_filecnt--;
+ active_bytecnt -= the_file_list->files[ndx]->length;
+}
+
/* Try to push messages off the list onto the wire. If we leave with more
* to do, return 0. On error, return -1. If everything flushed, return 1.
* This is only active in the receiver. */
continue;
/* Don't write errors on a dead socket. */
- if (fd == sock_f_in)
+ if (fd == sock_f_in) {
close_multiplexing_out();
- rsyserr(FERROR, errno, "read error");
+ rsyserr(FSOCKERR, errno, "read error");
+ } else
+ rsyserr(FERROR, errno, "read error");
exit_cleanup(RERR_STREAMIO);
}
if (msg_bytes >= sizeof line)
goto overflow;
read_loop(fd, line, msg_bytes);
- line[msg_bytes] = '\0';
/* A directory name was sent with the trailing null */
if (msg_bytes > 0 && !line[msg_bytes-1])
log_delete(line, S_IFDIR);
- else
+ else {
+ line[msg_bytes] = '\0';
log_delete(line, S_IFREG);
+ }
break;
case MSG_SUCCESS:
if (msg_bytes != 4) {
#define ONE_SEC 1000000L /* # of microseconds in a second */
- if (!bwlimit)
+ if (!bwlimit_writemax)
return;
total_written += bytes_written;
continue;
n = len - total;
- if (bwlimit && n > bwlimit_writemax)
+ if (bwlimit_writemax && n > bwlimit_writemax)
n = bwlimit_writemax;
cnt = write(fd, buf + total, n);
**/
static void mplex_write(enum msgcode code, char *buf, size_t len)
{
- char buffer[BIGPATHBUFLEN];
+ char buffer[1024];
size_t n = len;
SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
contiguous_write_len = len + 4;
if (n > sizeof buffer - 4)
- n = sizeof buffer - 4;
+ n = 0;
+ else
+ memcpy(buffer + 4, buf, n);
- memcpy(&buffer[4], buf, n);
writefd_unbuffered(sock_f_out, buffer, n+4);
len -= n;