X-Git-Url: https://mattmccutchen.net/rsync/rsync.git/blobdiff_plain/4c36ddbeecdde407c870109d70527640ca127ace..c95da96a0c51c66c8cb2eff97b768a717d9e0c79:/io.c diff --git a/io.c b/io.c index 4c704ebe..d7140d26 100644 --- a/io.c +++ b/io.c @@ -24,27 +24,15 @@ */ #include "rsync.h" -static int64 total_written; -static int64 total_read; - static int io_multiplexing_out; static int io_multiplexing_in; static int multiplex_in_fd; static int multiplex_out_fd; static time_t last_io; - +static int eof_error=1; extern int verbose; extern int io_timeout; - -int64 write_total(void) -{ - return total_written; -} - -int64 read_total(void) -{ - return total_read; -} +extern struct stats stats; static int buffer_f_in = -1; @@ -78,6 +66,7 @@ static char *read_buffer; static char *read_buffer_p; static int read_buffer_len; static int read_buffer_size; +static int no_flush; /* read from a socket with IO timeout. return the number of bytes read. If no bytes can be read then exit, never return @@ -86,6 +75,8 @@ static int read_timeout(int fd, char *buf, int len) { int n, ret=0; + io_flush(); + while (ret == 0) { fd_set fds; struct timeval tv; @@ -104,6 +95,7 @@ static int read_timeout(int fd, char *buf, int len) n = read(fd, buf, len); if (n > 0) { + stats.total_read += n; buf += n; len -= n; ret += n; @@ -116,8 +108,19 @@ static int read_timeout(int fd, char *buf, int len) continue; } + if (n == -1 && + (errno == EAGAIN || errno == EWOULDBLOCK)) { + /* this shouldn't happen, if it does then + sleep for a short time to prevent us + chewing too much CPU */ + u_sleep(100); + continue; + } + if (n == 0) { - rprintf(FERROR,"EOF in read_timeout\n"); + if (eof_error) { + rprintf(FERROR,"EOF in read_timeout\n"); + } exit_cleanup(1); } @@ -140,7 +143,7 @@ static void read_loop(int fd, char *buf, int len) } } -/* read from the file descriptor handing multiplexing - +/* read from the file descriptor handling multiplexing - return number of bytes read never return <= 0 */ static int read_unbuffered(int fd, char *buf, int len) @@ -218,10 +221,7 @@ static void read_check(int f) if (n > (read_buffer_size - read_buffer_len)) { read_buffer_size += n; - if (!read_buffer) - read_buffer = (char *)malloc(read_buffer_size); - else - read_buffer = (char *)realloc(read_buffer,read_buffer_size); + read_buffer = (char *)Realloc(read_buffer,read_buffer_size); if (!read_buffer) out_of_memory("read check"); read_buffer_p = read_buffer; } @@ -264,7 +264,6 @@ int32 read_int(int f) { char b[4]; readfd(f,b,4); - total_read += 4; return IVAL(b,0); } @@ -283,7 +282,6 @@ int64 read_longint(int f) #else if (remote_version >= 16) { readfd(f,b,8); - total_read += 8; ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32); } #endif @@ -294,7 +292,6 @@ int64 read_longint(int f) void read_buf(int f,char *buf,int len) { readfd(f,buf,len); - total_read += len; } void read_sbuf(int f,char *buf,int len) @@ -322,6 +319,9 @@ static void writefd_unbuffered(int fd,char *buf,int len) int fd_count, count; struct timeval tv; int reading; + int blocked=0; + + no_flush++; reading = (buffer_f_in != -1 && read_buffer_len < MAX_READ_BUFFER); @@ -351,18 +351,28 @@ static void writefd_unbuffered(int fd,char *buf,int len) } if (FD_ISSET(fd, &w_fds)) { - int ret = write(fd,buf+total,len-total); + int n = (len-total)>>blocked; + int ret = write(fd,buf+total,n?n:1); if (ret == -1 && errno == EINTR) { continue; } + if (ret == -1 && + (errno == EAGAIN || errno == EWOULDBLOCK)) { + blocked++; + continue; + } + if (ret <= 0) { rprintf(FERROR,"erroring writing %d bytes - exiting\n", len); exit_cleanup(1); } + blocked = 0; total += ret; + stats.total_written += ret; + if (io_timeout) last_io = time(NULL); continue; @@ -372,6 +382,8 @@ static void writefd_unbuffered(int fd,char *buf,int len) read_check(buffer_f_in); } } + + no_flush--; } @@ -393,7 +405,7 @@ void io_start_buffering(int fd) void io_flush(void) { int fd = multiplex_out_fd; - if (!io_buffer_count) return; + if (!io_buffer_count || no_flush) return; if (io_multiplexing_out) { SIVAL(io_buffer-4, 0, (MPLEX_BASE<<24) + io_buffer_count); @@ -439,7 +451,6 @@ void write_int(int f,int32 x) char b[4]; SIVAL(b,0,x); writefd(f,b,4); - total_written += 4; } void write_longint(int f, int64 x) @@ -457,13 +468,11 @@ void write_longint(int f, int64 x) SIVAL(b,4,((x>>32)&0xFFFFFFFF)); writefd(f,b,8); - total_written += 8; } void write_buf(int f,char *buf,int len) { writefd(f,buf,len); - total_written += len; } /* write a string to the connection */ @@ -480,8 +489,12 @@ void write_byte(int f,unsigned char c) int read_line(int f, char *buf, int maxlen) { + eof_error = 0; + while (maxlen) { + buf[0] = 0; read_buf(f, buf, 1); + if (buf[0] == 0) return 0; if (buf[0] == '\n') { buf[0] = 0; break; @@ -495,6 +508,9 @@ int read_line(int f, char *buf, int maxlen) *buf = 0; return 0; } + + eof_error = 1; + return 1; }