Revamped some of the io variables and calls to make the various I/O
authorWayne Davison <wayned@samba.org>
Sat, 17 Jul 2004 15:20:00 +0000 (15:20 +0000)
committerWayne Davison <wayned@samba.org>
Sat, 17 Jul 2004 15:20:00 +0000 (15:20 +0000)
functions seemlessly work on fds that aren't for the main socket. This
involved changing some fd-variable names (to make them clearer), adding
io_set_sock_fds(), and making input buffering have a better enabled
flag (via an allocated buffer, just like the output buffering).  I also
got rid of the fd arg to some functions where the fd arg could only
specify the input or output fd for the socket (which we already know).

io.c

diff --git a/io.c b/io.c
index 9f9c382..909bc65 100644 (file)
--- a/io.c
+++ b/io.c
 /** If no timeout is specified then use a 60 second select timeout */
 #define SELECT_TIMEOUT 60
 
-static int io_multiplexing_out;
-static int io_multiplexing_in;
-static int multiplex_in_fd = -1;
-static int multiplex_out_fd = -1;
-static time_t last_io;
-static int no_flush;
-
 extern int bwlimit;
 extern size_t bwlimit_writemax;
 extern int verbose;
@@ -86,6 +79,13 @@ int kludge_around_eof = False;
 int msg_fd_in = -1;
 int msg_fd_out = -1;
 
+static int io_multiplexing_out;
+static int io_multiplexing_in;
+static int sock_f_in = -1;
+static int sock_f_out = -1;
+static time_t last_io;
+static int no_flush;
+
 static int write_batch_monitor_in = -1;
 static int write_batch_monitor_out = -1;
 
@@ -144,7 +144,7 @@ static void check_timeout(void)
 
        t = time(NULL);
 
-       if (last_io && io_timeout && (t-last_io) >= io_timeout) {
+       if (t - last_io >= io_timeout) {
                if (!am_server && !am_daemon) {
                        rprintf(FERROR, "io timeout after %d seconds - exiting\n",
                                (int)(t-last_io));
@@ -153,6 +153,14 @@ static void check_timeout(void)
        }
 }
 
+/* Note the fds used for the main socket (which might really be a pipe
+ * for a local transfer, but we can ignore that). */
+void io_set_sock_fds(int f_in, int f_out)
+{
+       sock_f_in = f_in;
+       sock_f_out = f_out;
+}
+
 /** Setup the fd used to receive MSG_* messages.  Only needed when
  * we're the generator because the sender and receiver both use the
  * multiplexed I/O setup. */
@@ -338,9 +346,9 @@ void io_set_filesfrom_fds(int f_in, int f_out)
  * program where that is a problem (start_socket_client),
  * kludge_around_eof is True and we just exit.
  */
-static void whine_about_eof(void)
+static void whine_about_eof(int fd)
 {
-       if (kludge_around_eof)
+       if (kludge_around_eof && fd == sock_f_in)
                exit_cleanup(0);
 
        rprintf(FERROR, RSYNC_NAME ": connection unexpectedly closed "
@@ -351,16 +359,6 @@ static void whine_about_eof(void)
 }
 
 
-static void die_from_readerr(int err)
-{
-       /* this prevents us trying to write errors on a dead socket */
-       io_multiplexing_close();
-
-       rsyserr(FERROR, err, "read error");
-       exit_cleanup(RERR_STREAMIO);
-}
-
-
 /**
  * Read from a socket with I/O timeout. return the number of bytes
  * read. If no bytes can be read then exit, never return a number <= 0.
@@ -504,17 +502,23 @@ static int read_timeout(int fd, char *buf, size_t len)
 
                if (n <= 0) {
                        if (n == 0)
-                               whine_about_eof(); /* Doesn't return. */
+                               whine_about_eof(fd); /* Doesn't return. */
                        if (errno == EINTR || errno == EWOULDBLOCK
                            || errno == EAGAIN)
                                continue;
-                       die_from_readerr(errno); /* Doesn't return. */
+
+                       /* Don't write errors on a dead socket. */
+                       if (fd == sock_f_in)
+                               io_multiplexing_close();
+                       rsyserr(FERROR, errno, "read error");
+                       exit_cleanup(RERR_STREAMIO);
                }
 
                buf += n;
                len -= n;
                ret += n;
-               if (io_timeout)
+
+               if (io_timeout && fd == sock_f_in)
                        last_io = time(NULL);
        }
 
@@ -569,6 +573,42 @@ int read_filesfrom_line(int fd, char *fname)
 }
 
 
+static char *iobuf_out;
+static int iobuf_out_cnt;
+
+void io_start_buffering_out(void)
+{
+       if (iobuf_out)
+               return;
+       if (!(iobuf_out = new_array(char, IO_BUFFER_SIZE)))
+               out_of_memory("io_start_buffering_out");
+       iobuf_out_cnt = 0;
+}
+
+
+static char *iobuf_in;
+static size_t iobuf_in_siz;
+
+void io_start_buffering_in(void)
+{
+       if (iobuf_in)
+               return;
+       iobuf_in_siz = 2 * IO_BUFFER_SIZE;
+       if (!(iobuf_in = new_array(char, iobuf_in_siz)))
+               out_of_memory("io_start_buffering_in");
+}
+
+
+void io_end_buffering(void)
+{
+       io_flush(NORMAL_FLUSH);
+       if (!io_multiplexing_out) {
+               free(iobuf_out);
+               iobuf_out = NULL;
+       }
+}
+
+
 /**
  * Continue trying to read len bytes - don't return until len has been
  * read.
@@ -593,31 +633,23 @@ static void read_loop(int fd, char *buf, size_t len)
 static int readfd_unbuffered(int fd, char *buf, size_t len)
 {
        static size_t remaining;
+       static size_t iobuf_in_ndx;
        int tag, ret = 0;
        char line[1024];
-       static char *buffer;
-       static size_t bufferIdx = 0;
-       static size_t bufferSz;
 
-       if (fd != multiplex_in_fd)
+       if (!iobuf_in || fd != sock_f_in)
                return read_timeout(fd, buf, len);
 
        if (!io_multiplexing_in && remaining == 0) {
-               if (!buffer) {
-                       bufferSz = 2 * IO_BUFFER_SIZE;
-                       buffer   = new_array(char, bufferSz);
-                       if (!buffer)
-                               out_of_memory("readfd_unbuffered");
-               }
-               remaining = read_timeout(fd, buffer, bufferSz);
-               bufferIdx = 0;
+               remaining = read_timeout(fd, iobuf_in, iobuf_in_siz);
+               iobuf_in_ndx = 0;
        }
 
        while (ret == 0) {
                if (remaining) {
                        len = MIN(len, remaining);
-                       memcpy(buf, buffer + bufferIdx, len);
-                       bufferIdx += len;
+                       memcpy(buf, iobuf_in + iobuf_in_ndx, len);
+                       iobuf_in_ndx += len;
                        remaining -= len;
                        ret = len;
                        break;
@@ -631,14 +663,14 @@ static int readfd_unbuffered(int fd, char *buf, size_t len)
 
                switch (tag) {
                case MSG_DATA:
-                       if (!buffer || remaining > bufferSz) {
-                               buffer = realloc_array(buffer, char, remaining);
-                               if (!buffer)
+                       if (remaining > iobuf_in_siz) {
+                               if (!(iobuf_in = realloc_array(iobuf_in, char,
+                                                              remaining)))
                                        out_of_memory("readfd_unbuffered");
-                               bufferSz = remaining;
+                               iobuf_in_siz = remaining;
                        }
-                       read_loop(fd, buffer, remaining);
-                       bufferIdx = 0;
+                       read_loop(fd, iobuf_in, remaining);
+                       iobuf_in_ndx = 0;
                        break;
                case MSG_INFO:
                case MSG_ERROR:
@@ -684,8 +716,9 @@ static void readfd(int fd, char *buffer, size_t N)
                if ((size_t)write(batch_fd, buffer, total) != total)
                        exit_cleanup(RERR_FILEIO);
        }
-       
-       stats.total_read += total;
+
+       if (fd == sock_f_in)
+               stats.total_read += total;
 }
 
 
@@ -796,12 +829,12 @@ static void sleep_for_bwlimit(int bytes_written)
 }
 
 
-/**
- * Write len bytes to the file descriptor @p fd.
+/* Write len bytes to the file descriptor fd, looping as necessary to get
+ * the job done and also (in the generator) reading any data on msg_fd_in
+ * (to avoid deadlock).
  *
  * This function underlies the multiplexing system.  The body of the
- * application never calls this function directly.
- **/
+ * application never calls this function directly. */
 static void writefd_unbuffered(int fd,char *buf,size_t len)
 {
        size_t n, total = 0;
@@ -809,11 +842,6 @@ static void writefd_unbuffered(int fd,char *buf,size_t len)
        int maxfd, count, ret;
        struct timeval tv;
 
-       if (fd == msg_fd_out) {
-               rprintf(FERROR, "Internal error: wrong write used in receiver.\n");
-               exit_cleanup(RERR_PROTOCOL);
-       }
-
        no_flush++;
 
        while (total < len) {
@@ -864,49 +892,32 @@ static void writefd_unbuffered(int fd,char *buf,size_t len)
                        }
 
                        /* Don't try to write errors back across the stream. */
-                       io_multiplexing_close();
+                       if (fd == sock_f_out)
+                               io_multiplexing_close();
                        rsyserr(FERROR, errno,
                                "writefd_unbuffered failed to write %ld bytes: phase \"%s\"",
                                (long)len, io_write_phase);
                        exit_cleanup(RERR_STREAMIO);
                }
 
-               sleep_for_bwlimit(ret);
-
                total += ret;
 
-               if (io_timeout)
-                       last_io = time(NULL);
+               if (fd == sock_f_out) {
+                       if (io_timeout)
+                               last_io = time(NULL);
+                       sleep_for_bwlimit(ret);
+               }
        }
 
        no_flush--;
 }
 
 
-static char *io_buffer;
-static int io_buffer_count;
-
-void io_start_buffering_out(int fd)
-{
-       if (io_buffer)
-               return;
-       multiplex_out_fd = fd;
-       io_buffer = new_array(char, IO_BUFFER_SIZE);
-       if (!io_buffer)
-               out_of_memory("writefd");
-       io_buffer_count = 0;
-}
-
-void io_start_buffering_in(int fd)
-{
-       multiplex_in_fd = fd;
-}
-
 /**
  * Write an message to a multiplexed stream. If this fails then rsync
  * exits.
  **/
-static void mplex_write(int fd, enum msgcode code, char *buf, size_t len)
+static void mplex_write(enum msgcode code, char *buf, size_t len)
 {
        char buffer[4096];
        size_t n = len;
@@ -917,71 +928,61 @@ static void mplex_write(int fd, enum msgcode code, char *buf, size_t len)
                n = sizeof buffer - 4;
 
        memcpy(&buffer[4], buf, n);
-       writefd_unbuffered(fd, buffer, n+4);
+       writefd_unbuffered(sock_f_out, buffer, n+4);
 
        len -= n;
        buf += n;
 
        if (len)
-               writefd_unbuffered(fd, buf, len);
+               writefd_unbuffered(sock_f_out, buf, len);
 }
 
 
 void io_flush(int flush_it_all)
 {
-       int fd = multiplex_out_fd;
-
        msg_list_push(flush_it_all);
 
-       if (!io_buffer_count || no_flush)
+       if (!iobuf_out_cnt || no_flush)
                return;
 
        if (io_multiplexing_out)
-               mplex_write(fd, MSG_DATA, io_buffer, io_buffer_count);
+               mplex_write(MSG_DATA, iobuf_out, iobuf_out_cnt);
        else
-               writefd_unbuffered(fd, io_buffer, io_buffer_count);
-       io_buffer_count = 0;
+               writefd_unbuffered(sock_f_out, iobuf_out, iobuf_out_cnt);
+       iobuf_out_cnt = 0;
 }
 
 
-void io_end_buffering(void)
-{
-       io_flush(NORMAL_FLUSH);
-       if (!io_multiplexing_out) {
-               free(io_buffer);
-               io_buffer = NULL;
-       }
-}
-
 static void writefd(int fd,char *buf,size_t len)
 {
-       stats.total_written += len;
-
        if (fd == msg_fd_out) {
                rprintf(FERROR, "Internal error: wrong write used in receiver.\n");
                exit_cleanup(RERR_PROTOCOL);
        }
 
+       if (fd == sock_f_out)
+               stats.total_written += len;
+
        if (fd == write_batch_monitor_out) {
                if ((size_t)write(batch_fd, buf, len) != len)
                        exit_cleanup(RERR_FILEIO);
        }
 
-       if (!io_buffer || fd != multiplex_out_fd) {
+       if (!iobuf_out || fd != sock_f_out) {
                writefd_unbuffered(fd, buf, len);
                return;
        }
 
        while (len) {
-               int n = MIN((int)len, IO_BUFFER_SIZE-io_buffer_count);
+               int n = MIN((int)len, IO_BUFFER_SIZE - iobuf_out_cnt);
                if (n > 0) {
-                       memcpy(io_buffer+io_buffer_count, buf, n);
+                       memcpy(iobuf_out+iobuf_out_cnt, buf, n);
                        buf += n;
                        len -= n;
-                       io_buffer_count += n;
+                       iobuf_out_cnt += n;
                }
 
-               if (io_buffer_count == IO_BUFFER_SIZE)
+               if (iobuf_out_cnt == IO_BUFFER_SIZE)
                        io_flush(NORMAL_FLUSH);
        }
 }
@@ -1091,19 +1092,18 @@ void io_printf(int fd, const char *format, ...)
 
 
 /** Setup for multiplexing a MSG_* stream with the data stream. */
-void io_start_multiplex_out(int fd)
+void io_start_multiplex_out(void)
 {
-       multiplex_out_fd = fd;
        io_flush(NORMAL_FLUSH);
-       io_start_buffering_out(fd);
+       io_start_buffering_out();
        io_multiplexing_out = 1;
 }
 
 /** Setup for multiplexing a MSG_* stream with the data stream. */
-void io_start_multiplex_in(int fd)
+void io_start_multiplex_in(void)
 {
-       multiplex_in_fd = fd;
        io_flush(NORMAL_FLUSH);
+       io_start_buffering_in();
        io_multiplexing_in = 1;
 }
 
@@ -1115,7 +1115,7 @@ int io_multiplex_write(enum msgcode code, char *buf, size_t len)
 
        io_flush(NORMAL_FLUSH);
        stats.total_written += (len+4);
-       mplex_write(multiplex_out_fd, code, buf, len);
+       mplex_write(code, buf, len);
        return 1;
 }
 
@@ -1134,7 +1134,6 @@ void start_write_batch(int fd)
         * is involved. */
        write_int(batch_fd, protocol_version);
        write_int(batch_fd, checksum_seed);
-       stats.total_written -= sizeof (int) * 2;
 
        if (am_sender)
                write_batch_monitor_out = fd;