added --backup-dir option from Bob Edwards
[rsync/rsync.git] / io.c
diff --git a/io.c b/io.c
index 25d7516..ae398f2 100644 (file)
--- a/io.c
+++ b/io.c
@@ -38,6 +38,8 @@ extern int io_timeout;
 extern struct stats stats;
 
 static int buffer_f_in = -1;
+static int io_error_fd = -1;
+static int in_read_check;
 
 void setup_readbuffer(int f_in)
 {
@@ -64,6 +66,28 @@ static void check_timeout(void)
        }
 }
 
+/* setup the fd used to propogate errors */
+void io_set_error_fd(int fd)
+{
+       io_error_fd = fd;
+}
+
+/* read some data from the error fd and write it to FERROR */
+static void read_error_fd(void)
+{
+       char buf[200];
+       int n;
+       int fd = io_error_fd;
+       io_error_fd = -1;
+
+       n = read(fd, buf, sizeof(buf)-1);
+       if (n > 0) {
+               rwrite(FERROR, buf, n);
+       }
+
+       io_error_fd = fd;
+}
+
 
 static char *read_buffer;
 static char *read_buffer_p;
@@ -86,17 +110,34 @@ static int read_timeout(int fd, char *buf, int len)
        while (ret == 0) {
                fd_set fds;
                struct timeval tv;
+               int fd_count = fd+1;
 
                FD_ZERO(&fds);
                FD_SET(fd, &fds);
+               if (io_error_fd != -1) {
+                       FD_SET(io_error_fd, &fds);
+                       if (io_error_fd > fd) fd_count = io_error_fd+1;
+               }
+
                tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
                tv.tv_usec = 0;
 
-               if (select(fd+1, &fds, NULL, NULL, &tv) != 1) {
+               errno = 0;
+
+               if (select(fd_count, &fds, NULL, NULL, &tv) < 1) {
+                       if (errno == EBADF) {
+                               exit_cleanup(RERR_SOCKETIO);
+                       }
                        check_timeout();
                        continue;
                }
 
+               if (io_error_fd != -1 && FD_ISSET(io_error_fd, &fds)) {
+                       read_error_fd();
+               }
+
+               if (!FD_ISSET(fd, &fds)) continue;
+
                n = read(fd, buf, len);
 
                if (n > 0) {
@@ -112,14 +153,6 @@ static int read_timeout(int fd, char *buf, int len)
                        continue;
                }
 
-               if (n == -1 && 
-                   (errno == EAGAIN || errno == EWOULDBLOCK)) {
-                       /* this shouldn't happen, if it does then
-                          sleep for a short time to prevent us
-                          chewing too much CPU */
-                       u_sleep(100);
-                       continue;
-               }
 
                if (n == 0) {
                        if (eof_error) {
@@ -128,6 +161,9 @@ static int read_timeout(int fd, char *buf, int len)
                        exit_cleanup(RERR_STREAMIO);
                }
 
+               /* this prevents us trying to write errors on a dead socket */
+               io_multiplexing_close();
+
                rprintf(FERROR,"read error: %s\n", strerror(errno));
                exit_cleanup(RERR_STREAMIO);
        }
@@ -195,6 +231,8 @@ static int read_unbuffered(int fd, char *buf, int len)
 
                rprintf(tag,"%s", line);
                remaining = 0;
+
+               if (in_read_check) break;
        }
 
        return ret;
@@ -208,6 +246,8 @@ static int read_unbuffered(int fd, char *buf, int len)
 static void read_check(int f)
 {
        int n = 8192;
+       
+       in_read_check = 1;
 
        if (f == -1) return;
 
@@ -232,6 +272,8 @@ static void read_check(int f)
 
        n = read_unbuffered(f,read_buffer+read_buffer_len,n);
        read_buffer_len += n;
+
+       in_read_check = 0;
 }
 
 
@@ -333,7 +375,6 @@ static void writefd_unbuffered(int fd,char *buf,int len)
        int fd_count, count;
        struct timeval tv;
        int reading=0;
-       int blocked=0;
 
        no_flush++;
 
@@ -341,7 +382,7 @@ static void writefd_unbuffered(int fd,char *buf,int len)
                FD_ZERO(&w_fds);
                FD_ZERO(&r_fds);
                FD_SET(fd,&w_fds);
-               fd_count = fd+1;
+               fd_count = fd;
 
                if (!no_flush_read) {
                        reading = (buffer_f_in != -1);
@@ -349,19 +390,30 @@ static void writefd_unbuffered(int fd,char *buf,int len)
 
                if (reading) {
                        FD_SET(buffer_f_in,&r_fds);
-                       if (buffer_f_in > fd) 
-                               fd_count = buffer_f_in+1;
+                       if (buffer_f_in > fd_count) 
+                               fd_count = buffer_f_in;
+               }
+
+               if (io_error_fd != -1) {
+                       FD_SET(io_error_fd,&r_fds);
+                       if (io_error_fd > fd_count) 
+                               fd_count = io_error_fd;
                }
 
                tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
                tv.tv_usec = 0;
 
-               count = select(fd_count,
-                              reading?&r_fds:NULL,
+               errno = 0;
+
+               count = select(fd_count+1,
+                              (reading || io_error_fd != -1)?&r_fds:NULL,
                               &w_fds,NULL,
                               &tv);
 
                if (count <= 0) {
+                       if (errno == EBADF) {
+                               exit_cleanup(RERR_SOCKETIO);
+                       }
                        check_timeout();
                        continue;
                }
@@ -370,17 +422,18 @@ static void writefd_unbuffered(int fd,char *buf,int len)
                        read_check(buffer_f_in);
                }
 
+               if (io_error_fd != -1 && FD_ISSET(io_error_fd, &r_fds)) {
+                       read_error_fd();
+               }
+
                if (FD_ISSET(fd, &w_fds)) {
-                       int n = (len-total)>>blocked;
-                       int ret = write(fd,buf+total,n?n:1);
+                       int ret, n = len-total;
+                       
+                       if (n > PIPE_BUF) n = PIPE_BUF;
 
-                       if (ret == -1 && errno == EINTR) {
-                               continue;
-                       }
+                       ret = write(fd,buf+total,n?n:1);
 
-                       if (ret == -1 && 
-                           (errno == EAGAIN || errno == EWOULDBLOCK)) {
-                               blocked++;
+                       if (ret == -1 && errno == EINTR) {
                                continue;
                        }
 
@@ -389,7 +442,6 @@ static void writefd_unbuffered(int fd,char *buf,int len)
                                exit_cleanup(RERR_STREAMIO);
                        }
 
-                       blocked = 0;
                        total += ret;
 
                        if (io_timeout)
@@ -443,7 +495,7 @@ static void writefd(int fd,char *buf,int len)
 {
        stats.total_written += len;
 
-       if (!io_buffer) {
+       if (!io_buffer || fd != multiplex_out_fd) {
                writefd_unbuffered(fd, buf, len);
                return;
        }
@@ -569,7 +621,7 @@ void io_start_multiplex_in(int fd)
        io_multiplexing_in = 1;
 }
 
-/* write an message to the error stream */
+/* write an message to the multiplexed error stream */
 int io_multiplex_write(int f, char *buf, int len)
 {
        if (!io_multiplexing_out) return 0;
@@ -585,6 +637,20 @@ int io_multiplex_write(int f, char *buf, int len)
        return 1;
 }
 
+/* write a message to the special error fd */
+int io_error_write(int f, char *buf, int len)
+{
+       if (f == -1) return 0;
+       writefd_unbuffered(f, buf, len);
+       return 1;
+}
+
+/* stop output multiplexing */
+void io_multiplexing_close(void)
+{
+       io_multiplexing_out = 0;
+}
+
 void io_close_input(int fd)
 {
        buffer_f_in = -1;