X-Git-Url: https://mattmccutchen.net/rsync/rsync.git/blobdiff_plain/909ce14fc4ee350f4067730db563cbf0961891a3..eca2adb4b3a4ca095116f54e9e0a6df19f0bd149:/io.c diff --git a/io.c b/io.c index 6085745d..a83276d5 100644 --- a/io.c +++ b/io.c @@ -1,32 +1,42 @@ /* -*- c-file-style: "linux" -*- - - Copyright (C) 1996-2001 by Andrew Tridgell - Copyright (C) Paul Mackerras 1996 - Copyright (C) 2001 by Martin Pool - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; either version 2 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. -*/ + * + * Copyright (C) 1996-2001 by Andrew Tridgell + * Copyright (C) Paul Mackerras 1996 + * Copyright (C) 2001, 2002 by Martin Pool + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ -/* - socket and pipe IO utilities used in rsync +/** + * @file io.c + * + * Socket and pipe IO utilities used in rsync. + * + * rsync provides its own multiplexing system, which is used to send + * stderr and stdout over a single socket. We need this because + * stdout normally carries the binary data stream, and stderr all our + * error messages. + * + * For historical reasons this is off during the start of the + * connection, but it's switched on quite early using + * io_start_multiplex_out() and io_start_multiplex_in(). + **/ - tridge, June 1996 - */ #include "rsync.h" -/* if no timeout is specified then use a 60 second select timeout */ +/** If no timeout is specified then use a 60 second select timeout */ #define SELECT_TIMEOUT 60 static int io_multiplexing_out; @@ -42,6 +52,19 @@ extern int io_timeout; extern struct stats stats; +/** + * The connection might be dropped at some point; perhaps because the + * remote instance crashed. Just giving the offset on the stream is + * not very helpful. So instead we try to make io_phase_name point to + * something useful. + * + * @todo Perhaps we want some simple stack functionality, but there's + * no need to overdo it. + **/ +const char *io_write_phase = "unknown"; +const char *io_read_phase = "unknown"; + + /** Ignore EOF errors while reading a module listing if the remote version is 24 or less. */ int kludge_around_eof = False; @@ -76,17 +99,17 @@ static void check_timeout(void) } } -/* setup the fd used to propogate errors */ +/** Setup the fd used to propogate errors */ void io_set_error_fd(int fd) { io_error_fd = fd; } -/* read some data from the error fd and write it to the write log code */ +/** Read some data from the error fd and write it to the write log code */ static void read_error_fd(void) { char buf[200]; - int n; + size_t n; int fd = io_error_fd; int tag, len; @@ -103,7 +126,8 @@ static void read_error_fd(void) while (len) { n = len; - if (n > (sizeof(buf)-1)) n = sizeof(buf)-1; + if (n > (sizeof(buf)-1)) + n = sizeof(buf)-1; read_loop(fd, buf, n); rwrite((enum logcode)tag, buf, n); len -= n; @@ -113,21 +137,18 @@ static void read_error_fd(void) } +/** + * It's almost always an error to get an EOF when we're trying to read + * from the network, because the protocol is self-terminating. + * + * However, there is one unfortunate cases where it is not, which is + * rsync <2.4.6 sending a list of modules on a server, since the list + * is terminated by closing the socket. So, for the section of the + * program where that is a problem (start_socket_client), + * kludge_around_eof is True and we just exit. + */ static void whine_about_eof (void) { - /** - It's almost always an error to get an EOF when we're trying - to read from the network, because the protocol is - self-terminating. - - However, there is one unfortunate cases where it is not, - which is rsync <2.4.6 sending a list of modules on a - server, since the list is terminated by closing the socket. - So, for the section of the program where that is a problem - (start_socket_client), kludge_around_eof is True and we - just exit. - */ - if (kludge_around_eof) exit_cleanup (0); else { @@ -152,7 +173,7 @@ static void die_from_readerr (int err) } -/*! +/** * Read from a socket with IO timeout. return the number of bytes * read. If no bytes can be read then exit, never return a number <= 0. * @@ -234,8 +255,10 @@ static int read_timeout (int fd, char *buf, size_t len) -/*! Continue trying to read len bytes - don't return until len has - been read. */ +/** + * Continue trying to read len bytes - don't return until len has been + * read. + **/ static void read_loop (int fd, char *buf, size_t len) { while (len) { @@ -255,7 +278,7 @@ static void read_loop (int fd, char *buf, size_t len) */ static int read_unbuffered(int fd, char *buf, size_t len) { - static int remaining; + static size_t remaining; int tag, ret = 0; char line[1024]; @@ -305,12 +328,15 @@ static int read_unbuffered(int fd, char *buf, size_t len) -/* do a buffered read from fd. don't return until all N bytes - have been read. If all N can't be read then exit with an error */ +/** + * Do a buffered read from @p fd. Don't return until all @p n bytes + * have been read. If all @p n can't be read then exit with an + * error. + **/ static void readfd (int fd, char *buffer, size_t N) { int ret; - int total=0; + size_t total=0; while (total < N) { io_flush(); @@ -376,10 +402,43 @@ unsigned char read_byte(int f) return c; } -/* write len bytes to fd */ + +/** + * Sleep after writing to limit I/O bandwidth usage. + * + * @todo Rather than sleeping after each write, it might be better to + * use some kind of averaging. The current algorithm seems to always + * use a bit less bandwidth than specified, because it doesn't make up + * for slow periods. But arguably this is a feature. In addition, we + * ought to take the time used to write the data into account. + **/ +static void sleep_for_bwlimit(int bytes_written) +{ + struct timeval tv; + + if (!bwlimit) + return; + + assert(bytes_written > 0); + assert(bwlimit > 0); + + tv.tv_usec = bytes_written * 1000 / bwlimit; + tv.tv_sec = tv.tv_usec / 1000000; + tv.tv_usec = tv.tv_usec % 1000000; + + select(0, NULL, NULL, NULL, &tv); +} + + +/** + * Write len bytes to the file descriptor @p fd. + * + * This function underlies the multiplexing system. The body of the + * application never calls this function directly. + **/ static void writefd_unbuffered(int fd,char *buf,size_t len) { - int total = 0; + size_t total = 0; fd_set w_fds, r_fds; int fd_count, count; struct timeval tv; @@ -426,7 +485,8 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) } if (FD_ISSET(fd, &w_fds)) { - int ret, n = len-total; + int ret; + size_t n = len-total; ret = write(fd,buf+total,n); if (ret == -1 && errno == EINTR) { @@ -440,25 +500,17 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) } if (ret <= 0) { - rprintf(FERROR, - "error writing %d unbuffered bytes" - " - exiting: %s\n", len, + /* Don't try to write errors back + * across the stream */ + io_multiplexing_close(); + rprintf(FERROR, RSYNC_NAME + ": writefd_unbuffered failed to write %ld bytes: phase \"%s\": %s\n", + (long) len, io_write_phase, strerror(errno)); exit_cleanup(RERR_STREAMIO); } - /* Sleep after writing to limit I/O bandwidth */ - if (bwlimit) - { - tv.tv_sec = 0; - tv.tv_usec = ret * 1000 / bwlimit; - while (tv.tv_usec > 1000000) - { - tv.tv_sec++; - tv.tv_usec -= 1000000; - } - select(0, NULL, NULL, NULL, &tv); - } + sleep_for_bwlimit(ret); total += ret; @@ -483,12 +535,14 @@ void io_start_buffering(int fd) io_buffer_count = 0; } -/* write an message to a multiplexed stream. If this fails then rsync - exits */ +/** + * Write an message to a multiplexed stream. If this fails then rsync + * exits. + **/ static void mplex_write(int fd, enum logcode code, char *buf, size_t len) { char buffer[4096]; - int n = len; + size_t n = len; SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len); @@ -525,8 +579,7 @@ void io_flush(void) } -/* XXX: fd is ignored, which seems a little strange. */ -void io_end_buffering(int fd) +void io_end_buffering(void) { io_flush(); if (!io_multiplexing_out) { @@ -547,7 +600,7 @@ static void writefd(int fd,char *buf,size_t len) } while (len) { - int n = MIN(len, IO_BUFFER_SIZE-io_buffer_count); + int n = MIN((int) len, IO_BUFFER_SIZE-io_buffer_count); if (n > 0) { memcpy(io_buffer+io_buffer_count, buf, n); buf += n; @@ -594,7 +647,7 @@ void write_buf(int f,char *buf,size_t len) writefd(f,buf,len); } -/* write a string to the connection */ +/** Write a string to the connection */ static void write_sbuf(int f,char *buf) { write_buf(f, buf, strlen(buf)); @@ -608,12 +661,19 @@ void write_byte(int f,unsigned char c) +/** + * Read a line of up to @p maxlen characters into @p buf. Does not + * contain a trailing newline or carriage return. + * + * @return 1 for success; 0 for io error or truncation. + **/ int read_line(int f, char *buf, size_t maxlen) { while (maxlen) { buf[0] = 0; read_buf(f, buf, 1); - if (buf[0] == 0) return 0; + if (buf[0] == 0) + return 0; if (buf[0] == '\n') { buf[0] = 0; break; @@ -648,7 +708,7 @@ void io_printf(int fd, const char *format, ...) } -/* setup for multiplexing an error stream with the data stream */ +/** Setup for multiplexing an error stream with the data stream */ void io_start_multiplex_out(int fd) { multiplex_out_fd = fd; @@ -657,7 +717,7 @@ void io_start_multiplex_out(int fd) io_multiplexing_out = 1; } -/* setup for multiplexing an error stream with the data stream */ +/** Setup for multiplexing an error stream with the data stream */ void io_start_multiplex_in(int fd) { multiplex_in_fd = fd; @@ -665,7 +725,7 @@ void io_start_multiplex_in(int fd) io_multiplexing_in = 1; } -/* write an message to the multiplexed error stream */ +/** Write an message to the multiplexed error stream */ int io_multiplex_write(enum logcode code, char *buf, size_t len) { if (!io_multiplexing_out) return 0; @@ -676,7 +736,7 @@ int io_multiplex_write(enum logcode code, char *buf, size_t len) return 1; } -/* stop output multiplexing */ +/** Stop output multiplexing */ void io_multiplexing_close(void) { io_multiplexing_out = 0;