X-Git-Url: https://mattmccutchen.net/rsync/rsync.git/blobdiff_plain/81c99202d3a60703d651edb8e967e300648cd8f1..ad301e487c1b50120d7ca1a9c7cc5fe80f50b944:/io.c diff --git a/io.c b/io.c index 7aee3479..f651116e 100644 --- a/io.c +++ b/io.c @@ -1,48 +1,83 @@ /* -*- c-file-style: "linux" -*- - - Copyright (C) 1996-2001 by Andrew Tridgell - Copyright (C) Paul Mackerras 1996 - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; either version 2 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. -*/ + * + * Copyright (C) 1996-2001 by Andrew Tridgell + * Copyright (C) Paul Mackerras 1996 + * Copyright (C) 2001, 2002 by Martin Pool + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ -/* - socket and pipe IO utilities used in rsync +/** + * @file io.c + * + * Socket and pipe IO utilities used in rsync. + * + * rsync provides its own multiplexing system, which is used to send + * stderr and stdout over a single socket. We need this because + * stdout normally carries the binary data stream, and stderr all our + * error messages. + * + * For historical reasons this is off during the start of the + * connection, but it's switched on quite early using + * io_start_multiplex_out() and io_start_multiplex_in(). + **/ - tridge, June 1996 - */ #include "rsync.h" -/* if no timeout is specified then use a 60 second select timeout */ +/** If no timeout is specified then use a 60 second select timeout */ #define SELECT_TIMEOUT 60 -extern int bwlimit; - static int io_multiplexing_out; static int io_multiplexing_in; static int multiplex_in_fd; static int multiplex_out_fd; static time_t last_io; -static int eof_error=1; +static int no_flush; + +extern int bwlimit; extern int verbose; extern int io_timeout; extern struct stats stats; + +const char phase_unknown[] = "unknown"; + +/** + * The connection might be dropped at some point; perhaps because the + * remote instance crashed. Just giving the offset on the stream is + * not very helpful. So instead we try to make io_phase_name point to + * something useful. + * + * For buffered/multiplexed IO these names will be somewhat + * approximate; perhaps for ease of support we would rather make the + * buffer always flush when a single application-level IO finishes. + * + * @todo Perhaps we want some simple stack functionality, but there's + * no need to overdo it. + **/ +const char *io_write_phase = phase_unknown; +const char *io_read_phase = phase_unknown; + +/** Ignore EOF errors while reading a module listing if the remote + version is 24 or less. */ +int kludge_around_eof = False; + + static int io_error_fd = -1; -static void read_loop(int fd, char *buf, int len); +static void read_loop(int fd, char *buf, size_t len); static void check_timeout(void) { @@ -69,17 +104,17 @@ static void check_timeout(void) } } -/* setup the fd used to propogate errors */ +/** Setup the fd used to propagate errors */ void io_set_error_fd(int fd) { io_error_fd = fd; } -/* read some data from the error fd and write it to the write log code */ +/** Read some data from the error fd and write it to the write log code */ static void read_error_fd(void) { char buf[200]; - int n; + size_t n; int fd = io_error_fd; int tag, len; @@ -96,7 +131,8 @@ static void read_error_fd(void) while (len) { n = len; - if (n > (sizeof(buf)-1)) n = sizeof(buf)-1; + if (n > (sizeof(buf)-1)) + n = sizeof(buf)-1; read_loop(fd, buf, n); rwrite((enum logcode)tag, buf, n); len -= n; @@ -106,9 +142,43 @@ static void read_error_fd(void) } -static int no_flush; +/** + * It's almost always an error to get an EOF when we're trying to read + * from the network, because the protocol is self-terminating. + * + * However, there is one unfortunate cases where it is not, which is + * rsync <2.4.6 sending a list of modules on a server, since the list + * is terminated by closing the socket. So, for the section of the + * program where that is a problem (start_socket_client), + * kludge_around_eof is True and we just exit. + */ +static void whine_about_eof (void) +{ + if (kludge_around_eof) + exit_cleanup (0); + else { + rprintf (FERROR, + "%s: connection unexpectedly closed " + "(%.0f bytes read so far)\n", + RSYNC_NAME, (double)stats.total_read); + + exit_cleanup (RERR_STREAMIO); + } +} -/* + +static void die_from_readerr (int err) +{ + /* this prevents us trying to write errors on a dead socket */ + io_multiplexing_close(); + + rprintf(FERROR, "%s: read error: %s\n", + RSYNC_NAME, strerror (err)); + exit_cleanup(RERR_STREAMIO); +} + + +/** * Read from a socket with IO timeout. return the number of bytes * read. If no bytes can be read then exit, never return a number <= 0. * @@ -119,16 +189,18 @@ static int no_flush; * give a better explanation. We can tell whether the connection has * started by looking e.g. at whether the remote version is known yet. */ -static int read_timeout(int fd, char *buf, int len) +static int read_timeout (int fd, char *buf, size_t len) { int n, ret=0; io_flush(); while (ret == 0) { + /* until we manage to read *something* */ fd_set fds; struct timeval tv; int fd_count = fd+1; + int count; FD_ZERO(&fds); FD_SET(fd, &fds); @@ -142,11 +214,16 @@ static int read_timeout(int fd, char *buf, int len) errno = 0; - if (select(fd_count, &fds, NULL, NULL, &tv) < 1) { + count = select(fd_count, &fds, NULL, NULL, &tv); + + if (count == 0) { + check_timeout(); + } + + if (count <= 0) { if (errno == EBADF) { exit_cleanup(RERR_SOCKETIO); } - check_timeout(); continue; } @@ -165,41 +242,29 @@ static int read_timeout(int fd, char *buf, int len) if (io_timeout) last_io = time(NULL); continue; + } else if (n == 0) { + whine_about_eof (); + return -1; /* doesn't return */ + } else if (n == -1) { + if (errno == EINTR || errno == EWOULDBLOCK || + errno == EAGAIN) + continue; + else + die_from_readerr (errno); } - - if (n == -1 && errno == EINTR) { - continue; - } - - if (n == -1 && - (errno == EWOULDBLOCK || errno == EAGAIN)) { - continue; - } - - - if (n == 0) { - if (eof_error) { - rprintf(FERROR, - "%s: connection to server unexpectedly closed" - " (%.0f bytes read so far)\n", - RSYNC_NAME, (double)stats.total_read); - } - exit_cleanup(RERR_STREAMIO); - } - - /* this prevents us trying to write errors on a dead socket */ - io_multiplexing_close(); - - rprintf(FERROR,"read error: %s\n", strerror(errno)); - exit_cleanup(RERR_STREAMIO); } return ret; } -/* continue trying to read len bytes - don't return until len - has been read */ -static void read_loop(int fd, char *buf, int len) + + + +/** + * Continue trying to read len bytes - don't return until len has been + * read. + **/ +static void read_loop (int fd, char *buf, size_t len) { while (len) { int n = read_timeout(fd, buf, len); @@ -209,16 +274,20 @@ static void read_loop(int fd, char *buf, int len) } } -/* read from the file descriptor handling multiplexing - - return number of bytes read - never return <= 0 */ -static int read_unbuffered(int fd, char *buf, int len) + +/** + * Read from the file descriptor handling multiplexing - return number + * of bytes read. + * + * Never returns <= 0. + */ +static int read_unbuffered(int fd, char *buf, size_t len) { - static int remaining; - int tag, ret=0; + static size_t remaining; + int tag, ret = 0; char line[1024]; - if (!io_multiplexing_in || fd != multiplex_in_fd) + if (!io_multiplexing_in || fd != multiplex_in_fd) return read_timeout(fd, buf, len); while (ret == 0) { @@ -236,17 +305,18 @@ static int read_unbuffered(int fd, char *buf, int len) remaining = tag & 0xFFFFFF; tag = tag >> 24; - if (tag == MPLEX_BASE) continue; + if (tag == MPLEX_BASE) + continue; tag -= MPLEX_BASE; if (tag != FERROR && tag != FINFO) { - rprintf(FERROR,"unexpected tag %d\n", tag); + rprintf(FERROR, "unexpected tag %d\n", tag); exit_cleanup(RERR_STREAMIO); } - if (remaining > sizeof(line)-1) { - rprintf(FERROR,"multiplexing overflow %d\n\n", + if (remaining > sizeof(line) - 1) { + rprintf(FERROR, "multiplexing overflow %d\n\n", remaining); exit_cleanup(RERR_STREAMIO); } @@ -254,7 +324,7 @@ static int read_unbuffered(int fd, char *buf, int len) read_loop(fd, line, remaining); line[remaining] = 0; - rprintf((enum logcode)tag,"%s", line); + rprintf((enum logcode) tag, "%s", line); remaining = 0; } @@ -262,17 +332,21 @@ static int read_unbuffered(int fd, char *buf, int len) } -/* do a buffered read from fd. don't return until all N bytes - have been read. If all N can't be read then exit with an error */ -static void readfd(int fd,char *buffer,int N) + +/** + * Do a buffered read from @p fd. Don't return until all @p n bytes + * have been read. If all @p n can't be read then exit with an + * error. + **/ +static void readfd (int fd, char *buffer, size_t N) { int ret; - int total=0; + size_t total=0; while (total < N) { io_flush(); - ret = read_unbuffered(fd,buffer + total,N-total); + ret = read_unbuffered (fd, buffer + total, N-total); total += ret; } @@ -315,28 +389,61 @@ int64 read_longint(int f) return ret; } -void read_buf(int f,char *buf,int len) +void read_buf(int f,char *buf,size_t len) { readfd(f,buf,len); } -void read_sbuf(int f,char *buf,int len) +void read_sbuf(int f,char *buf,size_t len) { - read_buf(f,buf,len); + read_buf (f,buf,len); buf[len] = 0; } unsigned char read_byte(int f) { unsigned char c; - read_buf(f,(char *)&c,1); + read_buf (f, (char *)&c, 1); return c; } -/* write len bytes to fd */ -static void writefd_unbuffered(int fd,char *buf,int len) + +/** + * Sleep after writing to limit I/O bandwidth usage. + * + * @todo Rather than sleeping after each write, it might be better to + * use some kind of averaging. The current algorithm seems to always + * use a bit less bandwidth than specified, because it doesn't make up + * for slow periods. But arguably this is a feature. In addition, we + * ought to take the time used to write the data into account. + **/ +static void sleep_for_bwlimit(int bytes_written) { - int total = 0; + struct timeval tv; + + if (!bwlimit) + return; + + assert(bytes_written > 0); + assert(bwlimit > 0); + + tv.tv_usec = bytes_written * 1000 / bwlimit; + tv.tv_sec = tv.tv_usec / 1000000; + tv.tv_usec = tv.tv_usec % 1000000; + + select(0, NULL, NULL, NULL, &tv); +} + + +/** + * Write len bytes to the file descriptor @p fd. + * + * This function underlies the multiplexing system. The body of the + * application never calls this function directly. + **/ +static void writefd_unbuffered(int fd,char *buf,size_t len) +{ + size_t total = 0; fd_set w_fds, r_fds; int fd_count, count; struct timeval tv; @@ -367,11 +474,14 @@ static void writefd_unbuffered(int fd,char *buf,int len) &w_fds,NULL, &tv); + if (count == 0) { + check_timeout(); + } + if (count <= 0) { if (errno == EBADF) { exit_cleanup(RERR_SOCKETIO); } - check_timeout(); continue; } @@ -380,7 +490,8 @@ static void writefd_unbuffered(int fd,char *buf,int len) } if (FD_ISSET(fd, &w_fds)) { - int ret, n = len-total; + int ret; + size_t n = len-total; ret = write(fd,buf+total,n); if (ret == -1 && errno == EINTR) { @@ -394,25 +505,17 @@ static void writefd_unbuffered(int fd,char *buf,int len) } if (ret <= 0) { - rprintf(FERROR, - "error writing %d unbuffered bytes" - " - exiting: %s\n", len, + /* Don't try to write errors back + * across the stream */ + io_multiplexing_close(); + rprintf(FERROR, RSYNC_NAME + ": writefd_unbuffered failed to write %ld bytes: phase \"%s\": %s\n", + (long) len, io_write_phase, strerror(errno)); exit_cleanup(RERR_STREAMIO); } - /* Sleep after writing to limit I/O bandwidth */ - if (bwlimit) - { - tv.tv_sec = 0; - tv.tv_usec = ret * 1000 / bwlimit; - while (tv.tv_usec > 1000000) - { - tv.tv_sec++; - tv.tv_usec -= 1000000; - } - select(0, NULL, NULL, NULL, &tv); - } + sleep_for_bwlimit(ret); total += ret; @@ -437,12 +540,14 @@ void io_start_buffering(int fd) io_buffer_count = 0; } -/* write an message to a multiplexed stream. If this fails then rsync - exits */ -static void mplex_write(int fd, enum logcode code, char *buf, int len) +/** + * Write an message to a multiplexed stream. If this fails then rsync + * exits. + **/ +static void mplex_write(int fd, enum logcode code, char *buf, size_t len) { char buffer[4096]; - int n = len; + size_t n = len; SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len); @@ -479,8 +584,7 @@ void io_flush(void) } -/* XXX: fd is ignored, which seems a little strange. */ -void io_end_buffering(int fd) +void io_end_buffering(void) { io_flush(); if (!io_multiplexing_out) { @@ -489,7 +593,7 @@ void io_end_buffering(int fd) } } -static void writefd(int fd,char *buf,int len) +static void writefd(int fd,char *buf,size_t len) { stats.total_written += len; @@ -501,7 +605,7 @@ static void writefd(int fd,char *buf,int len) } while (len) { - int n = MIN(len, IO_BUFFER_SIZE-io_buffer_count); + int n = MIN((int) len, IO_BUFFER_SIZE-io_buffer_count); if (n > 0) { memcpy(io_buffer+io_buffer_count, buf, n); buf += n; @@ -522,6 +626,14 @@ void write_int(int f,int32 x) } +void write_int_named(int f, int32 x, const char *phase) +{ + io_write_phase = phase; + write_int(f, x); + io_write_phase = phase_unknown; +} + + /* * Note: int64 may actually be a 32-bit type if ./configure couldn't find any * 64-bit types on this platform. @@ -543,12 +655,12 @@ void write_longint(int f, int64 x) writefd(f,b,8); } -void write_buf(int f,char *buf,int len) +void write_buf(int f,char *buf,size_t len) { writefd(f,buf,len); } -/* write a string to the connection */ +/** Write a string to the connection */ static void write_sbuf(int f,char *buf) { write_buf(f, buf, strlen(buf)); @@ -560,14 +672,21 @@ void write_byte(int f,unsigned char c) write_buf(f,(char *)&c,1); } -int read_line(int f, char *buf, int maxlen) -{ - eof_error = 0; + +/** + * Read a line of up to @p maxlen characters into @p buf. Does not + * contain a trailing newline or carriage return. + * + * @return 1 for success; 0 for io error or truncation. + **/ +int read_line(int f, char *buf, size_t maxlen) +{ while (maxlen) { buf[0] = 0; read_buf(f, buf, 1); - if (buf[0] == 0) return 0; + if (buf[0] == 0) + return 0; if (buf[0] == '\n') { buf[0] = 0; break; @@ -582,8 +701,6 @@ int read_line(int f, char *buf, int maxlen) return 0; } - eof_error = 1; - return 1; } @@ -595,7 +712,7 @@ void io_printf(int fd, const char *format, ...) int len; va_start(ap, format); - len = vslprintf(buf, sizeof(buf), format, ap); + len = vsnprintf(buf, sizeof(buf), format, ap); va_end(ap); if (len < 0) exit_cleanup(RERR_STREAMIO); @@ -604,7 +721,7 @@ void io_printf(int fd, const char *format, ...) } -/* setup for multiplexing an error stream with the data stream */ +/** Setup for multiplexing an error stream with the data stream */ void io_start_multiplex_out(int fd) { multiplex_out_fd = fd; @@ -613,7 +730,7 @@ void io_start_multiplex_out(int fd) io_multiplexing_out = 1; } -/* setup for multiplexing an error stream with the data stream */ +/** Setup for multiplexing an error stream with the data stream */ void io_start_multiplex_in(int fd) { multiplex_in_fd = fd; @@ -621,8 +738,8 @@ void io_start_multiplex_in(int fd) io_multiplexing_in = 1; } -/* write an message to the multiplexed error stream */ -int io_multiplex_write(enum logcode code, char *buf, int len) +/** Write an message to the multiplexed error stream */ +int io_multiplex_write(enum logcode code, char *buf, size_t len) { if (!io_multiplexing_out) return 0; @@ -632,7 +749,7 @@ int io_multiplex_write(enum logcode code, char *buf, int len) return 1; } -/* stop output multiplexing */ +/** Stop output multiplexing */ void io_multiplexing_close(void) { io_multiplexing_out = 0;