X-Git-Url: https://mattmccutchen.net/rsync/rsync.git/blobdiff_plain/a800434a82af8dbd59da7c27b382c9897dd90150..27d3cdbc943a57d64f4f2a35a4f1e1b15d9ca41f:/io.c diff --git a/io.c b/io.c index 61bdc609..5122a2a2 100644 --- a/io.c +++ b/io.c @@ -24,6 +24,9 @@ */ #include "rsync.h" +/* if no timeout is specified then use a 60 second select timeout */ +#define SELECT_TIMEOUT 60 + static int io_multiplexing_out; static int io_multiplexing_in; static int multiplex_in_fd; @@ -54,8 +57,8 @@ static void check_timeout(void) t = time(NULL); - if (last_io && io_timeout && (t-last_io)>io_timeout) { - rprintf(FERROR,"read timeout after %d second - exiting\n", + if (last_io && io_timeout && (t-last_io) >= io_timeout) { + rprintf(FERROR,"io timeout after %d second - exiting\n", (int)(t-last_io)); exit_cleanup(1); } @@ -67,6 +70,7 @@ static char *read_buffer_p; static int read_buffer_len; static int read_buffer_size; static int no_flush; +static int no_flush_read; /* read from a socket with IO timeout. return the number of bytes read. If no bytes can be read then exit, never return @@ -75,7 +79,9 @@ static int read_timeout(int fd, char *buf, int len) { int n, ret=0; + no_flush_read++; io_flush(); + no_flush_read--; while (ret == 0) { fd_set fds; @@ -83,11 +89,10 @@ static int read_timeout(int fd, char *buf, int len) FD_ZERO(&fds); FD_SET(fd, &fds); - tv.tv_sec = io_timeout; + tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT; tv.tv_usec = 0; - if (select(fd+1, &fds, NULL, NULL, - io_timeout?&tv:NULL) != 1) { + if (select(fd+1, &fds, NULL, NULL, &tv) != 1) { check_timeout(); continue; } @@ -108,9 +113,18 @@ static int read_timeout(int fd, char *buf, int len) continue; } + if (n == -1 && + (errno == EAGAIN || errno == EWOULDBLOCK)) { + /* this shouldn't happen, if it does then + sleep for a short time to prevent us + chewing too much CPU */ + u_sleep(100); + continue; + } + if (n == 0) { if (eof_error) { - rprintf(FERROR,"EOF in read_timeout\n"); + rprintf(FERROR,"unexpected EOF in read_timeout\n"); } exit_cleanup(1); } @@ -229,7 +243,7 @@ static void readfd(int fd,char *buffer,int N) int ret; int total=0; - if (read_buffer_len < N && N < 1024) { + if ((read_buffer_len < N) && (N < 1024)) { read_check(buffer_f_in); } @@ -243,7 +257,9 @@ static void readfd(int fd,char *buffer,int N) continue; } + no_flush_read++; io_flush(); + no_flush_read--; ret = read_unbuffered(fd,buffer + total,N-total); total += ret; @@ -309,59 +325,69 @@ static void writefd_unbuffered(int fd,char *buf,int len) fd_set w_fds, r_fds; int fd_count, count; struct timeval tv; - int reading; + int reading=0; + int blocked=0; no_flush++; - reading = (buffer_f_in != -1 && read_buffer_len < MAX_READ_BUFFER); - while (total < len) { FD_ZERO(&w_fds); FD_ZERO(&r_fds); FD_SET(fd,&w_fds); fd_count = fd+1; + if (!no_flush_read) { + reading = (buffer_f_in != -1); + } + if (reading) { FD_SET(buffer_f_in,&r_fds); if (buffer_f_in > fd) fd_count = buffer_f_in+1; } - tv.tv_sec = io_timeout; + tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT; tv.tv_usec = 0; count = select(fd_count, reading?&r_fds:NULL, &w_fds,NULL, - io_timeout?&tv:NULL); + &tv); if (count <= 0) { check_timeout(); continue; } + if (reading && FD_ISSET(buffer_f_in, &r_fds)) { + read_check(buffer_f_in); + } + if (FD_ISSET(fd, &w_fds)) { - int ret = write(fd,buf+total,len-total); + int n = (len-total)>>blocked; + int ret = write(fd,buf+total,n?n:1); if (ret == -1 && errno == EINTR) { continue; } + if (ret == -1 && + (errno == EAGAIN || errno == EWOULDBLOCK)) { + blocked++; + continue; + } + if (ret <= 0) { rprintf(FERROR,"erroring writing %d bytes - exiting\n", len); exit_cleanup(1); } + blocked = 0; total += ret; stats.total_written += ret; if (io_timeout) last_io = time(NULL); - continue; - } - - if (reading && FD_ISSET(buffer_f_in, &r_fds)) { - read_check(buffer_f_in); } } @@ -458,7 +484,7 @@ void write_buf(int f,char *buf,int len) } /* write a string to the connection */ -void write_sbuf(int f,char *buf) +static void write_sbuf(int f,char *buf) { write_buf(f, buf, strlen(buf)); }