From 0bd6d59f293454bc3ddc4ab70b5d67c70e95d7b0 Mon Sep 17 00:00:00 2001 From: Wayne Davison Date: Sat, 17 Jul 2004 21:31:34 +0000 Subject: [PATCH] Updated to handle --read-batch mode. This actually fixes a potential race condition where the receiver can get ahead of the generator and try to create files before the directories have been created (for instance). --- g2r-basis-filename.diff | 337 +++++++++++++++++++++++++++++----------- 1 file changed, 245 insertions(+), 92 deletions(-) diff --git a/g2r-basis-filename.diff b/g2r-basis-filename.diff index 43e949d..1e6d779 100644 --- a/g2r-basis-filename.diff +++ b/g2r-basis-filename.diff @@ -5,32 +5,54 @@ the receiver and makes future options that do more basis-file searching more efficient (such as the --fuzzy option and the support for multiple --compare-dest options). +Also fixes a potential synchronization problem between the generator +and the receiver in read-batch mode. Should consider making the +sending of the index value the default for this pipe (it's currently +only sent in batch mode due to the sender not listening to the +generator to determine what files get updated). + You must run "make proto" before compiling. ---- orig/generator.c 2004-07-17 15:20:05 -+++ generator.c 2004-07-17 10:23:13 -@@ -251,7 +251,7 @@ static void generate_and_send_sums(struc +--- orig/generator.c 2004-07-17 16:30:20 ++++ generator.c 2004-07-17 15:50:09 +@@ -251,11 +251,11 @@ static void generate_and_send_sums(struc * out. It might be wrong. */ static void recv_generator(char *fname, struct file_struct *file, int i, - int f_out) + int f_out, int f_nameout) { - int fd; +- int fd; ++ int fd = -1; STRUCT_STAT st; -@@ -418,8 +418,10 @@ static void recv_generator(char *fname, +- struct map_struct *mapbuf; ++ struct map_struct *mapbuf = NULL; + int statret; + char *fnamecmp; + char fnamecmpbuf[MAXPATHLEN]; +@@ -398,9 +398,6 @@ static void recv_generator(char *fname, + } + #endif + +- if (read_batch) +- return; +- + if (preserve_hard_links && hard_link_check(file, HL_CHECK_MASTER)) + return; + +@@ -418,8 +415,10 @@ static void recv_generator(char *fname, statret = link_stat(fnamecmpbuf, &st, 0); if (!S_ISREG(st.st_mode)) statret = -1; - if (statret == -1) -+ if (statret < 0) { ++ if (statret == -1) { errno = saveerrno; + *fnamecmpbuf = '\0'; + } #if HAVE_LINK else if (link_dest && !dry_run) { if (do_link(fnamecmpbuf, fname) != 0) { -@@ -427,18 +429,22 @@ static void recv_generator(char *fname, +@@ -427,22 +426,22 @@ static void recv_generator(char *fname, rsyserr(FINFO, errno, "link %s => %s", fnamecmpbuf, fname); } @@ -50,22 +72,35 @@ You must run "make proto" before compiling. if (statret == -1) { if (preserve_hard_links && hard_link_check(file, HL_SKIP)) return; - if (errno == ENOENT) { -+ if (f_nameout >= 0) -+ write_byte(f_nameout, 0); - write_int(f_out,i); - if (!dry_run) - write_sum_head(f_out, NULL); -@@ -458,19 +464,21 @@ static void recv_generator(char *fname, +- if (errno == ENOENT) { +- write_int(f_out,i); +- if (!dry_run) +- write_sum_head(f_out, NULL); +- } else if (verbose > 1) { ++ if (errno == ENOENT) ++ goto notify_sender; ++ if (verbose > 1) { + rsyserr(FERROR, errno, + "recv_generator: failed to open %s", + full_fname(fname)); +@@ -451,26 +450,23 @@ static void recv_generator(char *fname, + } + + if (!S_ISREG(st.st_mode)) { +- if (delete_file(fname) != 0) { ++ if (delete_file(fname) != 0) + return; +- } + /* now pretend the file didn't exist */ if (preserve_hard_links && hard_link_check(file, HL_SKIP)) return; -+ if (f_nameout >= 0) -+ write_byte(f_nameout, 0); - write_int(f_out,i); - if (!dry_run) - write_sum_head(f_out, NULL); - return; +- write_int(f_out,i); +- if (!dry_run) +- write_sum_head(f_out, NULL); +- return; ++ statret = -1; ++ goto notify_sender; } - if (opt_ignore_existing && fnamecmp == fname) { @@ -80,7 +115,7 @@ You must run "make proto" before compiling. && cmp_modtime(st.st_mtime, file->modtime) > 0) { if (verbose > 1) rprintf(FINFO,"%s is newer\n",fname); -@@ -478,17 +486,21 @@ static void recv_generator(char *fname, +@@ -478,21 +474,17 @@ static void recv_generator(char *fname, } if (skip_file(fname, file, &st)) { @@ -90,52 +125,83 @@ You must run "make proto" before compiling. return; } - if (dry_run) { -+ if (f_nameout >= 0) -+ write_byte(f_nameout, 0); - write_int(f_out,i); - return; +- if (dry_run) { +- write_int(f_out,i); +- return; +- } +- +- if (whole_file > 0) { +- write_int(f_out,i); +- write_sum_head(f_out, NULL); +- return; ++ if (dry_run || whole_file) { ++ statret = -1; ++ goto notify_sender; } ++ if (read_batch) ++ goto notify_sender; - if (whole_file > 0) { -+ if (f_nameout >= 0) -+ write_byte(f_nameout, 0); - write_int(f_out,i); - write_sum_head(f_out, NULL); - return; -@@ -503,6 +515,8 @@ static void recv_generator(char *fname, + /* open the file */ + fd = do_open(fnamecmp, O_RDONLY, 0); +@@ -503,15 +495,12 @@ static void recv_generator(char *fname, /* pretend the file didn't exist */ if (preserve_hard_links && hard_link_check(file, HL_SKIP)) return; -+ if (f_nameout >= 0) -+ write_byte(f_nameout, 0); - write_int(f_out,i); - write_sum_head(f_out, NULL); - return; -@@ -521,6 +535,22 @@ static void recv_generator(char *fname, +- write_int(f_out,i); +- write_sum_head(f_out, NULL); +- return; ++ statret = -1; ++ goto notify_sender; + } + + if (st.st_size > 0) + mapbuf = map_file(fd,st.st_size); +- else +- mapbuf = NULL; + + if (verbose > 3) { + rprintf(FINFO,"gen mapped %s of size %.0f\n", fnamecmp, +@@ -521,16 +510,43 @@ static void recv_generator(char *fname, if (verbose > 2) rprintf(FINFO, "generating and sending sums for %d\n", i); +- write_int(f_out,i); +- generate_and_send_sums(mapbuf, st.st_size, f_out); ++notify_sender: + if (f_nameout >= 0) { + uchar lenbuf[3], *lb = lenbuf; -+ int len = strlen(fnamecmpbuf); -+ if (len > 127) { -+#if MAXPATHLEN > 32767 -+ *lb++ = len / 0x10000 + 0x80; -+ *lb++ = len / 0x100; ++ int len = statret == -1 ? 0 : strlen(fnamecmpbuf); ++ if (read_batch) ++ write_int(f_nameout, i); ++ if (len > 0x7F) { ++#if MAXPATHLEN > 0x7FFF ++ *lb++ = len / 0x10000 + 0x80; ++ *lb++ = len / 0x100; +#else -+ *lb++ = len / 0x100 + 0x80; ++ *lb++ = len / 0x100 + 0x80; +#endif + } + *lb = len; + write_buf(f_nameout, lenbuf, lb - lenbuf + 1); -+ write_buf(f_nameout, fnamecmpbuf, len); ++ if (len) ++ write_buf(f_nameout, fnamecmpbuf, len); + } -+ - write_int(f_out,i); - generate_and_send_sums(mapbuf, st.st_size, f_out); -@@ -530,7 +560,8 @@ static void recv_generator(char *fname, +- close(fd); +- if (mapbuf) +- unmap_file(mapbuf); ++ if (read_batch) ++ return; ++ ++ write_int(f_out, i); ++ if (statret == 0) { ++ generate_and_send_sums(mapbuf, st.st_size, f_out); ++ ++ close(fd); ++ if (mapbuf) ++ unmap_file(mapbuf); ++ } else if (!dry_run) ++ write_sum_head(f_out, NULL); } @@ -145,7 +211,7 @@ You must run "make proto" before compiling. { int i; int phase = 0; -@@ -571,7 +602,7 @@ void generate_files(int f_out, struct fi +@@ -571,7 +587,7 @@ void generate_files(int f_out, struct fi } recv_generator(local_name ? local_name : f_name_to(file, fbuf), @@ -154,7 +220,15 @@ You must run "make proto" before compiling. } phase++; -@@ -588,7 +619,7 @@ void generate_files(int f_out, struct fi +@@ -582,13 +598,15 @@ void generate_files(int f_out, struct fi + rprintf(FINFO,"generate_files phase=%d\n",phase); + + write_int(f_out, -1); ++ if (read_batch) ++ write_int(f_nameout, flist->count); + + /* files can cycle through the system more than once + * to catch initial checksum errors */ while ((i = get_redo_num()) != -1) { struct file_struct *file = flist->files[i]; recv_generator(local_name ? local_name : f_name_to(file, fbuf), @@ -163,7 +237,16 @@ You must run "make proto" before compiling. } phase++; -@@ -607,7 +638,7 @@ void generate_files(int f_out, struct fi +@@ -596,6 +614,8 @@ void generate_files(int f_out, struct fi + rprintf(FINFO,"generate_files phase=%d\n",phase); + + write_int(f_out, -1); ++ if (read_batch) ++ write_int(f_nameout, flist->count); + + if (preserve_hard_links) + do_hard_links(); +@@ -607,7 +627,7 @@ void generate_files(int f_out, struct fi if (!file->basename || !S_ISDIR(file->mode)) continue; recv_generator(local_name ? local_name : f_name(file), @@ -173,39 +256,50 @@ You must run "make proto" before compiling. if (verbose > 2) --- orig/main.c 2004-07-17 15:20:05 -+++ main.c 2004-07-17 15:22:08 -@@ -444,7 +444,7 @@ static int do_recv(int f_in,int f_out,st ++++ main.c 2004-07-17 15:58:11 +@@ -57,6 +57,7 @@ extern int filesfrom_fd; + extern pid_t cleanup_child_pid; + extern char *files_from; + extern char *remote_filesfrom_file; ++extern char *compare_dest; + extern char *rsync_path; + extern char *shell_cmd; + extern char *batch_name; +@@ -444,7 +445,8 @@ static int do_recv(int f_in,int f_out,st { int pid; int status = 0; - int error_pipe[2]; + int error_pipe[2], name_pipe[2]; ++ int need_name_pipe = compare_dest || read_batch; if (preserve_hard_links) init_hard_links(flist); -@@ -456,8 +456,8 @@ static int do_recv(int f_in,int f_out,st +@@ -456,8 +458,9 @@ static int do_recv(int f_in,int f_out,st } } - if (fd_pair(error_pipe) < 0) { - rprintf(FERROR,"error pipe failed in do_recv\n"); -+ if (fd_pair(error_pipe) < 0 || fd_pair(name_pipe) < 0) { ++ if (fd_pair(error_pipe) < 0 ++ || (need_name_pipe && fd_pair(name_pipe) < 0)) { + rprintf(FERROR, "fd_pair() failed in do_recv\n"); exit_cleanup(RERR_SOCKETIO); } -@@ -465,8 +465,10 @@ static int do_recv(int f_in,int f_out,st +@@ -465,6 +468,11 @@ static int do_recv(int f_in,int f_out,st if ((pid = do_fork()) == 0) { close(error_pipe[0]); -+ close(name_pipe[1]); ++ if (need_name_pipe) { ++ close(name_pipe[1]); ++ set_blocking(name_pipe[0]); ++ } else ++ name_pipe[0] = -1; if (f_in != f_out) close(f_out); -+ set_blocking(name_pipe[0]); - /* we can't let two processes write to the socket at one time */ - io_multiplexing_close(); -@@ -474,7 +476,7 @@ static int do_recv(int f_in,int f_out,st +@@ -474,7 +482,7 @@ static int do_recv(int f_in,int f_out,st /* set place to send errors */ set_msg_fd_out(error_pipe[1]); @@ -214,16 +308,19 @@ You must run "make proto" before compiling. io_flush(FULL_FLUSH); report(f_in); -@@ -492,14 +494,16 @@ static int do_recv(int f_in,int f_out,st +@@ -492,6 +500,11 @@ static int do_recv(int f_in,int f_out,st stop_write_batch(); close(error_pipe[1]); -+ close(name_pipe[0]); ++ if (need_name_pipe) { ++ close(name_pipe[0]); ++ set_nonblocking(name_pipe[1]); ++ } else ++ name_pipe[1] = -1; if (f_in != f_out) close(f_in); -+ set_nonblocking(name_pipe[1]); - io_start_buffering_out(); +@@ -499,7 +512,7 @@ static int do_recv(int f_in,int f_out,st set_msg_fd_in(error_pipe[0]); @@ -233,46 +330,102 @@ You must run "make proto" before compiling. get_redo_num(); /* Read final MSG_DONE and any prior messages. */ report(-1); --- orig/receiver.c 2004-07-16 20:07:22 -+++ receiver.c 2004-07-17 11:05:36 -@@ -304,7 +304,8 @@ static int receive_data(int f_in,struct ++++ receiver.c 2004-07-17 21:27:55 +@@ -28,6 +28,7 @@ extern int max_delete; + extern int csum_length; + extern struct stats stats; + extern int dry_run; ++extern int read_batch; + extern int am_server; + extern int relative_paths; + extern int keep_dirlinks; +@@ -299,13 +300,38 @@ static int receive_data(int f_in,struct + return 1; + } + ++static char *read_gen_name(int fd, char *buf, char *realname) ++{ ++ int len = read_byte(fd); ++ if (len & 0x80) { ++#if MAXPATHLEN > 32767 ++ uchar lenbuf[2]; ++ read_buf(fd, (char *)lenbuf, 2); ++ len = (len & ~0x80) * 0x10000 + lenbuf[0] * 0x100 + lenbuf[1]; ++#else ++ len = (len & ~0x80) * 0x100 + read_byte(fd); ++#endif ++ } ++ if (len) { ++ if (len >= MAXPATHLEN) { ++ rprintf(FERROR, "bogus data on generator name pipe\n"); ++ exit_cleanup(RERR_PROTOCOL); ++ } ++ read_sbuf(fd, buf, len); ++ return buf; ++ } ++ return realname; ++} ++ + + /** * main routine for receiver process. * * Receiver process runs on the same host as the generator process. */ -int recv_files(int f_in, struct file_list *flist, char *local_name) +int recv_files(int f_in, struct file_list *flist, char *local_name, -+ int f_name) ++ int f_name_in) { ++ int next_gen_i = -1; int fd1,fd2; STRUCT_STAT st; -@@ -317,7 +318,7 @@ int recv_files(int f_in, struct file_lis - struct file_struct *file; - struct stats initial_stats; - int save_make_backups = make_backups; -- int i, recv_ok, phase = 0; -+ int i, len, recv_ok, phase = 0; + char *fname, fbuf[MAXPATHLEN]; +@@ -332,8 +358,20 @@ int recv_files(int f_in, struct file_lis - if (verbose > 2) - rprintf(FINFO,"recv_files(%d) starting\n",flist->count); -@@ -373,19 +374,25 @@ int recv_files(int f_in, struct file_lis + i = read_int(f_in); + if (i == -1) { ++ if (read_batch) { ++ if (next_gen_i < 0) ++ next_gen_i = read_int(f_name_in); ++ while (next_gen_i < flist->count) { ++ read_gen_name(f_name_in, fnamecmpbuf, ++ NULL); ++ next_gen_i = read_int(f_name_in); ++ } ++ next_gen_i = -1; ++ } ++ + if (phase) + break; ++ + phase = 1; + csum_length = SUM_LENGTH; + if (verbose > 2) +@@ -373,19 +411,31 @@ int recv_files(int f_in, struct file_lis if (verbose > 2) rprintf(FINFO,"recv_files(%s)\n",fname); - fnamecmp = fname; -+ len = read_byte(f_name); -+ if (len & 0x80) { -+#if MAXPATHLEN > 32767 -+ read_buf(f_name, fnamecmpbuf, 2); -+ len = (len & ~0x80) * 0x10000 -+ + fnamecmpbuf[0] * 0x100 + fnamecmpbuf[1]; -+#else -+ len = (len & ~0x80) * 0x100 + read_byte(f_name); -+#endif ++ if (read_batch) { ++ if (next_gen_i < 0) ++ next_gen_i = read_int(f_name_in); ++ while (i > next_gen_i) { ++ read_gen_name(f_name_in, fnamecmpbuf, NULL); ++ next_gen_i = read_int(f_name_in); ++ } ++ if (i < next_gen_i) { ++ rprintf(FINFO, "skipping update for %s\n", ++ fname); ++ receive_data(f_in,NULL,-1,NULL,file->length); ++ continue; ++ } ++ next_gen_i = -1; + } -+ if (len) { -+ read_sbuf(f_name, fnamecmpbuf, len); -+ fnamecmp = fnamecmpbuf; -+ } else ++ ++ if (f_name_in >= 0) ++ fnamecmp = read_gen_name(f_name_in, fnamecmpbuf, fname); ++ else + fnamecmp = fname; ++ /* open the file */ fd1 = do_open(fnamecmp, O_RDONLY, 0); -- 2.34.1