From 76c21947140b90edaa5591c63b10ca00f939ad9b Mon Sep 17 00:00:00 2001 From: Wayne Davison Date: Fri, 2 Jan 2004 08:29:49 +0000 Subject: [PATCH] Applying my updated version of Craig Barratt's buffered I/O patch. --- fileio.c | 32 +++++++++++++++++++++++++++++++- flist.c | 2 +- io.c | 48 ++++++++++++++++++++++++++++++++++++++---------- main.c | 10 +++++++++- receiver.c | 2 ++ 5 files changed, 81 insertions(+), 13 deletions(-) diff --git a/fileio.c b/fileio.c index 32480f74..bc244286 100644 --- a/fileio.c +++ b/fileio.c @@ -69,6 +69,21 @@ static int write_sparse(int f,char *buf,size_t len) return len; } + +static char *wf_writeBuf; +static size_t wf_writeBufSize; +static size_t wf_writeBufCnt; + +int flush_write_file(int f) +{ + int ret = write(f, wf_writeBuf, wf_writeBufCnt); + if (ret < 0) + return ret; + /* if (ret < wf_writeBufCnt) ??? */ + wf_writeBufCnt = 0; + return ret; +} + /* * write_file does not allow incomplete writes. It loops internally * until len bytes are written or errno is set. @@ -83,7 +98,22 @@ int write_file(int f,char *buf,size_t len) int len1 = MIN(len, SPARSE_WRITE_SIZE); r1 = write_sparse(f, buf, len1); } else { - r1 = write(f, buf, len); + if (!wf_writeBuf) { + wf_writeBufSize = MAX_MAP_SIZE; + wf_writeBufCnt = 0; + wf_writeBuf = new_array(char, MAX_MAP_SIZE); + if (!wf_writeBuf) out_of_memory("write_file"); + } + r1 = MIN(len, wf_writeBufSize - wf_writeBufCnt); + if (r1) { + memcpy(wf_writeBuf + wf_writeBufCnt, buf, r1); + wf_writeBufCnt += r1; + } + if (wf_writeBufCnt == wf_writeBufSize) { + if (flush_write_file(f) < 0) return -1; + if (!r1 && len) + continue; + } } if (r1 <= 0) { if (ret > 0) return ret; diff --git a/flist.c b/flist.c index 9da5c73f..f905fb87 100644 --- a/flist.c +++ b/flist.c @@ -924,7 +924,7 @@ struct file_list *send_file_list(int f, int argc, char *argv[]) flist = flist_new(); if (f != -1) { - io_start_buffering(f); + io_start_buffering_out(f); if (filesfrom_fd >= 0) { if (argv[0] && !push_dir(argv[0], 0)) { rprintf(FERROR, "push_dir %s failed: %s\n", diff --git a/io.c b/io.c index 84a85bb3..8777bf9d 100644 --- a/io.c +++ b/io.c @@ -41,8 +41,8 @@ static int io_multiplexing_out; static int io_multiplexing_in; -static int multiplex_in_fd; -static int multiplex_out_fd; +static int multiplex_in_fd = -1; +static int multiplex_out_fd = -1; static time_t last_io; static int no_flush; @@ -440,17 +440,31 @@ static int read_unbuffered(int fd, char *buf, size_t len) static size_t remaining; int tag, ret = 0; char line[1024]; + static char *buffer; + static size_t bufferIdx = 0; + static size_t bufferSz; - if (!io_multiplexing_in || fd != multiplex_in_fd) + if (fd != multiplex_in_fd) return read_timeout(fd, buf, len); + if (!io_multiplexing_in && remaining == 0) { + if (!buffer) { + bufferSz = 2 * IO_BUFFER_SIZE; + buffer = new_array(char, bufferSz); + if (!buffer) out_of_memory("read_unbuffered"); + } + remaining = read_timeout(fd, buffer, bufferSz); + bufferIdx = 0; + } + while (ret == 0) { if (remaining) { len = MIN(len, remaining); - read_loop(fd, buf, len); + memcpy(buf, buffer + bufferIdx, len); + bufferIdx += len; remaining -= len; ret = len; - continue; + break; } read_loop(fd, line, 4); @@ -459,8 +473,16 @@ static int read_unbuffered(int fd, char *buf, size_t len) remaining = tag & 0xFFFFFF; tag = tag >> 24; - if (tag == MPLEX_BASE) + if (tag == MPLEX_BASE) { + if (!buffer || remaining > bufferSz) { + buffer = realloc_array(buffer, char, remaining); + if (!buffer) out_of_memory("read_unbuffered"); + bufferSz = remaining; + } + read_loop(fd, buffer, remaining); + bufferIdx = 0; continue; + } tag -= MPLEX_BASE; @@ -482,6 +504,9 @@ static int read_unbuffered(int fd, char *buf, size_t len) remaining = 0; } + if (remaining == 0) + io_flush(); + return ret; } @@ -498,8 +523,6 @@ static void readfd(int fd, char *buffer, size_t N) size_t total=0; while (total < N) { - io_flush(); - ret = read_unbuffered(fd, buffer + total, N-total); total += ret; } @@ -682,7 +705,7 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) static char *io_buffer; static int io_buffer_count; -void io_start_buffering(int fd) +void io_start_buffering_out(int fd) { if (io_buffer) return; multiplex_out_fd = fd; @@ -691,6 +714,11 @@ void io_start_buffering(int fd) io_buffer_count = 0; } +void io_start_buffering_in(int fd) +{ + multiplex_in_fd = fd; +} + /** * Write an message to a multiplexed stream. If this fails then rsync * exits. @@ -881,7 +909,7 @@ void io_start_multiplex_out(int fd) { multiplex_out_fd = fd; io_flush(); - io_start_buffering(fd); + io_start_buffering_out(fd); io_multiplexing_out = 1; } diff --git a/main.c b/main.c index 1b0cd8b1..1b07eee0 100644 --- a/main.c +++ b/main.c @@ -380,6 +380,8 @@ static void do_server_sender(int f_in, int f_out, int argc,char *argv[]) exit_cleanup(0); } + io_start_buffering_in(f_in); + io_start_buffering_out(f_out); send_files(flist,f_out,f_in); io_flush(); report(f_out); @@ -454,7 +456,7 @@ static int do_recv(int f_in,int f_out,struct file_list *flist,char *local_name) close(error_pipe[1]); if (f_in != f_out) close(f_in); - io_start_buffering(f_out); + io_start_buffering_out(f_out); io_set_error_fd(error_pipe[0]); @@ -508,6 +510,7 @@ static void do_server_recv(int f_in, int f_out, int argc,char *argv[]) } } + io_start_buffering_in(f_in); if (delete_mode && !delete_excluded) recv_exclude_list(f_in); @@ -606,6 +609,7 @@ int client_run(int f_in, int f_out, pid_t pid, int argc, char *argv[]) extern int cvs_exclude; extern int delete_mode; extern int delete_excluded; + io_start_buffering_out(f_out); if (cvs_exclude) add_cvs_excludes(); if (delete_mode && !delete_excluded) @@ -617,7 +621,10 @@ int client_run(int f_in, int f_out, pid_t pid, int argc, char *argv[]) if (verbose > 3) rprintf(FINFO,"file list sent\n"); + io_flush(); + io_start_buffering_out(f_out); send_files(flist,f_out,f_in); + io_flush(); if (protocol_version >= 24) { /* final goodbye message */ read_int(f_in); @@ -629,6 +636,7 @@ int client_run(int f_in, int f_out, pid_t pid, int argc, char *argv[]) wait_process(pid, &status); } report(-1); + io_flush(); exit_cleanup(status); } diff --git a/receiver.c b/receiver.c index 699ed0e8..ed7bcc58 100644 --- a/receiver.c +++ b/receiver.c @@ -303,6 +303,8 @@ static int receive_data(int f_in,struct map_struct *mapbuf,int fd,char *fname, offset += len; } + flush_write_file(fd); + if (do_progress) end_progress(total_size); -- 2.34.1