X-Git-Url: https://mattmccutchen.net/rsync/rsync.git/blobdiff_plain/ea2111d10a10a164334eff3b621ed7e630404345..e1b3d5c4be46ee0b17a98aa48cb7cb152c4a008b:/io.c diff --git a/io.c b/io.c index 03da79ce..7909e0b1 100644 --- a/io.c +++ b/io.c @@ -24,27 +24,15 @@ */ #include "rsync.h" -static int64 total_written; -static int64 total_read; - static int io_multiplexing_out; static int io_multiplexing_in; static int multiplex_in_fd; static int multiplex_out_fd; static time_t last_io; - +static int eof_error=1; extern int verbose; extern int io_timeout; - -int64 write_total(void) -{ - return total_written; -} - -int64 read_total(void) -{ - return total_read; -} +extern struct stats stats; static int buffer_f_in = -1; @@ -78,6 +66,7 @@ static char *read_buffer; static char *read_buffer_p; static int read_buffer_len; static int read_buffer_size; +static int no_flush; /* read from a socket with IO timeout. return the number of bytes read. If no bytes can be read then exit, never return @@ -106,6 +95,7 @@ static int read_timeout(int fd, char *buf, int len) n = read(fd, buf, len); if (n > 0) { + stats.total_read += n; buf += n; len -= n; ret += n; @@ -118,8 +108,19 @@ static int read_timeout(int fd, char *buf, int len) continue; } + if (n == -1 && + (errno == EAGAIN || errno == EWOULDBLOCK)) { + /* this shouldn't happen, if it does then + sleep for a short time to prevent us + chewing too much CPU */ + u_sleep(100); + continue; + } + if (n == 0) { - rprintf(FERROR,"EOF in read_timeout\n"); + if (eof_error) { + rprintf(FERROR,"EOF in read_timeout\n"); + } exit_cleanup(1); } @@ -142,7 +143,7 @@ static void read_loop(int fd, char *buf, int len) } } -/* read from the file descriptor handing multiplexing - +/* read from the file descriptor handling multiplexing - return number of bytes read never return <= 0 */ static int read_unbuffered(int fd, char *buf, int len) @@ -220,10 +221,7 @@ static void read_check(int f) if (n > (read_buffer_size - read_buffer_len)) { read_buffer_size += n; - if (!read_buffer) - read_buffer = (char *)malloc(read_buffer_size); - else - read_buffer = (char *)realloc(read_buffer,read_buffer_size); + read_buffer = (char *)Realloc(read_buffer,read_buffer_size); if (!read_buffer) out_of_memory("read check"); read_buffer_p = read_buffer; } @@ -266,7 +264,6 @@ int32 read_int(int f) { char b[4]; readfd(f,b,4); - total_read += 4; return IVAL(b,0); } @@ -285,7 +282,6 @@ int64 read_longint(int f) #else if (remote_version >= 16) { readfd(f,b,8); - total_read += 8; ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32); } #endif @@ -296,7 +292,6 @@ int64 read_longint(int f) void read_buf(int f,char *buf,int len) { readfd(f,buf,len); - total_read += len; } void read_sbuf(int f,char *buf,int len) @@ -325,6 +320,8 @@ static void writefd_unbuffered(int fd,char *buf,int len) struct timeval tv; int reading; + no_flush++; + reading = (buffer_f_in != -1 && read_buffer_len < MAX_READ_BUFFER); while (total < len) { @@ -359,12 +356,23 @@ static void writefd_unbuffered(int fd,char *buf,int len) continue; } + if (ret == -1 && + (errno == EAGAIN || errno == EWOULDBLOCK)) { + /* this shouldn't happen, if it does then + sleep for a short time to prevent us + chewing too much CPU */ + u_sleep(100); + continue; + } + if (ret <= 0) { rprintf(FERROR,"erroring writing %d bytes - exiting\n", len); exit_cleanup(1); } total += ret; + stats.total_written += ret; + if (io_timeout) last_io = time(NULL); continue; @@ -374,6 +382,8 @@ static void writefd_unbuffered(int fd,char *buf,int len) read_check(buffer_f_in); } } + + no_flush--; } @@ -395,7 +405,7 @@ void io_start_buffering(int fd) void io_flush(void) { int fd = multiplex_out_fd; - if (!io_buffer_count) return; + if (!io_buffer_count || no_flush) return; if (io_multiplexing_out) { SIVAL(io_buffer-4, 0, (MPLEX_BASE<<24) + io_buffer_count); @@ -441,7 +451,6 @@ void write_int(int f,int32 x) char b[4]; SIVAL(b,0,x); writefd(f,b,4); - total_written += 4; } void write_longint(int f, int64 x) @@ -459,13 +468,11 @@ void write_longint(int f, int64 x) SIVAL(b,4,((x>>32)&0xFFFFFFFF)); writefd(f,b,8); - total_written += 8; } void write_buf(int f,char *buf,int len) { writefd(f,buf,len); - total_written += len; } /* write a string to the connection */ @@ -482,8 +489,12 @@ void write_byte(int f,unsigned char c) int read_line(int f, char *buf, int maxlen) { + eof_error = 0; + while (maxlen) { + buf[0] = 0; read_buf(f, buf, 1); + if (buf[0] == 0) return 0; if (buf[0] == '\n') { buf[0] = 0; break; @@ -497,6 +508,9 @@ int read_line(int f, char *buf, int maxlen) *buf = 0; return 0; } + + eof_error = 1; + return 1; }