Made write_sbuf() non-static.
[rsync/rsync.git] / io.c
1 /* -*- c-file-style: "linux" -*-
2  *
3  * Copyright (C) 1996-2001 by Andrew Tridgell
4  * Copyright (C) Paul Mackerras 1996
5  * Copyright (C) 2001, 2002 by Martin Pool <mbp@samba.org>
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 2 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 /**
23  * @file io.c
24  *
25  * Socket and pipe I/O utilities used in rsync.
26  *
27  * rsync provides its own multiplexing system, which is used to send
28  * stderr and stdout over a single socket.  We need this because
29  * stdout normally carries the binary data stream, and stderr all our
30  * error messages.
31  *
32  * For historical reasons this is off during the start of the
33  * connection, but it's switched on quite early using
34  * io_start_multiplex_out() and io_start_multiplex_in().
35  **/
36
37 #include "rsync.h"
38
39 /** If no timeout is specified then use a 60 second select timeout */
40 #define SELECT_TIMEOUT 60
41
42 extern int bwlimit;
43 extern size_t bwlimit_writemax;
44 extern int verbose;
45 extern int io_timeout;
46 extern int am_server;
47 extern int am_daemon;
48 extern int am_sender;
49 extern int eol_nulls;
50 extern int checksum_seed;
51 extern int protocol_version;
52 extern char *remote_filesfrom_file;
53 extern struct stats stats;
54
55 const char phase_unknown[] = "unknown";
56 int select_timeout = SELECT_TIMEOUT;
57 int batch_fd = -1;
58
59 /**
60  * The connection might be dropped at some point; perhaps because the
61  * remote instance crashed.  Just giving the offset on the stream is
62  * not very helpful.  So instead we try to make io_phase_name point to
63  * something useful.
64  *
65  * For buffered/multiplexed I/O these names will be somewhat
66  * approximate; perhaps for ease of support we would rather make the
67  * buffer always flush when a single application-level I/O finishes.
68  *
69  * @todo Perhaps we want some simple stack functionality, but there's
70  * no need to overdo it.
71  **/
72 const char *io_write_phase = phase_unknown;
73 const char *io_read_phase = phase_unknown;
74
75 /** Ignore EOF errors while reading a module listing if the remote
76     version is 24 or less. */
77 int kludge_around_eof = False;
78
79 int msg_fd_in = -1;
80 int msg_fd_out = -1;
81
82 static int io_multiplexing_out;
83 static int io_multiplexing_in;
84 static int sock_f_in = -1;
85 static int sock_f_out = -1;
86 static time_t last_io;
87 static int no_flush;
88
89 static int write_batch_monitor_in = -1;
90 static int write_batch_monitor_out = -1;
91
92 static int io_filesfrom_f_in = -1;
93 static int io_filesfrom_f_out = -1;
94 static char io_filesfrom_buf[2048];
95 static char *io_filesfrom_bp;
96 static char io_filesfrom_lastchar;
97 static int io_filesfrom_buflen;
98
99 static void read_loop(int fd, char *buf, size_t len);
100
101 struct redo_list {
102         struct redo_list *next;
103         int num;
104 };
105
106 static struct redo_list *redo_list_head;
107 static struct redo_list *redo_list_tail;
108
109 struct msg_list {
110         struct msg_list *next;
111         char *buf;
112         int len;
113 };
114
115 static struct msg_list *msg_list_head;
116 static struct msg_list *msg_list_tail;
117
118 static void redo_list_add(int num)
119 {
120         struct redo_list *rl;
121
122         if (!(rl = new(struct redo_list)))
123                 exit_cleanup(RERR_MALLOC);
124         rl->next = NULL;
125         rl->num = num;
126         if (redo_list_tail)
127                 redo_list_tail->next = rl;
128         else
129                 redo_list_head = rl;
130         redo_list_tail = rl;
131 }
132
133 static void check_timeout(void)
134 {
135         time_t t;
136
137         if (!io_timeout)
138                 return;
139
140         if (!last_io) {
141                 last_io = time(NULL);
142                 return;
143         }
144
145         t = time(NULL);
146
147         if (t - last_io >= io_timeout) {
148                 if (!am_server && !am_daemon) {
149                         rprintf(FERROR, "io timeout after %d seconds - exiting\n",
150                                 (int)(t-last_io));
151                 }
152                 exit_cleanup(RERR_TIMEOUT);
153         }
154 }
155
156 /* Note the fds used for the main socket (which might really be a pipe
157  * for a local transfer, but we can ignore that). */
158 void io_set_sock_fds(int f_in, int f_out)
159 {
160         sock_f_in = f_in;
161         sock_f_out = f_out;
162 }
163
164 /** Setup the fd used to receive MSG_* messages.  Only needed when
165  * we're the generator because the sender and receiver both use the
166  * multiplexed I/O setup. */
167 void set_msg_fd_in(int fd)
168 {
169         msg_fd_in = fd;
170 }
171
172 /** Setup the fd used to send our MSG_* messages.  Only needed when
173  * we're the receiver because the generator and the sender both use
174  * the multiplexed I/O setup. */
175 void set_msg_fd_out(int fd)
176 {
177         msg_fd_out = fd;
178         set_nonblocking(msg_fd_out);
179 }
180
181 /* Add a message to the pending MSG_* list. */
182 static void msg_list_add(int code, char *buf, int len)
183 {
184         struct msg_list *ml;
185
186         if (!(ml = new(struct msg_list)))
187                 exit_cleanup(RERR_MALLOC);
188         ml->next = NULL;
189         if (!(ml->buf = new_array(char, len+4)))
190                 exit_cleanup(RERR_MALLOC);
191         SIVAL(ml->buf, 0, ((code+MPLEX_BASE)<<24) | len);
192         memcpy(ml->buf+4, buf, len);
193         ml->len = len+4;
194         if (msg_list_tail)
195                 msg_list_tail->next = ml;
196         else
197                 msg_list_head = ml;
198         msg_list_tail = ml;
199 }
200
201 void send_msg(enum msgcode code, char *buf, int len)
202 {
203         msg_list_add(code, buf, len);
204         msg_list_push(NORMAL_FLUSH);
205 }
206
207 /** Read a message from the MSG_* fd and dispatch it.  This is only
208  * called by the generator. */
209 static void read_msg_fd(void)
210 {
211         char buf[2048];
212         size_t n;
213         int fd = msg_fd_in;
214         int tag, len;
215
216         /* Temporarily disable msg_fd_in.  This is needed to avoid looping back
217          * to this routine from read_timeout() and writefd_unbuffered(). */
218         msg_fd_in = -1;
219
220         read_loop(fd, buf, 4);
221         tag = IVAL(buf, 0);
222
223         len = tag & 0xFFFFFF;
224         tag = (tag >> 24) - MPLEX_BASE;
225
226         switch (tag) {
227         case MSG_DONE:
228                 if (len != 0) {
229                         rprintf(FERROR, "invalid message %d:%d\n", tag, len);
230                         exit_cleanup(RERR_STREAMIO);
231                 }
232                 redo_list_add(-1);
233                 break;
234         case MSG_REDO:
235                 if (len != 4) {
236                         rprintf(FERROR, "invalid message %d:%d\n", tag, len);
237                         exit_cleanup(RERR_STREAMIO);
238                 }
239                 read_loop(fd, buf, 4);
240                 redo_list_add(IVAL(buf,0));
241                 break;
242         case MSG_INFO:
243         case MSG_ERROR:
244         case MSG_LOG:
245                 while (len) {
246                         n = len;
247                         if (n >= sizeof buf)
248                                 n = sizeof buf - 1;
249                         read_loop(fd, buf, n);
250                         rwrite((enum logcode)tag, buf, n);
251                         len -= n;
252                 }
253                 break;
254         default:
255                 rprintf(FERROR, "unknown message %d:%d\n", tag, len);
256                 exit_cleanup(RERR_STREAMIO);
257         }
258
259         msg_fd_in = fd;
260 }
261
262 /* Try to push messages off the list onto the wire.  If we leave with more
263  * to do, return 0.  On error, return -1.  If everything flushed, return 1.
264  * This is only active in the receiver. */
265 int msg_list_push(int flush_it_all)
266 {
267         static int written = 0;
268         struct timeval tv;
269         fd_set fds;
270
271         if (msg_fd_out < 0)
272                 return -1;
273
274         while (msg_list_head) {
275                 struct msg_list *ml = msg_list_head;
276                 int n = write(msg_fd_out, ml->buf + written, ml->len - written);
277                 if (n < 0) {
278                         if (errno == EINTR)
279                                 continue;
280                         if (errno != EWOULDBLOCK && errno != EAGAIN)
281                                 return -1;
282                         if (!flush_it_all)
283                                 return 0;
284                         FD_ZERO(&fds);
285                         FD_SET(msg_fd_out, &fds);
286                         tv.tv_sec = select_timeout;
287                         tv.tv_usec = 0;
288                         if (!select(msg_fd_out+1, NULL, &fds, NULL, &tv))
289                                 check_timeout();
290                 } else if ((written += n) == ml->len) {
291                         free(ml->buf);
292                         msg_list_head = ml->next;
293                         if (!msg_list_head)
294                                 msg_list_tail = NULL;
295                         free(ml);
296                         written = 0;
297                 }
298         }
299         return 1;
300 }
301
302 int get_redo_num(void)
303 {
304         struct redo_list *next;
305         int num;
306
307         while (!redo_list_head)
308                 read_msg_fd();
309
310         num = redo_list_head->num;
311         next = redo_list_head->next;
312         free(redo_list_head);
313         redo_list_head = next;
314         if (!next)
315                 redo_list_tail = NULL;
316
317         return num;
318 }
319
320 /**
321  * When we're the receiver and we have a local --files-from list of names
322  * that needs to be sent over the socket to the sender, we have to do two
323  * things at the same time: send the sender a list of what files we're
324  * processing and read the incoming file+info list from the sender.  We do
325  * this by augmenting the read_timeout() function to copy this data.  It
326  * uses the io_filesfrom_buf to read a block of data from f_in (when it is
327  * ready, since it might be a pipe) and then blast it out f_out (when it
328  * is ready to receive more data).
329  */
330 void io_set_filesfrom_fds(int f_in, int f_out)
331 {
332         io_filesfrom_f_in = f_in;
333         io_filesfrom_f_out = f_out;
334         io_filesfrom_bp = io_filesfrom_buf;
335         io_filesfrom_lastchar = '\0';
336         io_filesfrom_buflen = 0;
337 }
338
339 /**
340  * It's almost always an error to get an EOF when we're trying to read
341  * from the network, because the protocol is self-terminating.
342  *
343  * However, there is one unfortunate cases where it is not, which is
344  * rsync <2.4.6 sending a list of modules on a server, since the list
345  * is terminated by closing the socket. So, for the section of the
346  * program where that is a problem (start_socket_client),
347  * kludge_around_eof is True and we just exit.
348  */
349 static void whine_about_eof(int fd)
350 {
351         if (kludge_around_eof && fd == sock_f_in)
352                 exit_cleanup(0);
353
354         rprintf(FERROR, RSYNC_NAME ": connection unexpectedly closed "
355                 "(%.0f bytes read so far)\n",
356                 (double)stats.total_read);
357
358         exit_cleanup(RERR_STREAMIO);
359 }
360
361
362 /**
363  * Read from a socket with I/O timeout. return the number of bytes
364  * read. If no bytes can be read then exit, never return a number <= 0.
365  *
366  * TODO: If the remote shell connection fails, then current versions
367  * actually report an "unexpected EOF" error here.  Since it's a
368  * fairly common mistake to try to use rsh when ssh is required, we
369  * should trap that: if we fail to read any data at all, we should
370  * give a better explanation.  We can tell whether the connection has
371  * started by looking e.g. at whether the remote version is known yet.
372  */
373 static int read_timeout(int fd, char *buf, size_t len)
374 {
375         int n, ret = 0;
376
377         io_flush(NORMAL_FLUSH);
378
379         while (ret == 0) {
380                 /* until we manage to read *something* */
381                 fd_set r_fds, w_fds;
382                 struct timeval tv;
383                 int maxfd = fd;
384                 int count;
385
386                 FD_ZERO(&r_fds);
387                 FD_ZERO(&w_fds);
388                 FD_SET(fd, &r_fds);
389                 if (msg_fd_in >= 0) {
390                         FD_SET(msg_fd_in, &r_fds);
391                         if (msg_fd_in > maxfd)
392                                 maxfd = msg_fd_in;
393                 } else if (msg_list_head) {
394                         FD_SET(msg_fd_out, &w_fds);
395                         if (msg_fd_out > maxfd)
396                                 maxfd = msg_fd_out;
397                 }
398                 if (io_filesfrom_f_out >= 0) {
399                         int new_fd;
400                         if (io_filesfrom_buflen == 0) {
401                                 if (io_filesfrom_f_in >= 0) {
402                                         FD_SET(io_filesfrom_f_in, &r_fds);
403                                         new_fd = io_filesfrom_f_in;
404                                 } else {
405                                         io_filesfrom_f_out = -1;
406                                         new_fd = -1;
407                                 }
408                         } else {
409                                 FD_SET(io_filesfrom_f_out, &w_fds);
410                                 new_fd = io_filesfrom_f_out;
411                         }
412                         if (new_fd > maxfd)
413                                 maxfd = new_fd;
414                 }
415
416                 tv.tv_sec = select_timeout;
417                 tv.tv_usec = 0;
418
419                 errno = 0;
420
421                 count = select(maxfd + 1, &r_fds, &w_fds, NULL, &tv);
422
423                 if (count <= 0) {
424                         if (errno == EBADF)
425                                 exit_cleanup(RERR_SOCKETIO);
426                         check_timeout();
427                         continue;
428                 }
429
430                 if (msg_fd_in >= 0 && FD_ISSET(msg_fd_in, &r_fds))
431                         read_msg_fd();
432                 else if (msg_list_head && FD_ISSET(msg_fd_out, &w_fds))
433                         msg_list_push(NORMAL_FLUSH);
434
435                 if (io_filesfrom_f_out >= 0) {
436                         if (io_filesfrom_buflen) {
437                                 if (FD_ISSET(io_filesfrom_f_out, &w_fds)) {
438                                         int l = write(io_filesfrom_f_out,
439                                                       io_filesfrom_bp,
440                                                       io_filesfrom_buflen);
441                                         if (l > 0) {
442                                                 if (!(io_filesfrom_buflen -= l))
443                                                         io_filesfrom_bp = io_filesfrom_buf;
444                                                 else
445                                                         io_filesfrom_bp += l;
446                                         } else {
447                                                 /* XXX should we complain? */
448                                                 io_filesfrom_f_out = -1;
449                                         }
450                                 }
451                         } else if (io_filesfrom_f_in >= 0) {
452                                 if (FD_ISSET(io_filesfrom_f_in, &r_fds)) {
453                                         int l = read(io_filesfrom_f_in,
454                                                      io_filesfrom_buf,
455                                                      sizeof io_filesfrom_buf);
456                                         if (l <= 0) {
457                                                 /* Send end-of-file marker */
458                                                 io_filesfrom_buf[0] = '\0';
459                                                 io_filesfrom_buf[1] = '\0';
460                                                 io_filesfrom_buflen = io_filesfrom_lastchar? 2 : 1;
461                                                 io_filesfrom_f_in = -1;
462                                         } else {
463                                                 if (!eol_nulls) {
464                                                         char *s = io_filesfrom_buf + l;
465                                                         /* Transform CR and/or LF into '\0' */
466                                                         while (s-- > io_filesfrom_buf) {
467                                                                 if (*s == '\n' || *s == '\r')
468                                                                         *s = '\0';
469                                                         }
470                                                 }
471                                                 if (!io_filesfrom_lastchar) {
472                                                         /* Last buf ended with a '\0', so don't
473                                                          * let this buf start with one. */
474                                                         while (l && !*io_filesfrom_bp)
475                                                                 io_filesfrom_bp++, l--;
476                                                 }
477                                                 if (!l)
478                                                         io_filesfrom_bp = io_filesfrom_buf;
479                                                 else {
480                                                         char *f = io_filesfrom_bp;
481                                                         char *t = f;
482                                                         char *eob = f + l;
483                                                         /* Eliminate any multi-'\0' runs. */
484                                                         while (f != eob) {
485                                                                 if (!(*t++ = *f++)) {
486                                                                         while (f != eob && !*f)
487                                                                                 f++, l--;
488                                                                 }
489                                                         }
490                                                         io_filesfrom_lastchar = f[-1];
491                                                 }
492                                                 io_filesfrom_buflen = l;
493                                         }
494                                 }
495                         }
496                 }
497
498                 if (!FD_ISSET(fd, &r_fds))
499                         continue;
500
501                 n = read(fd, buf, len);
502
503                 if (n <= 0) {
504                         if (n == 0)
505                                 whine_about_eof(fd); /* Doesn't return. */
506                         if (errno == EINTR || errno == EWOULDBLOCK
507                             || errno == EAGAIN)
508                                 continue;
509
510                         /* Don't write errors on a dead socket. */
511                         if (fd == sock_f_in)
512                                 io_multiplexing_close();
513                         rsyserr(FERROR, errno, "read error");
514                         exit_cleanup(RERR_STREAMIO);
515                 }
516
517                 buf += n;
518                 len -= n;
519                 ret += n;
520
521                 if (io_timeout && fd == sock_f_in)
522                         last_io = time(NULL);
523         }
524
525         return ret;
526 }
527
528 /**
529  * Read a line into the "fname" buffer (which must be at least MAXPATHLEN
530  * characters long).
531  */
532 int read_filesfrom_line(int fd, char *fname)
533 {
534         char ch, *s, *eob = fname + MAXPATHLEN - 1;
535         int cnt;
536         int reading_remotely = remote_filesfrom_file != NULL;
537         int nulls = eol_nulls || reading_remotely;
538
539   start:
540         s = fname;
541         while (1) {
542                 cnt = read(fd, &ch, 1);
543                 if (cnt < 0 && (errno == EWOULDBLOCK
544                   || errno == EINTR || errno == EAGAIN)) {
545                         struct timeval tv;
546                         fd_set fds;
547                         FD_ZERO(&fds);
548                         FD_SET(fd, &fds);
549                         tv.tv_sec = select_timeout;
550                         tv.tv_usec = 0;
551                         if (!select(fd+1, &fds, NULL, NULL, &tv))
552                                 check_timeout();
553                         continue;
554                 }
555                 if (cnt != 1)
556                         break;
557                 if (nulls? !ch : (ch == '\r' || ch == '\n')) {
558                         /* Skip empty lines if reading locally. */
559                         if (!reading_remotely && s == fname)
560                                 continue;
561                         break;
562                 }
563                 if (s < eob)
564                         *s++ = ch;
565         }
566         *s = '\0';
567
568         /* Dump comments. */
569         if (*fname == '#' || *fname == ';')
570                 goto start;
571
572         return s - fname;
573 }
574
575
576 static char *iobuf_out;
577 static int iobuf_out_cnt;
578
579 void io_start_buffering_out(void)
580 {
581         if (iobuf_out)
582                 return;
583         if (!(iobuf_out = new_array(char, IO_BUFFER_SIZE)))
584                 out_of_memory("io_start_buffering_out");
585         iobuf_out_cnt = 0;
586 }
587
588
589 static char *iobuf_in;
590 static size_t iobuf_in_siz;
591
592 void io_start_buffering_in(void)
593 {
594         if (iobuf_in)
595                 return;
596         iobuf_in_siz = 2 * IO_BUFFER_SIZE;
597         if (!(iobuf_in = new_array(char, iobuf_in_siz)))
598                 out_of_memory("io_start_buffering_in");
599 }
600
601
602 void io_end_buffering(void)
603 {
604         io_flush(NORMAL_FLUSH);
605         if (!io_multiplexing_out) {
606                 free(iobuf_out);
607                 iobuf_out = NULL;
608         }
609 }
610
611
612 /**
613  * Continue trying to read len bytes - don't return until len has been
614  * read.
615  **/
616 static void read_loop(int fd, char *buf, size_t len)
617 {
618         while (len) {
619                 int n = read_timeout(fd, buf, len);
620
621                 buf += n;
622                 len -= n;
623         }
624 }
625
626
627 /**
628  * Read from the file descriptor handling multiplexing - return number
629  * of bytes read.
630  *
631  * Never returns <= 0.
632  */
633 static int readfd_unbuffered(int fd, char *buf, size_t len)
634 {
635         static size_t remaining;
636         static size_t iobuf_in_ndx;
637         int tag, ret = 0;
638         char line[1024];
639
640         if (!iobuf_in || fd != sock_f_in)
641                 return read_timeout(fd, buf, len);
642
643         if (!io_multiplexing_in && remaining == 0) {
644                 remaining = read_timeout(fd, iobuf_in, iobuf_in_siz);
645                 iobuf_in_ndx = 0;
646         }
647
648         while (ret == 0) {
649                 if (remaining) {
650                         len = MIN(len, remaining);
651                         memcpy(buf, iobuf_in + iobuf_in_ndx, len);
652                         iobuf_in_ndx += len;
653                         remaining -= len;
654                         ret = len;
655                         break;
656                 }
657
658                 read_loop(fd, line, 4);
659                 tag = IVAL(line, 0);
660
661                 remaining = tag & 0xFFFFFF;
662                 tag = (tag >> 24) - MPLEX_BASE;
663
664                 switch (tag) {
665                 case MSG_DATA:
666                         if (remaining > iobuf_in_siz) {
667                                 if (!(iobuf_in = realloc_array(iobuf_in, char,
668                                                                remaining)))
669                                         out_of_memory("readfd_unbuffered");
670                                 iobuf_in_siz = remaining;
671                         }
672                         read_loop(fd, iobuf_in, remaining);
673                         iobuf_in_ndx = 0;
674                         break;
675                 case MSG_INFO:
676                 case MSG_ERROR:
677                         if (remaining >= sizeof line) {
678                                 rprintf(FERROR, "multiplexing overflow %d:%ld\n\n",
679                                         tag, (long)remaining);
680                                 exit_cleanup(RERR_STREAMIO);
681                         }
682                         read_loop(fd, line, remaining);
683                         rwrite((enum logcode)tag, line, remaining);
684                         remaining = 0;
685                         break;
686                 default:
687                         rprintf(FERROR, "unexpected tag %d\n", tag);
688                         exit_cleanup(RERR_STREAMIO);
689                 }
690         }
691
692         if (remaining == 0)
693                 io_flush(NORMAL_FLUSH);
694
695         return ret;
696 }
697
698
699
700 /**
701  * Do a buffered read from @p fd.  Don't return until all @p n bytes
702  * have been read.  If all @p n can't be read then exit with an
703  * error.
704  **/
705 static void readfd(int fd, char *buffer, size_t N)
706 {
707         int  ret;
708         size_t total = 0;
709
710         while (total < N) {
711                 ret = readfd_unbuffered(fd, buffer + total, N-total);
712                 total += ret;
713         }
714
715         if (fd == write_batch_monitor_in) {
716                 if ((size_t)write(batch_fd, buffer, total) != total)
717                         exit_cleanup(RERR_FILEIO);
718         }
719
720         if (fd == sock_f_in)
721                 stats.total_read += total;
722 }
723
724
725 int32 read_int(int f)
726 {
727         char b[4];
728         int32 ret;
729
730         readfd(f,b,4);
731         ret = IVAL(b,0);
732         if (ret == (int32)0xffffffff)
733                 return -1;
734         return ret;
735 }
736
737 int64 read_longint(int f)
738 {
739         int64 ret;
740         char b[8];
741         ret = read_int(f);
742
743         if ((int32)ret != (int32)0xffffffff)
744                 return ret;
745
746 #ifdef NO_INT64
747         rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
748         exit_cleanup(RERR_UNSUPPORTED);
749 #else
750         readfd(f,b,8);
751         ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
752 #endif
753
754         return ret;
755 }
756
757 void read_buf(int f,char *buf,size_t len)
758 {
759         readfd(f,buf,len);
760 }
761
762 void read_sbuf(int f,char *buf,size_t len)
763 {
764         readfd(f, buf, len);
765         buf[len] = 0;
766 }
767
768 unsigned char read_byte(int f)
769 {
770         unsigned char c;
771         readfd(f, (char *)&c, 1);
772         return c;
773 }
774
775
776 /**
777  * Sleep after writing to limit I/O bandwidth usage.
778  *
779  * @todo Rather than sleeping after each write, it might be better to
780  * use some kind of averaging.  The current algorithm seems to always
781  * use a bit less bandwidth than specified, because it doesn't make up
782  * for slow periods.  But arguably this is a feature.  In addition, we
783  * ought to take the time used to write the data into account.
784  *
785  * During some phases of big transfers (file FOO is uptodate) this is
786  * called with a small bytes_written every time.  As the kernel has to
787  * round small waits up to guarantee that we actually wait at least the
788  * requested number of microseconds, this can become grossly inaccurate.
789  * We therefore keep track of the bytes we've written over time and only
790  * sleep when the accumulated delay is at least 1 tenth of a second.
791  **/
792 static void sleep_for_bwlimit(int bytes_written)
793 {
794         static struct timeval prior_tv;
795         static long total_written = 0; 
796         struct timeval tv, start_tv;
797         long elapsed_usec, sleep_usec;
798
799 #define ONE_SEC 1000000L /* # of microseconds in a second */
800
801         if (!bwlimit)
802                 return;
803
804         total_written += bytes_written; 
805
806         gettimeofday(&start_tv, NULL);
807         if (prior_tv.tv_sec) {
808                 elapsed_usec = (start_tv.tv_sec - prior_tv.tv_sec) * ONE_SEC
809                              + (start_tv.tv_usec - prior_tv.tv_usec);
810                 total_written -= elapsed_usec * bwlimit / (ONE_SEC/1024);
811                 if (total_written < 0)
812                         total_written = 0;
813         }
814
815         sleep_usec = total_written * (ONE_SEC/1024) / bwlimit;
816         if (sleep_usec < ONE_SEC / 10) {
817                 prior_tv = start_tv;
818                 return;
819         }
820
821         tv.tv_sec  = sleep_usec / ONE_SEC;
822         tv.tv_usec = sleep_usec % ONE_SEC;
823         select(0, NULL, NULL, NULL, &tv);
824
825         gettimeofday(&prior_tv, NULL);
826         elapsed_usec = (prior_tv.tv_sec - start_tv.tv_sec) * ONE_SEC
827                      + (prior_tv.tv_usec - start_tv.tv_usec);
828         total_written = (sleep_usec - elapsed_usec) * bwlimit / (ONE_SEC/1024);
829 }
830
831
832 /* Write len bytes to the file descriptor fd, looping as necessary to get
833  * the job done and also (in the generator) reading any data on msg_fd_in
834  * (to avoid deadlock).
835  *
836  * This function underlies the multiplexing system.  The body of the
837  * application never calls this function directly. */
838 static void writefd_unbuffered(int fd,char *buf,size_t len)
839 {
840         size_t n, total = 0;
841         fd_set w_fds, r_fds;
842         int maxfd, count, ret;
843         struct timeval tv;
844
845         no_flush++;
846
847         while (total < len) {
848                 FD_ZERO(&w_fds);
849                 FD_SET(fd,&w_fds);
850                 maxfd = fd;
851
852                 if (msg_fd_in >= 0) {
853                         FD_ZERO(&r_fds);
854                         FD_SET(msg_fd_in,&r_fds);
855                         if (msg_fd_in > maxfd)
856                                 maxfd = msg_fd_in;
857                 }
858
859                 tv.tv_sec = select_timeout;
860                 tv.tv_usec = 0;
861
862                 errno = 0;
863                 count = select(maxfd + 1, msg_fd_in >= 0 ? &r_fds : NULL,
864                                &w_fds, NULL, &tv);
865
866                 if (count <= 0) {
867                         if (count < 0 && errno == EBADF)
868                                 exit_cleanup(RERR_SOCKETIO);
869                         check_timeout();
870                         continue;
871                 }
872
873                 if (msg_fd_in >= 0 && FD_ISSET(msg_fd_in, &r_fds))
874                         read_msg_fd();
875
876                 if (!FD_ISSET(fd, &w_fds))
877                         continue;
878
879                 n = len - total;
880                 if (bwlimit && n > bwlimit_writemax)
881                         n = bwlimit_writemax;
882                 ret = write(fd, buf + total, n);
883
884                 if (ret <= 0) {
885                         if (ret < 0) {
886                                 if (errno == EINTR)
887                                         continue;
888                                 if (errno == EWOULDBLOCK || errno == EAGAIN) {
889                                         msleep(1);
890                                         continue;
891                                 }
892                         }
893
894                         /* Don't try to write errors back across the stream. */
895                         if (fd == sock_f_out)
896                                 io_multiplexing_close();
897                         rsyserr(FERROR, errno,
898                                 "writefd_unbuffered failed to write %ld bytes: phase \"%s\"",
899                                 (long)len, io_write_phase);
900                         exit_cleanup(RERR_STREAMIO);
901                 }
902
903                 total += ret;
904
905                 if (fd == sock_f_out) {
906                         if (io_timeout)
907                                 last_io = time(NULL);
908                         sleep_for_bwlimit(ret);
909                 }
910         }
911
912         no_flush--;
913 }
914
915
916 /**
917  * Write an message to a multiplexed stream. If this fails then rsync
918  * exits.
919  **/
920 static void mplex_write(enum msgcode code, char *buf, size_t len)
921 {
922         char buffer[4096];
923         size_t n = len;
924
925         SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
926
927         if (n > sizeof buffer - 4)
928                 n = sizeof buffer - 4;
929
930         memcpy(&buffer[4], buf, n);
931         writefd_unbuffered(sock_f_out, buffer, n+4);
932
933         len -= n;
934         buf += n;
935
936         if (len)
937                 writefd_unbuffered(sock_f_out, buf, len);
938 }
939
940
941 void io_flush(int flush_it_all)
942 {
943         msg_list_push(flush_it_all);
944
945         if (!iobuf_out_cnt || no_flush)
946                 return;
947
948         if (io_multiplexing_out)
949                 mplex_write(MSG_DATA, iobuf_out, iobuf_out_cnt);
950         else
951                 writefd_unbuffered(sock_f_out, iobuf_out, iobuf_out_cnt);
952         iobuf_out_cnt = 0;
953 }
954
955
956 static void writefd(int fd,char *buf,size_t len)
957 {
958         if (fd == msg_fd_out) {
959                 rprintf(FERROR, "Internal error: wrong write used in receiver.\n");
960                 exit_cleanup(RERR_PROTOCOL);
961         }
962
963         if (fd == sock_f_out)
964                 stats.total_written += len;
965
966         if (fd == write_batch_monitor_out) {
967                 if ((size_t)write(batch_fd, buf, len) != len)
968                         exit_cleanup(RERR_FILEIO);
969         }
970
971         if (!iobuf_out || fd != sock_f_out) {
972                 writefd_unbuffered(fd, buf, len);
973                 return;
974         }
975
976         while (len) {
977                 int n = MIN((int)len, IO_BUFFER_SIZE - iobuf_out_cnt);
978                 if (n > 0) {
979                         memcpy(iobuf_out+iobuf_out_cnt, buf, n);
980                         buf += n;
981                         len -= n;
982                         iobuf_out_cnt += n;
983                 }
984
985                 if (iobuf_out_cnt == IO_BUFFER_SIZE)
986                         io_flush(NORMAL_FLUSH);
987         }
988 }
989
990
991 void write_int(int f,int32 x)
992 {
993         char b[4];
994         SIVAL(b,0,x);
995         writefd(f,b,4);
996 }
997
998
999 void write_int_named(int f, int32 x, const char *phase)
1000 {
1001         io_write_phase = phase;
1002         write_int(f, x);
1003         io_write_phase = phase_unknown;
1004 }
1005
1006
1007 /*
1008  * Note: int64 may actually be a 32-bit type if ./configure couldn't find any
1009  * 64-bit types on this platform.
1010  */
1011 void write_longint(int f, int64 x)
1012 {
1013         char b[8];
1014
1015         if (x <= 0x7FFFFFFF) {
1016                 write_int(f, (int)x);
1017                 return;
1018         }
1019
1020 #ifdef NO_INT64
1021         rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
1022         exit_cleanup(RERR_UNSUPPORTED);
1023 #else
1024         write_int(f, (int32)0xFFFFFFFF);
1025         SIVAL(b,0,(x&0xFFFFFFFF));
1026         SIVAL(b,4,((x>>32)&0xFFFFFFFF));
1027
1028         writefd(f,b,8);
1029 #endif
1030 }
1031
1032 void write_buf(int f,char *buf,size_t len)
1033 {
1034         writefd(f,buf,len);
1035 }
1036
1037 /** Write a string to the connection */
1038 void write_sbuf(int f, char *buf)
1039 {
1040         writefd(f, buf, strlen(buf));
1041 }
1042
1043 void write_byte(int f,unsigned char c)
1044 {
1045         writefd(f, (char *)&c, 1);
1046 }
1047
1048
1049
1050 /**
1051  * Read a line of up to @p maxlen characters into @p buf (not counting
1052  * the trailing null).  Strips the (required) trailing newline and all
1053  * carriage returns.
1054  *
1055  * @return 1 for success; 0 for I/O error or truncation.
1056  **/
1057 int read_line(int f, char *buf, size_t maxlen)
1058 {
1059         while (maxlen) {
1060                 buf[0] = 0;
1061                 read_buf(f, buf, 1);
1062                 if (buf[0] == 0)
1063                         return 0;
1064                 if (buf[0] == '\n')
1065                         break;
1066                 if (buf[0] != '\r') {
1067                         buf++;
1068                         maxlen--;
1069                 }
1070         }
1071         *buf = '\0';
1072         return maxlen > 0;
1073 }
1074
1075
1076 void io_printf(int fd, const char *format, ...)
1077 {
1078         va_list ap;
1079         char buf[1024];
1080         int len;
1081
1082         va_start(ap, format);
1083         len = vsnprintf(buf, sizeof buf, format, ap);
1084         va_end(ap);
1085
1086         if (len < 0)
1087                 exit_cleanup(RERR_STREAMIO);
1088
1089         write_sbuf(fd, buf);
1090 }
1091
1092
1093 /** Setup for multiplexing a MSG_* stream with the data stream. */
1094 void io_start_multiplex_out(void)
1095 {
1096         io_flush(NORMAL_FLUSH);
1097         io_start_buffering_out();
1098         io_multiplexing_out = 1;
1099 }
1100
1101 /** Setup for multiplexing a MSG_* stream with the data stream. */
1102 void io_start_multiplex_in(void)
1103 {
1104         io_flush(NORMAL_FLUSH);
1105         io_start_buffering_in();
1106         io_multiplexing_in = 1;
1107 }
1108
1109 /** Write an message to the multiplexed data stream. */
1110 int io_multiplex_write(enum msgcode code, char *buf, size_t len)
1111 {
1112         if (!io_multiplexing_out)
1113                 return 0;
1114
1115         io_flush(NORMAL_FLUSH);
1116         stats.total_written += (len+4);
1117         mplex_write(code, buf, len);
1118         return 1;
1119 }
1120
1121 /** Stop output multiplexing. */
1122 void io_multiplexing_close(void)
1123 {
1124         io_multiplexing_out = 0;
1125 }
1126
1127 void start_write_batch(int fd)
1128 {
1129         /* Some communication has already taken place, but we don't
1130          * enable batch writing until here so that we can write a
1131          * canonical record of the communication even though the
1132          * actual communication so far depends on whether a daemon
1133          * is involved. */
1134         write_int(batch_fd, protocol_version);
1135         write_int(batch_fd, checksum_seed);
1136
1137         if (am_sender)
1138                 write_batch_monitor_out = fd;
1139         else
1140                 write_batch_monitor_in = fd;
1141 }
1142
1143 void stop_write_batch(void)
1144 {
1145         write_batch_monitor_out = -1;
1146         write_batch_monitor_in = -1;
1147 }