X-Git-Url: https://mattmccutchen.net/rsync/rsync.git/blobdiff_plain/954bbed84aef742d65327a4de0ea214ed90cb6b3..ecc7623e7faf75f6ba3dd7b5a416c52e2346ac7d:/io.c diff --git a/io.c b/io.c index 4ade1ea1..5b87be6f 100644 --- a/io.c +++ b/io.c @@ -1,8 +1,10 @@ -/* -*- c-file-style: "linux" -*- +/* + * Socket and pipe I/O utilities used in rsync. * - * Copyright (C) 1996-2001 by Andrew Tridgell - * Copyright (C) Paul Mackerras 1996 - * Copyright (C) 2001, 2002 by Martin Pool + * Copyright (C) 1996-2001 Andrew Tridgell + * Copyright (C) 1996 Paul Mackerras + * Copyright (C) 2001, 2002 Martin Pool + * Copyright (C) 2003, 2004, 2005, 2006 Wayne Davison * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -14,25 +16,17 @@ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street - Fifth Floor, Boston, MA 02110-1301, USA. */ -/** - * @file io.c - * - * Socket and pipe I/O utilities used in rsync. - * - * rsync provides its own multiplexing system, which is used to send - * stderr and stdout over a single socket. We need this because - * stdout normally carries the binary data stream, and stderr all our - * error messages. +/* Rsync provides its own multiplexing system, which is used to send + * stderr and stdout over a single socket. * * For historical reasons this is off during the start of the * connection, but it's switched on quite early using - * io_start_multiplex_out() and io_start_multiplex_in(). - **/ + * io_start_multiplex_out() and io_start_multiplex_in(). */ #include "rsync.h" @@ -63,22 +57,6 @@ int ignore_timeout = 0; int batch_fd = -1; int batch_gen_fd = -1; -/** - * The connection might be dropped at some point; perhaps because the - * remote instance crashed. Just giving the offset on the stream is - * not very helpful. So instead we try to make io_phase_name point to - * something useful. - * - * For buffered/multiplexed I/O these names will be somewhat - * approximate; perhaps for ease of support we would rather make the - * buffer always flush when a single application-level I/O finishes. - * - * @todo Perhaps we want some simple stack functionality, but there's - * no need to overdo it. - **/ -const char *io_write_phase = phase_unknown; -const char *io_read_phase = phase_unknown; - /* Ignore an EOF error if non-zero. See whine_about_eof(). */ int kluge_around_eof = 0; @@ -289,10 +267,7 @@ static void read_msg_fd(void) exit_cleanup(RERR_STREAMIO); } read_loop(fd, buf, len); - if (defer_forwarding_messages) - msg_list_add(&msg2sndr, MSG_DELETED, buf, len); - else - io_multiplex_write(MSG_DELETED, buf, len); + send_msg(MSG_DELETED, buf, len); break; case MSG_SUCCESS: if (len != 4 || !am_generator) { @@ -302,10 +277,7 @@ static void read_msg_fd(void) read_loop(fd, buf, len); if (remove_sent_files) { decrement_active_files(IVAL(buf,0)); - if (defer_forwarding_messages) - msg_list_add(&msg2sndr, MSG_SUCCESS, buf, len); - else - io_multiplex_write(MSG_SUCCESS, buf, len); + send_msg(MSG_SUCCESS, buf, len); } if (preserve_hard_links) flist_ndx_push(&hlink_list, IVAL(buf,0)); @@ -325,15 +297,13 @@ static void read_msg_fd(void) if (n >= sizeof buf) n = sizeof buf - 1; read_loop(fd, buf, n); - if (am_generator && am_server && defer_forwarding_messages) - msg_list_add(&msg2sndr, tag, buf, n); - else - rwrite((enum logcode)tag, buf, n); + rwrite(tag, buf, n); len -= n; } break; default: - rprintf(FERROR, "unknown message %d:%d\n", tag, len); + rprintf(FERROR, "unknown message %d:%d [%s]\n", + tag, len, who_am_i()); exit_cleanup(RERR_STREAMIO); } @@ -401,14 +371,19 @@ static int msg2genr_flush(int flush_it_all) return 1; } -void send_msg(enum msgcode code, char *buf, int len) +int send_msg(enum msgcode code, char *buf, int len) { if (msg_fd_out < 0) { - io_multiplex_write(code, buf, len); - return; + if (!defer_forwarding_messages) + return io_multiplex_write(code, buf, len); + if (!io_multiplexing_out) + return 0; + msg_list_add(&msg2sndr, code, buf, len); + return 1; } msg_list_add(&msg2genr, code, buf, len); msg2genr_flush(NORMAL_FLUSH); + return 1; } int get_redo_num(int itemizing, enum logcode code) @@ -1016,7 +991,7 @@ void write_sum_head(int f, struct sum_struct *sum) static void sleep_for_bwlimit(int bytes_written) { static struct timeval prior_tv; - static long total_written = 0; + static long total_written = 0; struct timeval tv, start_tv; long elapsed_usec, sleep_usec; @@ -1025,7 +1000,7 @@ static void sleep_for_bwlimit(int bytes_written) if (!bwlimit_writemax) return; - total_written += bytes_written; + total_written += bytes_written; gettimeofday(&start_tv, NULL); if (prior_tv.tv_sec) { @@ -1063,6 +1038,7 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) size_t n, total = 0; fd_set w_fds, r_fds; int maxfd, count, cnt, using_r_fds; + int defer_save = defer_forwarding_messages; struct timeval tv; no_flush++; @@ -1101,17 +1077,6 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) if (!FD_ISSET(fd, &w_fds)) continue; - if (msg2sndr.head && !defer_forwarding_messages) { - struct msg_list_item *m = msg2sndr.head; - if (!(msg2sndr.head = m->next)) - msg2sndr.tail = NULL; - defer_forwarding_messages = 1; - io_multiplex_write(IVAL(m->buf,0), m->buf+4, m->len-4); - defer_forwarding_messages = 0; - free(m); - continue; - } - n = len - total; if (bwlimit_writemax && n > bwlimit_writemax) n = bwlimit_writemax; @@ -1131,8 +1096,8 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) if (fd == sock_f_out) close_multiplexing_out(); rsyserr(FERROR, errno, - "writefd_unbuffered failed to write %ld bytes: phase \"%s\" [%s]", - (long)len, io_write_phase, who_am_i()); + "writefd_unbuffered failed to write %ld bytes [%s]", + (long)len, who_am_i()); /* If the other side is sending us error messages, try * to grab any messages they sent before they died. */ while (fd == sock_f_out && io_multiplexing_in) { @@ -1153,11 +1118,28 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) sleep_for_bwlimit(cnt); } } - defer_forwarding_messages = 0; + defer_forwarding_messages = defer_save; no_flush--; } +static void msg2sndr_flush(void) +{ + if (defer_forwarding_messages) + return; + + while (msg2sndr.head && io_multiplexing_out) { + struct msg_list_item *m = msg2sndr.head; + if (!(msg2sndr.head = m->next)) + msg2sndr.tail = NULL; + stats.total_written += m->len; + defer_forwarding_messages = 1; + writefd_unbuffered(sock_f_out, m->buf, m->len); + defer_forwarding_messages = 0; + free(m); + } +} + /** * Write an message to a multiplexed stream. If this fails then rsync * exits. @@ -1179,13 +1161,18 @@ static void mplex_write(enum msgcode code, char *buf, size_t len) len -= n; buf += n; - if (len) + if (len) { + defer_forwarding_messages = 1; writefd_unbuffered(sock_f_out, buf, len); + defer_forwarding_messages = 0; + msg2sndr_flush(); + } } void io_flush(int flush_it_all) { msg2genr_flush(flush_it_all); + msg2sndr_flush(); if (!iobuf_out_cnt || no_flush) return; @@ -1246,13 +1233,6 @@ void write_int(int f,int32 x) writefd(f,b,4); } -void write_int_named(int f, int32 x, const char *phase) -{ - io_write_phase = phase; - write_int(f, x); - io_write_phase = phase_unknown; -} - /* * Note: int64 may actually be a 32-bit type if ./configure couldn't find any * 64-bit types on this platform.