Don't use single-line "if (condition) statement;" idiom.
[rsync/rsync.git] / io.c
1 /* -*- c-file-style: "linux" -*-
2  *
3  * Copyright (C) 1996-2001 by Andrew Tridgell
4  * Copyright (C) Paul Mackerras 1996
5  * Copyright (C) 2001, 2002 by Martin Pool <mbp@samba.org>
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 2 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 /**
23  * @file io.c
24  *
25  * Socket and pipe I/O utilities used in rsync.
26  *
27  * rsync provides its own multiplexing system, which is used to send
28  * stderr and stdout over a single socket.  We need this because
29  * stdout normally carries the binary data stream, and stderr all our
30  * error messages.
31  *
32  * For historical reasons this is off during the start of the
33  * connection, but it's switched on quite early using
34  * io_start_multiplex_out() and io_start_multiplex_in().
35  **/
36
37 #include "rsync.h"
38
39 /** If no timeout is specified then use a 60 second select timeout */
40 #define SELECT_TIMEOUT 60
41
42 static int io_multiplexing_out;
43 static int io_multiplexing_in;
44 static int multiplex_in_fd = -1;
45 static int multiplex_out_fd = -1;
46 static time_t last_io;
47 static int no_flush;
48
49 extern int bwlimit;
50 extern size_t bwlimit_writemax;
51 extern int verbose;
52 extern int io_timeout;
53 extern int am_server;
54 extern int am_daemon;
55 extern int am_sender;
56 extern struct stats stats;
57
58
59 const char phase_unknown[] = "unknown";
60
61 /**
62  * The connection might be dropped at some point; perhaps because the
63  * remote instance crashed.  Just giving the offset on the stream is
64  * not very helpful.  So instead we try to make io_phase_name point to
65  * something useful.
66  *
67  * For buffered/multiplexed I/O these names will be somewhat
68  * approximate; perhaps for ease of support we would rather make the
69  * buffer always flush when a single application-level I/O finishes.
70  *
71  * @todo Perhaps we want some simple stack functionality, but there's
72  * no need to overdo it.
73  **/
74 const char *io_write_phase = phase_unknown;
75 const char *io_read_phase = phase_unknown;
76
77 /** Ignore EOF errors while reading a module listing if the remote
78     version is 24 or less. */
79 int kludge_around_eof = False;
80
81 int msg_fd_in = -1;
82 int msg_fd_out = -1;
83
84 static int io_filesfrom_f_in = -1;
85 static int io_filesfrom_f_out = -1;
86 static char io_filesfrom_buf[2048];
87 static char *io_filesfrom_bp;
88 static char io_filesfrom_lastchar;
89 static int io_filesfrom_buflen;
90
91 static void read_loop(int fd, char *buf, size_t len);
92
93 struct redo_list {
94         struct redo_list *next;
95         int num;
96 };
97
98 static struct redo_list *redo_list_head;
99 static struct redo_list *redo_list_tail;
100
101 struct msg_list {
102         struct msg_list *next;
103         char *buf;
104         int len;
105 };
106
107 static struct msg_list *msg_list_head;
108 static struct msg_list *msg_list_tail;
109
110 static void redo_list_add(int num)
111 {
112         struct redo_list *rl;
113
114         if (!(rl = new(struct redo_list)))
115                 exit_cleanup(RERR_MALLOC);
116         rl->next = NULL;
117         rl->num = num;
118         if (redo_list_tail)
119                 redo_list_tail->next = rl;
120         else
121                 redo_list_head = rl;
122         redo_list_tail = rl;
123 }
124
125 static void check_timeout(void)
126 {
127         time_t t;
128
129         if (!io_timeout)
130                 return;
131
132         if (!last_io) {
133                 last_io = time(NULL);
134                 return;
135         }
136
137         t = time(NULL);
138
139         if (last_io && io_timeout && (t-last_io) >= io_timeout) {
140                 if (!am_server && !am_daemon) {
141                         rprintf(FERROR, "io timeout after %d seconds - exiting\n",
142                                 (int)(t-last_io));
143                 }
144                 exit_cleanup(RERR_TIMEOUT);
145         }
146 }
147
148 /** Setup the fd used to receive MSG_* messages.  Only needed when
149  * we're the generator because the sender and receiver both use the
150  * multiplexed I/O setup. */
151 void set_msg_fd_in(int fd)
152 {
153         msg_fd_in = fd;
154 }
155
156 /** Setup the fd used to send our MSG_* messages.  Only needed when
157  * we're the receiver because the generator and the sender both use
158  * the multiplexed I/O setup. */
159 void set_msg_fd_out(int fd)
160 {
161         msg_fd_out = fd;
162         set_nonblocking(msg_fd_out);
163 }
164
165 /* Add a message to the pending MSG_* list. */
166 static void msg_list_add(int code, char *buf, int len)
167 {
168         struct msg_list *ml;
169
170         if (!(ml = new(struct msg_list)))
171                 exit_cleanup(RERR_MALLOC);
172         ml->next = NULL;
173         if (!(ml->buf = new_array(char, len+4)))
174                 exit_cleanup(RERR_MALLOC);
175         SIVAL(ml->buf, 0, ((code+MPLEX_BASE)<<24) | len);
176         memcpy(ml->buf+4, buf, len);
177         ml->len = len+4;
178         if (msg_list_tail)
179                 msg_list_tail->next = ml;
180         else
181                 msg_list_head = ml;
182         msg_list_tail = ml;
183 }
184
185 void send_msg(enum msgcode code, char *buf, int len)
186 {
187         msg_list_add(code, buf, len);
188         msg_list_push(NORMAL_FLUSH);
189 }
190
191 /** Read a message from the MSG_* fd and dispatch it.  This is only
192  * called by the generator. */
193 static void read_msg_fd(void)
194 {
195         char buf[200];
196         size_t n;
197         int fd = msg_fd_in;
198         int tag, len;
199
200         /* Temporarily disable msg_fd_in.  This is needed because we
201          * may call a write routine that could try to call us back. */
202         msg_fd_in = -1;
203
204         read_loop(fd, buf, 4);
205         tag = IVAL(buf, 0);
206
207         len = tag & 0xFFFFFF;
208         tag = (tag >> 24) - MPLEX_BASE;
209
210         switch (tag) {
211         case MSG_DONE:
212                 if (len != 0) {
213                         rprintf(FERROR, "invalid message %d:%d\n", tag, len);
214                         exit_cleanup(RERR_STREAMIO);
215                 }
216                 redo_list_add(-1);
217                 break;
218         case MSG_REDO:
219                 if (len != 4) {
220                         rprintf(FERROR, "invalid message %d:%d\n", tag, len);
221                         exit_cleanup(RERR_STREAMIO);
222                 }
223                 read_loop(fd, buf, 4);
224                 redo_list_add(IVAL(buf,0));
225                 break;
226         case MSG_INFO:
227         case MSG_ERROR:
228         case MSG_LOG:
229                 while (len) {
230                         n = len;
231                         if (n >= sizeof buf)
232                                 n = sizeof buf - 1;
233                         read_loop(fd, buf, n);
234                         rwrite((enum logcode)tag, buf, n);
235                         len -= n;
236                 }
237                 break;
238         default:
239                 rprintf(FERROR, "unknown message %d:%d\n", tag, len);
240                 exit_cleanup(RERR_STREAMIO);
241         }
242
243         msg_fd_in = fd;
244 }
245
246 /* Try to push messages off the list onto the wire.  If we leave with more
247  * to do, return 0.  On error, return -1.  If everything flushed, return 1.
248  * This is only called by the receiver. */
249 int msg_list_push(int flush_it_all)
250 {
251         static int written = 0;
252         struct timeval tv;
253         fd_set fds;
254
255         if (msg_fd_out < 0)
256                 return -1;
257
258         while (msg_list_head) {
259                 struct msg_list *ml = msg_list_head;
260                 int n = write(msg_fd_out, ml->buf + written, ml->len - written);
261                 if (n < 0) {
262                         if (errno == EINTR)
263                                 continue;
264                         if (errno != EWOULDBLOCK && errno != EAGAIN)
265                                 return -1;
266                         if (!flush_it_all)
267                                 return 0;
268                         FD_ZERO(&fds);
269                         FD_SET(msg_fd_out, &fds);
270                         tv.tv_sec = io_timeout ? io_timeout : SELECT_TIMEOUT;
271                         tv.tv_usec = 0;
272                         if (!select(msg_fd_out+1, NULL, &fds, NULL, &tv))
273                                 check_timeout();
274                 } else if ((written += n) == ml->len) {
275                         free(ml->buf);
276                         msg_list_head = ml->next;
277                         if (!msg_list_head)
278                                 msg_list_tail = NULL;
279                         free(ml);
280                         written = 0;
281                 }
282         }
283         return 1;
284 }
285
286 int get_redo_num(void)
287 {
288         struct redo_list *next;
289         int num;
290
291         while (!redo_list_head)
292                 read_msg_fd();
293
294         num = redo_list_head->num;
295         next = redo_list_head->next;
296         free(redo_list_head);
297         redo_list_head = next;
298         if (!next)
299                 redo_list_tail = NULL;
300
301         return num;
302 }
303
304 /**
305  * When we're the receiver and we have a local --files-from list of names
306  * that needs to be sent over the socket to the sender, we have to do two
307  * things at the same time: send the sender a list of what files we're
308  * processing and read the incoming file+info list from the sender.  We do
309  * this by augmenting the read_timeout() function to copy this data.  It
310  * uses the io_filesfrom_buf to read a block of data from f_in (when it is
311  * ready, since it might be a pipe) and then blast it out f_out (when it
312  * is ready to receive more data).
313  */
314 void io_set_filesfrom_fds(int f_in, int f_out)
315 {
316         io_filesfrom_f_in = f_in;
317         io_filesfrom_f_out = f_out;
318         io_filesfrom_bp = io_filesfrom_buf;
319         io_filesfrom_lastchar = '\0';
320         io_filesfrom_buflen = 0;
321 }
322
323 /**
324  * It's almost always an error to get an EOF when we're trying to read
325  * from the network, because the protocol is self-terminating.
326  *
327  * However, there is one unfortunate cases where it is not, which is
328  * rsync <2.4.6 sending a list of modules on a server, since the list
329  * is terminated by closing the socket. So, for the section of the
330  * program where that is a problem (start_socket_client),
331  * kludge_around_eof is True and we just exit.
332  */
333 static void whine_about_eof(void)
334 {
335         if (kludge_around_eof)
336                 exit_cleanup(0);
337         else {
338                 rprintf(FERROR, RSYNC_NAME ": connection unexpectedly closed "
339                         "(%.0f bytes read so far)\n",
340                         (double)stats.total_read);
341
342                 exit_cleanup(RERR_STREAMIO);
343         }
344 }
345
346
347 static void die_from_readerr(int err)
348 {
349         /* this prevents us trying to write errors on a dead socket */
350         io_multiplexing_close();
351
352         rsyserr(FERROR, err, "read error");
353         exit_cleanup(RERR_STREAMIO);
354 }
355
356
357 /**
358  * Read from a socket with I/O timeout. return the number of bytes
359  * read. If no bytes can be read then exit, never return a number <= 0.
360  *
361  * TODO: If the remote shell connection fails, then current versions
362  * actually report an "unexpected EOF" error here.  Since it's a
363  * fairly common mistake to try to use rsh when ssh is required, we
364  * should trap that: if we fail to read any data at all, we should
365  * give a better explanation.  We can tell whether the connection has
366  * started by looking e.g. at whether the remote version is known yet.
367  */
368 static int read_timeout(int fd, char *buf, size_t len)
369 {
370         int n, ret = 0;
371
372         io_flush(NORMAL_FLUSH);
373
374         while (ret == 0) {
375                 /* until we manage to read *something* */
376                 fd_set r_fds, w_fds;
377                 struct timeval tv;
378                 int fd_count = fd+1;
379                 int count;
380
381                 FD_ZERO(&r_fds);
382                 FD_SET(fd, &r_fds);
383                 if (msg_fd_in >= 0) {
384                         FD_SET(msg_fd_in, &r_fds);
385                         if (msg_fd_in >= fd_count)
386                                 fd_count = msg_fd_in+1;
387                 }
388                 if (io_filesfrom_f_out >= 0) {
389                         int new_fd;
390                         if (io_filesfrom_buflen == 0) {
391                                 if (io_filesfrom_f_in >= 0) {
392                                         FD_SET(io_filesfrom_f_in, &r_fds);
393                                         new_fd = io_filesfrom_f_in;
394                                 } else {
395                                         io_filesfrom_f_out = -1;
396                                         new_fd = -1;
397                                 }
398                         } else {
399                                 FD_ZERO(&w_fds);
400                                 FD_SET(io_filesfrom_f_out, &w_fds);
401                                 new_fd = io_filesfrom_f_out;
402                         }
403                         if (new_fd >= fd_count)
404                                 fd_count = new_fd+1;
405                 }
406
407                 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
408                 tv.tv_usec = 0;
409
410                 errno = 0;
411
412                 count = select(fd_count, &r_fds,
413                                io_filesfrom_buflen? &w_fds : NULL,
414                                NULL, &tv);
415
416                 if (count == 0) {
417                         msg_list_push(NORMAL_FLUSH);
418                         check_timeout();
419                 }
420
421                 if (count <= 0) {
422                         if (errno == EBADF)
423                                 exit_cleanup(RERR_SOCKETIO);
424                         continue;
425                 }
426
427                 if (msg_fd_in >= 0 && FD_ISSET(msg_fd_in, &r_fds))
428                         read_msg_fd();
429
430                 if (io_filesfrom_f_out >= 0) {
431                         if (io_filesfrom_buflen) {
432                                 if (FD_ISSET(io_filesfrom_f_out, &w_fds)) {
433                                         int l = write(io_filesfrom_f_out,
434                                                       io_filesfrom_bp,
435                                                       io_filesfrom_buflen);
436                                         if (l > 0) {
437                                                 if (!(io_filesfrom_buflen -= l))
438                                                         io_filesfrom_bp = io_filesfrom_buf;
439                                                 else
440                                                         io_filesfrom_bp += l;
441                                         } else {
442                                                 /* XXX should we complain? */
443                                                 io_filesfrom_f_out = -1;
444                                         }
445                                 }
446                         } else if (io_filesfrom_f_in >= 0) {
447                                 if (FD_ISSET(io_filesfrom_f_in, &r_fds)) {
448                                         int l = read(io_filesfrom_f_in,
449                                                      io_filesfrom_buf,
450                                                      sizeof io_filesfrom_buf);
451                                         if (l <= 0) {
452                                                 /* Send end-of-file marker */
453                                                 io_filesfrom_buf[0] = '\0';
454                                                 io_filesfrom_buf[1] = '\0';
455                                                 io_filesfrom_buflen = io_filesfrom_lastchar? 2 : 1;
456                                                 io_filesfrom_f_in = -1;
457                                         } else {
458                                                 extern int eol_nulls;
459                                                 if (!eol_nulls) {
460                                                         char *s = io_filesfrom_buf + l;
461                                                         /* Transform CR and/or LF into '\0' */
462                                                         while (s-- > io_filesfrom_buf) {
463                                                                 if (*s == '\n' || *s == '\r')
464                                                                         *s = '\0';
465                                                         }
466                                                 }
467                                                 if (!io_filesfrom_lastchar) {
468                                                         /* Last buf ended with a '\0', so don't
469                                                          * let this buf start with one. */
470                                                         while (l && !*io_filesfrom_bp)
471                                                                 io_filesfrom_bp++, l--;
472                                                 }
473                                                 if (!l)
474                                                         io_filesfrom_bp = io_filesfrom_buf;
475                                                 else {
476                                                         char *f = io_filesfrom_bp;
477                                                         char *t = f;
478                                                         char *eob = f + l;
479                                                         /* Eliminate any multi-'\0' runs. */
480                                                         while (f != eob) {
481                                                                 if (!(*t++ = *f++)) {
482                                                                         while (f != eob && !*f)
483                                                                                 f++, l--;
484                                                                 }
485                                                         }
486                                                         io_filesfrom_lastchar = f[-1];
487                                                 }
488                                                 io_filesfrom_buflen = l;
489                                         }
490                                 }
491                         }
492                 }
493
494                 if (!FD_ISSET(fd, &r_fds))
495                         continue;
496
497                 n = read(fd, buf, len);
498
499                 if (n > 0) {
500                         buf += n;
501                         len -= n;
502                         ret += n;
503                         if (io_timeout)
504                                 last_io = time(NULL);
505                         continue;
506                 } else if (n == 0) {
507                         whine_about_eof();
508                         return -1; /* doesn't return */
509                 } else if (n < 0) {
510                         if (errno == EINTR || errno == EWOULDBLOCK
511                             || errno == EAGAIN)
512                                 continue;
513                         die_from_readerr(errno);
514                 }
515         }
516
517         return ret;
518 }
519
520 /**
521  * Read a line into the "fname" buffer (which must be at least MAXPATHLEN
522  * characters long).
523  */
524 int read_filesfrom_line(int fd, char *fname)
525 {
526         char ch, *s, *eob = fname + MAXPATHLEN - 1;
527         int cnt;
528         extern int io_timeout;
529         extern int eol_nulls;
530         extern char *remote_filesfrom_file;
531         int reading_remotely = remote_filesfrom_file != NULL;
532         int nulls = eol_nulls || reading_remotely;
533
534   start:
535         s = fname;
536         while (1) {
537                 cnt = read(fd, &ch, 1);
538                 if (cnt < 0 && (errno == EWOULDBLOCK
539                   || errno == EINTR || errno == EAGAIN)) {
540                         struct timeval tv;
541                         fd_set fds;
542                         FD_ZERO(&fds);
543                         FD_SET(fd, &fds);
544                         tv.tv_sec = io_timeout? io_timeout : SELECT_TIMEOUT;
545                         tv.tv_usec = 0;
546                         if (!select(fd+1, &fds, NULL, NULL, &tv))
547                                 check_timeout();
548                         continue;
549                 }
550                 if (cnt != 1)
551                         break;
552                 if (nulls? !ch : (ch == '\r' || ch == '\n')) {
553                         /* Skip empty lines if reading locally. */
554                         if (!reading_remotely && s == fname)
555                                 continue;
556                         break;
557                 }
558                 if (s < eob)
559                         *s++ = ch;
560         }
561         *s = '\0';
562
563         /* Dump comments. */
564         if (*fname == '#' || *fname == ';')
565                 goto start;
566
567         return s - fname;
568 }
569
570
571 /**
572  * Continue trying to read len bytes - don't return until len has been
573  * read.
574  **/
575 static void read_loop(int fd, char *buf, size_t len)
576 {
577         while (len) {
578                 int n = read_timeout(fd, buf, len);
579
580                 buf += n;
581                 len -= n;
582         }
583 }
584
585
586 /**
587  * Read from the file descriptor handling multiplexing - return number
588  * of bytes read.
589  *
590  * Never returns <= 0.
591  */
592 static int read_unbuffered(int fd, char *buf, size_t len)
593 {
594         static size_t remaining;
595         int tag, ret = 0;
596         char line[1024];
597         static char *buffer;
598         static size_t bufferIdx = 0;
599         static size_t bufferSz;
600
601         if (fd != multiplex_in_fd)
602                 return read_timeout(fd, buf, len);
603
604         if (!io_multiplexing_in && remaining == 0) {
605                 if (!buffer) {
606                         bufferSz = 2 * IO_BUFFER_SIZE;
607                         buffer   = new_array(char, bufferSz);
608                         if (!buffer)
609                                 out_of_memory("read_unbuffered");
610                 }
611                 remaining = read_timeout(fd, buffer, bufferSz);
612                 bufferIdx = 0;
613         }
614
615         while (ret == 0) {
616                 if (remaining) {
617                         len = MIN(len, remaining);
618                         memcpy(buf, buffer + bufferIdx, len);
619                         bufferIdx += len;
620                         remaining -= len;
621                         ret = len;
622                         break;
623                 }
624
625                 read_loop(fd, line, 4);
626                 tag = IVAL(line, 0);
627
628                 remaining = tag & 0xFFFFFF;
629                 tag = (tag >> 24) - MPLEX_BASE;
630
631                 switch (tag) {
632                 case MSG_DATA:
633                         if (!buffer || remaining > bufferSz) {
634                                 buffer = realloc_array(buffer, char, remaining);
635                                 if (!buffer)
636                                         out_of_memory("read_unbuffered");
637                                 bufferSz = remaining;
638                         }
639                         read_loop(fd, buffer, remaining);
640                         bufferIdx = 0;
641                         break;
642                 case MSG_INFO:
643                 case MSG_ERROR:
644                         if (remaining >= sizeof line) {
645                                 rprintf(FERROR, "multiplexing overflow %d:%ld\n\n",
646                                         tag, (long)remaining);
647                                 exit_cleanup(RERR_STREAMIO);
648                         }
649                         read_loop(fd, line, remaining);
650                         rwrite((enum logcode)tag, line, remaining);
651                         remaining = 0;
652                         break;
653                 default:
654                         rprintf(FERROR, "unexpected tag %d\n", tag);
655                         exit_cleanup(RERR_STREAMIO);
656                 }
657         }
658
659         if (remaining == 0)
660                 io_flush(NORMAL_FLUSH);
661
662         return ret;
663 }
664
665
666
667 /**
668  * Do a buffered read from @p fd.  Don't return until all @p n bytes
669  * have been read.  If all @p n can't be read then exit with an
670  * error.
671  **/
672 static void readfd(int fd, char *buffer, size_t N)
673 {
674         int  ret;
675         size_t total = 0;
676
677         while (total < N) {
678                 ret = read_unbuffered(fd, buffer + total, N-total);
679                 total += ret;
680         }
681
682         stats.total_read += total;
683 }
684
685
686 int32 read_int(int f)
687 {
688         char b[4];
689         int32 ret;
690
691         readfd(f,b,4);
692         ret = IVAL(b,0);
693         if (ret == (int32)0xffffffff)
694                 return -1;
695         return ret;
696 }
697
698 int64 read_longint(int f)
699 {
700         int64 ret;
701         char b[8];
702         ret = read_int(f);
703
704         if ((int32)ret != (int32)0xffffffff)
705                 return ret;
706
707 #ifdef NO_INT64
708         rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
709         exit_cleanup(RERR_UNSUPPORTED);
710 #else
711         readfd(f,b,8);
712         ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
713 #endif
714
715         return ret;
716 }
717
718 void read_buf(int f,char *buf,size_t len)
719 {
720         readfd(f,buf,len);
721 }
722
723 void read_sbuf(int f,char *buf,size_t len)
724 {
725         read_buf(f,buf,len);
726         buf[len] = 0;
727 }
728
729 unsigned char read_byte(int f)
730 {
731         unsigned char c;
732         read_buf(f, (char *)&c, 1);
733         return c;
734 }
735
736
737 /**
738  * Sleep after writing to limit I/O bandwidth usage.
739  *
740  * @todo Rather than sleeping after each write, it might be better to
741  * use some kind of averaging.  The current algorithm seems to always
742  * use a bit less bandwidth than specified, because it doesn't make up
743  * for slow periods.  But arguably this is a feature.  In addition, we
744  * ought to take the time used to write the data into account.
745  *
746  * During some phases of big transfers (file FOO is uptodate) this is
747  * called with a small bytes_written every time.  As the kernel has to
748  * round small waits up to guarantee that we actually wait at least the
749  * requested number of microseconds, this can become grossly inaccurate.
750  * We therefore keep track of the bytes we've written over time and only
751  * sleep when the accumulated delay is at least 1 tenth of a second.
752  **/
753 static void sleep_for_bwlimit(int bytes_written)
754 {
755         static struct timeval prior_tv;
756         static long total_written = 0; 
757         struct timeval tv, start_tv;
758         long elapsed_usec, sleep_usec;
759
760 #define ONE_SEC 1000000L /* # of microseconds in a second */
761
762         if (!bwlimit)
763                 return;
764
765         total_written += bytes_written; 
766
767         gettimeofday(&start_tv, NULL);
768         if (prior_tv.tv_sec) {
769                 elapsed_usec = (start_tv.tv_sec - prior_tv.tv_sec) * ONE_SEC
770                              + (start_tv.tv_usec - prior_tv.tv_usec);
771                 total_written -= elapsed_usec * bwlimit / (ONE_SEC/1024);
772                 if (total_written < 0)
773                         total_written = 0;
774         }
775
776         sleep_usec = total_written * (ONE_SEC/1024) / bwlimit;
777         if (sleep_usec < ONE_SEC / 10) {
778                 prior_tv = start_tv;
779                 return;
780         }
781
782         tv.tv_sec  = sleep_usec / ONE_SEC;
783         tv.tv_usec = sleep_usec % ONE_SEC;
784         select(0, NULL, NULL, NULL, &tv);
785
786         gettimeofday(&prior_tv, NULL);
787         elapsed_usec = (prior_tv.tv_sec - start_tv.tv_sec) * ONE_SEC
788                      + (prior_tv.tv_usec - start_tv.tv_usec);
789         total_written = (sleep_usec - elapsed_usec) * bwlimit / (ONE_SEC/1024);
790 }
791
792
793 /**
794  * Write len bytes to the file descriptor @p fd.
795  *
796  * This function underlies the multiplexing system.  The body of the
797  * application never calls this function directly.
798  **/
799 static void writefd_unbuffered(int fd,char *buf,size_t len)
800 {
801         size_t total = 0;
802         fd_set w_fds, r_fds;
803         int fd_count, count;
804         struct timeval tv;
805
806         msg_list_push(NORMAL_FLUSH);
807
808         no_flush++;
809
810         while (total < len) {
811                 FD_ZERO(&w_fds);
812                 FD_SET(fd,&w_fds);
813                 fd_count = fd;
814
815                 if (msg_fd_in >= 0) {
816                         FD_ZERO(&r_fds);
817                         FD_SET(msg_fd_in,&r_fds);
818                         if (msg_fd_in > fd_count)
819                                 fd_count = msg_fd_in;
820                 }
821
822                 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
823                 tv.tv_usec = 0;
824
825                 errno = 0;
826                 count = select(fd_count+1, msg_fd_in >= 0 ? &r_fds : NULL,
827                                &w_fds, NULL, &tv);
828
829                 if (count == 0) {
830                         msg_list_push(NORMAL_FLUSH);
831                         check_timeout();
832                 }
833
834                 if (count <= 0) {
835                         if (errno == EBADF)
836                                 exit_cleanup(RERR_SOCKETIO);
837                         continue;
838                 }
839
840                 if (msg_fd_in >= 0 && FD_ISSET(msg_fd_in, &r_fds))
841                         read_msg_fd();
842
843                 if (FD_ISSET(fd, &w_fds)) {
844                         int ret;
845                         size_t n = len-total;
846                         if (bwlimit && n > bwlimit_writemax)
847                                 n = bwlimit_writemax;
848                         ret = write(fd,buf+total,n);
849
850                         if (ret < 0) {
851                                 if (errno == EINTR)
852                                         continue;
853                                 if (errno == EWOULDBLOCK || errno == EAGAIN) {
854                                         msleep(1);
855                                         continue;
856                                 }
857                         }
858
859                         if (ret <= 0) {
860                                 /* Don't try to write errors back
861                                  * across the stream */
862                                 io_multiplexing_close();
863                                 rsyserr(FERROR, errno,
864                                         "writefd_unbuffered failed to write %ld bytes: phase \"%s\"",
865                                         (long)len, io_write_phase);
866                                 exit_cleanup(RERR_STREAMIO);
867                         }
868
869                         sleep_for_bwlimit(ret);
870
871                         total += ret;
872
873                         if (io_timeout)
874                                 last_io = time(NULL);
875                 }
876         }
877
878         no_flush--;
879 }
880
881
882 static char *io_buffer;
883 static int io_buffer_count;
884
885 void io_start_buffering_out(int fd)
886 {
887         if (io_buffer)
888                 return;
889         multiplex_out_fd = fd;
890         io_buffer = new_array(char, IO_BUFFER_SIZE);
891         if (!io_buffer)
892                 out_of_memory("writefd");
893         io_buffer_count = 0;
894 }
895
896 void io_start_buffering_in(int fd)
897 {
898         multiplex_in_fd = fd;
899 }
900
901 /**
902  * Write an message to a multiplexed stream. If this fails then rsync
903  * exits.
904  **/
905 static void mplex_write(int fd, enum msgcode code, char *buf, size_t len)
906 {
907         char buffer[4096];
908         size_t n = len;
909
910         SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
911
912         if (n > (sizeof buffer - 4)) {
913                 n = sizeof buffer - 4;
914         }
915
916         memcpy(&buffer[4], buf, n);
917         writefd_unbuffered(fd, buffer, n+4);
918
919         len -= n;
920         buf += n;
921
922         if (len) {
923                 writefd_unbuffered(fd, buf, len);
924         }
925 }
926
927
928 void io_flush(int flush_it_all)
929 {
930         int fd = multiplex_out_fd;
931
932         msg_list_push(flush_it_all);
933
934         if (!io_buffer_count || no_flush)
935                 return;
936
937         if (io_multiplexing_out)
938                 mplex_write(fd, MSG_DATA, io_buffer, io_buffer_count);
939         else
940                 writefd_unbuffered(fd, io_buffer, io_buffer_count);
941         io_buffer_count = 0;
942 }
943
944
945 void io_end_buffering(void)
946 {
947         io_flush(NORMAL_FLUSH);
948         if (!io_multiplexing_out) {
949                 free(io_buffer);
950                 io_buffer = NULL;
951         }
952 }
953
954 static void writefd(int fd,char *buf,size_t len)
955 {
956         stats.total_written += len;
957
958         msg_list_push(NORMAL_FLUSH);
959
960         if (!io_buffer || fd != multiplex_out_fd) {
961                 writefd_unbuffered(fd, buf, len);
962                 return;
963         }
964
965         while (len) {
966                 int n = MIN((int)len, IO_BUFFER_SIZE-io_buffer_count);
967                 if (n > 0) {
968                         memcpy(io_buffer+io_buffer_count, buf, n);
969                         buf += n;
970                         len -= n;
971                         io_buffer_count += n;
972                 }
973
974                 if (io_buffer_count == IO_BUFFER_SIZE)
975                         io_flush(NORMAL_FLUSH);
976         }
977 }
978
979
980 void write_int(int f,int32 x)
981 {
982         char b[4];
983         SIVAL(b,0,x);
984         writefd(f,b,4);
985 }
986
987
988 void write_int_named(int f, int32 x, const char *phase)
989 {
990         io_write_phase = phase;
991         write_int(f, x);
992         io_write_phase = phase_unknown;
993 }
994
995
996 /*
997  * Note: int64 may actually be a 32-bit type if ./configure couldn't find any
998  * 64-bit types on this platform.
999  */
1000 void write_longint(int f, int64 x)
1001 {
1002         char b[8];
1003
1004         if (x <= 0x7FFFFFFF) {
1005                 write_int(f, (int)x);
1006                 return;
1007         }
1008
1009 #ifdef NO_INT64
1010         rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
1011         exit_cleanup(RERR_UNSUPPORTED);
1012 #else
1013         write_int(f, (int32)0xFFFFFFFF);
1014         SIVAL(b,0,(x&0xFFFFFFFF));
1015         SIVAL(b,4,((x>>32)&0xFFFFFFFF));
1016
1017         writefd(f,b,8);
1018 #endif
1019 }
1020
1021 void write_buf(int f,char *buf,size_t len)
1022 {
1023         writefd(f,buf,len);
1024 }
1025
1026 /** Write a string to the connection */
1027 static void write_sbuf(int f,char *buf)
1028 {
1029         write_buf(f, buf, strlen(buf));
1030 }
1031
1032
1033 void write_byte(int f,unsigned char c)
1034 {
1035         write_buf(f,(char *)&c,1);
1036 }
1037
1038
1039
1040 /**
1041  * Read a line of up to @p maxlen characters into @p buf (not counting
1042  * the trailing null).  Strips the (required) trailing newline and all
1043  * carriage returns.
1044  *
1045  * @return 1 for success; 0 for I/O error or truncation.
1046  **/
1047 int read_line(int f, char *buf, size_t maxlen)
1048 {
1049         while (maxlen) {
1050                 buf[0] = 0;
1051                 read_buf(f, buf, 1);
1052                 if (buf[0] == 0)
1053                         return 0;
1054                 if (buf[0] == '\n')
1055                         break;
1056                 if (buf[0] != '\r') {
1057                         buf++;
1058                         maxlen--;
1059                 }
1060         }
1061         *buf = '\0';
1062         return maxlen > 0;
1063 }
1064
1065
1066 void io_printf(int fd, const char *format, ...)
1067 {
1068         va_list ap;
1069         char buf[1024];
1070         int len;
1071
1072         va_start(ap, format);
1073         len = vsnprintf(buf, sizeof buf, format, ap);
1074         va_end(ap);
1075
1076         if (len < 0)
1077                 exit_cleanup(RERR_STREAMIO);
1078
1079         write_sbuf(fd, buf);
1080 }
1081
1082
1083 /** Setup for multiplexing a MSG_* stream with the data stream. */
1084 void io_start_multiplex_out(int fd)
1085 {
1086         multiplex_out_fd = fd;
1087         io_flush(NORMAL_FLUSH);
1088         io_start_buffering_out(fd);
1089         io_multiplexing_out = 1;
1090 }
1091
1092 /** Setup for multiplexing a MSG_* stream with the data stream. */
1093 void io_start_multiplex_in(int fd)
1094 {
1095         multiplex_in_fd = fd;
1096         io_flush(NORMAL_FLUSH);
1097         io_multiplexing_in = 1;
1098 }
1099
1100 /** Write an message to the multiplexed data stream. */
1101 int io_multiplex_write(enum msgcode code, char *buf, size_t len)
1102 {
1103         if (!io_multiplexing_out)
1104                 return 0;
1105
1106         io_flush(NORMAL_FLUSH);
1107         stats.total_written += (len+4);
1108         mplex_write(multiplex_out_fd, code, buf, len);
1109         return 1;
1110 }
1111
1112 /** Stop output multiplexing. */
1113 void io_multiplexing_close(void)
1114 {
1115         io_multiplexing_out = 0;
1116 }
1117