Fixed failing hunks.
[rsync/rsync-patches.git] / threaded-receiver.diff
CommitLineData
3ca259e2
WD
1This patch changes the receiving side to have the receiving code use a thread
2instead of a forked process. This extra thread does read from the socket, but
3it sends any stdout/stderr messages to the generator (main thread) to output.
4
5** This is very new code. ** Yes, it passes the "make test" testsuite, but
6there may still be some problems, especially in some of the untested features.
7(For one thing, I haven't yet added code to properly handle any keep-alive
8messages that arrive on the receiving side during the --delete-after phase!)
9
10This code just uses pthread.h directly, so configure changes will probably be
11needed to make this compatible with more systems. I have also tested that
12this code works fine using the GNU pth library without any code changes if
13you configured it with --enable-syscall-soft --enable-pthread (you may need
14to twiddle the Makefile options if you didn't install the library, though).
15
16If you try this out, please send some email to wayned@samba.org or the rsync
17mailing list with your results, build changes, bug reports, etc. Thanks!
18
19Be sure to run "make proto" before running "make".
20
21--- orig/Makefile.in 2005-11-07 04:29:00
22+++ Makefile.in 2005-12-10 18:35:39
23@@ -6,7 +6,7 @@ exec_prefix=@exec_prefix@
24 bindir=@bindir@
25 mandir=@mandir@
26
27-LIBS=@LIBS@
28+LIBS=@LIBS@ -lpthread
29 CC=@CC@
30 CFLAGS=@CFLAGS@
31 CPPFLAGS=@CPPFLAGS@
32--- orig/cleanup.c 2005-11-10 16:58:36
33+++ cleanup.c 2005-12-08 23:17:08
34@@ -94,9 +94,6 @@ void _exit_cleanup(int code, const char
35 }
36 inside_cleanup++;
37
38- signal(SIGUSR1, SIG_IGN);
39- signal(SIGUSR2, SIG_IGN);
40-
41 if (verbose > 3) {
42 rprintf(FINFO,"_exit_cleanup(code=%d, file=%s, line=%d): entered\n",
43 code, safe_fname(file), line);
44@@ -127,8 +124,6 @@ void _exit_cleanup(int code, const char
45 io_flush(FULL_FLUSH);
46 if (cleanup_fname)
47 do_unlink(cleanup_fname);
48- if (code)
49- kill_all(SIGUSR1);
50 if (cleanup_pid && cleanup_pid == getpid()) {
51 char *pidf = lp_pid_file();
52 if (pidf && *pidf)
8a0123e5
WD
53--- orig/errcode.h 2005-12-16 23:48:43
54+++ errcode.h 2005-12-16 23:50:02
55@@ -37,7 +37,6 @@
56 #define RERR_CRASHED 15 /* sibling crashed */
57 #define RERR_TERMINATED 16 /* sibling terminated abnormally */
58
59-#define RERR_SIGNAL1 19 /* status returned when sent SIGUSR1 */
60 #define RERR_SIGNAL 20 /* status returned when sent SIGINT, SIGTERM, SIGHUP */
61 #define RERR_WAITCHILD 21 /* some error returned by waitpid() */
62 #define RERR_MALLOC 22 /* error allocating core memory buffers */
14824301 63--- orig/generator.c 2005-12-16 04:03:06
3ca259e2
WD
64+++ generator.c 2005-12-08 23:17:08
65@@ -65,7 +65,6 @@ extern OFF_T min_size;
66 extern int io_error;
67 extern int allowed_lull;
68 extern int sock_f_out;
69-extern int ignore_timeout;
70 extern int protocol_version;
71 extern int fuzzy_basis;
72 extern int always_checksum;
14824301
WD
73@@ -96,6 +95,11 @@ static int deletion_count = 0; /* used t
74 static int can_link_symlinks = 1; /* start out optimistic */
75 static int can_link_devices = 1;
3ca259e2
WD
76
77+/* These vars are local copies so that the receiver can use the originals. */
78+static int GEN_append_mode;
79+static int GEN_make_backups;
80+static int GEN_csum_length;
81+
82 /* For calling delete_file() */
83 #define DEL_FORCE_RECURSE (1<<1) /* recurse even w/o --force */
84 #define DEL_TERSE (1<<3)
14824301 85@@ -445,8 +449,8 @@ static void sum_sizes_sqroot(struct sum_
3ca259e2
WD
86 }
87
88 if (protocol_version < 27) {
89- s2length = csum_length;
90- } else if (csum_length == SUM_LENGTH) {
91+ s2length = GEN_csum_length;
92+ } else if (GEN_csum_length == SUM_LENGTH) {
93 s2length = SUM_LENGTH;
94 } else {
95 int32 c;
14824301 96@@ -456,7 +460,7 @@ static void sum_sizes_sqroot(struct sum_
3ca259e2
WD
97 for (c = blength; c >>= 1 && b; b--) {}
98 /* add a bit, subtract rollsum, round up. */
99 s2length = (b + 1 - 32 + 7) / 8; /* --optimize in compiler-- */
100- s2length = MAX(s2length, csum_length);
101+ s2length = MAX(s2length, GEN_csum_length);
102 s2length = MIN(s2length, SUM_LENGTH);
103 }
104
14824301 105@@ -490,7 +494,7 @@ static void generate_and_send_sums(int f
3ca259e2
WD
106 sum_sizes_sqroot(&sum, len);
107 write_sum_head(f_out, &sum);
108
109- if (append_mode > 0 && f_copy < 0)
110+ if (GEN_append_mode > 0 && f_copy < 0)
111 return;
112
113 if (len > 0)
14824301 114@@ -509,7 +513,7 @@ static void generate_and_send_sums(int f
3ca259e2
WD
115
116 if (f_copy >= 0) {
117 full_write(f_copy, map, n1);
118- if (append_mode > 0)
119+ if (GEN_append_mode > 0)
120 continue;
121 }
122
14824301 123@@ -1143,7 +1147,7 @@ static void recv_generator(char *fname,
3ca259e2
WD
124 return;
125 }
126
127- if (append_mode && st.st_size > file->length)
128+ if (GEN_append_mode && st.st_size > file->length)
129 return;
130
14824301
WD
131 if (fnamecmp_type <= FNAMECMP_BASIS_DIR_HIGH)
132@@ -1198,7 +1202,7 @@ static void recv_generator(char *fname,
3ca259e2
WD
133 goto notify_others;
134 }
135
136- if (inplace && make_backups && fnamecmp_type == FNAMECMP_FNAME) {
137+ if (inplace && GEN_make_backups && fnamecmp_type == FNAMECMP_FNAME) {
138 if (!(backupptr = get_backup_name(fname))) {
139 close(fd);
140 return;
14824301 141@@ -1288,7 +1292,10 @@ void generate_files(int f_out, struct fi
3ca259e2
WD
142 int save_ignore_existing = ignore_existing;
143 int save_ignore_non_existing = ignore_non_existing;
144 int save_do_progress = do_progress;
145- int save_make_backups = make_backups;
146+ int save_make_backups = GEN_make_backups = make_backups;
147+
148+ GEN_append_mode = append_mode;
149+ GEN_csum_length = csum_length;
150
151 if (protocol_version >= 29) {
152 itemizing = 1;
14824301 153@@ -1317,7 +1324,7 @@ void generate_files(int f_out, struct fi
3ca259e2
WD
154 do_delete_pass(flist);
155 do_progress = 0;
156
157- if (append_mode || whole_file < 0)
158+ if (GEN_append_mode || whole_file < 0)
159 whole_file = 0;
160 if (verbose >= 2) {
161 rprintf(FINFO, "delta-transmission %s\n",
14824301 162@@ -1326,12 +1333,6 @@ void generate_files(int f_out, struct fi
3ca259e2
WD
163 : "enabled");
164 }
165
166- /* Since we often fill up the outgoing socket and then just sit around
167- * waiting for the other 2 processes to do their thing, we don't want
168- * to exit on a timeout. If the data stops flowing, the receiver will
169- * notice that and let us know via the redo pipe (or its closing). */
170- ignore_timeout = 1;
171-
172 for (i = 0; i < flist->count; i++) {
173 struct file_struct *file = flist->files[i];
174
14824301 175@@ -1375,23 +1376,34 @@ void generate_files(int f_out, struct fi
3ca259e2
WD
176 delete_in_dir(NULL, NULL, NULL);
177
178 phase++;
179- csum_length = SUM_LENGTH;
180+ GEN_csum_length = SUM_LENGTH; /* csum_length is set by the receiver */
181 max_size = min_size = ignore_existing = ignore_non_existing = 0;
182 update_only = always_checksum = size_only = 0;
183 ignore_times = 1;
184- if (append_mode) /* resend w/o append mode */
185- append_mode = -1; /* ... but only longer files */
186- make_backups = 0; /* avoid a duplicate backup for inplace processing */
187+ if (GEN_append_mode) /* resend w/o append mode */
188+ GEN_append_mode = -1; /* ... but only longer files */
189+ GEN_make_backups = 0; /* avoid a duplicate backup for inplace processing */
190
191 if (verbose > 2)
192 rprintf(FINFO,"generate_files phase=%d\n",phase);
193
194 write_int(f_out, -1);
195+ io_flush(NORMAL_FLUSH);
196
197 /* files can cycle through the system more than once
198 * to catch initial checksum errors */
199- while ((i = get_redo_num(itemizing, code)) != -1) {
200- struct file_struct *file = flist->files[i];
201+ while (1) {
202+ struct file_struct *file;
203+ if (preserve_hard_links)
204+ check_for_finished_hlinks(itemizing, code);
205+ if ((i = get_redo_num()) < 0) {
206+ if (i == -2)
207+ break;
208+ io_flush(NORMAL_FLUSH);
209+ msleep(20);
210+ continue;
211+ }
212+ file = flist->files[i];
213 if (local_name)
214 strlcpy(fbuf, local_name, sizeof fbuf);
215 else
14824301 216@@ -1403,27 +1415,43 @@ void generate_files(int f_out, struct fi
3ca259e2
WD
217 phase++;
218 ignore_non_existing = save_ignore_non_existing;
219 ignore_existing = save_ignore_existing;
220- make_backups = save_make_backups;
221+ GEN_make_backups = save_make_backups;
222
223 if (verbose > 2)
224 rprintf(FINFO,"generate_files phase=%d\n",phase);
225
226 write_int(f_out, -1);
227+ io_flush(NORMAL_FLUSH);
228+
229 /* Reduce round-trip lag-time for a useless delay-updates phase. */
230- if (protocol_version >= 29 && !delay_updates)
231+ if (protocol_version >= 29 && !delay_updates) {
232 write_int(f_out, -1);
233+ io_flush(NORMAL_FLUSH);
234+ }
235
236- /* Read MSG_DONE for the redo phase (and any prior messages). */
237- get_redo_num(itemizing, code);
238+ /* Read end marker for the redo phase (and any prior messages). */
239+ while (1) {
240+ if (preserve_hard_links)
241+ check_for_finished_hlinks(itemizing, code);
242+ if (get_redo_num() == -2)
243+ break;
244+ io_flush(NORMAL_FLUSH);
245+ msleep(20);
246+ }
247
248 if (protocol_version >= 29) {
249 phase++;
250 if (verbose > 2)
251 rprintf(FINFO, "generate_files phase=%d\n", phase);
252- if (delay_updates)
253+ if (delay_updates) {
254 write_int(f_out, -1);
255- /* Read MSG_DONE for delay-updates phase & prior messages. */
256- get_redo_num(itemizing, code);
257+ io_flush(NORMAL_FLUSH);
258+ }
259+ /* Read end marker for delay-updates phase & prior messages. */
260+ while (get_redo_num() != -2) {
261+ io_flush(NORMAL_FLUSH);
262+ msleep(20);
263+ }
264 }
265
266 do_progress = save_do_progress;
267--- orig/io.c 2005-12-08 21:19:31
268+++ io.c 2005-12-10 19:03:08
269@@ -47,7 +47,6 @@ extern int allowed_lull;
270 extern int am_server;
271 extern int am_daemon;
272 extern int am_sender;
273-extern int am_generator;
274 extern int eol_nulls;
275 extern int read_batch;
276 extern int csum_length;
277@@ -60,7 +59,6 @@ extern struct stats stats;
278 extern struct file_list *the_file_list;
279
280 const char phase_unknown[] = "unknown";
281-int ignore_timeout = 0;
282 int batch_fd = -1;
283 int batch_gen_fd = -1;
284
285@@ -84,7 +82,6 @@ const char *io_read_phase = phase_unknow
286 int kluge_around_eof = 0;
287
288 int msg_fd_in = -1;
289-int msg_fd_out = -1;
290 int sock_f_in = -1;
291 int sock_f_out = -1;
292
293@@ -109,27 +106,32 @@ static int select_timeout = SELECT_TIMEO
294 static void read_loop(int fd, char *buf, size_t len);
295
296 struct flist_ndx_item {
297- struct flist_ndx_item *next;
298+ volatile struct flist_ndx_item *next;
299 int ndx;
300 };
301
302 struct flist_ndx_list {
303- struct flist_ndx_item *head, *tail;
304+ volatile struct flist_ndx_item *head, *tail;
305+ pthread_mutex_t mutex;
306 };
307
308-static struct flist_ndx_list redo_list, hlink_list;
309+static struct flist_ndx_list redo_list = { NULL, NULL, PTHREAD_MUTEX_INITIALIZER };
310+static struct flist_ndx_list hlink_list = { NULL, NULL, PTHREAD_MUTEX_INITIALIZER };
311
312 struct msg_list_item {
313- struct msg_list_item *next;
314+ volatile struct msg_list_item *next;
315+ pthread_mutex_t mutex;
316 char *buf;
317 int len;
318+ enum msgcode code;
319 };
320
321 struct msg_list {
322- struct msg_list_item *head, *tail;
323+ volatile struct msg_list_item *head, *tail;
324+ pthread_mutex_t mutex;
325 };
326
327-static struct msg_list msg_list;
328+static struct msg_list msg_list = { NULL, NULL, PTHREAD_MUTEX_INITIALIZER };
329
330 static void flist_ndx_push(struct flist_ndx_list *lp, int ndx)
331 {
332@@ -139,27 +141,31 @@ static void flist_ndx_push(struct flist_
333 out_of_memory("flist_ndx_push");
334 item->next = NULL;
335 item->ndx = ndx;
336+ pthread_mutex_lock(&redo_list.mutex);
337 if (lp->tail)
338 lp->tail->next = item;
339 else
340 lp->head = item;
341 lp->tail = item;
342+ pthread_mutex_unlock(&redo_list.mutex);
343 }
344
345 static int flist_ndx_pop(struct flist_ndx_list *lp)
346 {
347- struct flist_ndx_item *next;
348+ struct flist_ndx_item *head, *next;
349 int ndx;
350
351 if (!lp->head)
352 return -1;
353
354- ndx = lp->head->ndx;
355- next = lp->head->next;
356- free(lp->head);
357- lp->head = next;
358- if (!next)
359+ pthread_mutex_lock(&hlink_list.mutex);
360+ head = (struct flist_ndx_item *)lp->head;
361+ next = (struct flist_ndx_item *)head->next;
362+ ndx = head->ndx;
363+ if (!(lp->head = next))
364 lp->tail = NULL;
365+ pthread_mutex_unlock(&hlink_list.mutex);
366+ free(head);
367
368 return ndx;
369 }
370@@ -168,7 +174,7 @@ static void check_timeout(void)
371 {
372 time_t t;
373
374- if (!io_timeout || ignore_timeout)
375+ if (!io_timeout)
376 return;
377
378 if (!last_io_in) {
379@@ -209,45 +215,40 @@ void set_io_timeout(int secs)
380
381 /* Setup the fd used to receive MSG_* messages. Only needed during the
382 * early stages of being a local sender (up through the sending of the
383- * file list) or when we're the generator (to fetch the messages from
384- * the receiver). */
385+ * file list). */
386 void set_msg_fd_in(int fd)
387 {
388 msg_fd_in = fd;
389 }
390
391-/* Setup the fd used to send our MSG_* messages. Only needed when
392- * we're the receiver (to send our messages to the generator). */
393-void set_msg_fd_out(int fd)
394-{
395- msg_fd_out = fd;
396- set_nonblocking(msg_fd_out);
397-}
398-
399 /* Add a message to the pending MSG_* list. */
400 static void msg_list_add(int code, char *buf, int len)
401 {
402 struct msg_list_item *ml;
403
404+ assert(am_receiver());
405 if (!(ml = new(struct msg_list_item)))
406 out_of_memory("msg_list_add");
407 ml->next = NULL;
408- if (!(ml->buf = new_array(char, len+4)))
409+ /* NOTE: the "+ 1" allows rwrite() to use the buf! */
410+ if (!(ml->buf = new_array(char, len + 1)))
411 out_of_memory("msg_list_add");
412- SIVAL(ml->buf, 0, ((code+MPLEX_BASE)<<24) | len);
413- memcpy(ml->buf+4, buf, len);
414- ml->len = len+4;
415+ memcpy(ml->buf, buf, len);
416+ ml->len = len;
417+ ml->code = code;
418+
419+ pthread_mutex_lock(&msg_list.mutex);
420 if (msg_list.tail)
421 msg_list.tail->next = ml;
422 else
423 msg_list.head = ml;
424 msg_list.tail = ml;
425+ pthread_mutex_unlock(&msg_list.mutex);
426 }
427
428-/* Read a message from the MSG_* fd and handle it. This is called either
429+/* Read a message from the MSG_* fd and handle it. This is only called
430 * during the early stages of being a local sender (up through the sending
431- * of the file list) or when we're the generator (to fetch the messages
432- * from the receiver). */
433+ * of the file list). */
434 static void read_msg_fd(void)
435 {
436 char buf[2048];
437@@ -266,40 +267,6 @@ static void read_msg_fd(void)
438 tag = (tag >> 24) - MPLEX_BASE;
439
440 switch (tag) {
441- case MSG_DONE:
442- if (len != 0 || !am_generator) {
443- rprintf(FERROR, "invalid message %d:%d\n", tag, len);
444- exit_cleanup(RERR_STREAMIO);
445- }
446- flist_ndx_push(&redo_list, -1);
447- break;
448- case MSG_REDO:
449- if (len != 4 || !am_generator) {
450- rprintf(FERROR, "invalid message %d:%d\n", tag, len);
451- exit_cleanup(RERR_STREAMIO);
452- }
453- read_loop(fd, buf, 4);
454- flist_ndx_push(&redo_list, IVAL(buf,0));
455- break;
456- case MSG_DELETED:
457- if (len >= (int)sizeof buf || !am_generator) {
458- rprintf(FERROR, "invalid message %d:%d\n", tag, len);
459- exit_cleanup(RERR_STREAMIO);
460- }
461- read_loop(fd, buf, len);
462- io_multiplex_write(MSG_DELETED, buf, len);
463- break;
464- case MSG_SUCCESS:
465- if (len != 4 || !am_generator) {
466- rprintf(FERROR, "invalid message %d:%d\n", tag, len);
467- exit_cleanup(RERR_STREAMIO);
468- }
469- read_loop(fd, buf, len);
470- if (remove_sent_files)
471- io_multiplex_write(MSG_SUCCESS, buf, len);
472- if (preserve_hard_links)
473- flist_ndx_push(&hlink_list, IVAL(buf,0));
474- break;
475 case MSG_INFO:
476 case MSG_ERROR:
477 case MSG_LOG:
478@@ -320,71 +287,72 @@ static void read_msg_fd(void)
479 msg_fd_in = fd;
480 }
481
482-/* Try to push messages off the list onto the wire. If we leave with more
483+/* Try to pop messages off the list onto the wire. If we leave with more
484 * to do, return 0. On error, return -1. If everything flushed, return 1.
485- * This is only active in the receiver. */
486+ * This is only called by the generator. */
487 static int msg_list_flush(int flush_it_all)
488 {
489- static int written = 0;
490- struct timeval tv;
491- fd_set fds;
492-
493- if (msg_fd_out < 0)
494- return -1;
495-
496+ assert(am_generator());
497+ no_flush++;
498 while (msg_list.head) {
499- struct msg_list_item *ml = msg_list.head;
500- int n = write(msg_fd_out, ml->buf + written, ml->len - written);
501- if (n < 0) {
502- if (errno == EINTR)
503- continue;
504- if (errno != EWOULDBLOCK && errno != EAGAIN)
505- return -1;
506- if (!flush_it_all)
507- return 0;
508- FD_ZERO(&fds);
509- FD_SET(msg_fd_out, &fds);
510- tv.tv_sec = select_timeout;
511- tv.tv_usec = 0;
512- if (!select(msg_fd_out+1, NULL, &fds, NULL, &tv))
513- check_timeout();
514- } else if ((written += n) == ml->len) {
515- free(ml->buf);
516- msg_list.head = ml->next;
517- if (!msg_list.head)
518- msg_list.tail = NULL;
519- free(ml);
520- written = 0;
521+ struct msg_list_item *ml = (struct msg_list_item *)msg_list.head;
522+ switch (ml->code) {
523+ case MSG_INFO:
524+ case MSG_ERROR:
525+ case MSG_LOG:
526+ rwrite(ml->code, ml->buf, ml->len);
527+ break;
528+ default:
529+ io_multiplex_write(ml->code, ml->buf, ml->len);
530+ break;
531 }
532+ pthread_mutex_lock(&msg_list.mutex);
533+ if (!(msg_list.head = ml->next))
534+ msg_list.tail = NULL;
535+ pthread_mutex_unlock(&msg_list.mutex);
536+ free(ml->buf);
537+ free(ml);
538+ if (!flush_it_all)
539+ break;
540 }
541+ no_flush--;
542+
543 return 1;
544 }
545
546 void send_msg(enum msgcode code, char *buf, int len)
547 {
548- if (msg_fd_out < 0) {
549+ if (am_receiver())
550+ msg_list_add(code, buf, len);
551+ else
552 io_multiplex_write(code, buf, len);
553- return;
554- }
555- msg_list_add(code, buf, len);
556- msg_list_flush(NORMAL_FLUSH);
557 }
558
559-int get_redo_num(int itemizing, enum logcode code)
560+/* This is only used by the receiver. */
561+void push_redo_num(int ndx)
562 {
563- while (1) {
564- if (hlink_list.head)
565- check_for_finished_hlinks(itemizing, code);
566- if (redo_list.head)
567- break;
568- read_msg_fd();
569- }
570+ assert(am_receiver());
571+ flist_ndx_push(&redo_list, ndx);
572+}
573
574+/* This is only used by the generator. */
575+int get_redo_num(void)
576+{
577+ assert(am_generator());
578 return flist_ndx_pop(&redo_list);
579 }
580
581+/* This is only used by the receiver. */
582+void push_hlink_num(int ndx)
583+{
584+ assert(am_receiver());
585+ flist_ndx_push(&hlink_list, ndx);
586+}
587+
588+/* This is only used by the generator. */
589 int get_hlink_num(void)
590 {
591+ assert(am_generator());
592 return flist_ndx_pop(&hlink_list);
593 }
594
595@@ -465,11 +433,6 @@ static int read_timeout(int fd, char *bu
596 FD_ZERO(&r_fds);
597 FD_ZERO(&w_fds);
598 FD_SET(fd, &r_fds);
599- if (msg_list.head) {
600- FD_SET(msg_fd_out, &w_fds);
601- if (msg_fd_out > maxfd)
602- maxfd = msg_fd_out;
603- }
604 if (io_filesfrom_f_out >= 0) {
605 int new_fd;
606 if (io_filesfrom_buflen == 0) {
607@@ -502,9 +465,6 @@ static int read_timeout(int fd, char *bu
608 continue;
609 }
610
611- if (msg_list.head && FD_ISSET(msg_fd_out, &w_fds))
612- msg_list_flush(NORMAL_FLUSH);
613-
614 if (io_filesfrom_f_out >= 0) {
615 if (io_filesfrom_buflen) {
616 if (FD_ISSET(io_filesfrom_f_out, &w_fds)) {
617@@ -832,6 +792,8 @@ static void readfd(int fd, char *buffer,
618 }
619
620 if (fd == write_batch_monitor_in) {
621+ if (am_generator())
622+ rprintf(FINFO, "writing %d bytes to batch file from generator\n", total);
623 if ((size_t)write(batch_fd, buffer, total) != total)
624 exit_cleanup(RERR_FILEIO);
625 }
626@@ -1091,7 +1053,6 @@ static void writefd_unbuffered(int fd,ch
627 * to grab any messages they sent before they died. */
628 while (fd == sock_f_out && io_multiplexing_in) {
629 set_io_timeout(30);
630- ignore_timeout = 0;
631 readfd_unbuffered(sock_f_in, io_filesfrom_buf,
632 sizeof io_filesfrom_buf);
633 }
634@@ -1101,7 +1062,7 @@ static void writefd_unbuffered(int fd,ch
635 total += ret;
636
637 if (fd == sock_f_out) {
638- if (io_timeout || am_generator)
639+ if (io_timeout || am_generator())
640 last_io_out = time(NULL);
641 sleep_for_bwlimit(ret);
642 }
643@@ -1126,7 +1087,7 @@ static void mplex_write(enum msgcode cod
644 * cause output to occur down the socket. Setting contiguous_write_len
645 * prevents the reading of msg_fd_in once we actually start to write
646 * this sequence of data (though we might read it before the start). */
647- if (am_generator && msg_fd_in >= 0)
648+ if (am_generator() && msg_fd_in >= 0)
649 contiguous_write_len = len + 4;
650
651 if (n > sizeof buffer - 4)
652@@ -1141,33 +1102,31 @@ static void mplex_write(enum msgcode cod
653 if (len)
654 writefd_unbuffered(sock_f_out, buf, len);
655
656- if (am_generator && msg_fd_in >= 0)
657+ if (am_generator() && msg_fd_in >= 0)
658 contiguous_write_len = 0;
659 }
660
661
662 void io_flush(int flush_it_all)
663 {
664- msg_list_flush(flush_it_all);
665-
666- if (!iobuf_out_cnt || no_flush)
667+ if (no_flush)
668 return;
669
670- if (io_multiplexing_out)
671- mplex_write(MSG_DATA, iobuf_out, iobuf_out_cnt);
672- else
673- writefd_unbuffered(sock_f_out, iobuf_out, iobuf_out_cnt);
674- iobuf_out_cnt = 0;
675+ if (iobuf_out_cnt) {
676+ if (io_multiplexing_out)
677+ mplex_write(MSG_DATA, iobuf_out, iobuf_out_cnt);
678+ else
679+ writefd_unbuffered(sock_f_out, iobuf_out, iobuf_out_cnt);
680+ iobuf_out_cnt = 0;
681+ }
682+
683+ if (am_generator())
684+ msg_list_flush(flush_it_all);
685 }
686
687
688 static void writefd(int fd,char *buf,size_t len)
689 {
690- if (fd == msg_fd_out) {
691- rprintf(FERROR, "Internal error: wrong write used in receiver.\n");
692- exit_cleanup(RERR_PROTOCOL);
693- }
694-
695 if (fd == sock_f_out)
696 stats.total_written += len;
697
698@@ -1387,9 +1346,3 @@ void start_write_batch(int fd)
699 else
700 write_batch_monitor_in = fd;
701 }
702-
703-void stop_write_batch(void)
704-{
705- write_batch_monitor_out = -1;
706- write_batch_monitor_in = -1;
707-}
8a0123e5
WD
708--- orig/log.c 2005-12-16 23:48:44
709+++ log.c 2005-12-16 23:49:57
3ca259e2
WD
710@@ -35,7 +35,6 @@ extern int am_sender;
711 extern int local_server;
712 extern int quiet;
713 extern int module_id;
714-extern int msg_fd_out;
715 extern int protocol_version;
716 extern int preserve_times;
14824301 717 extern int log_format_has_i;
8a0123e5 718@@ -68,7 +67,6 @@ struct {
3ca259e2
WD
719 { RERR_IPC , "error in IPC code" },
720 { RERR_CRASHED , "sibling process crashed" },
721 { RERR_TERMINATED , "sibling process terminated abnormally" },
8a0123e5
WD
722- { RERR_SIGNAL1 , "received SIGUSR1" },
723 { RERR_SIGNAL , "received SIGINT, SIGTERM, or SIGHUP" },
3ca259e2
WD
724 { RERR_WAITCHILD , "waitpid() failed" },
725 { RERR_MALLOC , "error allocating core memory buffers" },
8a0123e5 726@@ -206,8 +204,8 @@ void rwrite(enum logcode code, char *buf
3ca259e2
WD
727
728 buf[len] = 0;
729
730- if (am_server && msg_fd_out >= 0) {
731- /* Pass the message to our sibling. */
732+ if (am_receiver()) {
733+ /* Pass the message to the generator thread. */
734 send_msg((enum msgcode)code, buf, len);
735 return;
736 }
8a0123e5
WD
737--- orig/main.c 2005-12-16 23:48:44
738+++ main.c 2005-12-16 23:50:33
3ca259e2
WD
739@@ -30,7 +30,6 @@ extern int list_only;
740 extern int am_root;
741 extern int am_server;
742 extern int am_sender;
743-extern int am_generator;
744 extern int am_daemon;
745 extern int blocking_io;
746 extern int remove_sent_files;
747@@ -75,9 +74,20 @@ struct pid_status {
748
749 static time_t starttime, endtime;
750 static int64 total_read, total_written;
751+static pthread_t receiver_tid;
752
753 static void show_malloc_stats(void);
754
755+int am_generator()
756+{
757+ return receiver_tid != 0 && pthread_self() != receiver_tid;
758+}
759+
760+int am_receiver()
761+{
762+ return receiver_tid != 0 && pthread_self() == receiver_tid;
763+}
764+
765 /* Works like waitpid(), but if we already harvested the child pid in our
766 * sigchld_handler(), we succeed instead of returning an error. */
767 pid_t wait_process(pid_t pid, int *status_ptr, int flags)
768@@ -154,7 +164,7 @@ static void handle_stats(int f)
769 show_flist_stats();
770 }
771
772- if (am_generator)
773+ if (am_generator())
774 return;
775
776 if (am_daemon) {
777@@ -558,12 +568,30 @@ static void do_server_sender(int f_in, i
778 exit_cleanup(0);
779 }
780
781+struct thread_args {
782+ struct file_list *flist;
783+ char *local_name;
784+ int f_in;
785+};
786+
787+static void *start_receiver_thread(void *arg)
788+{
789+ static int exit_code;
790+ struct thread_args *ta = (struct thread_args *)arg;
791+
792+ recv_files(ta->f_in, ta->flist, ta->local_name);
793+ handle_stats(ta->f_in);
794+
795+ push_redo_num(-2);
796+
797+ exit_code = log_got_error ? RERR_PARTIAL : 0;
798+ return &exit_code;
799+}
800
801 static int do_recv(int f_in,int f_out,struct file_list *flist,char *local_name)
802 {
803- int pid;
804- int exit_code = 0;
805- int error_pipe[2];
806+ void *value_ptr;
807+ struct thread_args args;
808
809 /* The receiving side mustn't obey this, or an existing symlink that
810 * points to an identical file won't be replaced by the referent. */
811@@ -572,70 +600,16 @@ static int do_recv(int f_in,int f_out,st
812 if (preserve_hard_links)
813 init_hard_links();
814
815- if (fd_pair(error_pipe) < 0) {
816- rsyserr(FERROR, errno, "pipe failed in do_recv");
817+ args.f_in = f_in;
818+ args.flist = flist;
819+ args.local_name = local_name;
820+ if (pthread_create(&receiver_tid, NULL, start_receiver_thread, &args) < 0) {
821+ rsyserr(FERROR, errno, "pthread_create failed in do_recv");
822 exit_cleanup(RERR_IPC);
823 }
824
825- io_flush(NORMAL_FLUSH);
826-
827- if ((pid = do_fork()) == -1) {
828- rsyserr(FERROR, errno, "fork failed in do_recv");
829- exit_cleanup(RERR_IPC);
830- }
831-
832- if (pid == 0) {
833- close(error_pipe[0]);
834- if (f_in != f_out)
835- close(f_out);
836-
837- /* we can't let two processes write to the socket at one time */
838- close_multiplexing_out();
839-
840- /* set place to send errors */
841- set_msg_fd_out(error_pipe[1]);
842-
843- recv_files(f_in, flist, local_name);
844- io_flush(FULL_FLUSH);
845- handle_stats(f_in);
846-
847- send_msg(MSG_DONE, "", 0);
848- io_flush(FULL_FLUSH);
849-
850- /* Handle any keep-alive packets from the post-processing work
851- * that the generator does. */
852- if (protocol_version >= 29) {
853- kluge_around_eof = -1;
854-
855- /* This should only get stopped via a USR2 signal. */
856- while (read_int(f_in) == flist->count
857- && read_shortint(f_in) == ITEM_IS_NEW) {}
858-
859- rprintf(FERROR, "Invalid packet at end of run [%s]\n",
860- who_am_i());
861- exit_cleanup(RERR_PROTOCOL);
862- }
863-
864- /* Finally, we go to sleep until our parent kills us with a
865- * USR2 signal. We sleep for a short time, as on some OSes
866- * a signal won't interrupt a sleep! */
867- while (1)
868- msleep(20);
869- }
870-
871- am_generator = 1;
872- close_multiplexing_in();
873- if (write_batch && !am_server)
874- stop_write_batch();
875-
876- close(error_pipe[1]);
877- if (f_in != f_out)
878- close(f_in);
879-
880 io_start_buffering_out();
881
882- set_msg_fd_in(error_pipe[0]);
883-
884 generate_files(f_out, flist, local_name);
885
886 handle_stats(-1);
887@@ -646,10 +620,13 @@ static int do_recv(int f_in,int f_out,st
888 }
889 io_flush(FULL_FLUSH);
890
891- set_msg_fd_in(-1);
892- kill(pid, SIGUSR2);
893- wait_process_with_flush(pid, &exit_code);
894- return exit_code;
895+ pthread_join(receiver_tid, &value_ptr);
896+ if (!am_server)
897+ output_summary();
898+
899+ close_all();
900+
901+ return *(int*)value_ptr;
902 }
903
904
905@@ -1011,22 +988,6 @@ static int start_client(int argc, char *
906 return ret;
907 }
908
909-
910-static RETSIGTYPE sigusr1_handler(UNUSED(int val))
911-{
8a0123e5 912- exit_cleanup(RERR_SIGNAL1);
3ca259e2
WD
913-}
914-
915-static RETSIGTYPE sigusr2_handler(UNUSED(int val))
916-{
917- if (!am_server)
918- output_summary();
919- close_all();
920- if (log_got_error)
921- _exit(RERR_PARTIAL);
922- _exit(0);
923-}
924-
925 static RETSIGTYPE sigchld_handler(UNUSED(int val))
926 {
927 #ifdef WNOHANG
928@@ -1109,8 +1070,6 @@ int main(int argc,char *argv[])
929 int orig_argc = argc;
930 char **orig_argv = argv;
931
932- signal(SIGUSR1, sigusr1_handler);
933- signal(SIGUSR2, sigusr2_handler);
934 signal(SIGCHLD, sigchld_handler);
935 #ifdef MAINTAINER_MODE
936 signal(SIGSEGV, rsync_panic_handler);
937--- orig/match.c 2005-11-10 16:58:36
938+++ match.c 2005-12-08 23:17:09
939@@ -21,7 +21,7 @@
940
941 extern int verbose;
942 extern int am_server;
943-extern int do_progress;
944+extern int recv_progress;
945 extern int checksum_seed;
946 extern int append_mode;
947
948@@ -133,7 +133,7 @@ static void matched(int f, struct sum_st
949 else
950 last_match = offset;
951
952- if (buf && do_progress)
953+ if (buf && recv_progress)
954 show_progress(last_match, buf->file_size);
955 }
956
957@@ -333,7 +333,7 @@ void match_sums(int f, struct sum_struct
958 if (append_mode) {
959 OFF_T j = 0;
960 for (j = CHUNK_SIZE; j < s->flength; j += CHUNK_SIZE) {
961- if (buf && do_progress)
962+ if (buf && recv_progress)
963 show_progress(last_match, buf->file_size);
964 sum_update(map_ptr(buf, last_match, CHUNK_SIZE),
965 CHUNK_SIZE);
966@@ -341,7 +341,7 @@ void match_sums(int f, struct sum_struct
967 }
968 if (last_match < s->flength) {
969 int32 len = s->flength - last_match;
970- if (buf && do_progress)
971+ if (buf && recv_progress)
972 show_progress(last_match, buf->file_size);
973 sum_update(map_ptr(buf, last_match, len), len);
974 last_match = s->flength;
8a0123e5 975--- orig/options.c 2005-12-16 23:48:44
3ca259e2
WD
976+++ options.c 2005-12-08 23:17:09
977@@ -69,7 +69,6 @@ int def_compress_level = Z_DEFAULT_COMPR
978 int am_root = 0;
979 int am_server = 0;
980 int am_sender = 0;
981-int am_generator = 0;
982 int am_starting_up = 1;
983 int orig_umask = 0;
984 int relative_paths = -1;
985@@ -89,6 +88,7 @@ int am_daemon = 0;
986 int daemon_over_rsh = 0;
987 int do_stats = 0;
988 int do_progress = 0;
989+int recv_progress = 0;
990 int keep_partial = 0;
991 int safe_symlinks = 0;
992 int copy_unsafe_links = 0;
8a0123e5 993@@ -1236,6 +1236,7 @@ int parse_arguments(int *argc, const cha
3ca259e2
WD
994 if ((do_progress || dry_run) && !verbose && !log_before_transfer
995 && !am_server)
996 verbose = 1;
997+ recv_progress = do_progress;
998
999 if (dry_run)
1000 do_xfers = 0;
1001--- orig/pipe.c 2005-10-24 21:04:45
1002+++ pipe.c 2005-12-08 23:17:09
1003@@ -55,7 +55,7 @@ pid_t piped_child(char **command, int *f
1004 exit_cleanup(RERR_IPC);
1005 }
1006
1007- pid = do_fork();
1008+ pid = fork();
1009 if (pid == -1) {
1010 rsyserr(FERROR, errno, "fork");
1011 exit_cleanup(RERR_IPC);
1012@@ -117,7 +117,7 @@ pid_t local_child(int argc, char **argv,
1013 exit_cleanup(RERR_IPC);
1014 }
1015
1016- pid = do_fork();
1017+ pid = fork();
1018 if (pid == -1) {
1019 rsyserr(FERROR, errno, "fork");
1020 exit_cleanup(RERR_IPC);
1021--- orig/receiver.c 2005-11-10 16:58:36
1022+++ receiver.c 2005-12-08 23:17:10
1023@@ -24,7 +24,7 @@ extern int verbose;
1024 extern int do_xfers;
1025 extern int am_daemon;
1026 extern int am_server;
1027-extern int do_progress;
1028+extern int recv_progress;
1029 extern int log_before_transfer;
1030 extern int log_format_has_i;
1031 extern int daemon_log_format_has_i;
1032@@ -220,7 +220,7 @@ static int receive_data(int f_in, char *
1033 if (sum.remainder)
1034 sum.flength -= sum.blength - sum.remainder;
1035 for (j = CHUNK_SIZE; j < sum.flength; j += CHUNK_SIZE) {
1036- if (do_progress)
1037+ if (recv_progress)
1038 show_progress(offset, total_size);
1039 sum_update(map_ptr(mapbuf, offset, CHUNK_SIZE),
1040 CHUNK_SIZE);
1041@@ -228,7 +228,7 @@ static int receive_data(int f_in, char *
1042 }
1043 if (offset < sum.flength) {
1044 int32 len = sum.flength - offset;
1045- if (do_progress)
1046+ if (recv_progress)
1047 show_progress(offset, total_size);
1048 sum_update(map_ptr(mapbuf, offset, len), len);
1049 offset = sum.flength;
1050@@ -241,7 +241,7 @@ static int receive_data(int f_in, char *
1051 }
1052
1053 while ((i = recv_token(f_in, &data)) != 0) {
1054- if (do_progress)
1055+ if (recv_progress)
1056 show_progress(offset, total_size);
1057
1058 if (i > 0) {
1059@@ -309,7 +309,7 @@ static int receive_data(int f_in, char *
1060 ftruncate(fd, offset);
1061 #endif
1062
1063- if (do_progress)
1064+ if (recv_progress)
1065 end_progress(total_size);
1066
1067 if (fd != -1 && offset > 0 && sparse_end(fd) != 0) {
1068@@ -362,14 +362,13 @@ static void handle_delayed_updates(struc
1069 full_fname(fname),
1070 safe_fname(partialptr));
1071 } else {
1072- if (remove_sent_files
1073- || (preserve_hard_links
1074- && file->link_u.links)) {
1075+ if (remove_sent_files) {
1076 SIVAL(numbuf, 0, i);
1077 send_msg(MSG_SUCCESS,numbuf,4);
1078 }
1079- handle_partial_dir(partialptr,
1080- PDIR_DELETE);
1081+ if (preserve_hard_links && file->link_u.links)
1082+ push_hlink_num(i);
1083+ handle_partial_dir(partialptr, PDIR_DELETE);
1084 }
1085 }
1086 }
1087@@ -419,11 +418,6 @@ int recv_files(int f_in, struct file_lis
1088 if (verbose > 2)
1089 rprintf(FINFO,"recv_files(%d) starting\n",flist->count);
1090
1091- if (flist->hlink_pool) {
1092- pool_destroy(flist->hlink_pool);
1093- flist->hlink_pool = NULL;
1094- }
1095-
1096 if (delay_updates)
1097 init_delayed_bits(flist->count);
1098
1099@@ -444,7 +438,7 @@ int recv_files(int f_in, struct file_lis
1100 rprintf(FINFO, "recv_files phase=%d\n", phase);
1101 if (phase == 2 && delay_updates)
1102 handle_delayed_updates(flist, local_name);
1103- send_msg(MSG_DONE, "", 0);
1104+ push_redo_num(-2);
1105 if (keep_partial && !partial_dir)
1106 make_backups = 0; /* prevents double backup */
1107 if (append_mode) {
1108@@ -665,7 +659,7 @@ int recv_files(int f_in, struct file_lis
1109 /* log the transfer */
1110 if (log_before_transfer)
1111 log_item(file, &initial_stats, iflags, NULL);
1112- else if (!am_server && verbose && do_progress)
1113+ else if (!am_server && verbose && recv_progress)
1114 rprintf(FINFO, "%s\n", safe_fname(fname));
1115
1116 /* recv file data */
1117@@ -705,11 +699,12 @@ int recv_files(int f_in, struct file_lis
1118 cleanup_disable();
1119
1120 if (recv_ok > 0) {
1121- if (remove_sent_files
1122- || (preserve_hard_links && file->link_u.links)) {
1123+ if (remove_sent_files) {
1124 SIVAL(numbuf, 0, i);
1125 send_msg(MSG_SUCCESS, numbuf, 4);
1126 }
1127+ if (preserve_hard_links && file->link_u.links)
1128+ push_hlink_num(i);
1129 } else if (!recv_ok) {
1130 int msgtype = phase || read_batch ? FERROR : FINFO;
1131 if (msgtype == FERROR || verbose) {
1132@@ -732,10 +727,8 @@ int recv_files(int f_in, struct file_lis
1133 errstr, safe_fname(fname),
1134 keptstr, redostr);
1135 }
1136- if (!phase) {
1137- SIVAL(numbuf, 0, i);
1138- send_msg(MSG_REDO, numbuf, 4);
1139- }
1140+ if (!phase)
1141+ push_redo_num(i);
1142 }
1143 }
1144 make_backups = save_make_backups;
1145--- orig/rsync.c 2005-07-27 23:31:12
1146+++ rsync.c 2005-12-08 23:17:10
1147@@ -30,7 +30,6 @@ extern int omit_dir_times;
1148 extern int am_root;
1149 extern int am_server;
1150 extern int am_sender;
1151-extern int am_generator;
1152 extern int am_starting_up;
1153 extern int preserve_uid;
1154 extern int preserve_gid;
1155@@ -210,5 +209,5 @@ const char *who_am_i(void)
1156 {
1157 if (am_starting_up)
1158 return am_server ? "server" : "client";
1159- return am_sender ? "sender" : am_generator ? "generator" : "receiver";
1160+ return am_sender ? "sender" : am_generator() ? "generator" : "receiver";
1161 }
14824301 1162--- orig/rsync.h 2005-12-15 23:00:49
3ca259e2 1163+++ rsync.h 2005-12-10 19:02:58
14824301 1164@@ -165,10 +165,8 @@ enum msgcode {
3ca259e2
WD
1165 MSG_DATA=0, /* raw data on the multiplexed stream */
1166 MSG_ERROR=FERROR, MSG_INFO=FINFO, /* remote logging */
1167 MSG_LOG=FLOG, MSG_FCLIENT=FCLIENT, /* sibling logging */
1168- MSG_REDO=9, /* reprocess indicated flist index */
1169 MSG_SUCCESS=100,/* successfully updated indicated flist index */
1170 MSG_DELETED=101,/* successfully deleted a file on receiving side */
1171- MSG_DONE=86 /* current phase is done */
1172 };
1173
1174 #include "errcode.h"
14824301 1175@@ -319,6 +317,7 @@ enum msgcode {
3ca259e2
WD
1176 #endif
1177
1178 #include <assert.h>
1179+#include <pthread.h>
1180
1181 #include "lib/pool_alloc.h"
1182
1183--- orig/util.c 2005-11-12 20:13:05
1184+++ util.c 2005-12-08 23:17:10
1185@@ -405,51 +405,6 @@ int robust_rename(char *from, char *to,
1186 return -1;
1187 }
1188
1189-
1190-static pid_t all_pids[10];
1191-static int num_pids;
1192-
1193-/** Fork and record the pid of the child. **/
1194-pid_t do_fork(void)
1195-{
1196- pid_t newpid = fork();
1197-
1198- if (newpid != 0 && newpid != -1) {
1199- all_pids[num_pids++] = newpid;
1200- }
1201- return newpid;
1202-}
1203-
1204-/**
1205- * Kill all children.
1206- *
1207- * @todo It would be kind of nice to make sure that they are actually
1208- * all our children before we kill them, because their pids may have
1209- * been recycled by some other process. Perhaps when we wait for a
1210- * child, we should remove it from this array. Alternatively we could
1211- * perhaps use process groups, but I think that would not work on
1212- * ancient Unix versions that don't support them.
1213- **/
1214-void kill_all(int sig)
1215-{
1216- int i;
1217-
1218- for (i = 0; i < num_pids; i++) {
1219- /* Let's just be a little careful where we
1220- * point that gun, hey? See kill(2) for the
1221- * magic caused by negative values. */
1222- pid_t p = all_pids[i];
1223-
1224- if (p == getpid())
1225- continue;
1226- if (p <= 0)
1227- continue;
1228-
1229- kill(p, sig);
1230- }
1231-}
1232-
1233-
1234 /** Turn a user name into a uid */
1235 int name_to_uid(char *name, uid_t *uid)
1236 {