X-Git-Url: https://mattmccutchen.net/rsync/rsync.git/blobdiff_plain/408e69396c7efd881185face829c2057044c61e5..4a19c3b254b01c298fe25d72f450a760278e9386:/io.c diff --git a/io.c b/io.c index 0fa5af9e..02511764 100644 --- a/io.c +++ b/io.c @@ -1,8 +1,10 @@ -/* -*- c-file-style: "linux" -*- +/* + * Socket and pipe I/O utilities used in rsync. * - * Copyright (C) 1996-2001 by Andrew Tridgell - * Copyright (C) Paul Mackerras 1996 - * Copyright (C) 2001, 2002 by Martin Pool + * Copyright (C) 1996-2001 Andrew Tridgell + * Copyright (C) 1996 Paul Mackerras + * Copyright (C) 2001, 2002 Martin Pool + * Copyright (C) 2003, 2004, 2005, 2006 Wayne Davison * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -14,25 +16,17 @@ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street - Fifth Floor, Boston, MA 02110-1301, USA. */ -/** - * @file io.c - * - * Socket and pipe I/O utilities used in rsync. - * - * rsync provides its own multiplexing system, which is used to send - * stderr and stdout over a single socket. We need this because - * stdout normally carries the binary data stream, and stderr all our - * error messages. +/* Rsync provides its own multiplexing system, which is used to send + * stderr and stdout over a single socket. * * For historical reasons this is off during the start of the * connection, but it's switched on quite early using - * io_start_multiplex_out() and io_start_multiplex_in(). - **/ + * io_start_multiplex_out() and io_start_multiplex_in(). */ #include "rsync.h" @@ -52,7 +46,7 @@ extern int read_batch; extern int csum_length; extern int checksum_seed; extern int protocol_version; -extern int remove_sent_files; +extern int remove_source_files; extern int preserve_hard_links; extern char *filesfrom_host; extern struct stats stats; @@ -63,22 +57,6 @@ int ignore_timeout = 0; int batch_fd = -1; int batch_gen_fd = -1; -/** - * The connection might be dropped at some point; perhaps because the - * remote instance crashed. Just giving the offset on the stream is - * not very helpful. So instead we try to make io_phase_name point to - * something useful. - * - * For buffered/multiplexed I/O these names will be somewhat - * approximate; perhaps for ease of support we would rather make the - * buffer always flush when a single application-level I/O finishes. - * - * @todo Perhaps we want some simple stack functionality, but there's - * no need to overdo it. - **/ -const char *io_write_phase = phase_unknown; -const char *io_read_phase = phase_unknown; - /* Ignore an EOF error if non-zero. See whine_about_eof(). */ int kluge_around_eof = 0; @@ -226,7 +204,7 @@ void set_msg_fd_out(int fd) } /* Add a message to the pending MSG_* list. */ -static void msg_list_add(struct msg_list *lst, int code, char *buf, int len) +static void msg_list_add(struct msg_list *lst, int code, const char *buf, int len) { struct msg_list_item *m; int sz = len + 4 + sizeof m[0] - 1; @@ -279,7 +257,7 @@ static void read_msg_fd(void) exit_cleanup(RERR_STREAMIO); } read_loop(fd, buf, 4); - if (remove_sent_files) + if (remove_source_files) decrement_active_files(IVAL(buf,0)); flist_ndx_push(&redo_list, IVAL(buf,0)); break; @@ -289,10 +267,7 @@ static void read_msg_fd(void) exit_cleanup(RERR_STREAMIO); } read_loop(fd, buf, len); - if (defer_forwarding_messages) - msg_list_add(&msg2sndr, MSG_DELETED, buf, len); - else - io_multiplex_write(MSG_DELETED, buf, len); + send_msg(MSG_DELETED, buf, len); break; case MSG_SUCCESS: if (len != 4 || !am_generator) { @@ -300,12 +275,9 @@ static void read_msg_fd(void) exit_cleanup(RERR_STREAMIO); } read_loop(fd, buf, len); - if (remove_sent_files) { + if (remove_source_files) { decrement_active_files(IVAL(buf,0)); - if (defer_forwarding_messages) - msg_list_add(&msg2sndr, MSG_SUCCESS, buf, len); - else - io_multiplex_write(MSG_SUCCESS, buf, len); + send_msg(MSG_SUCCESS, buf, len); } if (preserve_hard_links) flist_ndx_push(&hlink_list, IVAL(buf,0)); @@ -325,15 +297,13 @@ static void read_msg_fd(void) if (n >= sizeof buf) n = sizeof buf - 1; read_loop(fd, buf, n); - if (am_generator && am_server && defer_forwarding_messages) - msg_list_add(&msg2sndr, tag, buf, n); - else - rwrite((enum logcode)tag, buf, n); + rwrite(tag, buf, n); len -= n; } break; default: - rprintf(FERROR, "unknown message %d:%d\n", tag, len); + rprintf(FERROR, "unknown message %d:%d [%s]\n", + tag, len, who_am_i()); exit_cleanup(RERR_STREAMIO); } @@ -341,7 +311,7 @@ static void read_msg_fd(void) } /* This is used by the generator to limit how many file transfers can - * be active at once when --remove-sent-files is specified. Without + * be active at once when --remove-source-files is specified. Without * this, sender-side deletions were mostly happening at the end. */ void increment_active_files(int ndx, int itemizing, enum logcode code) { @@ -401,14 +371,19 @@ static int msg2genr_flush(int flush_it_all) return 1; } -void send_msg(enum msgcode code, char *buf, int len) +int send_msg(enum msgcode code, const char *buf, int len) { if (msg_fd_out < 0) { - io_multiplex_write(code, buf, len); - return; + if (!defer_forwarding_messages) + return io_multiplex_write(code, buf, len); + if (!io_multiplexing_out) + return 0; + msg_list_add(&msg2sndr, code, buf, len); + return 1; } msg_list_add(&msg2genr, code, buf, len); msg2genr_flush(NORMAL_FLUSH); + return 1; } int get_redo_num(int itemizing, enum logcode code) @@ -658,13 +633,19 @@ int read_filesfrom_line(int fd, char *fname) if (cnt < 0 && (errno == EWOULDBLOCK || errno == EINTR || errno == EAGAIN)) { struct timeval tv; - fd_set fds; - FD_ZERO(&fds); - FD_SET(fd, &fds); + fd_set r_fds, e_fds; + FD_ZERO(&r_fds); + FD_SET(fd, &r_fds); + FD_ZERO(&e_fds); + FD_SET(fd, &e_fds); tv.tv_sec = select_timeout; tv.tv_usec = 0; - if (!select(fd+1, &fds, NULL, NULL, &tv)) + if (!select(fd+1, &r_fds, NULL, &e_fds, &tv)) check_timeout(); + if (FD_ISSET(fd, &e_fds)) { + rsyserr(FINFO, errno, + "select exception on fd %d", fd); + } continue; } if (cnt != 1) @@ -913,12 +894,12 @@ int64 read_longint(int f) return num; } -void read_buf(int f,char *buf,size_t len) +void read_buf(int f, char *buf, size_t len) { readfd(f,buf,len); } -void read_sbuf(int f,char *buf,size_t len) +void read_sbuf(int f, char *buf, size_t len) { readfd(f, buf, len); buf[len] = '\0'; @@ -1016,7 +997,7 @@ void write_sum_head(int f, struct sum_struct *sum) static void sleep_for_bwlimit(int bytes_written) { static struct timeval prior_tv; - static long total_written = 0; + static long total_written = 0; struct timeval tv, start_tv; long elapsed_usec, sleep_usec; @@ -1025,7 +1006,7 @@ static void sleep_for_bwlimit(int bytes_written) if (!bwlimit_writemax) return; - total_written += bytes_written; + total_written += bytes_written; gettimeofday(&start_tv, NULL); if (prior_tv.tv_sec) { @@ -1058,23 +1039,26 @@ static void sleep_for_bwlimit(int bytes_written) * * This function underlies the multiplexing system. The body of the * application never calls this function directly. */ -static void writefd_unbuffered(int fd,char *buf,size_t len) +static void writefd_unbuffered(int fd, const char *buf, size_t len) { size_t n, total = 0; - fd_set w_fds, r_fds; + fd_set w_fds, r_fds, e_fds; int maxfd, count, cnt, using_r_fds; + int defer_save = defer_forwarding_messages; struct timeval tv; no_flush++; while (total < len) { FD_ZERO(&w_fds); - FD_SET(fd,&w_fds); + FD_SET(fd, &w_fds); + FD_ZERO(&e_fds); + FD_SET(fd, &e_fds); maxfd = fd; if (msg_fd_in >= 0) { FD_ZERO(&r_fds); - FD_SET(msg_fd_in,&r_fds); + FD_SET(msg_fd_in, &r_fds); if (msg_fd_in > maxfd) maxfd = msg_fd_in; using_r_fds = 1; @@ -1086,7 +1070,7 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) errno = 0; count = select(maxfd + 1, using_r_fds ? &r_fds : NULL, - &w_fds, NULL, &tv); + &w_fds, &e_fds, &tv); if (count <= 0) { if (count < 0 && errno == EBADF) @@ -1095,24 +1079,17 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) continue; } + if (FD_ISSET(fd, &e_fds)) { + rsyserr(FINFO, errno, + "select exception on fd %d", fd); + } + if (using_r_fds && FD_ISSET(msg_fd_in, &r_fds)) read_msg_fd(); if (!FD_ISSET(fd, &w_fds)) continue; - if (msg2sndr.head && !defer_forwarding_messages) { - struct msg_list_item *m = msg2sndr.head; - int code = (IVAL(m->buf,0) >> 24) - MPLEX_BASE; - if (!(msg2sndr.head = m->next)) - msg2sndr.tail = NULL; - defer_forwarding_messages = 1; - io_multiplex_write(code, m->buf+4, m->len-4); - defer_forwarding_messages = 0; - free(m); - continue; - } - n = len - total; if (bwlimit_writemax && n > bwlimit_writemax) n = bwlimit_writemax; @@ -1132,8 +1109,8 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) if (fd == sock_f_out) close_multiplexing_out(); rsyserr(FERROR, errno, - "writefd_unbuffered failed to write %ld bytes: phase \"%s\" [%s]", - (long)len, io_write_phase, who_am_i()); + "writefd_unbuffered failed to write %ld bytes [%s]", + (long)len, who_am_i()); /* If the other side is sending us error messages, try * to grab any messages they sent before they died. */ while (fd == sock_f_out && io_multiplexing_in) { @@ -1154,16 +1131,33 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) sleep_for_bwlimit(cnt); } } - defer_forwarding_messages = 0; + defer_forwarding_messages = defer_save; no_flush--; } +static void msg2sndr_flush(void) +{ + if (defer_forwarding_messages) + return; + + while (msg2sndr.head && io_multiplexing_out) { + struct msg_list_item *m = msg2sndr.head; + if (!(msg2sndr.head = m->next)) + msg2sndr.tail = NULL; + stats.total_written += m->len; + defer_forwarding_messages = 1; + writefd_unbuffered(sock_f_out, m->buf, m->len); + defer_forwarding_messages = 0; + free(m); + } +} + /** * Write an message to a multiplexed stream. If this fails then rsync * exits. **/ -static void mplex_write(enum msgcode code, char *buf, size_t len) +static void mplex_write(enum msgcode code, const char *buf, size_t len) { char buffer[1024]; size_t n = len; @@ -1180,13 +1174,18 @@ static void mplex_write(enum msgcode code, char *buf, size_t len) len -= n; buf += n; - if (len) + if (len) { + defer_forwarding_messages = 1; writefd_unbuffered(sock_f_out, buf, len); + defer_forwarding_messages = 0; + msg2sndr_flush(); + } } void io_flush(int flush_it_all) { msg2genr_flush(flush_it_all); + msg2sndr_flush(); if (!iobuf_out_cnt || no_flush) return; @@ -1198,7 +1197,7 @@ void io_flush(int flush_it_all) iobuf_out_cnt = 0; } -static void writefd(int fd,char *buf,size_t len) +static void writefd(int fd, const char *buf, size_t len) { if (fd == msg_fd_out) { rprintf(FERROR, "Internal error: wrong write used in receiver.\n"); @@ -1247,13 +1246,6 @@ void write_int(int f,int32 x) writefd(f,b,4); } -void write_int_named(int f, int32 x, const char *phase) -{ - io_write_phase = phase; - write_int(f, x); - io_write_phase = phase_unknown; -} - /* * Note: int64 may actually be a 32-bit type if ./configure couldn't find any * 64-bit types on this platform. @@ -1279,13 +1271,13 @@ void write_longint(int f, int64 x) #endif } -void write_buf(int f,char *buf,size_t len) +void write_buf(int f, const char *buf, size_t len) { writefd(f,buf,len); } /** Write a string to the connection */ -void write_sbuf(int f, char *buf) +void write_sbuf(int f, const char *buf) { writefd(f, buf, strlen(buf)); } @@ -1295,7 +1287,7 @@ void write_byte(int f, uchar c) writefd(f, (char *)&c, 1); } -void write_vstring(int f, char *str, int len) +void write_vstring(int f, const char *str, int len) { uchar lenbuf[3], *lb = lenbuf; @@ -1378,7 +1370,7 @@ void io_start_multiplex_in(void) } /** Write an message to the multiplexed data stream. */ -int io_multiplex_write(enum msgcode code, char *buf, size_t len) +int io_multiplex_write(enum msgcode code, const char *buf, size_t len) { if (!io_multiplexing_out) return 0;