Applying my updated version of Craig Barratt's buffered I/O patch.
authorWayne Davison <wayned@samba.org>
Fri, 2 Jan 2004 08:29:49 +0000 (08:29 +0000)
committerWayne Davison <wayned@samba.org>
Fri, 2 Jan 2004 08:29:49 +0000 (08:29 +0000)
fileio.c
flist.c
io.c
main.c
receiver.c

index 32480f7..bc24428 100644 (file)
--- a/fileio.c
+++ b/fileio.c
@@ -69,6 +69,21 @@ static int write_sparse(int f,char *buf,size_t len)
        return len;
 }
 
+
+static char *wf_writeBuf;
+static size_t wf_writeBufSize;
+static size_t wf_writeBufCnt;
+
+int flush_write_file(int f)
+{
+       int ret = write(f, wf_writeBuf, wf_writeBufCnt);
+       if (ret < 0)
+               return ret;
+       /* if (ret < wf_writeBufCnt) ??? */
+       wf_writeBufCnt = 0;
+       return ret;
+}
+
 /*
  * write_file does not allow incomplete writes.  It loops internally
  * until len bytes are written or errno is set.
@@ -83,7 +98,22 @@ int write_file(int f,char *buf,size_t len)
                        int len1 = MIN(len, SPARSE_WRITE_SIZE);
                        r1 = write_sparse(f, buf, len1);
                } else {
-                       r1 = write(f, buf, len);
+                       if (!wf_writeBuf) {
+                               wf_writeBufSize = MAX_MAP_SIZE;
+                               wf_writeBufCnt  = 0;
+                               wf_writeBuf = new_array(char, MAX_MAP_SIZE);
+                               if (!wf_writeBuf) out_of_memory("write_file");
+                       }
+                       r1 = MIN(len, wf_writeBufSize - wf_writeBufCnt);
+                       if (r1) {
+                               memcpy(wf_writeBuf + wf_writeBufCnt, buf, r1);
+                               wf_writeBufCnt += r1;
+                       }
+                       if (wf_writeBufCnt == wf_writeBufSize) {
+                               if (flush_write_file(f) < 0) return -1;
+                               if (!r1 && len)
+                                       continue;
+                       }
                }
                if (r1 <= 0) {
                        if (ret > 0) return ret;
diff --git a/flist.c b/flist.c
index 9da5c73..f905fb8 100644 (file)
--- a/flist.c
+++ b/flist.c
@@ -924,7 +924,7 @@ struct file_list *send_file_list(int f, int argc, char *argv[])
        flist = flist_new();
 
        if (f != -1) {
-               io_start_buffering(f);
+               io_start_buffering_out(f);
                if (filesfrom_fd >= 0) {
                        if (argv[0] && !push_dir(argv[0], 0)) {
                                rprintf(FERROR, "push_dir %s failed: %s\n",
diff --git a/io.c b/io.c
index 84a85bb..8777bf9 100644 (file)
--- a/io.c
+++ b/io.c
@@ -41,8 +41,8 @@
 
 static int io_multiplexing_out;
 static int io_multiplexing_in;
-static int multiplex_in_fd;
-static int multiplex_out_fd;
+static int multiplex_in_fd = -1;
+static int multiplex_out_fd = -1;
 static time_t last_io;
 static int no_flush;
 
@@ -440,17 +440,31 @@ static int read_unbuffered(int fd, char *buf, size_t len)
        static size_t remaining;
        int tag, ret = 0;
        char line[1024];
+       static char *buffer;
+       static size_t bufferIdx = 0;
+       static size_t bufferSz;
 
-       if (!io_multiplexing_in || fd != multiplex_in_fd)
+       if (fd != multiplex_in_fd)
                return read_timeout(fd, buf, len);
 
+       if (!io_multiplexing_in && remaining == 0) {
+               if (!buffer) {
+                       bufferSz = 2 * IO_BUFFER_SIZE;
+                       buffer   = new_array(char, bufferSz);
+                       if (!buffer) out_of_memory("read_unbuffered");
+               }
+               remaining = read_timeout(fd, buffer, bufferSz);
+               bufferIdx = 0;
+       }
+
        while (ret == 0) {
                if (remaining) {
                        len = MIN(len, remaining);
-                       read_loop(fd, buf, len);
+                       memcpy(buf, buffer + bufferIdx, len);
+                       bufferIdx += len;
                        remaining -= len;
                        ret = len;
-                       continue;
+                       break;
                }
 
                read_loop(fd, line, 4);
@@ -459,8 +473,16 @@ static int read_unbuffered(int fd, char *buf, size_t len)
                remaining = tag & 0xFFFFFF;
                tag = tag >> 24;
 
-               if (tag == MPLEX_BASE)
+               if (tag == MPLEX_BASE) {
+                       if (!buffer || remaining > bufferSz) {
+                               buffer = realloc_array(buffer, char, remaining);
+                               if (!buffer) out_of_memory("read_unbuffered");
+                               bufferSz = remaining;
+                       }
+                       read_loop(fd, buffer, remaining);
+                       bufferIdx = 0;
                        continue;
+               }
 
                tag -= MPLEX_BASE;
 
@@ -482,6 +504,9 @@ static int read_unbuffered(int fd, char *buf, size_t len)
                remaining = 0;
        }
 
+       if (remaining == 0)
+               io_flush();
+
        return ret;
 }
 
@@ -498,8 +523,6 @@ static void readfd(int fd, char *buffer, size_t N)
        size_t total=0;  
 
        while (total < N) {
-               io_flush();
-
                ret = read_unbuffered(fd, buffer + total, N-total);
                total += ret;
        }
@@ -682,7 +705,7 @@ static void writefd_unbuffered(int fd,char *buf,size_t len)
 static char *io_buffer;
 static int io_buffer_count;
 
-void io_start_buffering(int fd)
+void io_start_buffering_out(int fd)
 {
        if (io_buffer) return;
        multiplex_out_fd = fd;
@@ -691,6 +714,11 @@ void io_start_buffering(int fd)
        io_buffer_count = 0;
 }
 
+void io_start_buffering_in(int fd)
+{
+       multiplex_in_fd = fd;
+}
+
 /**
  * Write an message to a multiplexed stream. If this fails then rsync
  * exits.
@@ -881,7 +909,7 @@ void io_start_multiplex_out(int fd)
 {
        multiplex_out_fd = fd;
        io_flush();
-       io_start_buffering(fd);
+       io_start_buffering_out(fd);
        io_multiplexing_out = 1;
 }
 
diff --git a/main.c b/main.c
index 1b0cd8b..1b07eee 100644 (file)
--- a/main.c
+++ b/main.c
@@ -380,6 +380,8 @@ static void do_server_sender(int f_in, int f_out, int argc,char *argv[])
                exit_cleanup(0);
        }
 
+       io_start_buffering_in(f_in);
+       io_start_buffering_out(f_out);
        send_files(flist,f_out,f_in);
        io_flush();
        report(f_out);
@@ -454,7 +456,7 @@ static int do_recv(int f_in,int f_out,struct file_list *flist,char *local_name)
        close(error_pipe[1]);
        if (f_in != f_out) close(f_in);
 
-       io_start_buffering(f_out);
+       io_start_buffering_out(f_out);
 
        io_set_error_fd(error_pipe[0]);
 
@@ -508,6 +510,7 @@ static void do_server_recv(int f_in, int f_out, int argc,char *argv[])
                }
        }
 
+       io_start_buffering_in(f_in);
        if (delete_mode && !delete_excluded)
                recv_exclude_list(f_in);
 
@@ -606,6 +609,7 @@ int client_run(int f_in, int f_out, pid_t pid, int argc, char *argv[])
                extern int cvs_exclude;
                extern int delete_mode;
                extern int delete_excluded;
+               io_start_buffering_out(f_out);
                if (cvs_exclude)
                        add_cvs_excludes();
                if (delete_mode && !delete_excluded)
@@ -617,7 +621,10 @@ int client_run(int f_in, int f_out, pid_t pid, int argc, char *argv[])
                if (verbose > 3)
                        rprintf(FINFO,"file list sent\n");
 
+               io_flush();
+               io_start_buffering_out(f_out);
                send_files(flist,f_out,f_in);
+               io_flush();
                if (protocol_version >= 24) {
                        /* final goodbye message */
                        read_int(f_in);
@@ -629,6 +636,7 @@ int client_run(int f_in, int f_out, pid_t pid, int argc, char *argv[])
                        wait_process(pid, &status);
                }
                report(-1);
+               io_flush();
                exit_cleanup(status);
        }
 
index 699ed0e..ed7bcc5 100644 (file)
@@ -303,6 +303,8 @@ static int receive_data(int f_in,struct map_struct *mapbuf,int fd,char *fname,
                offset += len;
        }
 
+       flush_write_file(fd);
+
        if (do_progress)
                end_progress(total_size);