My modified version of Chris Shoemaker's improved batch-file handling.
[rsync/rsync.git] / io.c
diff --git a/io.c b/io.c
index f34a899..9f9c382 100644 (file)
--- a/io.c
+++ b/io.c
@@ -54,11 +54,14 @@ extern int am_server;
 extern int am_daemon;
 extern int am_sender;
 extern int eol_nulls;
+extern int checksum_seed;
+extern int protocol_version;
 extern char *remote_filesfrom_file;
 extern struct stats stats;
 
 const char phase_unknown[] = "unknown";
 int select_timeout = SELECT_TIMEOUT;
+int batch_fd = -1;
 
 /**
  * The connection might be dropped at some point; perhaps because the
@@ -83,6 +86,9 @@ int kludge_around_eof = False;
 int msg_fd_in = -1;
 int msg_fd_out = -1;
 
+static int write_batch_monitor_in = -1;
+static int write_batch_monitor_out = -1;
+
 static int io_filesfrom_f_in = -1;
 static int io_filesfrom_f_out = -1;
 static char io_filesfrom_buf[2048];
@@ -380,6 +386,7 @@ static int read_timeout(int fd, char *buf, size_t len)
                int count;
 
                FD_ZERO(&r_fds);
+               FD_ZERO(&w_fds);
                FD_SET(fd, &r_fds);
                if (msg_fd_in >= 0) {
                        FD_SET(msg_fd_in, &r_fds);
@@ -401,7 +408,6 @@ static int read_timeout(int fd, char *buf, size_t len)
                                        new_fd = -1;
                                }
                        } else {
-                               FD_ZERO(&w_fds);
                                FD_SET(io_filesfrom_f_out, &w_fds);
                                new_fd = io_filesfrom_f_out;
                        }
@@ -414,14 +420,12 @@ static int read_timeout(int fd, char *buf, size_t len)
 
                errno = 0;
 
-               count = select(maxfd + 1, &r_fds,
-                              io_filesfrom_buflen? &w_fds : NULL,
-                              NULL, &tv);
+               count = select(maxfd + 1, &r_fds, &w_fds, NULL, &tv);
 
                if (count <= 0) {
-                       check_timeout();
                        if (errno == EBADF)
                                exit_cleanup(RERR_SOCKETIO);
+                       check_timeout();
                        continue;
                }
 
@@ -586,7 +590,7 @@ static void read_loop(int fd, char *buf, size_t len)
  *
  * Never returns <= 0.
  */
-static int read_unbuffered(int fd, char *buf, size_t len)
+static int readfd_unbuffered(int fd, char *buf, size_t len)
 {
        static size_t remaining;
        int tag, ret = 0;
@@ -603,7 +607,7 @@ static int read_unbuffered(int fd, char *buf, size_t len)
                        bufferSz = 2 * IO_BUFFER_SIZE;
                        buffer   = new_array(char, bufferSz);
                        if (!buffer)
-                               out_of_memory("read_unbuffered");
+                               out_of_memory("readfd_unbuffered");
                }
                remaining = read_timeout(fd, buffer, bufferSz);
                bufferIdx = 0;
@@ -630,7 +634,7 @@ static int read_unbuffered(int fd, char *buf, size_t len)
                        if (!buffer || remaining > bufferSz) {
                                buffer = realloc_array(buffer, char, remaining);
                                if (!buffer)
-                                       out_of_memory("read_unbuffered");
+                                       out_of_memory("readfd_unbuffered");
                                bufferSz = remaining;
                        }
                        read_loop(fd, buffer, remaining);
@@ -672,10 +676,15 @@ static void readfd(int fd, char *buffer, size_t N)
        size_t total = 0;
 
        while (total < N) {
-               ret = read_unbuffered(fd, buffer + total, N-total);
+               ret = readfd_unbuffered(fd, buffer + total, N-total);
                total += ret;
        }
 
+       if (fd == write_batch_monitor_in) {
+               if ((size_t)write(batch_fd, buffer, total) != total)
+                       exit_cleanup(RERR_FILEIO);
+       }
+       
        stats.total_read += total;
 }
 
@@ -827,9 +836,9 @@ static void writefd_unbuffered(int fd,char *buf,size_t len)
                               &w_fds, NULL, &tv);
 
                if (count <= 0) {
-                       check_timeout();
                        if (count < 0 && errno == EBADF)
                                exit_cleanup(RERR_SOCKETIO);
+                       check_timeout();
                        continue;
                }
 
@@ -904,9 +913,8 @@ static void mplex_write(int fd, enum msgcode code, char *buf, size_t len)
 
        SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
 
-       if (n > (sizeof buffer - 4)) {
+       if (n > sizeof buffer - 4)
                n = sizeof buffer - 4;
-       }
 
        memcpy(&buffer[4], buf, n);
        writefd_unbuffered(fd, buffer, n+4);
@@ -954,6 +962,11 @@ static void writefd(int fd,char *buf,size_t len)
                exit_cleanup(RERR_PROTOCOL);
        }
 
+       if (fd == write_batch_monitor_out) {
+               if ((size_t)write(batch_fd, buf, len) != len)
+                       exit_cleanup(RERR_FILEIO);
+       }
+
        if (!io_buffer || fd != multiplex_out_fd) {
                writefd_unbuffered(fd, buf, len);
                return;
@@ -1112,3 +1125,25 @@ void io_multiplexing_close(void)
        io_multiplexing_out = 0;
 }
 
+void start_write_batch(int fd)
+{
+       /* Some communication has already taken place, but we don't
+        * enable batch writing until here so that we can write a
+        * canonical record of the communication even though the
+        * actual communication so far depends on whether a daemon
+        * is involved. */
+       write_int(batch_fd, protocol_version);
+       write_int(batch_fd, checksum_seed);
+       stats.total_written -= sizeof (int) * 2;
+
+       if (am_sender)
+               write_batch_monitor_out = fd;
+       else
+               write_batch_monitor_in = fd;
+}
+
+void stop_write_batch(void)
+{
+       write_batch_monitor_out = -1;
+       write_batch_monitor_in = -1;
+}