- Handle the new incremental-recursion mode.
authorWayne Davison <wayned@samba.org>
Thu, 28 Dec 2006 07:54:27 +0000 (07:54 +0000)
committerWayne Davison <wayned@samba.org>
Thu, 28 Dec 2006 07:54:27 +0000 (07:54 +0000)
- Changed some function names to make them more consistent.

io.c

diff --git a/io.c b/io.c
index d5d117f..3ad9147 100644 (file)
--- a/io.c
+++ b/io.c
@@ -41,7 +41,10 @@ extern int am_server;
 extern int am_daemon;
 extern int am_sender;
 extern int am_generator;
+extern int incremental;
+extern int io_error;
 extern int eol_nulls;
+extern int flist_eof;
 extern int read_batch;
 extern int csum_length;
 extern int checksum_seed;
@@ -50,12 +53,12 @@ extern int remove_source_files;
 extern int preserve_hard_links;
 extern char *filesfrom_host;
 extern struct stats stats;
-extern struct file_list *the_file_list;
+extern struct file_list *cur_flist, *first_flist;
 
 const char phase_unknown[] = "unknown";
 int ignore_timeout = 0;
 int batch_fd = -1;
-int batch_gen_fd = -1;
+int done_cnt = 0;
 
 /* Ignore an EOF error if non-zero. See whine_about_eof(). */
 int kluge_around_eof = 0;
@@ -65,6 +68,18 @@ int msg_fd_out = -1;
 int sock_f_in = -1;
 int sock_f_out = -1;
 
+static int iobuf_f_in = -1;
+static char *iobuf_in;
+static size_t iobuf_in_siz;
+static size_t iobuf_in_ndx;
+static size_t iobuf_in_remaining;
+
+static int iobuf_f_out = -1;
+static char *iobuf_out;
+static int iobuf_out_cnt;
+
+int flist_forward_from = -1;
+
 static int io_multiplexing_out;
 static int io_multiplexing_in;
 static time_t last_io_in;
@@ -92,7 +107,11 @@ static char int_byte_cnt[64] = {
        5, 5, 5, 5, 5, 5, 5, 5, 6, 6, 6, 6, 7, 7, 8, 9, /* (C0 - FF)/4 */
 };
 
-static void read_loop(int fd, char *buf, size_t len);
+static int readfd_unbuffered(int fd, char *buf, size_t len);
+static void writefd(int fd, const char *buf, size_t len);
+static void writefd_unbuffered(int fd, const char *buf, size_t len);
+static void decrement_active_files(int ndx);
+static void decrement_flist_in_progress(int ndx, int redo);
 
 struct flist_ndx_item {
        struct flist_ndx_item *next;
@@ -237,6 +256,7 @@ static void read_msg_fd(void)
 {
        char buf[2048];
        size_t n;
+       struct file_list *flist;
        int fd = msg_fd_in;
        int tag, len;
 
@@ -244,7 +264,7 @@ static void read_msg_fd(void)
         * to this routine from writefd_unbuffered(). */
        msg_fd_in = -1;
 
-       read_loop(fd, buf, 4);
+       readfd_unbuffered(fd, buf, 4);
        tag = IVAL(buf, 0);
 
        len = tag & 0xFFFFFF;
@@ -253,50 +273,71 @@ static void read_msg_fd(void)
        switch (tag) {
        case MSG_DONE:
                if (len != 0 || !am_generator) {
-                       rprintf(FERROR, "invalid message %d:%d\n", tag, len);
+                 invalid_msg:
+                       rprintf(FERROR, "invalid message %d:%d [%s%s]\n",
+                               tag, len, who_am_i(),
+                               incremental ? "/incremental" : "");
                        exit_cleanup(RERR_STREAMIO);
                }
-               flist_ndx_push(&redo_list, -1);
+               done_cnt++;
                break;
        case MSG_REDO:
-               if (len != 4 || !am_generator) {
-                       rprintf(FERROR, "invalid message %d:%d\n", tag, len);
-                       exit_cleanup(RERR_STREAMIO);
-               }
-               read_loop(fd, buf, 4);
+               if (len != 4 || !am_generator)
+                       goto invalid_msg;
+               readfd_unbuffered(fd, buf, 4);
                if (remove_source_files)
                        decrement_active_files(IVAL(buf,0));
                flist_ndx_push(&redo_list, IVAL(buf,0));
+               if (incremental)
+                       decrement_flist_in_progress(IVAL(buf,0), 1);
+               break;
+       case MSG_FLIST:
+               if (len != 4 || !am_generator || !incremental)
+                       goto invalid_msg;
+               readfd_unbuffered(fd, buf, 4);
+               /* Read extra file list from receiver. */
+               assert(iobuf_in != NULL);
+               assert(iobuf_f_in == fd);
+               flist = recv_file_list(fd);
+               flist->parent_ndx = IVAL(buf,0);
+               break;
+       case MSG_FLIST_EOF:
+               if (len != 0 || !am_generator || !incremental)
+                       goto invalid_msg;
+               flist_eof = 1;
                break;
        case MSG_DELETED:
-               if (len >= (int)sizeof buf || !am_generator) {
-                       rprintf(FERROR, "invalid message %d:%d\n", tag, len);
-                       exit_cleanup(RERR_STREAMIO);
-               }
-               read_loop(fd, buf, len);
+               if (len >= (int)sizeof buf || !am_generator)
+                       goto invalid_msg;
+               readfd_unbuffered(fd, buf, len);
                send_msg(MSG_DELETED, buf, len);
                break;
        case MSG_SUCCESS:
-               if (len != 4 || !am_generator) {
-                       rprintf(FERROR, "invalid message %d:%d\n", tag, len);
-                       exit_cleanup(RERR_STREAMIO);
-               }
-               read_loop(fd, buf, len);
+               if (len != 4 || !am_generator)
+                       goto invalid_msg;
+               readfd_unbuffered(fd, buf, len);
                if (remove_source_files) {
                        decrement_active_files(IVAL(buf,0));
                        send_msg(MSG_SUCCESS, buf, len);
                }
                if (preserve_hard_links)
                        flist_ndx_push(&hlink_list, IVAL(buf,0));
+               if (incremental)
+                       decrement_flist_in_progress(IVAL(buf,0), 0);
+               break;
+       case MSG_NO_SEND:
+               if (len != 4 || !am_generator)
+                       goto invalid_msg;
+               readfd_unbuffered(fd, buf, len);
+               if (incremental)
+                       decrement_flist_in_progress(IVAL(buf,0), 0);
                break;
        case MSG_SOCKERR:
        case MSG_CLIENT:
-               if (!am_generator) {
-                       rprintf(FERROR, "invalid message %d:%d\n", tag, len);
-                       exit_cleanup(RERR_STREAMIO);
-               }
+               if (!am_generator)
+                       goto invalid_msg;
                if (tag == MSG_SOCKERR)
-                       close_multiplexing_out();
+                       io_end_multiplex_out();
                /* FALL THROUGH */
        case MSG_INFO:
        case MSG_ERROR:
@@ -305,7 +346,7 @@ static void read_msg_fd(void)
                        n = len;
                        if (n >= sizeof buf)
                                n = sizeof buf - 1;
-                       read_loop(fd, buf, n);
+                       readfd_unbuffered(fd, buf, n);
                        rwrite((enum logcode)tag, buf, n);
                        len -= n;
                }
@@ -334,51 +375,65 @@ void increment_active_files(int ndx, int itemizing, enum logcode code)
        }
 
        active_filecnt++;
-       active_bytecnt += F_LENGTH(the_file_list->files[ndx]);
+       active_bytecnt += F_LENGTH(cur_flist->files[ndx]);
 }
 
-void decrement_active_files(int ndx)
+static void decrement_active_files(int ndx)
 {
+       struct file_list *flist = flist_for_ndx(ndx);
+       assert(flist != NULL);
        active_filecnt--;
-       active_bytecnt -= F_LENGTH(the_file_list->files[ndx]);
+       active_bytecnt -= F_LENGTH(flist->files[ndx - flist->ndx_start]);
+}
+
+static void decrement_flist_in_progress(int ndx, int redo)
+{
+       struct file_list *flist = cur_flist ? cur_flist : first_flist;
+
+       while (ndx < flist->ndx_start) {
+               if (flist == first_flist) {
+                 invalid_ndx:
+                       rprintf(FERROR,
+                               "Invalid file index: %d (%d - %d) [%s]\n",
+                               ndx, first_flist->ndx_start,
+                               first_flist->prev->ndx_start + first_flist->prev->count - 1,
+                               who_am_i());
+                       exit_cleanup(RERR_PROTOCOL);
+               }
+               flist = flist->prev;
+       }
+       while (ndx >= flist->ndx_start + flist->count) {
+               if (!(flist = flist->next))
+                       goto invalid_ndx;
+       }
+
+       flist->in_progress--;
+       if (redo)
+               flist->to_redo++;
 }
 
 /* Try to push messages off the list onto the wire.  If we leave with more
  * to do, return 0.  On error, return -1.  If everything flushed, return 1.
  * This is only active in the receiver. */
-static int msg2genr_flush(int flush_it_all)
+static int msg2genr_flush(void)
 {
-       static int written = 0;
-       struct timeval tv;
-       fd_set fds;
-
-       if (msg_fd_out < 0)
+       if (msg_fd_out < 0 || no_flush)
                return -1;
 
+       no_flush++;
        while (msg2genr.head) {
                struct msg_list_item *m = msg2genr.head;
-               int n = write(msg_fd_out, m->buf + written, m->len - written);
-               if (n < 0) {
-                       if (errno == EINTR)
-                               continue;
-                       if (errno != EWOULDBLOCK && errno != EAGAIN)
-                               return -1;
-                       if (!flush_it_all)
-                               return 0;
-                       FD_ZERO(&fds);
-                       FD_SET(msg_fd_out, &fds);
-                       tv.tv_sec = select_timeout;
-                       tv.tv_usec = 0;
-                       if (!select(msg_fd_out+1, NULL, &fds, NULL, &tv))
-                               check_timeout();
-               } else if ((written += n) == m->len) {
-                       msg2genr.head = m->next;
-                       if (!msg2genr.head)
-                               msg2genr.tail = NULL;
-                       free(m);
-                       written = 0;
-               }
+               writefd(msg_fd_out, m->buf, m->len);
+               msg2genr.head = m->next;
+               if (!msg2genr.head)
+                       msg2genr.tail = NULL;
+               free(m);
        }
+       if (iobuf_out_cnt) {
+               writefd_unbuffered(iobuf_f_out, iobuf_out, iobuf_out_cnt);
+               iobuf_out_cnt = 0;
+       }
+       no_flush--;
        return 1;
 }
 
@@ -393,7 +448,7 @@ int send_msg(enum msgcode code, const char *buf, int len)
                return 1;
        }
        msg_list_add(&msg2genr, code, buf, len);
-       msg2genr_flush(NORMAL_FLUSH);
+       msg2genr_flush();
        return 1;
 }
 
@@ -404,18 +459,13 @@ void send_msg_int(enum msgcode code, int num)
        send_msg(code, numbuf, 4);
 }
 
-int get_redo_num(int itemizing, enum logcode code)
+void wait_for_receiver(void)
 {
-       while (1) {
-#ifdef SUPPORT_HARD_LINKS
-               if (hlink_list.head)
-                       check_for_finished_hlinks(itemizing, code);
-#endif
-               if (redo_list.head)
-                       break;
-               read_msg_fd();
-       }
+       read_msg_fd();
+}
 
+int get_redo_num(void)
+{
        return flist_ndx_pop(&redo_list);
 }
 
@@ -538,7 +588,7 @@ static int read_timeout(int fd, char *buf, size_t len)
                }
 
                if (msg2genr.head && FD_ISSET(msg_fd_out, &w_fds))
-                       msg2genr_flush(NORMAL_FLUSH);
+                       msg2genr_flush();
 
                if (io_filesfrom_f_out >= 0) {
                        if (io_filesfrom_buflen) {
@@ -617,7 +667,7 @@ static int read_timeout(int fd, char *buf, size_t len)
 
                        /* Don't write errors on a dead socket. */
                        if (fd == sock_f_in) {
-                               close_multiplexing_out();
+                               io_end_multiplex_out();
                                rsyserr(FSOCKERR, errno, "read error");
                        } else
                                rsyserr(FERROR, errno, "read error");
@@ -688,37 +738,51 @@ int read_filesfrom_line(int fd, char *fname)
        return s - fname;
 }
 
-static char *iobuf_out;
-static int iobuf_out_cnt;
-
-void io_start_buffering_out(void)
+int io_start_buffering_out(int f_out)
 {
-       if (iobuf_out)
-               return;
+       if (iobuf_out) {
+               assert(f_out == iobuf_f_out);
+               return 0;
+       }
        if (!(iobuf_out = new_array(char, IO_BUFFER_SIZE)))
                out_of_memory("io_start_buffering_out");
        iobuf_out_cnt = 0;
+       iobuf_f_out = f_out;
+       return 1;
 }
 
-static char *iobuf_in;
-static size_t iobuf_in_siz;
-
-void io_start_buffering_in(void)
+int io_start_buffering_in(int f_in)
 {
-       if (iobuf_in)
-               return;
+       if (iobuf_in) {
+               assert(f_in == iobuf_f_in);
+               return 0;
+       }
        iobuf_in_siz = 2 * IO_BUFFER_SIZE;
        if (!(iobuf_in = new_array(char, iobuf_in_siz)))
                out_of_memory("io_start_buffering_in");
+       iobuf_f_in = f_in;
+       return 1;
 }
 
-void io_end_buffering(void)
+void io_end_buffering_in(void)
 {
-       io_flush(NORMAL_FLUSH);
-       if (!io_multiplexing_out) {
-               free(iobuf_out);
-               iobuf_out = NULL;
-       }
+       if (!iobuf_in)
+               return;
+       free(iobuf_in);
+       iobuf_in = NULL;
+       iobuf_in_ndx = 0;
+       iobuf_in_remaining = 0;
+       iobuf_f_in = -1;
+}
+
+void io_end_buffering_out(void)
+{
+       if (!iobuf_out)
+               return;
+       io_flush(FULL_FLUSH);
+       free(iobuf_out);
+       iobuf_out = NULL;
+       iobuf_f_out = -1;
 }
 
 void maybe_flush_socket(void)
@@ -733,14 +797,31 @@ void maybe_send_keepalive(void)
                if (!iobuf_out || !iobuf_out_cnt) {
                        if (protocol_version < 29)
                                return; /* there's nothing we can do */
-                       write_int(sock_f_out, the_file_list->count);
-                       write_shortint(sock_f_out, ITEM_IS_NEW);
+                       if (protocol_version >= 30)
+                               send_msg(MSG_NOOP, "", 0);
+                       else {
+                               write_int(sock_f_out, cur_flist->count);
+                               write_shortint(sock_f_out, ITEM_IS_NEW);
+                       }
                }
                if (iobuf_out)
                        io_flush(NORMAL_FLUSH);
        }
 }
 
+void start_flist_forward(int f_in)
+{
+       assert(iobuf_out != NULL);
+       assert(iobuf_f_out == msg_fd_out);
+       flist_forward_from = f_in;
+}
+
+void stop_flist_forward()
+{
+       io_flush(NORMAL_FLUSH);
+       flist_forward_from = -1;
+}
+
 /**
  * Continue trying to read len bytes - don't return until len has been
  * read.
@@ -763,26 +844,24 @@ static void read_loop(int fd, char *buf, size_t len)
  */
 static int readfd_unbuffered(int fd, char *buf, size_t len)
 {
-       static size_t remaining;
-       static size_t iobuf_in_ndx;
        size_t msg_bytes;
        int tag, cnt = 0;
        char line[BIGPATHBUFLEN];
 
-       if (!iobuf_in || fd != sock_f_in)
+       if (!iobuf_in || fd != iobuf_f_in)
                return read_timeout(fd, buf, len);
 
-       if (!io_multiplexing_in && remaining == 0) {
-               remaining = read_timeout(fd, iobuf_in, iobuf_in_siz);
+       if (!io_multiplexing_in && iobuf_in_remaining == 0) {
+               iobuf_in_remaining = read_timeout(fd, iobuf_in, iobuf_in_siz);
                iobuf_in_ndx = 0;
        }
 
        while (cnt == 0) {
-               if (remaining) {
-                       len = MIN(len, remaining);
+               if (iobuf_in_remaining) {
+                       len = MIN(len, iobuf_in_remaining);
                        memcpy(buf, iobuf_in + iobuf_in_ndx, len);
                        iobuf_in_ndx += len;
-                       remaining -= len;
+                       iobuf_in_remaining -= len;
                        cnt = len;
                        break;
                }
@@ -802,9 +881,19 @@ static int readfd_unbuffered(int fd, char *buf, size_t len)
                                iobuf_in_siz = msg_bytes;
                        }
                        read_loop(fd, iobuf_in, msg_bytes);
-                       remaining = msg_bytes;
+                       iobuf_in_remaining = msg_bytes;
                        iobuf_in_ndx = 0;
                        break;
+               case MSG_NOOP:
+                       if (am_sender)
+                               maybe_send_keepalive();
+                       break;
+               case MSG_IO_ERROR:
+                       if (msg_bytes != 4)
+                               goto invalid_msg;
+                       read_loop(fd, line, msg_bytes);
+                       io_error |= IVAL(line, 0);
+                       break;
                case MSG_DELETED:
                        if (msg_bytes >= sizeof line)
                                goto overflow;
@@ -819,6 +908,7 @@ static int readfd_unbuffered(int fd, char *buf, size_t len)
                        break;
                case MSG_SUCCESS:
                        if (msg_bytes != 4) {
+                         invalid_msg:
                                rprintf(FERROR, "invalid multi-message %d:%ld [%s]\n",
                                        tag, (long)msg_bytes, who_am_i());
                                exit_cleanup(RERR_STREAMIO);
@@ -826,6 +916,12 @@ static int readfd_unbuffered(int fd, char *buf, size_t len)
                        read_loop(fd, line, msg_bytes);
                        successful_send(IVAL(line, 0));
                        break;
+               case MSG_NO_SEND:
+                       if (msg_bytes != 4)
+                               goto invalid_msg;
+                       read_loop(fd, line, msg_bytes);
+                       send_msg_int(MSG_NO_SEND, IVAL(line, 0));
+                       break;
                case MSG_INFO:
                case MSG_ERROR:
                        if (msg_bytes >= sizeof line) {
@@ -845,7 +941,7 @@ static int readfd_unbuffered(int fd, char *buf, size_t len)
                }
        }
 
-       if (remaining == 0)
+       if (iobuf_in_remaining == 0)
                io_flush(NORMAL_FLUSH);
 
        return cnt;
@@ -871,6 +967,9 @@ static void readfd(int fd, char *buffer, size_t N)
                        exit_cleanup(RERR_FILEIO);
        }
 
+       if (fd == flist_forward_from)
+               writefd(iobuf_f_out, buffer, total);
+
        if (fd == sock_f_in)
                stats.total_read += total;
 }
@@ -1171,7 +1270,7 @@ static void writefd_unbuffered(int fd, const char *buf, size_t len)
 
                        /* Don't try to write errors back across the stream. */
                        if (fd == sock_f_out)
-                               close_multiplexing_out();
+                               io_end_multiplex_out();
                        rsyserr(FERROR, errno,
                                "writefd_unbuffered failed to write %ld bytes [%s]",
                                (long)len, who_am_i());
@@ -1246,9 +1345,9 @@ static void mplex_write(enum msgcode code, const char *buf, size_t len)
        }
 }
 
-void io_flush(int flush_it_all)
+void io_flush(UNUSED(int flush_it_all))
 {
-       msg2genr_flush(flush_it_all);
+       msg2genr_flush();
        msg2sndr_flush();
 
        if (!iobuf_out_cnt || no_flush)
@@ -1257,17 +1356,12 @@ void io_flush(int flush_it_all)
        if (io_multiplexing_out)
                mplex_write(MSG_DATA, iobuf_out, iobuf_out_cnt);
        else
-               writefd_unbuffered(sock_f_out, iobuf_out, iobuf_out_cnt);
+               writefd_unbuffered(iobuf_f_out, iobuf_out, iobuf_out_cnt);
        iobuf_out_cnt = 0;
 }
 
 static void writefd(int fd, const char *buf, size_t len)
 {
-       if (fd == msg_fd_out) {
-               rprintf(FERROR, "Internal error: wrong write used in receiver.\n");
-               exit_cleanup(RERR_PROTOCOL);
-       }
-
        if (fd == sock_f_out)
                stats.total_written += len;
 
@@ -1276,7 +1370,7 @@ static void writefd(int fd, const char *buf, size_t len)
                        exit_cleanup(RERR_FILEIO);
        }
 
-       if (!iobuf_out || fd != sock_f_out) {
+       if (!iobuf_out || fd != iobuf_f_out) {
                writefd_unbuffered(fd, buf, len);
                return;
        }
@@ -1499,7 +1593,7 @@ void io_printf(int fd, const char *format, ...)
 void io_start_multiplex_out(void)
 {
        io_flush(NORMAL_FLUSH);
-       io_start_buffering_out();
+       io_start_buffering_out(sock_f_out);
        io_multiplexing_out = 1;
 }
 
@@ -1507,7 +1601,7 @@ void io_start_multiplex_out(void)
 void io_start_multiplex_in(void)
 {
        io_flush(NORMAL_FLUSH);
-       io_start_buffering_in();
+       io_start_buffering_in(sock_f_in);
        io_multiplexing_in = 1;
 }
 
@@ -1516,22 +1610,23 @@ int io_multiplex_write(enum msgcode code, const char *buf, size_t len)
 {
        if (!io_multiplexing_out)
                return 0;
-
        io_flush(NORMAL_FLUSH);
        stats.total_written += (len+4);
        mplex_write(code, buf, len);
        return 1;
 }
 
-void close_multiplexing_in(void)
+void io_end_multiplex_in(void)
 {
        io_multiplexing_in = 0;
+       io_end_buffering_in();
 }
 
 /** Stop output multiplexing. */
-void close_multiplexing_out(void)
+void io_end_multiplex_out(void)
 {
        io_multiplexing_out = 0;
+       io_end_buffering_out();
 }
 
 void start_write_batch(int fd)