Matt McCutchen's Web Site
/
rsync
/
rsync.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
handle systems that don't take a 2nd argument to gettimeofday()
[rsync/rsync.git]
/
io.c
diff --git
a/io.c
b/io.c
index
3ea09e1
..
4fd1a13
100644
(file)
--- a/
io.c
+++ b/
io.c
@@
-60,7
+60,7
@@
static void check_timeout(void)
if (last_io && io_timeout && (t-last_io) >= io_timeout) {
rprintf(FERROR,"io timeout after %d second - exiting\n",
(int)(t-last_io));
if (last_io && io_timeout && (t-last_io) >= io_timeout) {
rprintf(FERROR,"io timeout after %d second - exiting\n",
(int)(t-last_io));
- exit_cleanup(
1
);
+ exit_cleanup(
RERR_TIMEOUT
);
}
}
}
}
@@
-70,6
+70,7
@@
static char *read_buffer_p;
static int read_buffer_len;
static int read_buffer_size;
static int no_flush;
static int read_buffer_len;
static int read_buffer_size;
static int no_flush;
+static int no_flush_read;
/* read from a socket with IO timeout. return the number of
bytes read. If no bytes can be read then exit, never return
/* read from a socket with IO timeout. return the number of
bytes read. If no bytes can be read then exit, never return
@@
-78,7
+79,9
@@
static int read_timeout(int fd, char *buf, int len)
{
int n, ret=0;
{
int n, ret=0;
+ no_flush_read++;
io_flush();
io_flush();
+ no_flush_read--;
while (ret == 0) {
fd_set fds;
while (ret == 0) {
fd_set fds;
@@
-97,7
+100,6
@@
static int read_timeout(int fd, char *buf, int len)
n = read(fd, buf, len);
if (n > 0) {
n = read(fd, buf, len);
if (n > 0) {
- stats.total_read += n;
buf += n;
len -= n;
ret += n;
buf += n;
len -= n;
ret += n;
@@
-110,24
+112,19
@@
static int read_timeout(int fd, char *buf, int len)
continue;
}
continue;
}
- if (n == -1 &&
- (errno == EAGAIN || errno == EWOULDBLOCK)) {
- /* this shouldn't happen, if it does then
- sleep for a short time to prevent us
- chewing too much CPU */
- u_sleep(100);
- continue;
- }
if (n == 0) {
if (eof_error) {
rprintf(FERROR,"unexpected EOF in read_timeout\n");
}
if (n == 0) {
if (eof_error) {
rprintf(FERROR,"unexpected EOF in read_timeout\n");
}
- exit_cleanup(
1
);
+ exit_cleanup(
RERR_STREAMIO
);
}
}
+ /* this prevents us trying to write errors on a dead socket */
+ io_multiplexing_out = 0;
+
rprintf(FERROR,"read error: %s\n", strerror(errno));
rprintf(FERROR,"read error: %s\n", strerror(errno));
- exit_cleanup(
1
);
+ exit_cleanup(
RERR_STREAMIO
);
}
return ret;
}
return ret;
@@
-179,13
+176,13
@@
static int read_unbuffered(int fd, char *buf, int len)
if (tag != FERROR && tag != FINFO) {
rprintf(FERROR,"unexpected tag %d\n", tag);
if (tag != FERROR && tag != FINFO) {
rprintf(FERROR,"unexpected tag %d\n", tag);
- exit_cleanup(
1
);
+ exit_cleanup(
RERR_STREAMIO
);
}
if (remaining > sizeof(line)-1) {
rprintf(FERROR,"multiplexing overflow %d\n\n",
remaining);
}
if (remaining > sizeof(line)-1) {
rprintf(FERROR,"multiplexing overflow %d\n\n",
remaining);
- exit_cleanup(
1
);
+ exit_cleanup(
RERR_STREAMIO
);
}
read_loop(fd, line, remaining);
}
read_loop(fd, line, remaining);
@@
-254,19
+251,27
@@
static void readfd(int fd,char *buffer,int N)
continue;
}
continue;
}
+ no_flush_read++;
io_flush();
io_flush();
+ no_flush_read--;
ret = read_unbuffered(fd,buffer + total,N-total);
total += ret;
}
ret = read_unbuffered(fd,buffer + total,N-total);
total += ret;
}
+
+ stats.total_read += total;
}
int32 read_int(int f)
{
char b[4];
}
int32 read_int(int f)
{
char b[4];
+ int32 ret;
+
readfd(f,b,4);
readfd(f,b,4);
- return IVAL(b,0);
+ ret = IVAL(b,0);
+ if (ret == (int32)0xffffffff) return -1;
+ return ret;
}
int64 read_longint(int f)
}
int64 read_longint(int f)
@@
-276,11
+281,13
@@
int64 read_longint(int f)
char b[8];
ret = read_int(f);
char b[8];
ret = read_int(f);
- if ((int32)ret != (int32)0xffffffff) return ret;
+ if ((int32)ret != (int32)0xffffffff) {
+ return ret;
+ }
#ifdef NO_INT64
rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
#ifdef NO_INT64
rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
- exit_cleanup(
1
);
+ exit_cleanup(
RERR_UNSUPPORTED
);
#else
if (remote_version >= 16) {
readfd(f,b,8);
#else
if (remote_version >= 16) {
readfd(f,b,8);
@@
-320,8
+327,7
@@
static void writefd_unbuffered(int fd,char *buf,int len)
fd_set w_fds, r_fds;
int fd_count, count;
struct timeval tv;
fd_set w_fds, r_fds;
int fd_count, count;
struct timeval tv;
- int reading;
- int blocked=0;
+ int reading=0;
no_flush++;
no_flush++;
@@
-331,8
+337,9
@@
static void writefd_unbuffered(int fd,char *buf,int len)
FD_SET(fd,&w_fds);
fd_count = fd+1;
FD_SET(fd,&w_fds);
fd_count = fd+1;
- reading = (buffer_f_in != -1 &&
- read_buffer_len < MAX_READ_BUFFER);
+ if (!no_flush_read) {
+ reading = (buffer_f_in != -1);
+ }
if (reading) {
FD_SET(buffer_f_in,&r_fds);
if (reading) {
FD_SET(buffer_f_in,&r_fds);
@@
-358,27
+365,22
@@
static void writefd_unbuffered(int fd,char *buf,int len)
}
if (FD_ISSET(fd, &w_fds)) {
}
if (FD_ISSET(fd, &w_fds)) {
- int n = (len-total)>>blocked;
- int ret = write(fd,buf+total,n?n:1);
+ int ret, n = len-total;
+
+ if (n > PIPE_BUF) n = PIPE_BUF;
- if (ret == -1 && errno == EINTR) {
- continue;
- }
+ ret = write(fd,buf+total,n?n:1);
- if (ret == -1 &&
- (errno == EAGAIN || errno == EWOULDBLOCK)) {
- blocked++;
+ if (ret == -1 && errno == EINTR) {
continue;
}
if (ret <= 0) {
rprintf(FERROR,"erroring writing %d bytes - exiting\n", len);
continue;
}
if (ret <= 0) {
rprintf(FERROR,"erroring writing %d bytes - exiting\n", len);
- exit_cleanup(
1
);
+ exit_cleanup(
RERR_STREAMIO
);
}
}
- blocked = 0;
total += ret;
total += ret;
- stats.total_written += ret;
if (io_timeout)
last_io = time(NULL);
if (io_timeout)
last_io = time(NULL);
@@
-429,6
+431,8
@@
void io_end_buffering(int fd)
static void writefd(int fd,char *buf,int len)
{
static void writefd(int fd,char *buf,int len)
{
+ stats.total_written += len;
+
if (!io_buffer) {
writefd_unbuffered(fd, buf, len);
return;
if (!io_buffer) {
writefd_unbuffered(fd, buf, len);
return;
@@
-465,7
+469,7
@@
void write_longint(int f, int64 x)
return;
}
return;
}
- write_int(f,
-1
);
+ write_int(f,
(int32)0xFFFFFFFF
);
SIVAL(b,0,(x&0xFFFFFFFF));
SIVAL(b,4,((x>>32)&0xFFFFFFFF));
SIVAL(b,0,(x&0xFFFFFFFF));
SIVAL(b,4,((x>>32)&0xFFFFFFFF));
@@
-478,7
+482,7
@@
void write_buf(int f,char *buf,int len)
}
/* write a string to the connection */
}
/* write a string to the connection */
-void write_sbuf(int f,char *buf)
+
static
void write_sbuf(int f,char *buf)
{
write_buf(f, buf, strlen(buf));
}
{
write_buf(f, buf, strlen(buf));
}
@@
-524,10
+528,10
@@
void io_printf(int fd, const char *format, ...)
int len;
va_start(ap, format);
int len;
va_start(ap, format);
- len = vslprintf(buf, sizeof(buf)
-1
, format, ap);
+ len = vslprintf(buf, sizeof(buf), format, ap);
va_end(ap);
va_end(ap);
- if (len < 0) exit_cleanup(
1
);
+ if (len < 0) exit_cleanup(
RERR_STREAMIO
);
write_sbuf(fd, buf);
}
write_sbuf(fd, buf);
}
@@
-549,7
+553,7
@@
void io_start_multiplex_in(int fd)
io_flush();
if (read_buffer_len) {
fprintf(stderr,"ERROR: data in read buffer at mplx start\n");
io_flush();
if (read_buffer_len) {
fprintf(stderr,"ERROR: data in read buffer at mplx start\n");
- exit_cleanup(
1
);
+ exit_cleanup(
RERR_STREAMIO
);
}
io_multiplexing_in = 1;
}
io_multiplexing_in = 1;
@@
-565,6
+569,8
@@
int io_multiplex_write(int f, char *buf, int len)
SIVAL(io_buffer-4, 0, ((MPLEX_BASE + f)<<24) + len);
memcpy(io_buffer, buf, len);
SIVAL(io_buffer-4, 0, ((MPLEX_BASE + f)<<24) + len);
memcpy(io_buffer, buf, len);
+ stats.total_written += (len+4);
+
writefd_unbuffered(multiplex_out_fd, io_buffer-4, len+4);
return 1;
}
writefd_unbuffered(multiplex_out_fd, io_buffer-4, len+4);
return 1;
}