added some really ugly code to allow errors to propogate to
authorAndrew Tridgell <tridge@samba.org>
Sun, 23 Jan 2000 07:36:56 +0000 (07:36 +0000)
committerAndrew Tridgell <tridge@samba.org>
Sun, 23 Jan 2000 07:36:56 +0000 (07:36 +0000)
clients when writing to a rsync server

it works like this:

- we have an extra pipe from the receiver to the generator
- the server always runs with multiplexing on
- errors from the generator go down the multiplexed connection
- errors from the receiver go over the pipe, and from there to
  the multiplexed conn

it required some incredibly ugly code. damn.

clientserver.c
io.c
log.c
main.c
receiver.c
rsync.h

index bb367b0..8c55807 100644 (file)
@@ -101,7 +101,7 @@ int start_socket_client(char *host, char *path, int argc, char *argv[])
        }
        io_printf(fd,"\n");
 
-       if (remote_version > 17 && !am_sender)
+       if (remote_version >= 22 || (remote_version > 17 && !am_sender))
                io_start_multiplex_in(fd);
 
        return client_run(fd, fd, -1, argc, argv);
@@ -316,9 +316,17 @@ static int rsync_module(int fd, int i)
        argp = argv + optind;
        optind = 0;
 
-       if (remote_version > 17 && am_sender)
+       if (remote_version >= 22 || (remote_version > 17 && am_sender))
                io_start_multiplex_out(fd);
 
+       if (read_only) {
+               extern int am_sender;
+               if (!am_sender) {
+                       rprintf(FERROR,"ERROR: module is read only\n");
+                       return -1;
+               }
+       }
+
        if (!ret) {
                option_error();
        }
diff --git a/io.c b/io.c
index 4fd1a13..ae398f2 100644 (file)
--- a/io.c
+++ b/io.c
@@ -38,6 +38,8 @@ extern int io_timeout;
 extern struct stats stats;
 
 static int buffer_f_in = -1;
+static int io_error_fd = -1;
+static int in_read_check;
 
 void setup_readbuffer(int f_in)
 {
@@ -64,6 +66,28 @@ static void check_timeout(void)
        }
 }
 
+/* setup the fd used to propogate errors */
+void io_set_error_fd(int fd)
+{
+       io_error_fd = fd;
+}
+
+/* read some data from the error fd and write it to FERROR */
+static void read_error_fd(void)
+{
+       char buf[200];
+       int n;
+       int fd = io_error_fd;
+       io_error_fd = -1;
+
+       n = read(fd, buf, sizeof(buf)-1);
+       if (n > 0) {
+               rwrite(FERROR, buf, n);
+       }
+
+       io_error_fd = fd;
+}
+
 
 static char *read_buffer;
 static char *read_buffer_p;
@@ -86,17 +110,34 @@ static int read_timeout(int fd, char *buf, int len)
        while (ret == 0) {
                fd_set fds;
                struct timeval tv;
+               int fd_count = fd+1;
 
                FD_ZERO(&fds);
                FD_SET(fd, &fds);
+               if (io_error_fd != -1) {
+                       FD_SET(io_error_fd, &fds);
+                       if (io_error_fd > fd) fd_count = io_error_fd+1;
+               }
+
                tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
                tv.tv_usec = 0;
 
-               if (select(fd+1, &fds, NULL, NULL, &tv) != 1) {
+               errno = 0;
+
+               if (select(fd_count, &fds, NULL, NULL, &tv) < 1) {
+                       if (errno == EBADF) {
+                               exit_cleanup(RERR_SOCKETIO);
+                       }
                        check_timeout();
                        continue;
                }
 
+               if (io_error_fd != -1 && FD_ISSET(io_error_fd, &fds)) {
+                       read_error_fd();
+               }
+
+               if (!FD_ISSET(fd, &fds)) continue;
+
                n = read(fd, buf, len);
 
                if (n > 0) {
@@ -121,7 +162,7 @@ static int read_timeout(int fd, char *buf, int len)
                }
 
                /* this prevents us trying to write errors on a dead socket */
-               io_multiplexing_out = 0;
+               io_multiplexing_close();
 
                rprintf(FERROR,"read error: %s\n", strerror(errno));
                exit_cleanup(RERR_STREAMIO);
@@ -190,6 +231,8 @@ static int read_unbuffered(int fd, char *buf, int len)
 
                rprintf(tag,"%s", line);
                remaining = 0;
+
+               if (in_read_check) break;
        }
 
        return ret;
@@ -203,6 +246,8 @@ static int read_unbuffered(int fd, char *buf, int len)
 static void read_check(int f)
 {
        int n = 8192;
+       
+       in_read_check = 1;
 
        if (f == -1) return;
 
@@ -227,6 +272,8 @@ static void read_check(int f)
 
        n = read_unbuffered(f,read_buffer+read_buffer_len,n);
        read_buffer_len += n;
+
+       in_read_check = 0;
 }
 
 
@@ -335,7 +382,7 @@ static void writefd_unbuffered(int fd,char *buf,int len)
                FD_ZERO(&w_fds);
                FD_ZERO(&r_fds);
                FD_SET(fd,&w_fds);
-               fd_count = fd+1;
+               fd_count = fd;
 
                if (!no_flush_read) {
                        reading = (buffer_f_in != -1);
@@ -343,19 +390,30 @@ static void writefd_unbuffered(int fd,char *buf,int len)
 
                if (reading) {
                        FD_SET(buffer_f_in,&r_fds);
-                       if (buffer_f_in > fd) 
-                               fd_count = buffer_f_in+1;
+                       if (buffer_f_in > fd_count) 
+                               fd_count = buffer_f_in;
+               }
+
+               if (io_error_fd != -1) {
+                       FD_SET(io_error_fd,&r_fds);
+                       if (io_error_fd > fd_count) 
+                               fd_count = io_error_fd;
                }
 
                tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
                tv.tv_usec = 0;
 
-               count = select(fd_count,
-                              reading?&r_fds:NULL,
+               errno = 0;
+
+               count = select(fd_count+1,
+                              (reading || io_error_fd != -1)?&r_fds:NULL,
                               &w_fds,NULL,
                               &tv);
 
                if (count <= 0) {
+                       if (errno == EBADF) {
+                               exit_cleanup(RERR_SOCKETIO);
+                       }
                        check_timeout();
                        continue;
                }
@@ -364,6 +422,10 @@ static void writefd_unbuffered(int fd,char *buf,int len)
                        read_check(buffer_f_in);
                }
 
+               if (io_error_fd != -1 && FD_ISSET(io_error_fd, &r_fds)) {
+                       read_error_fd();
+               }
+
                if (FD_ISSET(fd, &w_fds)) {
                        int ret, n = len-total;
                        
@@ -433,7 +495,7 @@ static void writefd(int fd,char *buf,int len)
 {
        stats.total_written += len;
 
-       if (!io_buffer) {
+       if (!io_buffer || fd != multiplex_out_fd) {
                writefd_unbuffered(fd, buf, len);
                return;
        }
@@ -559,7 +621,7 @@ void io_start_multiplex_in(int fd)
        io_multiplexing_in = 1;
 }
 
-/* write an message to the error stream */
+/* write an message to the multiplexed error stream */
 int io_multiplex_write(int f, char *buf, int len)
 {
        if (!io_multiplexing_out) return 0;
@@ -575,6 +637,20 @@ int io_multiplex_write(int f, char *buf, int len)
        return 1;
 }
 
+/* write a message to the special error fd */
+int io_error_write(int f, char *buf, int len)
+{
+       if (f == -1) return 0;
+       writefd_unbuffered(f, buf, len);
+       return 1;
+}
+
+/* stop output multiplexing */
+void io_multiplexing_close(void)
+{
+       io_multiplexing_out = 0;
+}
+
 void io_close_input(int fd)
 {
        buffer_f_in = -1;
diff --git a/log.c b/log.c
index 6239409..788225f 100644 (file)
--- a/log.c
+++ b/log.c
@@ -24,7 +24,7 @@
 #include "rsync.h"
 
 static FILE *logfile;
-
+static int log_error_fd = -1;
 
 static void logit(int priority, char *buf)
 {
@@ -77,14 +77,18 @@ void log_open(void)
        logit(LOG_INFO,"rsyncd started\n");
 #endif
 }
-               
 
-/* this is the rsync debugging function. Call it with FINFO, FERROR or FLOG */
- void rprintf(int fd, const char *format, ...)
+/* setup the error file descriptor - used when we are a server
+   that is receiving files */
+void set_error_fd(int fd)
+{
+       log_error_fd = fd;
+}
+
+/* this is the underlying (unformatted) rsync debugging function. Call
+   it with FINFO, FERROR or FLOG */
+void rwrite(int fd, char *buf, int len)
 {
-       va_list ap;  
-       char buf[1024];
-       int len;
        FILE *f=NULL;
        extern int am_daemon;
        extern int quiet;
@@ -92,14 +96,8 @@ void log_open(void)
 
        if (quiet != 0 && fd == FINFO) return;
 
-       va_start(ap, format);
-       len = vslprintf(buf, sizeof(buf), format, ap);
-       va_end(ap);
-
        if (len < 0) exit_cleanup(RERR_MESSAGEIO);
 
-       if (len > sizeof(buf)-1) exit_cleanup(RERR_MESSAGEIO);
-
        buf[len] = 0;
 
        if (fd == FLOG) {
@@ -117,7 +115,9 @@ void log_open(void)
                depth++;
 
                log_open();
-               if (!io_multiplex_write(fd, buf, strlen(buf))) {
+
+               if (!io_error_write(log_error_fd, buf, strlen(buf)) &&
+                   !io_multiplex_write(fd, buf, strlen(buf))) {
                        logit(priority, buf);
                }
 
@@ -143,6 +143,23 @@ void log_open(void)
 
        if (buf[len-1] == '\r' || buf[len-1] == '\n') fflush(f);
 }
+               
+
+/* this is the rsync debugging function. Call it with FINFO, FERROR or FLOG */
+ void rprintf(int fd, const char *format, ...)
+{
+       va_list ap;  
+       char buf[1024];
+       int len;
+
+       va_start(ap, format);
+       len = vslprintf(buf, sizeof(buf), format, ap);
+       va_end(ap);
+
+       if (len > sizeof(buf)-1) exit_cleanup(RERR_MESSAGEIO);
+
+       rwrite(fd, buf, len);
+}
 
 void rflush(int fd)
 {
diff --git a/main.c b/main.c
index 404d49b..6ea9321 100644 (file)
--- a/main.c
+++ b/main.c
@@ -278,6 +278,7 @@ static int do_recv(int f_in,int f_out,struct file_list *flist,char *local_name)
        int pid;
        int status=0;
        int recv_pipe[2];
+       int error_pipe[2];
        extern int preserve_hard_links;
 
        if (preserve_hard_links)
@@ -287,13 +288,25 @@ static int do_recv(int f_in,int f_out,struct file_list *flist,char *local_name)
                rprintf(FERROR,"pipe failed in do_recv\n");
                exit_cleanup(RERR_SOCKETIO);
        }
+
+       if (pipe(error_pipe) < 0) {
+               rprintf(FERROR,"error pipe failed in do_recv\n");
+               exit_cleanup(RERR_SOCKETIO);
+       }
   
        io_flush();
 
        if ((pid=do_fork()) == 0) {
                close(recv_pipe[0]);
+               close(error_pipe[0]);
                if (f_in != f_out) close(f_out);
 
+               /* we can't let two processes write to the socket at one time */
+               io_multiplexing_close();
+
+               /* set place to send errors */
+               set_error_fd(error_pipe[1]);
+
                recv_files(f_in,flist,local_name,recv_pipe[1]);
                report(f_in);
 
@@ -302,11 +315,14 @@ static int do_recv(int f_in,int f_out,struct file_list *flist,char *local_name)
        }
 
        close(recv_pipe[1]);
+       close(error_pipe[1]);
        io_close_input(f_in);
        if (f_in != f_out) close(f_in);
 
        io_start_buffering(f_out);
 
+       io_set_error_fd(error_pipe[0]);
+
        generate_files(f_out,flist,local_name,recv_pipe[0]);
 
        io_flush();
index 890fadd..3cd7f9e 100644 (file)
@@ -468,7 +468,7 @@ int recv_files(int f_in,struct file_list *flist,char *local_name,int f_gen)
                finish_transfer(fname, fnametmp, file);
 
                cleanup_disable();
-                               
+
                if (!recv_ok) {
                        if (csum_length == SUM_LENGTH) {
                                rprintf(FERROR,"ERROR: file corruption in %s. File changed during transfer?\n",
diff --git a/rsync.h b/rsync.h
index 7bde8b2..a86baaa 100644 (file)
--- a/rsync.h
+++ b/rsync.h
@@ -47,7 +47,7 @@
 #define SAME_TIME (1<<7)
 
 /* update this if you make incompatible changes */
-#define PROTOCOL_VERSION 21
+#define PROTOCOL_VERSION 22
 #define MIN_PROTOCOL_VERSION 11
 #define MAX_PROTOCOL_VERSION 30