From 8ef246e0b5a3aa9944dab4275353c50aab9f955c Mon Sep 17 00:00:00 2001 From: Wayne Davison Date: Thu, 28 Dec 2006 07:54:27 +0000 Subject: [PATCH] - Handle the new incremental-recursion mode. - Changed some function names to make them more consistent. --- io.c | 325 ++++++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 210 insertions(+), 115 deletions(-) diff --git a/io.c b/io.c index d5d117fa..3ad91472 100644 --- a/io.c +++ b/io.c @@ -41,7 +41,10 @@ extern int am_server; extern int am_daemon; extern int am_sender; extern int am_generator; +extern int incremental; +extern int io_error; extern int eol_nulls; +extern int flist_eof; extern int read_batch; extern int csum_length; extern int checksum_seed; @@ -50,12 +53,12 @@ extern int remove_source_files; extern int preserve_hard_links; extern char *filesfrom_host; extern struct stats stats; -extern struct file_list *the_file_list; +extern struct file_list *cur_flist, *first_flist; const char phase_unknown[] = "unknown"; int ignore_timeout = 0; int batch_fd = -1; -int batch_gen_fd = -1; +int done_cnt = 0; /* Ignore an EOF error if non-zero. See whine_about_eof(). */ int kluge_around_eof = 0; @@ -65,6 +68,18 @@ int msg_fd_out = -1; int sock_f_in = -1; int sock_f_out = -1; +static int iobuf_f_in = -1; +static char *iobuf_in; +static size_t iobuf_in_siz; +static size_t iobuf_in_ndx; +static size_t iobuf_in_remaining; + +static int iobuf_f_out = -1; +static char *iobuf_out; +static int iobuf_out_cnt; + +int flist_forward_from = -1; + static int io_multiplexing_out; static int io_multiplexing_in; static time_t last_io_in; @@ -92,7 +107,11 @@ static char int_byte_cnt[64] = { 5, 5, 5, 5, 5, 5, 5, 5, 6, 6, 6, 6, 7, 7, 8, 9, /* (C0 - FF)/4 */ }; -static void read_loop(int fd, char *buf, size_t len); +static int readfd_unbuffered(int fd, char *buf, size_t len); +static void writefd(int fd, const char *buf, size_t len); +static void writefd_unbuffered(int fd, const char *buf, size_t len); +static void decrement_active_files(int ndx); +static void decrement_flist_in_progress(int ndx, int redo); struct flist_ndx_item { struct flist_ndx_item *next; @@ -237,6 +256,7 @@ static void read_msg_fd(void) { char buf[2048]; size_t n; + struct file_list *flist; int fd = msg_fd_in; int tag, len; @@ -244,7 +264,7 @@ static void read_msg_fd(void) * to this routine from writefd_unbuffered(). */ msg_fd_in = -1; - read_loop(fd, buf, 4); + readfd_unbuffered(fd, buf, 4); tag = IVAL(buf, 0); len = tag & 0xFFFFFF; @@ -253,50 +273,71 @@ static void read_msg_fd(void) switch (tag) { case MSG_DONE: if (len != 0 || !am_generator) { - rprintf(FERROR, "invalid message %d:%d\n", tag, len); + invalid_msg: + rprintf(FERROR, "invalid message %d:%d [%s%s]\n", + tag, len, who_am_i(), + incremental ? "/incremental" : ""); exit_cleanup(RERR_STREAMIO); } - flist_ndx_push(&redo_list, -1); + done_cnt++; break; case MSG_REDO: - if (len != 4 || !am_generator) { - rprintf(FERROR, "invalid message %d:%d\n", tag, len); - exit_cleanup(RERR_STREAMIO); - } - read_loop(fd, buf, 4); + if (len != 4 || !am_generator) + goto invalid_msg; + readfd_unbuffered(fd, buf, 4); if (remove_source_files) decrement_active_files(IVAL(buf,0)); flist_ndx_push(&redo_list, IVAL(buf,0)); + if (incremental) + decrement_flist_in_progress(IVAL(buf,0), 1); + break; + case MSG_FLIST: + if (len != 4 || !am_generator || !incremental) + goto invalid_msg; + readfd_unbuffered(fd, buf, 4); + /* Read extra file list from receiver. */ + assert(iobuf_in != NULL); + assert(iobuf_f_in == fd); + flist = recv_file_list(fd); + flist->parent_ndx = IVAL(buf,0); + break; + case MSG_FLIST_EOF: + if (len != 0 || !am_generator || !incremental) + goto invalid_msg; + flist_eof = 1; break; case MSG_DELETED: - if (len >= (int)sizeof buf || !am_generator) { - rprintf(FERROR, "invalid message %d:%d\n", tag, len); - exit_cleanup(RERR_STREAMIO); - } - read_loop(fd, buf, len); + if (len >= (int)sizeof buf || !am_generator) + goto invalid_msg; + readfd_unbuffered(fd, buf, len); send_msg(MSG_DELETED, buf, len); break; case MSG_SUCCESS: - if (len != 4 || !am_generator) { - rprintf(FERROR, "invalid message %d:%d\n", tag, len); - exit_cleanup(RERR_STREAMIO); - } - read_loop(fd, buf, len); + if (len != 4 || !am_generator) + goto invalid_msg; + readfd_unbuffered(fd, buf, len); if (remove_source_files) { decrement_active_files(IVAL(buf,0)); send_msg(MSG_SUCCESS, buf, len); } if (preserve_hard_links) flist_ndx_push(&hlink_list, IVAL(buf,0)); + if (incremental) + decrement_flist_in_progress(IVAL(buf,0), 0); + break; + case MSG_NO_SEND: + if (len != 4 || !am_generator) + goto invalid_msg; + readfd_unbuffered(fd, buf, len); + if (incremental) + decrement_flist_in_progress(IVAL(buf,0), 0); break; case MSG_SOCKERR: case MSG_CLIENT: - if (!am_generator) { - rprintf(FERROR, "invalid message %d:%d\n", tag, len); - exit_cleanup(RERR_STREAMIO); - } + if (!am_generator) + goto invalid_msg; if (tag == MSG_SOCKERR) - close_multiplexing_out(); + io_end_multiplex_out(); /* FALL THROUGH */ case MSG_INFO: case MSG_ERROR: @@ -305,7 +346,7 @@ static void read_msg_fd(void) n = len; if (n >= sizeof buf) n = sizeof buf - 1; - read_loop(fd, buf, n); + readfd_unbuffered(fd, buf, n); rwrite((enum logcode)tag, buf, n); len -= n; } @@ -334,51 +375,65 @@ void increment_active_files(int ndx, int itemizing, enum logcode code) } active_filecnt++; - active_bytecnt += F_LENGTH(the_file_list->files[ndx]); + active_bytecnt += F_LENGTH(cur_flist->files[ndx]); } -void decrement_active_files(int ndx) +static void decrement_active_files(int ndx) { + struct file_list *flist = flist_for_ndx(ndx); + assert(flist != NULL); active_filecnt--; - active_bytecnt -= F_LENGTH(the_file_list->files[ndx]); + active_bytecnt -= F_LENGTH(flist->files[ndx - flist->ndx_start]); +} + +static void decrement_flist_in_progress(int ndx, int redo) +{ + struct file_list *flist = cur_flist ? cur_flist : first_flist; + + while (ndx < flist->ndx_start) { + if (flist == first_flist) { + invalid_ndx: + rprintf(FERROR, + "Invalid file index: %d (%d - %d) [%s]\n", + ndx, first_flist->ndx_start, + first_flist->prev->ndx_start + first_flist->prev->count - 1, + who_am_i()); + exit_cleanup(RERR_PROTOCOL); + } + flist = flist->prev; + } + while (ndx >= flist->ndx_start + flist->count) { + if (!(flist = flist->next)) + goto invalid_ndx; + } + + flist->in_progress--; + if (redo) + flist->to_redo++; } /* Try to push messages off the list onto the wire. If we leave with more * to do, return 0. On error, return -1. If everything flushed, return 1. * This is only active in the receiver. */ -static int msg2genr_flush(int flush_it_all) +static int msg2genr_flush(void) { - static int written = 0; - struct timeval tv; - fd_set fds; - - if (msg_fd_out < 0) + if (msg_fd_out < 0 || no_flush) return -1; + no_flush++; while (msg2genr.head) { struct msg_list_item *m = msg2genr.head; - int n = write(msg_fd_out, m->buf + written, m->len - written); - if (n < 0) { - if (errno == EINTR) - continue; - if (errno != EWOULDBLOCK && errno != EAGAIN) - return -1; - if (!flush_it_all) - return 0; - FD_ZERO(&fds); - FD_SET(msg_fd_out, &fds); - tv.tv_sec = select_timeout; - tv.tv_usec = 0; - if (!select(msg_fd_out+1, NULL, &fds, NULL, &tv)) - check_timeout(); - } else if ((written += n) == m->len) { - msg2genr.head = m->next; - if (!msg2genr.head) - msg2genr.tail = NULL; - free(m); - written = 0; - } + writefd(msg_fd_out, m->buf, m->len); + msg2genr.head = m->next; + if (!msg2genr.head) + msg2genr.tail = NULL; + free(m); } + if (iobuf_out_cnt) { + writefd_unbuffered(iobuf_f_out, iobuf_out, iobuf_out_cnt); + iobuf_out_cnt = 0; + } + no_flush--; return 1; } @@ -393,7 +448,7 @@ int send_msg(enum msgcode code, const char *buf, int len) return 1; } msg_list_add(&msg2genr, code, buf, len); - msg2genr_flush(NORMAL_FLUSH); + msg2genr_flush(); return 1; } @@ -404,18 +459,13 @@ void send_msg_int(enum msgcode code, int num) send_msg(code, numbuf, 4); } -int get_redo_num(int itemizing, enum logcode code) +void wait_for_receiver(void) { - while (1) { -#ifdef SUPPORT_HARD_LINKS - if (hlink_list.head) - check_for_finished_hlinks(itemizing, code); -#endif - if (redo_list.head) - break; - read_msg_fd(); - } + read_msg_fd(); +} +int get_redo_num(void) +{ return flist_ndx_pop(&redo_list); } @@ -538,7 +588,7 @@ static int read_timeout(int fd, char *buf, size_t len) } if (msg2genr.head && FD_ISSET(msg_fd_out, &w_fds)) - msg2genr_flush(NORMAL_FLUSH); + msg2genr_flush(); if (io_filesfrom_f_out >= 0) { if (io_filesfrom_buflen) { @@ -617,7 +667,7 @@ static int read_timeout(int fd, char *buf, size_t len) /* Don't write errors on a dead socket. */ if (fd == sock_f_in) { - close_multiplexing_out(); + io_end_multiplex_out(); rsyserr(FSOCKERR, errno, "read error"); } else rsyserr(FERROR, errno, "read error"); @@ -688,37 +738,51 @@ int read_filesfrom_line(int fd, char *fname) return s - fname; } -static char *iobuf_out; -static int iobuf_out_cnt; - -void io_start_buffering_out(void) +int io_start_buffering_out(int f_out) { - if (iobuf_out) - return; + if (iobuf_out) { + assert(f_out == iobuf_f_out); + return 0; + } if (!(iobuf_out = new_array(char, IO_BUFFER_SIZE))) out_of_memory("io_start_buffering_out"); iobuf_out_cnt = 0; + iobuf_f_out = f_out; + return 1; } -static char *iobuf_in; -static size_t iobuf_in_siz; - -void io_start_buffering_in(void) +int io_start_buffering_in(int f_in) { - if (iobuf_in) - return; + if (iobuf_in) { + assert(f_in == iobuf_f_in); + return 0; + } iobuf_in_siz = 2 * IO_BUFFER_SIZE; if (!(iobuf_in = new_array(char, iobuf_in_siz))) out_of_memory("io_start_buffering_in"); + iobuf_f_in = f_in; + return 1; } -void io_end_buffering(void) +void io_end_buffering_in(void) { - io_flush(NORMAL_FLUSH); - if (!io_multiplexing_out) { - free(iobuf_out); - iobuf_out = NULL; - } + if (!iobuf_in) + return; + free(iobuf_in); + iobuf_in = NULL; + iobuf_in_ndx = 0; + iobuf_in_remaining = 0; + iobuf_f_in = -1; +} + +void io_end_buffering_out(void) +{ + if (!iobuf_out) + return; + io_flush(FULL_FLUSH); + free(iobuf_out); + iobuf_out = NULL; + iobuf_f_out = -1; } void maybe_flush_socket(void) @@ -733,14 +797,31 @@ void maybe_send_keepalive(void) if (!iobuf_out || !iobuf_out_cnt) { if (protocol_version < 29) return; /* there's nothing we can do */ - write_int(sock_f_out, the_file_list->count); - write_shortint(sock_f_out, ITEM_IS_NEW); + if (protocol_version >= 30) + send_msg(MSG_NOOP, "", 0); + else { + write_int(sock_f_out, cur_flist->count); + write_shortint(sock_f_out, ITEM_IS_NEW); + } } if (iobuf_out) io_flush(NORMAL_FLUSH); } } +void start_flist_forward(int f_in) +{ + assert(iobuf_out != NULL); + assert(iobuf_f_out == msg_fd_out); + flist_forward_from = f_in; +} + +void stop_flist_forward() +{ + io_flush(NORMAL_FLUSH); + flist_forward_from = -1; +} + /** * Continue trying to read len bytes - don't return until len has been * read. @@ -763,26 +844,24 @@ static void read_loop(int fd, char *buf, size_t len) */ static int readfd_unbuffered(int fd, char *buf, size_t len) { - static size_t remaining; - static size_t iobuf_in_ndx; size_t msg_bytes; int tag, cnt = 0; char line[BIGPATHBUFLEN]; - if (!iobuf_in || fd != sock_f_in) + if (!iobuf_in || fd != iobuf_f_in) return read_timeout(fd, buf, len); - if (!io_multiplexing_in && remaining == 0) { - remaining = read_timeout(fd, iobuf_in, iobuf_in_siz); + if (!io_multiplexing_in && iobuf_in_remaining == 0) { + iobuf_in_remaining = read_timeout(fd, iobuf_in, iobuf_in_siz); iobuf_in_ndx = 0; } while (cnt == 0) { - if (remaining) { - len = MIN(len, remaining); + if (iobuf_in_remaining) { + len = MIN(len, iobuf_in_remaining); memcpy(buf, iobuf_in + iobuf_in_ndx, len); iobuf_in_ndx += len; - remaining -= len; + iobuf_in_remaining -= len; cnt = len; break; } @@ -802,9 +881,19 @@ static int readfd_unbuffered(int fd, char *buf, size_t len) iobuf_in_siz = msg_bytes; } read_loop(fd, iobuf_in, msg_bytes); - remaining = msg_bytes; + iobuf_in_remaining = msg_bytes; iobuf_in_ndx = 0; break; + case MSG_NOOP: + if (am_sender) + maybe_send_keepalive(); + break; + case MSG_IO_ERROR: + if (msg_bytes != 4) + goto invalid_msg; + read_loop(fd, line, msg_bytes); + io_error |= IVAL(line, 0); + break; case MSG_DELETED: if (msg_bytes >= sizeof line) goto overflow; @@ -819,6 +908,7 @@ static int readfd_unbuffered(int fd, char *buf, size_t len) break; case MSG_SUCCESS: if (msg_bytes != 4) { + invalid_msg: rprintf(FERROR, "invalid multi-message %d:%ld [%s]\n", tag, (long)msg_bytes, who_am_i()); exit_cleanup(RERR_STREAMIO); @@ -826,6 +916,12 @@ static int readfd_unbuffered(int fd, char *buf, size_t len) read_loop(fd, line, msg_bytes); successful_send(IVAL(line, 0)); break; + case MSG_NO_SEND: + if (msg_bytes != 4) + goto invalid_msg; + read_loop(fd, line, msg_bytes); + send_msg_int(MSG_NO_SEND, IVAL(line, 0)); + break; case MSG_INFO: case MSG_ERROR: if (msg_bytes >= sizeof line) { @@ -845,7 +941,7 @@ static int readfd_unbuffered(int fd, char *buf, size_t len) } } - if (remaining == 0) + if (iobuf_in_remaining == 0) io_flush(NORMAL_FLUSH); return cnt; @@ -871,6 +967,9 @@ static void readfd(int fd, char *buffer, size_t N) exit_cleanup(RERR_FILEIO); } + if (fd == flist_forward_from) + writefd(iobuf_f_out, buffer, total); + if (fd == sock_f_in) stats.total_read += total; } @@ -1171,7 +1270,7 @@ static void writefd_unbuffered(int fd, const char *buf, size_t len) /* Don't try to write errors back across the stream. */ if (fd == sock_f_out) - close_multiplexing_out(); + io_end_multiplex_out(); rsyserr(FERROR, errno, "writefd_unbuffered failed to write %ld bytes [%s]", (long)len, who_am_i()); @@ -1246,9 +1345,9 @@ static void mplex_write(enum msgcode code, const char *buf, size_t len) } } -void io_flush(int flush_it_all) +void io_flush(UNUSED(int flush_it_all)) { - msg2genr_flush(flush_it_all); + msg2genr_flush(); msg2sndr_flush(); if (!iobuf_out_cnt || no_flush) @@ -1257,17 +1356,12 @@ void io_flush(int flush_it_all) if (io_multiplexing_out) mplex_write(MSG_DATA, iobuf_out, iobuf_out_cnt); else - writefd_unbuffered(sock_f_out, iobuf_out, iobuf_out_cnt); + writefd_unbuffered(iobuf_f_out, iobuf_out, iobuf_out_cnt); iobuf_out_cnt = 0; } static void writefd(int fd, const char *buf, size_t len) { - if (fd == msg_fd_out) { - rprintf(FERROR, "Internal error: wrong write used in receiver.\n"); - exit_cleanup(RERR_PROTOCOL); - } - if (fd == sock_f_out) stats.total_written += len; @@ -1276,7 +1370,7 @@ static void writefd(int fd, const char *buf, size_t len) exit_cleanup(RERR_FILEIO); } - if (!iobuf_out || fd != sock_f_out) { + if (!iobuf_out || fd != iobuf_f_out) { writefd_unbuffered(fd, buf, len); return; } @@ -1499,7 +1593,7 @@ void io_printf(int fd, const char *format, ...) void io_start_multiplex_out(void) { io_flush(NORMAL_FLUSH); - io_start_buffering_out(); + io_start_buffering_out(sock_f_out); io_multiplexing_out = 1; } @@ -1507,7 +1601,7 @@ void io_start_multiplex_out(void) void io_start_multiplex_in(void) { io_flush(NORMAL_FLUSH); - io_start_buffering_in(); + io_start_buffering_in(sock_f_in); io_multiplexing_in = 1; } @@ -1516,22 +1610,23 @@ int io_multiplex_write(enum msgcode code, const char *buf, size_t len) { if (!io_multiplexing_out) return 0; - io_flush(NORMAL_FLUSH); stats.total_written += (len+4); mplex_write(code, buf, len); return 1; } -void close_multiplexing_in(void) +void io_end_multiplex_in(void) { io_multiplexing_in = 0; + io_end_buffering_in(); } /** Stop output multiplexing. */ -void close_multiplexing_out(void) +void io_end_multiplex_out(void) { io_multiplexing_out = 0; + io_end_buffering_out(); } void start_write_batch(int fd) -- 2.34.1