From 7dda3c6cc0fe747b08b8e18b5f1e7a8a58059822 Mon Sep 17 00:00:00 2001 From: Wayne Davison Date: Fri, 15 Dec 2006 03:14:31 +0000 Subject: [PATCH] Removing this patch for now, as I'm not considering using threads to replace the two receiver-side processes at the moment. --- threaded-receiver.diff | 1276 ---------------------------------------- 1 file changed, 1276 deletions(-) delete mode 100644 threaded-receiver.diff diff --git a/threaded-receiver.diff b/threaded-receiver.diff deleted file mode 100644 index 8129c4d..0000000 --- a/threaded-receiver.diff +++ /dev/null @@ -1,1276 +0,0 @@ -This patch changes the receiving side to have the receiving code use a thread -instead of a forked process. This extra thread does read from the socket, but -it sends any stdout/stderr messages to the generator (main thread) to output. - -** This is very new code. ** Yes, it passes the "make test" testsuite, but -there may still be some problems, especially in some of the untested features. -(For one thing, I haven't yet added code to properly handle any keep-alive -messages that arrive on the receiving side during the --delete-after phase!) - -This code just uses pthread.h directly, so configure changes will probably be -needed to make this compatible with more systems. I have also tested that -this code works fine using the GNU pth library without any code changes if -you configured it with --enable-syscall-soft --enable-pthread (you may need -to twiddle the Makefile options if you didn't install the library, though). - -NOTE: we still need to duplicate the partial_fname static in util.c! - -If you try this out, please send some email to wayned@samba.org or the rsync -mailing list with your results, build changes, bug reports, etc. Thanks! - -To use this patch, run these commands for a successful build: - - patch -p1 >= 1) && b; b--) {} - /* add a bit, subtract rollsum, round up. */ - s2length = (b + 1 - 32 + 7) / 8; /* --optimize in compiler-- */ -- s2length = MAX(s2length, csum_length); -+ s2length = MAX(s2length, GEN_csum_length); - s2length = MIN(s2length, SUM_LENGTH); - } - -@@ -553,7 +557,7 @@ static void generate_and_send_sums(int f - sum_sizes_sqroot(&sum, len); - write_sum_head(f_out, &sum); - -- if (append_mode > 0 && f_copy < 0) -+ if (GEN_append_mode > 0 && f_copy < 0) - return; - - if (len > 0) -@@ -572,7 +576,7 @@ static void generate_and_send_sums(int f - - if (f_copy >= 0) { - full_write(f_copy, map, n1); -- if (append_mode > 0) -+ if (GEN_append_mode > 0) - continue; - } - -@@ -1348,7 +1352,7 @@ static void recv_generator(char *fname, - return; - } - -- if (append_mode && st.st_size > file->length) -+ if (GEN_append_mode && st.st_size > file->length) - return; - - if (fnamecmp_type <= FNAMECMP_BASIS_DIR_HIGH) -@@ -1411,7 +1415,7 @@ static void recv_generator(char *fname, - goto notify_others; - } - -- if (inplace && make_backups && fnamecmp_type == FNAMECMP_FNAME) { -+ if (inplace && GEN_make_backups && fnamecmp_type == FNAMECMP_FNAME) { - if (!(backupptr = get_backup_name(fname))) { - close(fd); - return; -@@ -1502,9 +1506,12 @@ void generate_files(int f_out, struct fi - int save_ignore_existing = ignore_existing; - int save_ignore_non_existing = ignore_non_existing; - int save_do_progress = do_progress; -- int save_make_backups = make_backups; -+ int save_make_backups = GEN_make_backups = make_backups; - int dir_tweaking = !(list_only || local_name || dry_run); - -+ GEN_append_mode = append_mode; -+ GEN_csum_length = csum_length; -+ - if (protocol_version >= 29) { - itemizing = 1; - maybe_ATTRS_REPORT = stdout_format_has_i ? 0 : ATTRS_REPORT; -@@ -1532,7 +1539,7 @@ void generate_files(int f_out, struct fi - do_delete_pass(flist); - do_progress = 0; - -- if (append_mode || whole_file < 0) -+ if (GEN_append_mode || whole_file < 0) - whole_file = 0; - if (verbose >= 2) { - rprintf(FINFO, "delta-transmission %s\n", -@@ -1541,12 +1548,6 @@ void generate_files(int f_out, struct fi - : "enabled"); - } - -- /* Since we often fill up the outgoing socket and then just sit around -- * waiting for the other 2 processes to do their thing, we don't want -- * to exit on a timeout. If the data stops flowing, the receiver will -- * notice that and let us know via the redo pipe (or its closing). */ -- ignore_timeout = 1; -- - for (i = 0; i < flist->count; i++) { - struct file_struct *file = flist->files[i]; - -@@ -1590,23 +1591,34 @@ void generate_files(int f_out, struct fi - delete_in_dir(NULL, NULL, NULL, NULL); - - phase++; -- csum_length = SUM_LENGTH; -+ GEN_csum_length = SUM_LENGTH; /* csum_length is set by the receiver */ - max_size = min_size = ignore_existing = ignore_non_existing = 0; - update_only = always_checksum = size_only = 0; - ignore_times = 1; -- if (append_mode) /* resend w/o append mode */ -- append_mode = -1; /* ... but only longer files */ -- make_backups = 0; /* avoid a duplicate backup for inplace processing */ -+ if (GEN_append_mode) /* resend w/o append mode */ -+ GEN_append_mode = -1; /* ... but only longer files */ -+ GEN_make_backups = 0; /* avoid a duplicate backup for inplace processing */ - - if (verbose > 2) - rprintf(FINFO,"generate_files phase=%d\n",phase); - - write_int(f_out, -1); -+ io_flush(NORMAL_FLUSH); - - /* files can cycle through the system more than once - * to catch initial checksum errors */ -- while ((i = get_redo_num(itemizing, code)) != -1) { -- struct file_struct *file = flist->files[i]; -+ while (1) { -+ struct file_struct *file; -+ if (preserve_hard_links) -+ check_for_finished_hlinks(itemizing, code); -+ if ((i = get_redo_num()) < 0) { -+ if (i == -2) -+ break; -+ io_flush(NORMAL_FLUSH); -+ msleep(20); -+ continue; -+ } -+ file = flist->files[i]; - if (local_name) - strlcpy(fbuf, local_name, sizeof fbuf); - else -@@ -1618,27 +1630,43 @@ void generate_files(int f_out, struct fi - phase++; - ignore_non_existing = save_ignore_non_existing; - ignore_existing = save_ignore_existing; -- make_backups = save_make_backups; -+ GEN_make_backups = save_make_backups; - - if (verbose > 2) - rprintf(FINFO,"generate_files phase=%d\n",phase); - - write_int(f_out, -1); -+ io_flush(NORMAL_FLUSH); -+ - /* Reduce round-trip lag-time for a useless delay-updates phase. */ -- if (protocol_version >= 29 && !delay_updates) -+ if (protocol_version >= 29 && !delay_updates) { - write_int(f_out, -1); -+ io_flush(NORMAL_FLUSH); -+ } - -- /* Read MSG_DONE for the redo phase (and any prior messages). */ -- get_redo_num(itemizing, code); -+ /* Read end marker for the redo phase (and any prior messages). */ -+ while (1) { -+ if (preserve_hard_links) -+ check_for_finished_hlinks(itemizing, code); -+ if (get_redo_num() == -2) -+ break; -+ io_flush(NORMAL_FLUSH); -+ msleep(20); -+ } - - if (protocol_version >= 29) { - phase++; - if (verbose > 2) - rprintf(FINFO, "generate_files phase=%d\n", phase); -- if (delay_updates) -+ if (delay_updates) { - write_int(f_out, -1); -- /* Read MSG_DONE for delay-updates phase & prior messages. */ -- get_redo_num(itemizing, code); -+ io_flush(NORMAL_FLUSH); -+ } -+ /* Read end marker for delay-updates phase & prior messages. */ -+ while (get_redo_num() != -2) { -+ io_flush(NORMAL_FLUSH); -+ msleep(20); -+ } - } - - do_progress = save_do_progress; ---- old/io.c -+++ new/io.c -@@ -40,20 +40,17 @@ extern int allowed_lull; - extern int am_server; - extern int am_daemon; - extern int am_sender; --extern int am_generator; - extern int eol_nulls; - extern int read_batch; - extern int csum_length; - extern int checksum_seed; - extern int protocol_version; --extern int remove_source_files; - extern int preserve_hard_links; - extern char *filesfrom_host; - extern struct stats stats; - extern struct file_list *the_file_list; - - const char phase_unknown[] = "unknown"; --int ignore_timeout = 0; - int batch_fd = -1; - int batch_gen_fd = -1; - -@@ -61,7 +58,6 @@ int batch_gen_fd = -1; - int kluge_around_eof = 0; - - int msg_fd_in = -1; --int msg_fd_out = -1; - int sock_f_in = -1; - int sock_f_out = -1; - -@@ -88,27 +84,31 @@ static OFF_T active_bytecnt = 0; - static void read_loop(int fd, char *buf, size_t len); - - struct flist_ndx_item { -- struct flist_ndx_item *next; -+ volatile struct flist_ndx_item *next; - int ndx; - }; - - struct flist_ndx_list { -- struct flist_ndx_item *head, *tail; -+ volatile struct flist_ndx_item *head, *tail; -+ pthread_mutex_t mutex; - }; - --static struct flist_ndx_list redo_list, hlink_list; -+static struct flist_ndx_list redo_list = { NULL, NULL, PTHREAD_MUTEX_INITIALIZER }; -+static struct flist_ndx_list hlink_list = { NULL, NULL, PTHREAD_MUTEX_INITIALIZER }; - - struct msg_list_item { -- struct msg_list_item *next; -+ volatile struct msg_list_item *next; - int len; -+ enum msgcode code; - char buf[1]; - }; - - struct msg_list { -- struct msg_list_item *head, *tail; -+ volatile struct msg_list_item *head, *tail; -+ pthread_mutex_t mutex; - }; - --static struct msg_list msg2genr, msg2sndr; -+static struct msg_list msg_list = { NULL, NULL, PTHREAD_MUTEX_INITIALIZER }; - - static void flist_ndx_push(struct flist_ndx_list *lp, int ndx) - { -@@ -118,27 +118,31 @@ static void flist_ndx_push(struct flist_ - out_of_memory("flist_ndx_push"); - item->next = NULL; - item->ndx = ndx; -+ pthread_mutex_lock(&redo_list.mutex); - if (lp->tail) - lp->tail->next = item; - else - lp->head = item; - lp->tail = item; -+ pthread_mutex_unlock(&redo_list.mutex); - } - - static int flist_ndx_pop(struct flist_ndx_list *lp) - { -- struct flist_ndx_item *next; -+ struct flist_ndx_item *head, *next; - int ndx; - - if (!lp->head) - return -1; - -- ndx = lp->head->ndx; -- next = lp->head->next; -- free(lp->head); -- lp->head = next; -- if (!next) -+ pthread_mutex_lock(&hlink_list.mutex); -+ head = (struct flist_ndx_item *)lp->head; -+ next = (struct flist_ndx_item *)head->next; -+ ndx = head->ndx; -+ if (!(lp->head = next)) - lp->tail = NULL; -+ pthread_mutex_unlock(&hlink_list.mutex); -+ free(head); - - return ndx; - } -@@ -147,7 +151,7 @@ static void check_timeout(void) - { - time_t t; - -- if (!io_timeout || ignore_timeout) -+ if (!io_timeout) - return; - - if (!last_io_in) { -@@ -188,44 +192,38 @@ void set_io_timeout(int secs) - - /* Setup the fd used to receive MSG_* messages. Only needed during the - * early stages of being a local sender (up through the sending of the -- * file list) or when we're the generator (to fetch the messages from -- * the receiver). */ -+ * file list). */ - void set_msg_fd_in(int fd) - { - msg_fd_in = fd; - } - --/* Setup the fd used to send our MSG_* messages. Only needed when -- * we're the receiver (to send our messages to the generator). */ --void set_msg_fd_out(int fd) --{ -- msg_fd_out = fd; -- set_nonblocking(msg_fd_out); --} -- - /* Add a message to the pending MSG_* list. */ --static void msg_list_add(struct msg_list *lst, int code, const char *buf, int len) -+static void msg_list_add(int code, const char *buf, int len) - { - struct msg_list_item *m; -- int sz = len + 4 + sizeof m[0] - 1; -+ int sz = len + sizeof m[0] - 1; - -+ assert(am_receiver()); - if (!(m = (struct msg_list_item *)new_array(char, sz))) - out_of_memory("msg_list_add"); - m->next = NULL; -- m->len = len + 4; -- SIVAL(m->buf, 0, ((code+MPLEX_BASE)<<24) | len); -- memcpy(m->buf + 4, buf, len); -- if (lst->tail) -- lst->tail->next = m; -+ m->len = len; -+ m->code = code; -+ memcpy(m->buf, buf, len); -+ -+ pthread_mutex_lock(&msg_list.mutex); -+ if (msg_list.tail) -+ msg_list.tail->next = m; - else -- lst->head = m; -- lst->tail = m; -+ msg_list.head = m; -+ msg_list.tail = m; -+ pthread_mutex_unlock(&msg_list.mutex); - } - --/* Read a message from the MSG_* fd and handle it. This is called either -+/* Read a message from the MSG_* fd and handle it. This is only called - * during the early stages of being a local sender (up through the sending -- * of the file list) or when we're the generator (to fetch the messages -- * from the receiver). */ -+ * of the file list). */ - static void read_msg_fd(void) - { - char buf[2048]; -@@ -244,51 +242,6 @@ static void read_msg_fd(void) - tag = (tag >> 24) - MPLEX_BASE; - - switch (tag) { -- case MSG_DONE: -- if (len != 0 || !am_generator) { -- rprintf(FERROR, "invalid message %d:%d\n", tag, len); -- exit_cleanup(RERR_STREAMIO); -- } -- flist_ndx_push(&redo_list, -1); -- break; -- case MSG_REDO: -- if (len != 4 || !am_generator) { -- rprintf(FERROR, "invalid message %d:%d\n", tag, len); -- exit_cleanup(RERR_STREAMIO); -- } -- read_loop(fd, buf, 4); -- if (remove_source_files) -- decrement_active_files(IVAL(buf,0)); -- flist_ndx_push(&redo_list, IVAL(buf,0)); -- break; -- case MSG_DELETED: -- if (len >= (int)sizeof buf || !am_generator) { -- rprintf(FERROR, "invalid message %d:%d\n", tag, len); -- exit_cleanup(RERR_STREAMIO); -- } -- read_loop(fd, buf, len); -- send_msg(MSG_DELETED, buf, len); -- break; -- case MSG_SUCCESS: -- if (len != 4 || !am_generator) { -- rprintf(FERROR, "invalid message %d:%d\n", tag, len); -- exit_cleanup(RERR_STREAMIO); -- } -- read_loop(fd, buf, len); -- if (remove_source_files) { -- decrement_active_files(IVAL(buf,0)); -- send_msg(MSG_SUCCESS, buf, len); -- } -- if (preserve_hard_links) -- flist_ndx_push(&hlink_list, IVAL(buf,0)); -- break; -- case MSG_SOCKERR: -- if (!am_generator) { -- rprintf(FERROR, "invalid message %d:%d\n", tag, len); -- exit_cleanup(RERR_STREAMIO); -- } -- close_multiplexing_out(); -- /* FALL THROUGH */ - case MSG_INFO: - case MSG_ERROR: - case MSG_LOG: -@@ -332,75 +285,80 @@ void decrement_active_files(int ndx) - active_bytecnt -= the_file_list->files[ndx]->length; - } - --/* Try to push messages off the list onto the wire. If we leave with more -+/* Try to pop messages off the list onto the wire. If we leave with more - * to do, return 0. On error, return -1. If everything flushed, return 1. -- * This is only active in the receiver. */ --static int msg2genr_flush(int flush_it_all) -+ * This is only called by the generator. */ -+static void msg_list_flush(void) - { -- static int written = 0; -- struct timeval tv; -- fd_set fds; -+ assert(am_generator()); - -- if (msg_fd_out < 0) -- return -1; -+ if (defer_forwarding_messages) -+ return; - -- while (msg2genr.head) { -- struct msg_list_item *m = msg2genr.head; -- int n = write(msg_fd_out, m->buf + written, m->len - written); -- if (n < 0) { -- if (errno == EINTR) -- continue; -- if (errno != EWOULDBLOCK && errno != EAGAIN) -- return -1; -- if (!flush_it_all) -- return 0; -- FD_ZERO(&fds); -- FD_SET(msg_fd_out, &fds); -- tv.tv_sec = select_timeout; -- tv.tv_usec = 0; -- if (!select(msg_fd_out+1, NULL, &fds, NULL, &tv)) -- check_timeout(); -- } else if ((written += n) == m->len) { -- msg2genr.head = m->next; -- if (!msg2genr.head) -- msg2genr.tail = NULL; -- free(m); -- written = 0; -+ no_flush++; -+ defer_forwarding_messages = 1; -+ while (msg_list.head) { -+ struct msg_list_item *m = (struct msg_list_item *)msg_list.head; -+ pthread_mutex_lock(&msg_list.mutex); -+ if (!(msg_list.head = m->next)) -+ msg_list.tail = NULL; -+ pthread_mutex_unlock(&msg_list.mutex); -+ switch (m->code) { -+ case MSG_SOCKERR: -+ close_multiplexing_out(); -+ /* FALL THROUGH */ -+ case MSG_INFO: -+ case MSG_ERROR: -+ case MSG_LOG: -+ rwrite(m->code, m->buf, m->len); -+ break; -+ default: -+ io_multiplex_write(m->code, m->buf, m->len); -+ break; - } -+ free(m); - } -- return 1; -+ defer_forwarding_messages = 0; -+ no_flush--; - } - - int send_msg(enum msgcode code, const char *buf, int len) - { -- if (msg_fd_out < 0) { -+ if (!am_receiver()) { - if (!defer_forwarding_messages) - return io_multiplex_write(code, buf, len); - if (!io_multiplexing_out) - return 0; -- msg_list_add(&msg2sndr, code, buf, len); -- return 1; - } -- msg_list_add(&msg2genr, code, buf, len); -- msg2genr_flush(NORMAL_FLUSH); -+ msg_list_add(code, buf, len); - return 1; - } - --int get_redo_num(int itemizing, enum logcode code) -+/* This is only used by the receiver. */ -+void push_redo_num(int ndx) - { -- while (1) { -- if (hlink_list.head) -- check_for_finished_hlinks(itemizing, code); -- if (redo_list.head) -- break; -- read_msg_fd(); -- } -+ assert(am_receiver()); -+ flist_ndx_push(&redo_list, ndx); -+} - -+/* This is only used by the generator. */ -+int get_redo_num(void) -+{ -+ assert(am_generator()); - return flist_ndx_pop(&redo_list); - } - -+/* This is only used by the receiver. */ -+void push_hlink_num(int ndx) -+{ -+ assert(am_receiver()); -+ flist_ndx_push(&hlink_list, ndx); -+} -+ -+/* This is only used by the generator. */ - int get_hlink_num(void) - { -+ assert(am_generator()); - return flist_ndx_pop(&hlink_list); - } - -@@ -480,11 +438,6 @@ static int read_timeout(int fd, char *bu - FD_ZERO(&r_fds); - FD_ZERO(&w_fds); - FD_SET(fd, &r_fds); -- if (msg2genr.head) { -- FD_SET(msg_fd_out, &w_fds); -- if (msg_fd_out > maxfd) -- maxfd = msg_fd_out; -- } - if (io_filesfrom_f_out >= 0) { - int new_fd; - if (io_filesfrom_buflen == 0) { -@@ -517,9 +470,6 @@ static int read_timeout(int fd, char *bu - continue; - } - -- if (msg2genr.head && FD_ISSET(msg_fd_out, &w_fds)) -- msg2genr_flush(NORMAL_FLUSH); -- - if (io_filesfrom_f_out >= 0) { - if (io_filesfrom_buflen) { - if (FD_ISSET(io_filesfrom_f_out, &w_fds)) { -@@ -847,6 +797,8 @@ static void readfd(int fd, char *buffer, - } - - if (fd == write_batch_monitor_in) { -+ if (am_generator()) -+ rprintf(FINFO, "writing %d bytes to batch file from generator\n", total); - if ((size_t)write(batch_fd, buffer, total) != total) - exit_cleanup(RERR_FILEIO); - } -@@ -1115,7 +1067,6 @@ static void writefd_unbuffered(int fd, c - * to grab any messages they sent before they died. */ - while (fd == sock_f_out && io_multiplexing_in) { - set_io_timeout(30); -- ignore_timeout = 0; - readfd_unbuffered(sock_f_in, io_filesfrom_buf, - sizeof io_filesfrom_buf); - } -@@ -1126,7 +1077,7 @@ static void writefd_unbuffered(int fd, c - defer_forwarding_messages = 1; - - if (fd == sock_f_out) { -- if (io_timeout || am_generator) -+ if (io_timeout || am_generator()) - last_io_out = time(NULL); - sleep_for_bwlimit(cnt); - } -@@ -1136,23 +1087,6 @@ static void writefd_unbuffered(int fd, c - no_flush--; - } - --static void msg2sndr_flush(void) --{ -- if (defer_forwarding_messages) -- return; -- -- while (msg2sndr.head && io_multiplexing_out) { -- struct msg_list_item *m = msg2sndr.head; -- if (!(msg2sndr.head = m->next)) -- msg2sndr.tail = NULL; -- stats.total_written += m->len; -- defer_forwarding_messages = 1; -- writefd_unbuffered(sock_f_out, m->buf, m->len); -- defer_forwarding_messages = 0; -- free(m); -- } --} -- - /** - * Write an message to a multiplexed stream. If this fails then rsync - * exits. -@@ -1178,14 +1112,15 @@ static void mplex_write(enum msgcode cod - defer_forwarding_messages = 1; - writefd_unbuffered(sock_f_out, buf, len); - defer_forwarding_messages = 0; -- msg2sndr_flush(); -+ if (am_generator()) -+ msg_list_flush(); - } - } - --void io_flush(int flush_it_all) -+void io_flush(UNUSED(int flush_it_all)) - { -- msg2genr_flush(flush_it_all); -- msg2sndr_flush(); -+ if (am_generator()) -+ msg_list_flush(); - - if (!iobuf_out_cnt || no_flush) - return; -@@ -1199,11 +1134,6 @@ void io_flush(int flush_it_all) - - static void writefd(int fd, const char *buf, size_t len) - { -- if (fd == msg_fd_out) { -- rprintf(FERROR, "Internal error: wrong write used in receiver.\n"); -- exit_cleanup(RERR_PROTOCOL); -- } -- - if (fd == sock_f_out) - stats.total_written += len; - -@@ -1409,9 +1339,3 @@ void start_write_batch(int fd) - else - write_batch_monitor_in = fd; - } -- --void stop_write_batch(void) --{ -- write_batch_monitor_out = -1; -- write_batch_monitor_in = -1; --} ---- old/log.c -+++ new/log.c -@@ -33,7 +33,6 @@ extern int am_sender; - extern int local_server; - extern int quiet; - extern int module_id; --extern int msg_fd_out; - extern int allow_8bit_chars; - extern int protocol_version; - extern int preserve_times; -@@ -75,7 +74,6 @@ struct { - { RERR_IPC , "error in IPC code" }, - { RERR_CRASHED , "sibling process crashed" }, - { RERR_TERMINATED , "sibling process terminated abnormally" }, -- { RERR_SIGNAL1 , "received SIGUSR1" }, - { RERR_SIGNAL , "received SIGINT, SIGTERM, or SIGHUP" }, - { RERR_WAITCHILD , "waitpid() failed" }, - { RERR_MALLOC , "error allocating core memory buffers" }, -@@ -241,8 +239,8 @@ void rwrite(enum logcode code, const cha - if (len < 0) - exit_cleanup(RERR_MESSAGEIO); - -- if (am_server && msg_fd_out >= 0) { -- /* Pass the message to our sibling. */ -+ if (am_receiver()) { -+ /* Pass the message to the generator thread. */ - send_msg((enum msgcode)code, buf, len); - return; - } ---- old/main.c -+++ new/main.c -@@ -32,7 +32,6 @@ extern int list_only; - extern int am_root; - extern int am_server; - extern int am_sender; --extern int am_generator; - extern int am_daemon; - extern int blocking_io; - extern int remove_source_files; -@@ -96,9 +95,20 @@ struct pid_status { - - static time_t starttime, endtime; - static int64 total_read, total_written; -+static pthread_t receiver_tid; - - static void show_malloc_stats(void); - -+int am_generator() -+{ -+ return receiver_tid != 0 && pthread_self() != receiver_tid; -+} -+ -+int am_receiver() -+{ -+ return receiver_tid != 0 && pthread_self() == receiver_tid; -+} -+ - /* Works like waitpid(), but if we already harvested the child pid in our - * remember_children(), we succeed instead of returning an error. */ - pid_t wait_process(pid_t pid, int *status_ptr, int flags) -@@ -175,7 +185,7 @@ static void handle_stats(int f) - show_flist_stats(); - } - -- if (am_generator) -+ if (am_generator()) - return; - - if (am_daemon) { -@@ -684,12 +694,30 @@ static void do_server_sender(int f_in, i - exit_cleanup(0); - } - -+struct thread_args { -+ struct file_list *flist; -+ char *local_name; -+ int f_in; -+}; -+ -+static void *start_receiver_thread(void *arg) -+{ -+ static int exit_code; -+ struct thread_args *ta = (struct thread_args *)arg; -+ -+ recv_files(ta->f_in, ta->flist, ta->local_name); -+ handle_stats(ta->f_in); -+ -+ push_redo_num(-2); -+ -+ exit_code = log_got_error ? RERR_PARTIAL : 0; -+ return &exit_code; -+} - - static int do_recv(int f_in,int f_out,struct file_list *flist,char *local_name) - { -- int pid; -- int exit_code = 0; -- int error_pipe[2]; -+ void *value_ptr; -+ struct thread_args args; - - /* The receiving side mustn't obey this, or an existing symlink that - * points to an identical file won't be replaced by the referent. */ -@@ -698,70 +726,16 @@ static int do_recv(int f_in,int f_out,st - if (preserve_hard_links) - init_hard_links(); - -- if (fd_pair(error_pipe) < 0) { -- rsyserr(FERROR, errno, "pipe failed in do_recv"); -- exit_cleanup(RERR_IPC); -- } -- -- io_flush(NORMAL_FLUSH); -- -- if ((pid = do_fork()) == -1) { -- rsyserr(FERROR, errno, "fork failed in do_recv"); -+ args.f_in = f_in; -+ args.flist = flist; -+ args.local_name = local_name; -+ if (pthread_create(&receiver_tid, NULL, start_receiver_thread, &args) < 0) { -+ rsyserr(FERROR, errno, "pthread_create failed in do_recv"); - exit_cleanup(RERR_IPC); - } - -- if (pid == 0) { -- close(error_pipe[0]); -- if (f_in != f_out) -- close(f_out); -- -- /* we can't let two processes write to the socket at one time */ -- close_multiplexing_out(); -- -- /* set place to send errors */ -- set_msg_fd_out(error_pipe[1]); -- -- recv_files(f_in, flist, local_name); -- io_flush(FULL_FLUSH); -- handle_stats(f_in); -- -- send_msg(MSG_DONE, "", 0); -- io_flush(FULL_FLUSH); -- -- /* Handle any keep-alive packets from the post-processing work -- * that the generator does. */ -- if (protocol_version >= 29) { -- kluge_around_eof = -1; -- -- /* This should only get stopped via a USR2 signal. */ -- while (read_int(f_in) == flist->count -- && read_shortint(f_in) == ITEM_IS_NEW) {} -- -- rprintf(FERROR, "Invalid packet at end of run [%s]\n", -- who_am_i()); -- exit_cleanup(RERR_PROTOCOL); -- } -- -- /* Finally, we go to sleep until our parent kills us with a -- * USR2 signal. We sleep for a short time, as on some OSes -- * a signal won't interrupt a sleep! */ -- while (1) -- msleep(20); -- } -- -- am_generator = 1; -- close_multiplexing_in(); -- if (write_batch && !am_server) -- stop_write_batch(); -- -- close(error_pipe[1]); -- if (f_in != f_out) -- close(f_in); -- - io_start_buffering_out(); - -- set_msg_fd_in(error_pipe[0]); -- - generate_files(f_out, flist, local_name); - - handle_stats(-1); -@@ -772,10 +746,13 @@ static int do_recv(int f_in,int f_out,st - } - io_flush(FULL_FLUSH); - -- set_msg_fd_in(-1); -- kill(pid, SIGUSR2); -- wait_process_with_flush(pid, &exit_code); -- return exit_code; -+ pthread_join(receiver_tid, &value_ptr); -+ if (!am_server) -+ output_summary(); -+ -+ close_all(); -+ -+ return *(int*)value_ptr; - } - - -@@ -1177,22 +1154,6 @@ static int start_client(int argc, char * - return ret; - } - -- --static RETSIGTYPE sigusr1_handler(UNUSED(int val)) --{ -- exit_cleanup(RERR_SIGNAL1); --} -- --static RETSIGTYPE sigusr2_handler(UNUSED(int val)) --{ -- if (!am_server) -- output_summary(); -- close_all(); -- if (log_got_error) -- _exit(RERR_PARTIAL); -- _exit(0); --} -- - RETSIGTYPE remember_children(UNUSED(int val)) - { - #ifdef WNOHANG -@@ -1284,8 +1245,6 @@ int main(int argc,char *argv[]) - # endif - sigact.sa_flags = SA_NOCLDSTOP; - #endif -- SIGACTMASK(SIGUSR1, sigusr1_handler); -- SIGACTMASK(SIGUSR2, sigusr2_handler); - SIGACTMASK(SIGCHLD, remember_children); - #ifdef MAINTAINER_MODE - SIGACTMASK(SIGSEGV, rsync_panic_handler); ---- old/match.c -+++ new/match.c -@@ -23,7 +23,7 @@ - #include "rsync.h" - - extern int verbose; --extern int do_progress; -+extern int recv_progress; - extern int checksum_seed; - extern int append_mode; - -@@ -113,7 +113,7 @@ static void matched(int f, struct sum_st - else - last_match = offset; - -- if (buf && do_progress) -+ if (buf && recv_progress) - show_progress(last_match, buf->file_size); - } - -@@ -317,7 +317,7 @@ void match_sums(int f, struct sum_struct - if (append_mode) { - OFF_T j = 0; - for (j = CHUNK_SIZE; j < s->flength; j += CHUNK_SIZE) { -- if (buf && do_progress) -+ if (buf && recv_progress) - show_progress(last_match, buf->file_size); - sum_update(map_ptr(buf, last_match, CHUNK_SIZE), - CHUNK_SIZE); -@@ -325,7 +325,7 @@ void match_sums(int f, struct sum_struct - } - if (last_match < s->flength) { - int32 len = (int32)(s->flength - last_match); -- if (buf && do_progress) -+ if (buf && recv_progress) - show_progress(last_match, buf->file_size); - sum_update(map_ptr(buf, last_match, len), len); - last_match = s->flength; ---- old/options.c -+++ new/options.c -@@ -74,7 +74,6 @@ int def_compress_level = Z_DEFAULT_COMPR - int am_root = 0; - int am_server = 0; - int am_sender = 0; --int am_generator = 0; - int am_starting_up = 1; - int relative_paths = -1; - int implied_dirs = 1; -@@ -95,6 +94,7 @@ int am_daemon = 0; - int daemon_over_rsh = 0; - int do_stats = 0; - int do_progress = 0; -+int recv_progress = 0; - int keep_partial = 0; - int safe_symlinks = 0; - int copy_unsafe_links = 0; -@@ -1302,6 +1302,7 @@ int parse_arguments(int *argc, const cha - - if (do_progress && !verbose && !log_before_transfer && !am_server) - verbose = 1; -+ recv_progress = do_progress; - - if (dry_run) - do_xfers = 0; ---- old/pipe.c -+++ new/pipe.c -@@ -58,7 +58,7 @@ pid_t piped_child(char **command, int *f - exit_cleanup(RERR_IPC); - } - -- pid = do_fork(); -+ pid = fork(); - if (pid == -1) { - rsyserr(FERROR, errno, "fork"); - exit_cleanup(RERR_IPC); -@@ -122,7 +122,7 @@ pid_t local_child(int argc, char **argv, - exit_cleanup(RERR_IPC); - } - -- pid = do_fork(); -+ pid = fork(); - if (pid == -1) { - rsyserr(FERROR, errno, "fork"); - exit_cleanup(RERR_IPC); ---- old/receiver.c -+++ new/receiver.c -@@ -25,7 +25,7 @@ - extern int verbose; - extern int do_xfers; - extern int am_server; --extern int do_progress; -+extern int recv_progress; - extern int log_before_transfer; - extern int stdout_format_has_i; - extern int logfile_format_has_i; -@@ -157,7 +157,7 @@ static int receive_data(int f_in, char * - if (sum.remainder) - sum.flength -= sum.blength - sum.remainder; - for (j = CHUNK_SIZE; j < sum.flength; j += CHUNK_SIZE) { -- if (do_progress) -+ if (recv_progress) - show_progress(offset, total_size); - sum_update(map_ptr(mapbuf, offset, CHUNK_SIZE), - CHUNK_SIZE); -@@ -165,7 +165,7 @@ static int receive_data(int f_in, char * - } - if (offset < sum.flength) { - int32 len = (int32)(sum.flength - offset); -- if (do_progress) -+ if (recv_progress) - show_progress(offset, total_size); - sum_update(map_ptr(mapbuf, offset, len), len); - offset = sum.flength; -@@ -178,7 +178,7 @@ static int receive_data(int f_in, char * - } - - while ((i = recv_token(f_in, &data)) != 0) { -- if (do_progress) -+ if (recv_progress) - show_progress(offset, total_size); - - if (i > 0) { -@@ -248,7 +248,7 @@ static int receive_data(int f_in, char * - ftruncate(fd, offset); - #endif - -- if (do_progress) -+ if (recv_progress) - end_progress(total_size); - - if (fd != -1 && offset > 0 && sparse_end(fd) != 0) { -@@ -299,12 +299,12 @@ static void handle_delayed_updates(struc - "rename failed for %s (from %s)", - full_fname(fname), partialptr); - } else { -- if (remove_source_files -- || (preserve_hard_links -- && file->link_u.links)) { -+ if (remove_source_files) { - SIVAL(numbuf, 0, i); - send_msg(MSG_SUCCESS,numbuf,4); - } -+ if (preserve_hard_links && file->link_u.links) -+ push_hlink_num(i); - handle_partial_dir(partialptr, PDIR_DELETE); - } - } -@@ -355,11 +355,6 @@ int recv_files(int f_in, struct file_lis - if (verbose > 2) - rprintf(FINFO,"recv_files(%d) starting\n",flist->count); - -- if (flist->hlink_pool) { -- pool_destroy(flist->hlink_pool); -- flist->hlink_pool = NULL; -- } -- - if (delay_updates) - delayed_bits = bitbag_create(flist->count); - -@@ -382,7 +377,7 @@ int recv_files(int f_in, struct file_lis - rprintf(FINFO, "recv_files phase=%d\n", phase); - if (phase == 2 && delay_updates) - handle_delayed_updates(flist, local_name); -- send_msg(MSG_DONE, "", 0); -+ push_redo_num(-2); - if (keep_partial && !partial_dir) - make_backups = 0; /* prevents double backup */ - if (append_mode) { -@@ -607,7 +602,7 @@ int recv_files(int f_in, struct file_lis - /* log the transfer */ - if (log_before_transfer) - log_item(FCLIENT, file, &initial_stats, iflags, NULL); -- else if (!am_server && verbose && do_progress) -+ else if (!am_server && verbose && recv_progress) - rprintf(FINFO, "%s\n", fname); - - /* recv file data */ -@@ -654,11 +649,13 @@ int recv_files(int f_in, struct file_lis - cleanup_disable(); - - if (recv_ok > 0) { -- if (remove_source_files -- || (preserve_hard_links && file->link_u.links)) { -+ if (remove_source_files) { -+ decrement_active_files(i); - SIVAL(numbuf, 0, i); - send_msg(MSG_SUCCESS, numbuf, 4); - } -+ if (preserve_hard_links && file->link_u.links) -+ push_hlink_num(i); - } else if (!recv_ok) { - enum logcode msgtype = phase || read_batch ? FERROR : FINFO; - if (msgtype == FERROR || verbose) { -@@ -681,8 +678,8 @@ int recv_files(int f_in, struct file_lis - errstr, fname, keptstr, redostr); - } - if (!phase) { -- SIVAL(numbuf, 0, i); -- send_msg(MSG_REDO, numbuf, 4); -+ decrement_active_files(i); -+ push_redo_num(i); - } - } - } ---- old/rsync.c -+++ new/rsync.c -@@ -39,7 +39,6 @@ extern int omit_dir_times; - extern int am_root; - extern int am_server; - extern int am_sender; --extern int am_generator; - extern int am_starting_up; - extern int allow_8bit_chars; - extern int preserve_uid; -@@ -305,5 +304,5 @@ const char *who_am_i(void) - { - if (am_starting_up) - return am_server ? "server" : "client"; -- return am_sender ? "sender" : am_generator ? "generator" : "receiver"; -+ return am_sender ? "sender" : am_generator() ? "generator" : "receiver"; - } ---- old/rsync.h -+++ new/rsync.h -@@ -170,10 +170,8 @@ enum msgcode { - MSG_DATA=0, /* raw data on the multiplexed stream */ - MSG_ERROR=FERROR, MSG_INFO=FINFO, /* remote logging */ - MSG_LOG=FLOG, MSG_SOCKERR=FSOCKERR, /* sibling logging */ -- MSG_REDO=9, /* reprocess indicated flist index */ - MSG_SUCCESS=100,/* successfully updated indicated flist index */ - MSG_DELETED=101,/* successfully deleted a file on receiving side */ -- MSG_DONE=86 /* current phase is done */ - }; - - #include "errcode.h" -@@ -330,6 +328,7 @@ enum msgcode { - #endif - - #include -+#include - - #include "lib/pool_alloc.h" - ---- old/util.c -+++ new/util.c -@@ -417,49 +417,6 @@ int robust_rename(const char *from, cons - return -1; - } - --static pid_t all_pids[10]; --static int num_pids; -- --/** Fork and record the pid of the child. **/ --pid_t do_fork(void) --{ -- pid_t newpid = fork(); -- -- if (newpid != 0 && newpid != -1) { -- all_pids[num_pids++] = newpid; -- } -- return newpid; --} -- --/** -- * Kill all children. -- * -- * @todo It would be kind of nice to make sure that they are actually -- * all our children before we kill them, because their pids may have -- * been recycled by some other process. Perhaps when we wait for a -- * child, we should remove it from this array. Alternatively we could -- * perhaps use process groups, but I think that would not work on -- * ancient Unix versions that don't support them. -- **/ --void kill_all(int sig) --{ -- int i; -- -- for (i = 0; i < num_pids; i++) { -- /* Let's just be a little careful where we -- * point that gun, hey? See kill(2) for the -- * magic caused by negative values. */ -- pid_t p = all_pids[i]; -- -- if (p == getpid()) -- continue; -- if (p <= 0) -- continue; -- -- kill(p, sig); -- } --} -- - /** Turn a user name into a uid */ - int name_to_uid(const char *name, uid_t *uid) - { -- 2.34.1