X-Git-Url: https://mattmccutchen.net/rsync/rsync.git/blobdiff_plain/72f2d1b38495477d63142ed5a490164bff94bc90..4a19c3b254b01c298fe25d72f450a760278e9386:/io.c diff --git a/io.c b/io.c index 6dafa2af..02511764 100644 --- a/io.c +++ b/io.c @@ -1,8 +1,10 @@ -/* -*- c-file-style: "linux" -*- +/* + * Socket and pipe I/O utilities used in rsync. * - * Copyright (C) 1996-2001 by Andrew Tridgell - * Copyright (C) Paul Mackerras 1996 - * Copyright (C) 2001, 2002 by Martin Pool + * Copyright (C) 1996-2001 Andrew Tridgell + * Copyright (C) 1996 Paul Mackerras + * Copyright (C) 2001, 2002 Martin Pool + * Copyright (C) 2003, 2004, 2005, 2006 Wayne Davison * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -14,25 +16,17 @@ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street - Fifth Floor, Boston, MA 02110-1301, USA. */ -/** - * @file io.c - * - * Socket and pipe I/O utilities used in rsync. - * - * rsync provides its own multiplexing system, which is used to send - * stderr and stdout over a single socket. We need this because - * stdout normally carries the binary data stream, and stderr all our - * error messages. +/* Rsync provides its own multiplexing system, which is used to send + * stderr and stdout over a single socket. * * For historical reasons this is off during the start of the * connection, but it's switched on quite early using - * io_start_multiplex_out() and io_start_multiplex_in(). - **/ + * io_start_multiplex_out() and io_start_multiplex_in(). */ #include "rsync.h" @@ -41,7 +35,6 @@ extern int bwlimit; extern size_t bwlimit_writemax; -extern int verbose; extern int io_timeout; extern int allowed_lull; extern int am_server; @@ -53,7 +46,7 @@ extern int read_batch; extern int csum_length; extern int checksum_seed; extern int protocol_version; -extern int remove_sent_files; +extern int remove_source_files; extern int preserve_hard_links; extern char *filesfrom_host; extern struct stats stats; @@ -64,22 +57,6 @@ int ignore_timeout = 0; int batch_fd = -1; int batch_gen_fd = -1; -/** - * The connection might be dropped at some point; perhaps because the - * remote instance crashed. Just giving the offset on the stream is - * not very helpful. So instead we try to make io_phase_name point to - * something useful. - * - * For buffered/multiplexed I/O these names will be somewhat - * approximate; perhaps for ease of support we would rather make the - * buffer always flush when a single application-level I/O finishes. - * - * @todo Perhaps we want some simple stack functionality, but there's - * no need to overdo it. - **/ -const char *io_write_phase = phase_unknown; -const char *io_read_phase = phase_unknown; - /* Ignore an EOF error if non-zero. See whine_about_eof(). */ int kluge_around_eof = 0; @@ -103,8 +80,10 @@ static char io_filesfrom_buf[2048]; static char *io_filesfrom_bp; static char io_filesfrom_lastchar; static int io_filesfrom_buflen; -static size_t contiguous_write_len = 0; +static int defer_forwarding_messages = 0; static int select_timeout = SELECT_TIMEOUT; +static int active_filecnt = 0; +static OFF_T active_bytecnt = 0; static void read_loop(int fd, char *buf, size_t len); @@ -121,15 +100,15 @@ static struct flist_ndx_list redo_list, hlink_list; struct msg_list_item { struct msg_list_item *next; - char *buf; int len; + char buf[1]; }; struct msg_list { struct msg_list_item *head, *tail; }; -static struct msg_list msg_list; +static struct msg_list msg2genr, msg2sndr; static void flist_ndx_push(struct flist_ndx_list *lp, int ndx) { @@ -225,23 +204,22 @@ void set_msg_fd_out(int fd) } /* Add a message to the pending MSG_* list. */ -static void msg_list_add(int code, char *buf, int len) +static void msg_list_add(struct msg_list *lst, int code, const char *buf, int len) { - struct msg_list_item *ml; + struct msg_list_item *m; + int sz = len + 4 + sizeof m[0] - 1; - if (!(ml = new(struct msg_list_item))) - out_of_memory("msg_list_add"); - ml->next = NULL; - if (!(ml->buf = new_array(char, len+4))) + if (!(m = (struct msg_list_item *)new_array(char, sz))) out_of_memory("msg_list_add"); - SIVAL(ml->buf, 0, ((code+MPLEX_BASE)<<24) | len); - memcpy(ml->buf+4, buf, len); - ml->len = len+4; - if (msg_list.tail) - msg_list.tail->next = ml; + m->next = NULL; + m->len = len + 4; + SIVAL(m->buf, 0, ((code+MPLEX_BASE)<<24) | len); + memcpy(m->buf + 4, buf, len); + if (lst->tail) + lst->tail->next = m; else - msg_list.head = ml; - msg_list.tail = ml; + lst->head = m; + lst->tail = m; } /* Read a message from the MSG_* fd and handle it. This is called either @@ -279,6 +257,8 @@ static void read_msg_fd(void) exit_cleanup(RERR_STREAMIO); } read_loop(fd, buf, 4); + if (remove_source_files) + decrement_active_files(IVAL(buf,0)); flist_ndx_push(&redo_list, IVAL(buf,0)); break; case MSG_DELETED: @@ -287,7 +267,7 @@ static void read_msg_fd(void) exit_cleanup(RERR_STREAMIO); } read_loop(fd, buf, len); - io_multiplex_write(MSG_DELETED, buf, len); + send_msg(MSG_DELETED, buf, len); break; case MSG_SUCCESS: if (len != 4 || !am_generator) { @@ -295,8 +275,10 @@ static void read_msg_fd(void) exit_cleanup(RERR_STREAMIO); } read_loop(fd, buf, len); - if (remove_sent_files) - io_multiplex_write(MSG_SUCCESS, buf, len); + if (remove_source_files) { + decrement_active_files(IVAL(buf,0)); + send_msg(MSG_SUCCESS, buf, len); + } if (preserve_hard_links) flist_ndx_push(&hlink_list, IVAL(buf,0)); break; @@ -315,22 +297,45 @@ static void read_msg_fd(void) if (n >= sizeof buf) n = sizeof buf - 1; read_loop(fd, buf, n); - rwrite((enum logcode)tag, buf, n); + rwrite(tag, buf, n); len -= n; } break; default: - rprintf(FERROR, "unknown message %d:%d\n", tag, len); + rprintf(FERROR, "unknown message %d:%d [%s]\n", + tag, len, who_am_i()); exit_cleanup(RERR_STREAMIO); } msg_fd_in = fd; } +/* This is used by the generator to limit how many file transfers can + * be active at once when --remove-source-files is specified. Without + * this, sender-side deletions were mostly happening at the end. */ +void increment_active_files(int ndx, int itemizing, enum logcode code) +{ + /* TODO: tune these limits? */ + while (active_filecnt >= (active_bytecnt >= 128*1024 ? 10 : 50)) { + if (hlink_list.head) + check_for_finished_hlinks(itemizing, code); + read_msg_fd(); + } + + active_filecnt++; + active_bytecnt += the_file_list->files[ndx]->length; +} + +void decrement_active_files(int ndx) +{ + active_filecnt--; + active_bytecnt -= the_file_list->files[ndx]->length; +} + /* Try to push messages off the list onto the wire. If we leave with more * to do, return 0. On error, return -1. If everything flushed, return 1. * This is only active in the receiver. */ -static int msg_list_flush(int flush_it_all) +static int msg2genr_flush(int flush_it_all) { static int written = 0; struct timeval tv; @@ -339,9 +344,9 @@ static int msg_list_flush(int flush_it_all) if (msg_fd_out < 0) return -1; - while (msg_list.head) { - struct msg_list_item *ml = msg_list.head; - int n = write(msg_fd_out, ml->buf + written, ml->len - written); + while (msg2genr.head) { + struct msg_list_item *m = msg2genr.head; + int n = write(msg_fd_out, m->buf + written, m->len - written); if (n < 0) { if (errno == EINTR) continue; @@ -355,26 +360,30 @@ static int msg_list_flush(int flush_it_all) tv.tv_usec = 0; if (!select(msg_fd_out+1, NULL, &fds, NULL, &tv)) check_timeout(); - } else if ((written += n) == ml->len) { - free(ml->buf); - msg_list.head = ml->next; - if (!msg_list.head) - msg_list.tail = NULL; - free(ml); + } else if ((written += n) == m->len) { + msg2genr.head = m->next; + if (!msg2genr.head) + msg2genr.tail = NULL; + free(m); written = 0; } } return 1; } -void send_msg(enum msgcode code, char *buf, int len) +int send_msg(enum msgcode code, const char *buf, int len) { if (msg_fd_out < 0) { - io_multiplex_write(code, buf, len); - return; + if (!defer_forwarding_messages) + return io_multiplex_write(code, buf, len); + if (!io_multiplexing_out) + return 0; + msg_list_add(&msg2sndr, code, buf, len); + return 1; } - msg_list_add(code, buf, len); - msg_list_flush(NORMAL_FLUSH); + msg_list_add(&msg2genr, code, buf, len); + msg2genr_flush(NORMAL_FLUSH); + return 1; } int get_redo_num(int itemizing, enum logcode code) @@ -471,7 +480,7 @@ static int read_timeout(int fd, char *buf, size_t len) FD_ZERO(&r_fds); FD_ZERO(&w_fds); FD_SET(fd, &r_fds); - if (msg_list.head) { + if (msg2genr.head) { FD_SET(msg_fd_out, &w_fds); if (msg_fd_out > maxfd) maxfd = msg_fd_out; @@ -508,8 +517,8 @@ static int read_timeout(int fd, char *buf, size_t len) continue; } - if (msg_list.head && FD_ISSET(msg_fd_out, &w_fds)) - msg_list_flush(NORMAL_FLUSH); + if (msg2genr.head && FD_ISSET(msg_fd_out, &w_fds)) + msg2genr_flush(NORMAL_FLUSH); if (io_filesfrom_f_out >= 0) { if (io_filesfrom_buflen) { @@ -624,13 +633,19 @@ int read_filesfrom_line(int fd, char *fname) if (cnt < 0 && (errno == EWOULDBLOCK || errno == EINTR || errno == EAGAIN)) { struct timeval tv; - fd_set fds; - FD_ZERO(&fds); - FD_SET(fd, &fds); + fd_set r_fds, e_fds; + FD_ZERO(&r_fds); + FD_SET(fd, &r_fds); + FD_ZERO(&e_fds); + FD_SET(fd, &e_fds); tv.tv_sec = select_timeout; tv.tv_usec = 0; - if (!select(fd+1, &fds, NULL, NULL, &tv)) + if (!select(fd+1, &r_fds, NULL, &e_fds, &tv)) check_timeout(); + if (FD_ISSET(fd, &e_fds)) { + rsyserr(FINFO, errno, + "select exception on fd %d", fd); + } continue; } if (cnt != 1) @@ -774,12 +789,13 @@ static int readfd_unbuffered(int fd, char *buf, size_t len) if (msg_bytes >= sizeof line) goto overflow; read_loop(fd, line, msg_bytes); - line[msg_bytes] = '\0'; /* A directory name was sent with the trailing null */ if (msg_bytes > 0 && !line[msg_bytes-1]) log_delete(line, S_IFDIR); - else + else { + line[msg_bytes] = '\0'; log_delete(line, S_IFREG); + } break; case MSG_SUCCESS: if (msg_bytes != 4) { @@ -878,12 +894,12 @@ int64 read_longint(int f) return num; } -void read_buf(int f,char *buf,size_t len) +void read_buf(int f, char *buf, size_t len) { readfd(f,buf,len); } -void read_sbuf(int f,char *buf,size_t len) +void read_sbuf(int f, char *buf, size_t len) { readfd(f, buf, len); buf[len] = '\0'; @@ -920,6 +936,11 @@ int read_vstring(int f, char *buf, int bufsize) void read_sum_head(int f, struct sum_struct *sum) { sum->count = read_int(f); + if (sum->count < 0) { + rprintf(FERROR, "Invalid checksum count %ld [%s]\n", + (long)sum->count, who_am_i()); + exit_cleanup(RERR_PROTOCOL); + } sum->blength = read_int(f); if (sum->blength < 0 || sum->blength > MAX_BLOCK_SIZE) { rprintf(FERROR, "Invalid block length %ld [%s]\n", @@ -976,16 +997,16 @@ void write_sum_head(int f, struct sum_struct *sum) static void sleep_for_bwlimit(int bytes_written) { static struct timeval prior_tv; - static long total_written = 0; + static long total_written = 0; struct timeval tv, start_tv; long elapsed_usec, sleep_usec; #define ONE_SEC 1000000L /* # of microseconds in a second */ - if (!bwlimit) + if (!bwlimit_writemax) return; - total_written += bytes_written; + total_written += bytes_written; gettimeofday(&start_tv, NULL); if (prior_tv.tv_sec) { @@ -1018,23 +1039,26 @@ static void sleep_for_bwlimit(int bytes_written) * * This function underlies the multiplexing system. The body of the * application never calls this function directly. */ -static void writefd_unbuffered(int fd,char *buf,size_t len) +static void writefd_unbuffered(int fd, const char *buf, size_t len) { size_t n, total = 0; - fd_set w_fds, r_fds; + fd_set w_fds, r_fds, e_fds; int maxfd, count, cnt, using_r_fds; + int defer_save = defer_forwarding_messages; struct timeval tv; no_flush++; while (total < len) { FD_ZERO(&w_fds); - FD_SET(fd,&w_fds); + FD_SET(fd, &w_fds); + FD_ZERO(&e_fds); + FD_SET(fd, &e_fds); maxfd = fd; - if (msg_fd_in >= 0 && len-total >= contiguous_write_len) { + if (msg_fd_in >= 0) { FD_ZERO(&r_fds); - FD_SET(msg_fd_in,&r_fds); + FD_SET(msg_fd_in, &r_fds); if (msg_fd_in > maxfd) maxfd = msg_fd_in; using_r_fds = 1; @@ -1046,7 +1070,7 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) errno = 0; count = select(maxfd + 1, using_r_fds ? &r_fds : NULL, - &w_fds, NULL, &tv); + &w_fds, &e_fds, &tv); if (count <= 0) { if (count < 0 && errno == EBADF) @@ -1055,6 +1079,11 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) continue; } + if (FD_ISSET(fd, &e_fds)) { + rsyserr(FINFO, errno, + "select exception on fd %d", fd); + } + if (using_r_fds && FD_ISSET(msg_fd_in, &r_fds)) read_msg_fd(); @@ -1062,7 +1091,7 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) continue; n = len - total; - if (bwlimit && n > bwlimit_writemax) + if (bwlimit_writemax && n > bwlimit_writemax) n = bwlimit_writemax; cnt = write(fd, buf + total, n); @@ -1080,8 +1109,8 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) if (fd == sock_f_out) close_multiplexing_out(); rsyserr(FERROR, errno, - "writefd_unbuffered failed to write %ld bytes: phase \"%s\" [%s]", - (long)len, io_write_phase, who_am_i()); + "writefd_unbuffered failed to write %ld bytes [%s]", + (long)len, who_am_i()); /* If the other side is sending us error messages, try * to grab any messages they sent before they died. */ while (fd == sock_f_out && io_multiplexing_in) { @@ -1094,6 +1123,7 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) } total += cnt; + defer_forwarding_messages = 1; if (fd == sock_f_out) { if (io_timeout || am_generator) @@ -1102,46 +1132,60 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) } } + defer_forwarding_messages = defer_save; no_flush--; } +static void msg2sndr_flush(void) +{ + if (defer_forwarding_messages) + return; + + while (msg2sndr.head && io_multiplexing_out) { + struct msg_list_item *m = msg2sndr.head; + if (!(msg2sndr.head = m->next)) + msg2sndr.tail = NULL; + stats.total_written += m->len; + defer_forwarding_messages = 1; + writefd_unbuffered(sock_f_out, m->buf, m->len); + defer_forwarding_messages = 0; + free(m); + } +} + /** * Write an message to a multiplexed stream. If this fails then rsync * exits. **/ -static void mplex_write(enum msgcode code, char *buf, size_t len) +static void mplex_write(enum msgcode code, const char *buf, size_t len) { - char buffer[BIGPATHBUFLEN]; + char buffer[1024]; size_t n = len; SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len); - /* When the generator reads messages from the msg_fd_in pipe, it can - * cause output to occur down the socket. Setting contiguous_write_len - * prevents the reading of msg_fd_in once we actually start to write - * this sequence of data (though we might read it before the start). */ - if (am_generator && msg_fd_in >= 0) - contiguous_write_len = len + 4; - if (n > sizeof buffer - 4) - n = sizeof buffer - 4; + n = 0; + else + memcpy(buffer + 4, buf, n); - memcpy(&buffer[4], buf, n); writefd_unbuffered(sock_f_out, buffer, n+4); len -= n; buf += n; - if (len) + if (len) { + defer_forwarding_messages = 1; writefd_unbuffered(sock_f_out, buf, len); - - if (am_generator && msg_fd_in >= 0) - contiguous_write_len = 0; + defer_forwarding_messages = 0; + msg2sndr_flush(); + } } void io_flush(int flush_it_all) { - msg_list_flush(flush_it_all); + msg2genr_flush(flush_it_all); + msg2sndr_flush(); if (!iobuf_out_cnt || no_flush) return; @@ -1153,7 +1197,7 @@ void io_flush(int flush_it_all) iobuf_out_cnt = 0; } -static void writefd(int fd,char *buf,size_t len) +static void writefd(int fd, const char *buf, size_t len) { if (fd == msg_fd_out) { rprintf(FERROR, "Internal error: wrong write used in receiver.\n"); @@ -1202,13 +1246,6 @@ void write_int(int f,int32 x) writefd(f,b,4); } -void write_int_named(int f, int32 x, const char *phase) -{ - io_write_phase = phase; - write_int(f, x); - io_write_phase = phase_unknown; -} - /* * Note: int64 may actually be a 32-bit type if ./configure couldn't find any * 64-bit types on this platform. @@ -1234,13 +1271,13 @@ void write_longint(int f, int64 x) #endif } -void write_buf(int f,char *buf,size_t len) +void write_buf(int f, const char *buf, size_t len) { writefd(f,buf,len); } /** Write a string to the connection */ -void write_sbuf(int f, char *buf) +void write_sbuf(int f, const char *buf) { writefd(f, buf, strlen(buf)); } @@ -1250,7 +1287,7 @@ void write_byte(int f, uchar c) writefd(f, (char *)&c, 1); } -void write_vstring(int f, char *str, int len) +void write_vstring(int f, const char *str, int len) { uchar lenbuf[3], *lb = lenbuf; @@ -1333,7 +1370,7 @@ void io_start_multiplex_in(void) } /** Write an message to the multiplexed data stream. */ -int io_multiplex_write(enum msgcode code, char *buf, size_t len) +int io_multiplex_write(enum msgcode code, const char *buf, size_t len) { if (!io_multiplexing_out) return 0;