- Updated the address for the FSF in the opening comment.
[rsync/rsync.git] / io.c
diff --git a/io.c b/io.c
index 4ade1ea..c91007e 100644 (file)
--- a/io.c
+++ b/io.c
@@ -1,8 +1,10 @@
-/* -*- c-file-style: "linux" -*-
+/*
+ * Socket and pipe I/O utilities used in rsync.
  *
- * Copyright (C) 1996-2001 by Andrew Tridgell
- * Copyright (C) Paul Mackerras 1996
- * Copyright (C) 2001, 2002 by Martin Pool <mbp@samba.org>
+ * Copyright (C) 1996-2001 Andrew Tridgell
+ * Copyright (C) 1996 Paul Mackerras
+ * Copyright (C) 2001, 2002 Martin Pool <mbp@samba.org>
+ * Copyright (C) 2003, 2004, 2005, 2006 Wayne Davison
  *
  * This program is free software; you can redistribute it and/or modify
  * it under the terms of the GNU General Public License as published by
  *
  * You should have received a copy of the GNU General Public License
  * along with this program; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
  */
 
-/**
- * @file io.c
- *
- * Socket and pipe I/O utilities used in rsync.
- *
- * rsync provides its own multiplexing system, which is used to send
- * stderr and stdout over a single socket.  We need this because
- * stdout normally carries the binary data stream, and stderr all our
- * error messages.
+/* Rsync provides its own multiplexing system, which is used to send
+ * stderr and stdout over a single socket.
  *
  * For historical reasons this is off during the start of the
  * connection, but it's switched on quite early using
- * io_start_multiplex_out() and io_start_multiplex_in().
- **/
+ * io_start_multiplex_out() and io_start_multiplex_in(). */
 
 #include "rsync.h"
 
@@ -63,22 +57,6 @@ int ignore_timeout = 0;
 int batch_fd = -1;
 int batch_gen_fd = -1;
 
-/**
- * The connection might be dropped at some point; perhaps because the
- * remote instance crashed.  Just giving the offset on the stream is
- * not very helpful.  So instead we try to make io_phase_name point to
- * something useful.
- *
- * For buffered/multiplexed I/O these names will be somewhat
- * approximate; perhaps for ease of support we would rather make the
- * buffer always flush when a single application-level I/O finishes.
- *
- * @todo Perhaps we want some simple stack functionality, but there's
- * no need to overdo it.
- **/
-const char *io_write_phase = phase_unknown;
-const char *io_read_phase = phase_unknown;
-
 /* Ignore an EOF error if non-zero. See whine_about_eof(). */
 int kluge_around_eof = 0;
 
@@ -333,7 +311,8 @@ static void read_msg_fd(void)
                }
                break;
        default:
-               rprintf(FERROR, "unknown message %d:%d\n", tag, len);
+               rprintf(FERROR, "unknown message %d:%d [%s]\n",
+                       tag, len, who_am_i());
                exit_cleanup(RERR_STREAMIO);
        }
 
@@ -1016,7 +995,7 @@ void write_sum_head(int f, struct sum_struct *sum)
 static void sleep_for_bwlimit(int bytes_written)
 {
        static struct timeval prior_tv;
-       static long total_written = 0; 
+       static long total_written = 0;
        struct timeval tv, start_tv;
        long elapsed_usec, sleep_usec;
 
@@ -1025,7 +1004,7 @@ static void sleep_for_bwlimit(int bytes_written)
        if (!bwlimit_writemax)
                return;
 
-       total_written += bytes_written; 
+       total_written += bytes_written;
 
        gettimeofday(&start_tv, NULL);
        if (prior_tv.tv_sec) {
@@ -1063,6 +1042,7 @@ static void writefd_unbuffered(int fd,char *buf,size_t len)
        size_t n, total = 0;
        fd_set w_fds, r_fds;
        int maxfd, count, cnt, using_r_fds;
+       int defer_save = defer_forwarding_messages;
        struct timeval tv;
 
        no_flush++;
@@ -1101,17 +1081,6 @@ static void writefd_unbuffered(int fd,char *buf,size_t len)
                if (!FD_ISSET(fd, &w_fds))
                        continue;
 
-               if (msg2sndr.head && !defer_forwarding_messages) {
-                       struct msg_list_item *m = msg2sndr.head;
-                       if (!(msg2sndr.head = m->next))
-                               msg2sndr.tail = NULL;
-                       defer_forwarding_messages = 1;
-                       io_multiplex_write(IVAL(m->buf,0), m->buf+4, m->len-4);
-                       defer_forwarding_messages = 0;
-                       free(m);
-                       continue;
-               }
-
                n = len - total;
                if (bwlimit_writemax && n > bwlimit_writemax)
                        n = bwlimit_writemax;
@@ -1131,8 +1100,8 @@ static void writefd_unbuffered(int fd,char *buf,size_t len)
                        if (fd == sock_f_out)
                                close_multiplexing_out();
                        rsyserr(FERROR, errno,
-                               "writefd_unbuffered failed to write %ld bytes: phase \"%s\" [%s]",
-                               (long)len, io_write_phase, who_am_i());
+                               "writefd_unbuffered failed to write %ld bytes [%s]",
+                               (long)len, who_am_i());
                        /* If the other side is sending us error messages, try
                         * to grab any messages they sent before they died. */
                        while (fd == sock_f_out && io_multiplexing_in) {
@@ -1153,11 +1122,28 @@ static void writefd_unbuffered(int fd,char *buf,size_t len)
                        sleep_for_bwlimit(cnt);
                }
        }
-       defer_forwarding_messages = 0;
 
+       defer_forwarding_messages = defer_save;
        no_flush--;
 }
 
+static void msg2sndr_flush(void)
+{
+       if (defer_forwarding_messages)
+               return;
+
+       while (msg2sndr.head && io_multiplexing_out) {
+               struct msg_list_item *m = msg2sndr.head;
+               if (!(msg2sndr.head = m->next))
+                       msg2sndr.tail = NULL;
+               stats.total_written += m->len;
+               defer_forwarding_messages = 1;
+               writefd_unbuffered(sock_f_out, m->buf, m->len);
+               defer_forwarding_messages = 0;
+               free(m);
+       }
+}
+
 /**
  * Write an message to a multiplexed stream. If this fails then rsync
  * exits.
@@ -1179,13 +1165,18 @@ static void mplex_write(enum msgcode code, char *buf, size_t len)
        len -= n;
        buf += n;
 
-       if (len)
+       if (len) {
+               defer_forwarding_messages = 1;
                writefd_unbuffered(sock_f_out, buf, len);
+               defer_forwarding_messages = 0;
+               msg2sndr_flush();
+       }
 }
 
 void io_flush(int flush_it_all)
 {
        msg2genr_flush(flush_it_all);
+       msg2sndr_flush();
 
        if (!iobuf_out_cnt || no_flush)
                return;
@@ -1246,13 +1237,6 @@ void write_int(int f,int32 x)
        writefd(f,b,4);
 }
 
-void write_int_named(int f, int32 x, const char *phase)
-{
-       io_write_phase = phase;
-       write_int(f, x);
-       io_write_phase = phase_unknown;
-}
-
 /*
  * Note: int64 may actually be a 32-bit type if ./configure couldn't find any
  * 64-bit types on this platform.