My modified version of Chris Shoemaker's improved batch-file handling.
[rsync/rsync.git] / io.c
diff --git a/io.c b/io.c
index 7bb3b08..9f9c382 100644 (file)
--- a/io.c
+++ b/io.c
@@ -1,19 +1,19 @@
 /* -*- c-file-style: "linux" -*-
- * 
- * Copyright (C) 1996-2001 by Andrew Tridgell 
+ *
+ * Copyright (C) 1996-2001 by Andrew Tridgell
  * Copyright (C) Paul Mackerras 1996
  * Copyright (C) 2001, 2002 by Martin Pool <mbp@samba.org>
- * 
+ *
  * This program is free software; you can redistribute it and/or modify
  * it under the terms of the GNU General Public License as published by
  * the Free Software Foundation; either version 2 of the License, or
  * (at your option) any later version.
- * 
+ *
  * This program is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  * GNU General Public License for more details.
- * 
+ *
  * You should have received a copy of the GNU General Public License
  * along with this program; if not, write to the Free Software
  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
@@ -22,7 +22,7 @@
 /**
  * @file io.c
  *
- * Socket and pipe IO utilities used in rsync.
+ * Socket and pipe I/O utilities used in rsync.
  *
  * rsync provides its own multiplexing system, which is used to send
  * stderr and stdout over a single socket.  We need this because
@@ -47,15 +47,21 @@ static time_t last_io;
 static int no_flush;
 
 extern int bwlimit;
+extern size_t bwlimit_writemax;
 extern int verbose;
 extern int io_timeout;
 extern int am_server;
 extern int am_daemon;
 extern int am_sender;
+extern int eol_nulls;
+extern int checksum_seed;
+extern int protocol_version;
+extern char *remote_filesfrom_file;
 extern struct stats stats;
 
-
 const char phase_unknown[] = "unknown";
+int select_timeout = SELECT_TIMEOUT;
+int batch_fd = -1;
 
 /**
  * The connection might be dropped at some point; perhaps because the
@@ -63,9 +69,9 @@ const char phase_unknown[] = "unknown";
  * not very helpful.  So instead we try to make io_phase_name point to
  * something useful.
  *
- * For buffered/multiplexed IO these names will be somewhat
+ * For buffered/multiplexed I/O these names will be somewhat
  * approximate; perhaps for ease of support we would rather make the
- * buffer always flush when a single application-level IO finishes.
+ * buffer always flush when a single application-level I/O finishes.
  *
  * @todo Perhaps we want some simple stack functionality, but there's
  * no need to overdo it.
@@ -80,6 +86,9 @@ int kludge_around_eof = False;
 int msg_fd_in = -1;
 int msg_fd_out = -1;
 
+static int write_batch_monitor_in = -1;
+static int write_batch_monitor_out = -1;
+
 static int io_filesfrom_f_in = -1;
 static int io_filesfrom_f_out = -1;
 static char io_filesfrom_buf[2048];
@@ -137,7 +146,7 @@ static void check_timeout(void)
 
        if (last_io && io_timeout && (t-last_io) >= io_timeout) {
                if (!am_server && !am_daemon) {
-                       rprintf(FERROR,"io timeout after %d seconds - exiting\n", 
+                       rprintf(FERROR, "io timeout after %d seconds - exiting\n",
                                (int)(t-last_io));
                }
                exit_cleanup(RERR_TIMEOUT);
@@ -146,7 +155,7 @@ static void check_timeout(void)
 
 /** Setup the fd used to receive MSG_* messages.  Only needed when
  * we're the generator because the sender and receiver both use the
- * multiplexed IO setup. */
+ * multiplexed I/O setup. */
 void set_msg_fd_in(int fd)
 {
        msg_fd_in = fd;
@@ -154,7 +163,7 @@ void set_msg_fd_in(int fd)
 
 /** Setup the fd used to send our MSG_* messages.  Only needed when
  * we're the receiver because the generator and the sender both use
- * the multiplexed IO setup. */
+ * the multiplexed I/O setup. */
 void set_msg_fd_out(int fd)
 {
        msg_fd_out = fd;
@@ -191,13 +200,13 @@ void send_msg(enum msgcode code, char *buf, int len)
  * called by the generator. */
 static void read_msg_fd(void)
 {
-       char buf[200];
+       char buf[2048];
        size_t n;
        int fd = msg_fd_in;
        int tag, len;
 
-       /* Temporarily disable msg_fd_in.  This is needed because we
-        * may call a write routine that could try to call us back. */
+       /* Temporarily disable msg_fd_in.  This is needed to avoid looping back
+        * to this routine from read_timeout() and writefd_unbuffered(). */
        msg_fd_in = -1;
 
        read_loop(fd, buf, 4);
@@ -208,13 +217,17 @@ static void read_msg_fd(void)
 
        switch (tag) {
        case MSG_DONE:
-               if (len != 0)
+               if (len != 0) {
+                       rprintf(FERROR, "invalid message %d:%d\n", tag, len);
                        exit_cleanup(RERR_STREAMIO);
+               }
                redo_list_add(-1);
                break;
        case MSG_REDO:
-               if (len != 4)
+               if (len != 4) {
+                       rprintf(FERROR, "invalid message %d:%d\n", tag, len);
                        exit_cleanup(RERR_STREAMIO);
+               }
                read_loop(fd, buf, 4);
                redo_list_add(IVAL(buf,0));
                break;
@@ -231,6 +244,7 @@ static void read_msg_fd(void)
                }
                break;
        default:
+               rprintf(FERROR, "unknown message %d:%d\n", tag, len);
                exit_cleanup(RERR_STREAMIO);
        }
 
@@ -239,7 +253,7 @@ static void read_msg_fd(void)
 
 /* Try to push messages off the list onto the wire.  If we leave with more
  * to do, return 0.  On error, return -1.  If everything flushed, return 1.
- * This is only called by the receiver. */
+ * This is only active in the receiver. */
 int msg_list_push(int flush_it_all)
 {
        static int written = 0;
@@ -261,7 +275,7 @@ int msg_list_push(int flush_it_all)
                                return 0;
                        FD_ZERO(&fds);
                        FD_SET(msg_fd_out, &fds);
-                       tv.tv_sec = io_timeout ? io_timeout : SELECT_TIMEOUT;
+                       tv.tv_sec = select_timeout;
                        tv.tv_usec = 0;
                        if (!select(msg_fd_out+1, NULL, &fds, NULL, &tv))
                                check_timeout();
@@ -328,14 +342,12 @@ static void whine_about_eof(void)
 {
        if (kludge_around_eof)
                exit_cleanup(0);
-       else {
-               rprintf(FERROR,
-                       "%s: connection unexpectedly closed "
-                       "(%.0f bytes read so far)\n",
-                       RSYNC_NAME, (double)stats.total_read);
 
-               exit_cleanup(RERR_STREAMIO);
-       }
+       rprintf(FERROR, RSYNC_NAME ": connection unexpectedly closed "
+               "(%.0f bytes read so far)\n",
+               (double)stats.total_read);
+
+       exit_cleanup(RERR_STREAMIO);
 }
 
 
@@ -344,14 +356,13 @@ static void die_from_readerr(int err)
        /* this prevents us trying to write errors on a dead socket */
        io_multiplexing_close();
 
-       rprintf(FERROR, "%s: read error: %s\n",
-               RSYNC_NAME, strerror(err));
+       rsyserr(FERROR, err, "read error");
        exit_cleanup(RERR_STREAMIO);
 }
 
 
 /**
- * Read from a socket with IO timeout. return the number of bytes
+ * Read from a socket with I/O timeout. return the number of bytes
  * read. If no bytes can be read then exit, never return a number <= 0.
  *
  * TODO: If the remote shell connection fails, then current versions
@@ -363,7 +374,7 @@ static void die_from_readerr(int err)
  */
 static int read_timeout(int fd, char *buf, size_t len)
 {
-       int n, ret=0;
+       int n, ret = 0;
 
        io_flush(NORMAL_FLUSH);
 
@@ -371,15 +382,20 @@ static int read_timeout(int fd, char *buf, size_t len)
                /* until we manage to read *something* */
                fd_set r_fds, w_fds;
                struct timeval tv;
-               int fd_count = fd+1;
+               int maxfd = fd;
                int count;
 
                FD_ZERO(&r_fds);
+               FD_ZERO(&w_fds);
                FD_SET(fd, &r_fds);
                if (msg_fd_in >= 0) {
                        FD_SET(msg_fd_in, &r_fds);
-                       if (msg_fd_in >= fd_count)
-                               fd_count = msg_fd_in+1;
+                       if (msg_fd_in > maxfd)
+                               maxfd = msg_fd_in;
+               } else if (msg_list_head) {
+                       FD_SET(msg_fd_out, &w_fds);
+                       if (msg_fd_out > maxfd)
+                               maxfd = msg_fd_out;
                }
                if (io_filesfrom_f_out >= 0) {
                        int new_fd;
@@ -392,37 +408,31 @@ static int read_timeout(int fd, char *buf, size_t len)
                                        new_fd = -1;
                                }
                        } else {
-                               FD_ZERO(&w_fds);
                                FD_SET(io_filesfrom_f_out, &w_fds);
                                new_fd = io_filesfrom_f_out;
                        }
-                       if (new_fd >= fd_count)
-                               fd_count = new_fd+1;
+                       if (new_fd > maxfd)
+                               maxfd = new_fd;
                }
 
-               tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
+               tv.tv_sec = select_timeout;
                tv.tv_usec = 0;
 
                errno = 0;
 
-               count = select(fd_count, &r_fds,
-                              io_filesfrom_buflen? &w_fds : NULL,
-                              NULL, &tv);
-
-               if (count == 0) {
-                       msg_list_push(NORMAL_FLUSH);
-                       check_timeout();
-               }
+               count = select(maxfd + 1, &r_fds, &w_fds, NULL, &tv);
 
                if (count <= 0) {
-                       if (errno == EBADF) {
+                       if (errno == EBADF)
                                exit_cleanup(RERR_SOCKETIO);
-                       }
+                       check_timeout();
                        continue;
                }
 
                if (msg_fd_in >= 0 && FD_ISSET(msg_fd_in, &r_fds))
                        read_msg_fd();
+               else if (msg_list_head && FD_ISSET(msg_fd_out, &w_fds))
+                       msg_list_push(NORMAL_FLUSH);
 
                if (io_filesfrom_f_out >= 0) {
                        if (io_filesfrom_buflen) {
@@ -452,7 +462,6 @@ static int read_timeout(int fd, char *buf, size_t len)
                                                io_filesfrom_buflen = io_filesfrom_lastchar? 2 : 1;
                                                io_filesfrom_f_in = -1;
                                        } else {
-                                               extern int eol_nulls;
                                                if (!eol_nulls) {
                                                        char *s = io_filesfrom_buf + l;
                                                        /* Transform CR and/or LF into '\0' */
@@ -488,26 +497,25 @@ static int read_timeout(int fd, char *buf, size_t len)
                        }
                }
 
-               if (!FD_ISSET(fd, &r_fds)) continue;
+               if (!FD_ISSET(fd, &r_fds))
+                       continue;
 
                n = read(fd, buf, len);
 
-               if (n > 0) {
-                       buf += n;
-                       len -= n;
-                       ret += n;
-                       if (io_timeout)
-                               last_io = time(NULL);
-                       continue;
-               } else if (n == 0) {
-                       whine_about_eof();
-                       return -1; /* doesn't return */
-               } else if (n < 0) {
-                       if (errno == EINTR || errno == EWOULDBLOCK ||
-                           errno == EAGAIN) 
+               if (n <= 0) {
+                       if (n == 0)
+                               whine_about_eof(); /* Doesn't return. */
+                       if (errno == EINTR || errno == EWOULDBLOCK
+                           || errno == EAGAIN)
                                continue;
-                       die_from_readerr(errno);
+                       die_from_readerr(errno); /* Doesn't return. */
                }
+
+               buf += n;
+               len -= n;
+               ret += n;
+               if (io_timeout)
+                       last_io = time(NULL);
        }
 
        return ret;
@@ -521,9 +529,6 @@ int read_filesfrom_line(int fd, char *fname)
 {
        char ch, *s, *eob = fname + MAXPATHLEN - 1;
        int cnt;
-       extern int io_timeout;
-       extern int eol_nulls;
-       extern char *remote_filesfrom_file;
        int reading_remotely = remote_filesfrom_file != NULL;
        int nulls = eol_nulls || reading_remotely;
 
@@ -537,7 +542,7 @@ int read_filesfrom_line(int fd, char *fname)
                        fd_set fds;
                        FD_ZERO(&fds);
                        FD_SET(fd, &fds);
-                       tv.tv_sec = io_timeout? io_timeout : SELECT_TIMEOUT;
+                       tv.tv_sec = select_timeout;
                        tv.tv_usec = 0;
                        if (!select(fd+1, &fds, NULL, NULL, &tv))
                                check_timeout();
@@ -582,10 +587,10 @@ static void read_loop(int fd, char *buf, size_t len)
 /**
  * Read from the file descriptor handling multiplexing - return number
  * of bytes read.
- * 
- * Never returns <= 0. 
+ *
+ * Never returns <= 0.
  */
-static int read_unbuffered(int fd, char *buf, size_t len)
+static int readfd_unbuffered(int fd, char *buf, size_t len)
 {
        static size_t remaining;
        int tag, ret = 0;
@@ -601,7 +606,8 @@ static int read_unbuffered(int fd, char *buf, size_t len)
                if (!buffer) {
                        bufferSz = 2 * IO_BUFFER_SIZE;
                        buffer   = new_array(char, bufferSz);
-                       if (!buffer) out_of_memory("read_unbuffered");
+                       if (!buffer)
+                               out_of_memory("readfd_unbuffered");
                }
                remaining = read_timeout(fd, buffer, bufferSz);
                bufferIdx = 0;
@@ -627,7 +633,8 @@ static int read_unbuffered(int fd, char *buf, size_t len)
                case MSG_DATA:
                        if (!buffer || remaining > bufferSz) {
                                buffer = realloc_array(buffer, char, remaining);
-                               if (!buffer) out_of_memory("read_unbuffered");
+                               if (!buffer)
+                                       out_of_memory("readfd_unbuffered");
                                bufferSz = remaining;
                        }
                        read_loop(fd, buffer, remaining);
@@ -666,13 +673,18 @@ static int read_unbuffered(int fd, char *buf, size_t len)
 static void readfd(int fd, char *buffer, size_t N)
 {
        int  ret;
-       size_t total=0;  
+       size_t total = 0;
 
        while (total < N) {
-               ret = read_unbuffered(fd, buffer + total, N-total);
+               ret = readfd_unbuffered(fd, buffer + total, N-total);
                total += ret;
        }
 
+       if (fd == write_batch_monitor_in) {
+               if ((size_t)write(batch_fd, buffer, total) != total)
+                       exit_cleanup(RERR_FILEIO);
+       }
+       
        stats.total_read += total;
 }
 
@@ -684,7 +696,8 @@ int32 read_int(int f)
 
        readfd(f,b,4);
        ret = IVAL(b,0);
-       if (ret == (int32)0xffffffff) return -1;
+       if (ret == (int32)0xffffffff)
+               return -1;
        return ret;
 }
 
@@ -694,9 +707,8 @@ int64 read_longint(int f)
        char b[8];
        ret = read_int(f);
 
-       if ((int32)ret != (int32)0xffffffff) {
+       if ((int32)ret != (int32)0xffffffff)
                return ret;
-       }
 
 #ifdef NO_INT64
        rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
@@ -736,22 +748,51 @@ unsigned char read_byte(int f)
  * use a bit less bandwidth than specified, because it doesn't make up
  * for slow periods.  But arguably this is a feature.  In addition, we
  * ought to take the time used to write the data into account.
+ *
+ * During some phases of big transfers (file FOO is uptodate) this is
+ * called with a small bytes_written every time.  As the kernel has to
+ * round small waits up to guarantee that we actually wait at least the
+ * requested number of microseconds, this can become grossly inaccurate.
+ * We therefore keep track of the bytes we've written over time and only
+ * sleep when the accumulated delay is at least 1 tenth of a second.
  **/
 static void sleep_for_bwlimit(int bytes_written)
 {
-       struct timeval tv;
+       static struct timeval prior_tv;
+       static long total_written = 0; 
+       struct timeval tv, start_tv;
+       long elapsed_usec, sleep_usec;
+
+#define ONE_SEC        1000000L /* # of microseconds in a second */
 
        if (!bwlimit)
                return;
 
-       assert(bytes_written > 0);
-       assert(bwlimit > 0);
+       total_written += bytes_written; 
+
+       gettimeofday(&start_tv, NULL);
+       if (prior_tv.tv_sec) {
+               elapsed_usec = (start_tv.tv_sec - prior_tv.tv_sec) * ONE_SEC
+                            + (start_tv.tv_usec - prior_tv.tv_usec);
+               total_written -= elapsed_usec * bwlimit / (ONE_SEC/1024);
+               if (total_written < 0)
+                       total_written = 0;
+       }
 
-       tv.tv_usec = bytes_written * 1000 / bwlimit;
-       tv.tv_sec  = tv.tv_usec / 1000000;
-       tv.tv_usec = tv.tv_usec % 1000000;
+       sleep_usec = total_written * (ONE_SEC/1024) / bwlimit;
+       if (sleep_usec < ONE_SEC / 10) {
+               prior_tv = start_tv;
+               return;
+       }
 
+       tv.tv_sec  = sleep_usec / ONE_SEC;
+       tv.tv_usec = sleep_usec % ONE_SEC;
        select(0, NULL, NULL, NULL, &tv);
+
+       gettimeofday(&prior_tv, NULL);
+       elapsed_usec = (prior_tv.tv_sec - start_tv.tv_sec) * ONE_SEC
+                    + (prior_tv.tv_usec - start_tv.tv_usec);
+       total_written = (sleep_usec - elapsed_usec) * bwlimit / (ONE_SEC/1024);
 }
 
 
@@ -763,54 +804,56 @@ static void sleep_for_bwlimit(int bytes_written)
  **/
 static void writefd_unbuffered(int fd,char *buf,size_t len)
 {
-       size_t total = 0;
+       size_t n, total = 0;
        fd_set w_fds, r_fds;
-       int fd_count, count;
+       int maxfd, count, ret;
        struct timeval tv;
 
-       msg_list_push(NORMAL_FLUSH);
+       if (fd == msg_fd_out) {
+               rprintf(FERROR, "Internal error: wrong write used in receiver.\n");
+               exit_cleanup(RERR_PROTOCOL);
+       }
 
        no_flush++;
 
        while (total < len) {
                FD_ZERO(&w_fds);
                FD_SET(fd,&w_fds);
-               fd_count = fd;
+               maxfd = fd;
 
                if (msg_fd_in >= 0) {
                        FD_ZERO(&r_fds);
                        FD_SET(msg_fd_in,&r_fds);
-                       if (msg_fd_in > fd_count) 
-                               fd_count = msg_fd_in;
+                       if (msg_fd_in > maxfd)
+                               maxfd = msg_fd_in;
                }
 
-               tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
+               tv.tv_sec = select_timeout;
                tv.tv_usec = 0;
 
                errno = 0;
-               count = select(fd_count+1, msg_fd_in >= 0 ? &r_fds : NULL,
+               count = select(maxfd + 1, msg_fd_in >= 0 ? &r_fds : NULL,
                               &w_fds, NULL, &tv);
 
-               if (count == 0) {
-                       msg_list_push(NORMAL_FLUSH);
-                       check_timeout();
-               }
-
                if (count <= 0) {
-                       if (errno == EBADF) {
+                       if (count < 0 && errno == EBADF)
                                exit_cleanup(RERR_SOCKETIO);
-                       }
+                       check_timeout();
                        continue;
                }
 
                if (msg_fd_in >= 0 && FD_ISSET(msg_fd_in, &r_fds))
                        read_msg_fd();
 
-               if (FD_ISSET(fd, &w_fds)) {
-                       int ret;
-                       size_t n = len-total;
-                       ret = write(fd,buf+total,n);
+               if (!FD_ISSET(fd, &w_fds))
+                       continue;
 
+               n = len - total;
+               if (bwlimit && n > bwlimit_writemax)
+                       n = bwlimit_writemax;
+               ret = write(fd, buf + total, n);
+
+               if (ret <= 0) {
                        if (ret < 0) {
                                if (errno == EINTR)
                                        continue;
@@ -820,24 +863,20 @@ static void writefd_unbuffered(int fd,char *buf,size_t len)
                                }
                        }
 
-                       if (ret <= 0) {
-                               /* Don't try to write errors back
-                                * across the stream */
-                               io_multiplexing_close();
-                               rprintf(FERROR, RSYNC_NAME
-                                       ": writefd_unbuffered failed to write %ld bytes: phase \"%s\": %s\n",
-                                       (long) len, io_write_phase, 
-                                       strerror(errno));
-                               exit_cleanup(RERR_STREAMIO);
-                       }
+                       /* Don't try to write errors back across the stream. */
+                       io_multiplexing_close();
+                       rsyserr(FERROR, errno,
+                               "writefd_unbuffered failed to write %ld bytes: phase \"%s\"",
+                               (long)len, io_write_phase);
+                       exit_cleanup(RERR_STREAMIO);
+               }
 
-                       sleep_for_bwlimit(ret);
-                       total += ret;
+               sleep_for_bwlimit(ret);
 
-                       if (io_timeout)
-                               last_io = time(NULL);
-               }
+               total += ret;
+
+               if (io_timeout)
+                       last_io = time(NULL);
        }
 
        no_flush--;
@@ -849,10 +888,12 @@ static int io_buffer_count;
 
 void io_start_buffering_out(int fd)
 {
-       if (io_buffer) return;
+       if (io_buffer)
+               return;
        multiplex_out_fd = fd;
        io_buffer = new_array(char, IO_BUFFER_SIZE);
-       if (!io_buffer) out_of_memory("writefd");
+       if (!io_buffer)
+               out_of_memory("writefd");
        io_buffer_count = 0;
 }
 
@@ -872,9 +913,8 @@ static void mplex_write(int fd, enum msgcode code, char *buf, size_t len)
 
        SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
 
-       if (n > (sizeof buffer - 4)) {
+       if (n > sizeof buffer - 4)
                n = sizeof buffer - 4;
-       }
 
        memcpy(&buffer[4], buf, n);
        writefd_unbuffered(fd, buffer, n+4);
@@ -882,16 +922,15 @@ static void mplex_write(int fd, enum msgcode code, char *buf, size_t len)
        len -= n;
        buf += n;
 
-       if (len) {
+       if (len)
                writefd_unbuffered(fd, buf, len);
-       }
 }
 
 
 void io_flush(int flush_it_all)
 {
        int fd = multiplex_out_fd;
-       
+
        msg_list_push(flush_it_all);
 
        if (!io_buffer_count || no_flush)
@@ -918,7 +957,15 @@ static void writefd(int fd,char *buf,size_t len)
 {
        stats.total_written += len;
 
-       msg_list_push(NORMAL_FLUSH);
+       if (fd == msg_fd_out) {
+               rprintf(FERROR, "Internal error: wrong write used in receiver.\n");
+               exit_cleanup(RERR_PROTOCOL);
+       }
+
+       if (fd == write_batch_monitor_out) {
+               if ((size_t)write(batch_fd, buf, len) != len)
+                       exit_cleanup(RERR_FILEIO);
+       }
 
        if (!io_buffer || fd != multiplex_out_fd) {
                writefd_unbuffered(fd, buf, len);
@@ -926,7 +973,7 @@ static void writefd(int fd,char *buf,size_t len)
        }
 
        while (len) {
-               int n = MIN((int) len, IO_BUFFER_SIZE-io_buffer_count);
+               int n = MIN((int)len, IO_BUFFER_SIZE-io_buffer_count);
                if (n > 0) {
                        memcpy(io_buffer+io_buffer_count, buf, n);
                        buf += n;
@@ -1001,10 +1048,11 @@ void write_byte(int f,unsigned char c)
 
 
 /**
- * Read a line of up to @p maxlen characters into @p buf.  Does not
- * contain a trailing newline or carriage return.
+ * Read a line of up to @p maxlen characters into @p buf (not counting
+ * the trailing null).  Strips the (required) trailing newline and all
+ * carriage returns.
  *
- * @return 1 for success; 0 for io error or truncation.
+ * @return 1 for success; 0 for I/O error or truncation.
  **/
 int read_line(int f, char *buf, size_t maxlen)
 {
@@ -1013,27 +1061,21 @@ int read_line(int f, char *buf, size_t maxlen)
                read_buf(f, buf, 1);
                if (buf[0] == 0)
                        return 0;
-               if (buf[0] == '\n') {
-                       buf[0] = 0;
+               if (buf[0] == '\n')
                        break;
-               }
                if (buf[0] != '\r') {
                        buf++;
                        maxlen--;
                }
        }
-       if (maxlen == 0) {
-               *buf = 0;
-               return 0;
-       }
-
-       return 1;
+       *buf = '\0';
+       return maxlen > 0;
 }
 
 
 void io_printf(int fd, const char *format, ...)
 {
-       va_list ap;  
+       va_list ap;
        char buf[1024];
        int len;
 
@@ -1041,7 +1083,8 @@ void io_printf(int fd, const char *format, ...)
        len = vsnprintf(buf, sizeof buf, format, ap);
        va_end(ap);
 
-       if (len < 0) exit_cleanup(RERR_STREAMIO);
+       if (len < 0)
+               exit_cleanup(RERR_STREAMIO);
 
        write_sbuf(fd, buf);
 }
@@ -1067,7 +1110,8 @@ void io_start_multiplex_in(int fd)
 /** Write an message to the multiplexed data stream. */
 int io_multiplex_write(enum msgcode code, char *buf, size_t len)
 {
-       if (!io_multiplexing_out) return 0;
+       if (!io_multiplexing_out)
+               return 0;
 
        io_flush(NORMAL_FLUSH);
        stats.total_written += (len+4);
@@ -1081,3 +1125,25 @@ void io_multiplexing_close(void)
        io_multiplexing_out = 0;
 }
 
+void start_write_batch(int fd)
+{
+       /* Some communication has already taken place, but we don't
+        * enable batch writing until here so that we can write a
+        * canonical record of the communication even though the
+        * actual communication so far depends on whether a daemon
+        * is involved. */
+       write_int(batch_fd, protocol_version);
+       write_int(batch_fd, checksum_seed);
+       stats.total_written -= sizeof (int) * 2;
+
+       if (am_sender)
+               write_batch_monitor_out = fd;
+       else
+               write_batch_monitor_in = fd;
+}
+
+void stop_write_batch(void)
+{
+       write_batch_monitor_out = -1;
+       write_batch_monitor_in = -1;
+}