X-Git-Url: https://mattmccutchen.net/rsync/rsync.git/blobdiff_plain/cad2bba7d809c9f385fd8b85959c09c5e687edb5..c95da96a0c51c66c8cb2eff97b768a717d9e0c79:/io.c diff --git a/io.c b/io.c index 85ff7424..d7140d26 100644 --- a/io.c +++ b/io.c @@ -24,9 +24,6 @@ */ #include "rsync.h" -static int64 total_written; -static int64 total_read; - static int io_multiplexing_out; static int io_multiplexing_in; static int multiplex_in_fd; @@ -35,17 +32,7 @@ static time_t last_io; static int eof_error=1; extern int verbose; extern int io_timeout; - - -int64 write_total(void) -{ - return total_written; -} - -int64 read_total(void) -{ - return total_read; -} +extern struct stats stats; static int buffer_f_in = -1; @@ -79,6 +66,7 @@ static char *read_buffer; static char *read_buffer_p; static int read_buffer_len; static int read_buffer_size; +static int no_flush; /* read from a socket with IO timeout. return the number of bytes read. If no bytes can be read then exit, never return @@ -107,6 +95,7 @@ static int read_timeout(int fd, char *buf, int len) n = read(fd, buf, len); if (n > 0) { + stats.total_read += n; buf += n; len -= n; ret += n; @@ -119,6 +108,15 @@ static int read_timeout(int fd, char *buf, int len) continue; } + if (n == -1 && + (errno == EAGAIN || errno == EWOULDBLOCK)) { + /* this shouldn't happen, if it does then + sleep for a short time to prevent us + chewing too much CPU */ + u_sleep(100); + continue; + } + if (n == 0) { if (eof_error) { rprintf(FERROR,"EOF in read_timeout\n"); @@ -266,7 +264,6 @@ int32 read_int(int f) { char b[4]; readfd(f,b,4); - total_read += 4; return IVAL(b,0); } @@ -285,7 +282,6 @@ int64 read_longint(int f) #else if (remote_version >= 16) { readfd(f,b,8); - total_read += 8; ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32); } #endif @@ -296,7 +292,6 @@ int64 read_longint(int f) void read_buf(int f,char *buf,int len) { readfd(f,buf,len); - total_read += len; } void read_sbuf(int f,char *buf,int len) @@ -324,6 +319,9 @@ static void writefd_unbuffered(int fd,char *buf,int len) int fd_count, count; struct timeval tv; int reading; + int blocked=0; + + no_flush++; reading = (buffer_f_in != -1 && read_buffer_len < MAX_READ_BUFFER); @@ -353,18 +351,28 @@ static void writefd_unbuffered(int fd,char *buf,int len) } if (FD_ISSET(fd, &w_fds)) { - int ret = write(fd,buf+total,len-total); + int n = (len-total)>>blocked; + int ret = write(fd,buf+total,n?n:1); if (ret == -1 && errno == EINTR) { continue; } + if (ret == -1 && + (errno == EAGAIN || errno == EWOULDBLOCK)) { + blocked++; + continue; + } + if (ret <= 0) { rprintf(FERROR,"erroring writing %d bytes - exiting\n", len); exit_cleanup(1); } + blocked = 0; total += ret; + stats.total_written += ret; + if (io_timeout) last_io = time(NULL); continue; @@ -374,6 +382,8 @@ static void writefd_unbuffered(int fd,char *buf,int len) read_check(buffer_f_in); } } + + no_flush--; } @@ -395,7 +405,7 @@ void io_start_buffering(int fd) void io_flush(void) { int fd = multiplex_out_fd; - if (!io_buffer_count) return; + if (!io_buffer_count || no_flush) return; if (io_multiplexing_out) { SIVAL(io_buffer-4, 0, (MPLEX_BASE<<24) + io_buffer_count); @@ -441,7 +451,6 @@ void write_int(int f,int32 x) char b[4]; SIVAL(b,0,x); writefd(f,b,4); - total_written += 4; } void write_longint(int f, int64 x) @@ -459,13 +468,11 @@ void write_longint(int f, int64 x) SIVAL(b,4,((x>>32)&0xFFFFFFFF)); writefd(f,b,8); - total_written += 8; } void write_buf(int f,char *buf,int len) { writefd(f,buf,len); - total_written += len; } /* write a string to the connection */