extern int am_daemon;
extern int am_sender;
extern int am_generator;
+extern int incremental;
+extern int io_error;
extern int eol_nulls;
+extern int flist_eof;
extern int read_batch;
extern int csum_length;
extern int checksum_seed;
extern int preserve_hard_links;
extern char *filesfrom_host;
extern struct stats stats;
-extern struct file_list *the_file_list;
+extern struct file_list *cur_flist, *first_flist;
const char phase_unknown[] = "unknown";
int ignore_timeout = 0;
int batch_fd = -1;
-int batch_gen_fd = -1;
+int done_cnt = 0;
/* Ignore an EOF error if non-zero. See whine_about_eof(). */
int kluge_around_eof = 0;
int sock_f_in = -1;
int sock_f_out = -1;
+static int iobuf_f_in = -1;
+static char *iobuf_in;
+static size_t iobuf_in_siz;
+static size_t iobuf_in_ndx;
+static size_t iobuf_in_remaining;
+
+static int iobuf_f_out = -1;
+static char *iobuf_out;
+static int iobuf_out_cnt;
+
+int flist_forward_from = -1;
+
static int io_multiplexing_out;
static int io_multiplexing_in;
static time_t last_io_in;
5, 5, 5, 5, 5, 5, 5, 5, 6, 6, 6, 6, 7, 7, 8, 9, /* (C0 - FF)/4 */
};
-static void read_loop(int fd, char *buf, size_t len);
+static int readfd_unbuffered(int fd, char *buf, size_t len);
+static void writefd(int fd, const char *buf, size_t len);
+static void writefd_unbuffered(int fd, const char *buf, size_t len);
+static void decrement_active_files(int ndx);
+static void decrement_flist_in_progress(int ndx, int redo);
struct flist_ndx_item {
struct flist_ndx_item *next;
{
char buf[2048];
size_t n;
+ struct file_list *flist;
int fd = msg_fd_in;
int tag, len;
* to this routine from writefd_unbuffered(). */
msg_fd_in = -1;
- read_loop(fd, buf, 4);
+ readfd_unbuffered(fd, buf, 4);
tag = IVAL(buf, 0);
len = tag & 0xFFFFFF;
switch (tag) {
case MSG_DONE:
if (len != 0 || !am_generator) {
- rprintf(FERROR, "invalid message %d:%d\n", tag, len);
+ invalid_msg:
+ rprintf(FERROR, "invalid message %d:%d [%s%s]\n",
+ tag, len, who_am_i(),
+ incremental ? "/incremental" : "");
exit_cleanup(RERR_STREAMIO);
}
- flist_ndx_push(&redo_list, -1);
+ done_cnt++;
break;
case MSG_REDO:
- if (len != 4 || !am_generator) {
- rprintf(FERROR, "invalid message %d:%d\n", tag, len);
- exit_cleanup(RERR_STREAMIO);
- }
- read_loop(fd, buf, 4);
+ if (len != 4 || !am_generator)
+ goto invalid_msg;
+ readfd_unbuffered(fd, buf, 4);
if (remove_source_files)
decrement_active_files(IVAL(buf,0));
flist_ndx_push(&redo_list, IVAL(buf,0));
+ if (incremental)
+ decrement_flist_in_progress(IVAL(buf,0), 1);
+ break;
+ case MSG_FLIST:
+ if (len != 4 || !am_generator || !incremental)
+ goto invalid_msg;
+ readfd_unbuffered(fd, buf, 4);
+ /* Read extra file list from receiver. */
+ assert(iobuf_in != NULL);
+ assert(iobuf_f_in == fd);
+ flist = recv_file_list(fd);
+ flist->parent_ndx = IVAL(buf,0);
+ break;
+ case MSG_FLIST_EOF:
+ if (len != 0 || !am_generator || !incremental)
+ goto invalid_msg;
+ flist_eof = 1;
break;
case MSG_DELETED:
- if (len >= (int)sizeof buf || !am_generator) {
- rprintf(FERROR, "invalid message %d:%d\n", tag, len);
- exit_cleanup(RERR_STREAMIO);
- }
- read_loop(fd, buf, len);
+ if (len >= (int)sizeof buf || !am_generator)
+ goto invalid_msg;
+ readfd_unbuffered(fd, buf, len);
send_msg(MSG_DELETED, buf, len);
break;
case MSG_SUCCESS:
- if (len != 4 || !am_generator) {
- rprintf(FERROR, "invalid message %d:%d\n", tag, len);
- exit_cleanup(RERR_STREAMIO);
- }
- read_loop(fd, buf, len);
+ if (len != 4 || !am_generator)
+ goto invalid_msg;
+ readfd_unbuffered(fd, buf, len);
if (remove_source_files) {
decrement_active_files(IVAL(buf,0));
send_msg(MSG_SUCCESS, buf, len);
}
if (preserve_hard_links)
flist_ndx_push(&hlink_list, IVAL(buf,0));
+ if (incremental)
+ decrement_flist_in_progress(IVAL(buf,0), 0);
+ break;
+ case MSG_NO_SEND:
+ if (len != 4 || !am_generator)
+ goto invalid_msg;
+ readfd_unbuffered(fd, buf, len);
+ if (incremental)
+ decrement_flist_in_progress(IVAL(buf,0), 0);
break;
case MSG_SOCKERR:
case MSG_CLIENT:
- if (!am_generator) {
- rprintf(FERROR, "invalid message %d:%d\n", tag, len);
- exit_cleanup(RERR_STREAMIO);
- }
+ if (!am_generator)
+ goto invalid_msg;
if (tag == MSG_SOCKERR)
- close_multiplexing_out();
+ io_end_multiplex_out();
/* FALL THROUGH */
case MSG_INFO:
case MSG_ERROR:
n = len;
if (n >= sizeof buf)
n = sizeof buf - 1;
- read_loop(fd, buf, n);
+ readfd_unbuffered(fd, buf, n);
rwrite((enum logcode)tag, buf, n);
len -= n;
}
}
active_filecnt++;
- active_bytecnt += F_LENGTH(the_file_list->files[ndx]);
+ active_bytecnt += F_LENGTH(cur_flist->files[ndx]);
}
-void decrement_active_files(int ndx)
+static void decrement_active_files(int ndx)
{
+ struct file_list *flist = flist_for_ndx(ndx);
+ assert(flist != NULL);
active_filecnt--;
- active_bytecnt -= F_LENGTH(the_file_list->files[ndx]);
+ active_bytecnt -= F_LENGTH(flist->files[ndx - flist->ndx_start]);
+}
+
+static void decrement_flist_in_progress(int ndx, int redo)
+{
+ struct file_list *flist = cur_flist ? cur_flist : first_flist;
+
+ while (ndx < flist->ndx_start) {
+ if (flist == first_flist) {
+ invalid_ndx:
+ rprintf(FERROR,
+ "Invalid file index: %d (%d - %d) [%s]\n",
+ ndx, first_flist->ndx_start,
+ first_flist->prev->ndx_start + first_flist->prev->count - 1,
+ who_am_i());
+ exit_cleanup(RERR_PROTOCOL);
+ }
+ flist = flist->prev;
+ }
+ while (ndx >= flist->ndx_start + flist->count) {
+ if (!(flist = flist->next))
+ goto invalid_ndx;
+ }
+
+ flist->in_progress--;
+ if (redo)
+ flist->to_redo++;
}
/* Try to push messages off the list onto the wire. If we leave with more
* to do, return 0. On error, return -1. If everything flushed, return 1.
* This is only active in the receiver. */
-static int msg2genr_flush(int flush_it_all)
+static int msg2genr_flush(void)
{
- static int written = 0;
- struct timeval tv;
- fd_set fds;
-
- if (msg_fd_out < 0)
+ if (msg_fd_out < 0 || no_flush)
return -1;
+ no_flush++;
while (msg2genr.head) {
struct msg_list_item *m = msg2genr.head;
- int n = write(msg_fd_out, m->buf + written, m->len - written);
- if (n < 0) {
- if (errno == EINTR)
- continue;
- if (errno != EWOULDBLOCK && errno != EAGAIN)
- return -1;
- if (!flush_it_all)
- return 0;
- FD_ZERO(&fds);
- FD_SET(msg_fd_out, &fds);
- tv.tv_sec = select_timeout;
- tv.tv_usec = 0;
- if (!select(msg_fd_out+1, NULL, &fds, NULL, &tv))
- check_timeout();
- } else if ((written += n) == m->len) {
- msg2genr.head = m->next;
- if (!msg2genr.head)
- msg2genr.tail = NULL;
- free(m);
- written = 0;
- }
+ writefd(msg_fd_out, m->buf, m->len);
+ msg2genr.head = m->next;
+ if (!msg2genr.head)
+ msg2genr.tail = NULL;
+ free(m);
}
+ if (iobuf_out_cnt) {
+ writefd_unbuffered(iobuf_f_out, iobuf_out, iobuf_out_cnt);
+ iobuf_out_cnt = 0;
+ }
+ no_flush--;
return 1;
}
return 1;
}
msg_list_add(&msg2genr, code, buf, len);
- msg2genr_flush(NORMAL_FLUSH);
+ msg2genr_flush();
return 1;
}
send_msg(code, numbuf, 4);
}
-int get_redo_num(int itemizing, enum logcode code)
+void wait_for_receiver(void)
{
- while (1) {
-#ifdef SUPPORT_HARD_LINKS
- if (hlink_list.head)
- check_for_finished_hlinks(itemizing, code);
-#endif
- if (redo_list.head)
- break;
- read_msg_fd();
- }
+ read_msg_fd();
+}
+int get_redo_num(void)
+{
return flist_ndx_pop(&redo_list);
}
}
if (msg2genr.head && FD_ISSET(msg_fd_out, &w_fds))
- msg2genr_flush(NORMAL_FLUSH);
+ msg2genr_flush();
if (io_filesfrom_f_out >= 0) {
if (io_filesfrom_buflen) {
/* Don't write errors on a dead socket. */
if (fd == sock_f_in) {
- close_multiplexing_out();
+ io_end_multiplex_out();
rsyserr(FSOCKERR, errno, "read error");
} else
rsyserr(FERROR, errno, "read error");
return s - fname;
}
-static char *iobuf_out;
-static int iobuf_out_cnt;
-
-void io_start_buffering_out(void)
+int io_start_buffering_out(int f_out)
{
- if (iobuf_out)
- return;
+ if (iobuf_out) {
+ assert(f_out == iobuf_f_out);
+ return 0;
+ }
if (!(iobuf_out = new_array(char, IO_BUFFER_SIZE)))
out_of_memory("io_start_buffering_out");
iobuf_out_cnt = 0;
+ iobuf_f_out = f_out;
+ return 1;
}
-static char *iobuf_in;
-static size_t iobuf_in_siz;
-
-void io_start_buffering_in(void)
+int io_start_buffering_in(int f_in)
{
- if (iobuf_in)
- return;
+ if (iobuf_in) {
+ assert(f_in == iobuf_f_in);
+ return 0;
+ }
iobuf_in_siz = 2 * IO_BUFFER_SIZE;
if (!(iobuf_in = new_array(char, iobuf_in_siz)))
out_of_memory("io_start_buffering_in");
+ iobuf_f_in = f_in;
+ return 1;
}
-void io_end_buffering(void)
+void io_end_buffering_in(void)
{
- io_flush(NORMAL_FLUSH);
- if (!io_multiplexing_out) {
- free(iobuf_out);
- iobuf_out = NULL;
- }
+ if (!iobuf_in)
+ return;
+ free(iobuf_in);
+ iobuf_in = NULL;
+ iobuf_in_ndx = 0;
+ iobuf_in_remaining = 0;
+ iobuf_f_in = -1;
+}
+
+void io_end_buffering_out(void)
+{
+ if (!iobuf_out)
+ return;
+ io_flush(FULL_FLUSH);
+ free(iobuf_out);
+ iobuf_out = NULL;
+ iobuf_f_out = -1;
}
void maybe_flush_socket(void)
if (!iobuf_out || !iobuf_out_cnt) {
if (protocol_version < 29)
return; /* there's nothing we can do */
- write_int(sock_f_out, the_file_list->count);
- write_shortint(sock_f_out, ITEM_IS_NEW);
+ if (protocol_version >= 30)
+ send_msg(MSG_NOOP, "", 0);
+ else {
+ write_int(sock_f_out, cur_flist->count);
+ write_shortint(sock_f_out, ITEM_IS_NEW);
+ }
}
if (iobuf_out)
io_flush(NORMAL_FLUSH);
}
}
+void start_flist_forward(int f_in)
+{
+ assert(iobuf_out != NULL);
+ assert(iobuf_f_out == msg_fd_out);
+ flist_forward_from = f_in;
+}
+
+void stop_flist_forward()
+{
+ io_flush(NORMAL_FLUSH);
+ flist_forward_from = -1;
+}
+
/**
* Continue trying to read len bytes - don't return until len has been
* read.
*/
static int readfd_unbuffered(int fd, char *buf, size_t len)
{
- static size_t remaining;
- static size_t iobuf_in_ndx;
size_t msg_bytes;
int tag, cnt = 0;
char line[BIGPATHBUFLEN];
- if (!iobuf_in || fd != sock_f_in)
+ if (!iobuf_in || fd != iobuf_f_in)
return read_timeout(fd, buf, len);
- if (!io_multiplexing_in && remaining == 0) {
- remaining = read_timeout(fd, iobuf_in, iobuf_in_siz);
+ if (!io_multiplexing_in && iobuf_in_remaining == 0) {
+ iobuf_in_remaining = read_timeout(fd, iobuf_in, iobuf_in_siz);
iobuf_in_ndx = 0;
}
while (cnt == 0) {
- if (remaining) {
- len = MIN(len, remaining);
+ if (iobuf_in_remaining) {
+ len = MIN(len, iobuf_in_remaining);
memcpy(buf, iobuf_in + iobuf_in_ndx, len);
iobuf_in_ndx += len;
- remaining -= len;
+ iobuf_in_remaining -= len;
cnt = len;
break;
}
iobuf_in_siz = msg_bytes;
}
read_loop(fd, iobuf_in, msg_bytes);
- remaining = msg_bytes;
+ iobuf_in_remaining = msg_bytes;
iobuf_in_ndx = 0;
break;
+ case MSG_NOOP:
+ if (am_sender)
+ maybe_send_keepalive();
+ break;
+ case MSG_IO_ERROR:
+ if (msg_bytes != 4)
+ goto invalid_msg;
+ read_loop(fd, line, msg_bytes);
+ io_error |= IVAL(line, 0);
+ break;
case MSG_DELETED:
if (msg_bytes >= sizeof line)
goto overflow;
break;
case MSG_SUCCESS:
if (msg_bytes != 4) {
+ invalid_msg:
rprintf(FERROR, "invalid multi-message %d:%ld [%s]\n",
tag, (long)msg_bytes, who_am_i());
exit_cleanup(RERR_STREAMIO);
read_loop(fd, line, msg_bytes);
successful_send(IVAL(line, 0));
break;
+ case MSG_NO_SEND:
+ if (msg_bytes != 4)
+ goto invalid_msg;
+ read_loop(fd, line, msg_bytes);
+ send_msg_int(MSG_NO_SEND, IVAL(line, 0));
+ break;
case MSG_INFO:
case MSG_ERROR:
if (msg_bytes >= sizeof line) {
}
}
- if (remaining == 0)
+ if (iobuf_in_remaining == 0)
io_flush(NORMAL_FLUSH);
return cnt;
exit_cleanup(RERR_FILEIO);
}
+ if (fd == flist_forward_from)
+ writefd(iobuf_f_out, buffer, total);
+
if (fd == sock_f_in)
stats.total_read += total;
}
/* Don't try to write errors back across the stream. */
if (fd == sock_f_out)
- close_multiplexing_out();
+ io_end_multiplex_out();
rsyserr(FERROR, errno,
"writefd_unbuffered failed to write %ld bytes [%s]",
(long)len, who_am_i());
}
}
-void io_flush(int flush_it_all)
+void io_flush(UNUSED(int flush_it_all))
{
- msg2genr_flush(flush_it_all);
+ msg2genr_flush();
msg2sndr_flush();
if (!iobuf_out_cnt || no_flush)
if (io_multiplexing_out)
mplex_write(MSG_DATA, iobuf_out, iobuf_out_cnt);
else
- writefd_unbuffered(sock_f_out, iobuf_out, iobuf_out_cnt);
+ writefd_unbuffered(iobuf_f_out, iobuf_out, iobuf_out_cnt);
iobuf_out_cnt = 0;
}
static void writefd(int fd, const char *buf, size_t len)
{
- if (fd == msg_fd_out) {
- rprintf(FERROR, "Internal error: wrong write used in receiver.\n");
- exit_cleanup(RERR_PROTOCOL);
- }
-
if (fd == sock_f_out)
stats.total_written += len;
exit_cleanup(RERR_FILEIO);
}
- if (!iobuf_out || fd != sock_f_out) {
+ if (!iobuf_out || fd != iobuf_f_out) {
writefd_unbuffered(fd, buf, len);
return;
}
void io_start_multiplex_out(void)
{
io_flush(NORMAL_FLUSH);
- io_start_buffering_out();
+ io_start_buffering_out(sock_f_out);
io_multiplexing_out = 1;
}
void io_start_multiplex_in(void)
{
io_flush(NORMAL_FLUSH);
- io_start_buffering_in();
+ io_start_buffering_in(sock_f_in);
io_multiplexing_in = 1;
}
{
if (!io_multiplexing_out)
return 0;
-
io_flush(NORMAL_FLUSH);
stats.total_written += (len+4);
mplex_write(code, buf, len);
return 1;
}
-void close_multiplexing_in(void)
+void io_end_multiplex_in(void)
{
io_multiplexing_in = 0;
+ io_end_buffering_in();
}
/** Stop output multiplexing. */
-void close_multiplexing_out(void)
+void io_end_multiplex_out(void)
{
io_multiplexing_out = 0;
+ io_end_buffering_out();
}
void start_write_batch(int fd)