Made the int_byte_cnt[] array 75% smaller.
[rsync/rsync.git] / io.c
1 /*
2  * Socket and pipe I/O utilities used in rsync.
3  *
4  * Copyright (C) 1996-2001 Andrew Tridgell
5  * Copyright (C) 1996 Paul Mackerras
6  * Copyright (C) 2001, 2002 Martin Pool <mbp@samba.org>
7  * Copyright (C) 2003, 2004, 2005, 2006 Wayne Davison
8  *
9  * This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License along
20  * with this program; if not, write to the Free Software Foundation, Inc.,
21  * 51 Franklin Street - Fifth Floor, Boston, MA 02110-1301, USA.
22  */
23
24 /* Rsync provides its own multiplexing system, which is used to send
25  * stderr and stdout over a single socket.
26  *
27  * For historical reasons this is off during the start of the
28  * connection, but it's switched on quite early using
29  * io_start_multiplex_out() and io_start_multiplex_in(). */
30
31 #include "rsync.h"
32
33 /** If no timeout is specified then use a 60 second select timeout */
34 #define SELECT_TIMEOUT 60
35
36 extern int bwlimit;
37 extern size_t bwlimit_writemax;
38 extern int io_timeout;
39 extern int allowed_lull;
40 extern int am_server;
41 extern int am_daemon;
42 extern int am_sender;
43 extern int am_generator;
44 extern int eol_nulls;
45 extern int read_batch;
46 extern int csum_length;
47 extern int checksum_seed;
48 extern int protocol_version;
49 extern int remove_source_files;
50 extern int preserve_hard_links;
51 extern char *filesfrom_host;
52 extern struct stats stats;
53 extern struct file_list *the_file_list;
54
55 const char phase_unknown[] = "unknown";
56 int ignore_timeout = 0;
57 int batch_fd = -1;
58 int batch_gen_fd = -1;
59
60 /* Ignore an EOF error if non-zero. See whine_about_eof(). */
61 int kluge_around_eof = 0;
62
63 int msg_fd_in = -1;
64 int msg_fd_out = -1;
65 int sock_f_in = -1;
66 int sock_f_out = -1;
67
68 static int io_multiplexing_out;
69 static int io_multiplexing_in;
70 static time_t last_io_in;
71 static time_t last_io_out;
72 static int no_flush;
73
74 static int write_batch_monitor_in = -1;
75 static int write_batch_monitor_out = -1;
76
77 static int io_filesfrom_f_in = -1;
78 static int io_filesfrom_f_out = -1;
79 static char io_filesfrom_buf[2048];
80 static char *io_filesfrom_bp;
81 static char io_filesfrom_lastchar;
82 static int io_filesfrom_buflen;
83 static int defer_forwarding_messages = 0;
84 static int select_timeout = SELECT_TIMEOUT;
85 static int active_filecnt = 0;
86 static OFF_T active_bytecnt = 0;
87
88 static char int_byte_cnt[64] = {
89         3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, /* (00 - 3F)/4 */
90         3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, /* (40 - 7F)/4 */
91         4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, /* (80 - BF)/4 */
92         5, 5, 5, 5, 5, 5, 5, 5, 6, 6, 6, 6, 7, 7, 8, 9, /* (C0 - FF)/4 */
93 };
94
95 static void read_loop(int fd, char *buf, size_t len);
96
97 struct flist_ndx_item {
98         struct flist_ndx_item *next;
99         int ndx;
100 };
101
102 struct flist_ndx_list {
103         struct flist_ndx_item *head, *tail;
104 };
105
106 static struct flist_ndx_list redo_list, hlink_list;
107
108 struct msg_list_item {
109         struct msg_list_item *next;
110         int len;
111         char buf[1];
112 };
113
114 struct msg_list {
115         struct msg_list_item *head, *tail;
116 };
117
118 static struct msg_list msg2genr, msg2sndr;
119
120 static void flist_ndx_push(struct flist_ndx_list *lp, int ndx)
121 {
122         struct flist_ndx_item *item;
123
124         if (!(item = new(struct flist_ndx_item)))
125                 out_of_memory("flist_ndx_push");
126         item->next = NULL;
127         item->ndx = ndx;
128         if (lp->tail)
129                 lp->tail->next = item;
130         else
131                 lp->head = item;
132         lp->tail = item;
133 }
134
135 static int flist_ndx_pop(struct flist_ndx_list *lp)
136 {
137         struct flist_ndx_item *next;
138         int ndx;
139
140         if (!lp->head)
141                 return -1;
142
143         ndx = lp->head->ndx;
144         next = lp->head->next;
145         free(lp->head);
146         lp->head = next;
147         if (!next)
148                 lp->tail = NULL;
149
150         return ndx;
151 }
152
153 static void check_timeout(void)
154 {
155         time_t t;
156
157         if (!io_timeout || ignore_timeout)
158                 return;
159
160         if (!last_io_in) {
161                 last_io_in = time(NULL);
162                 return;
163         }
164
165         t = time(NULL);
166
167         if (t - last_io_in >= io_timeout) {
168                 if (!am_server && !am_daemon) {
169                         rprintf(FERROR, "io timeout after %d seconds -- exiting\n",
170                                 (int)(t-last_io_in));
171                 }
172                 exit_cleanup(RERR_TIMEOUT);
173         }
174 }
175
176 /* Note the fds used for the main socket (which might really be a pipe
177  * for a local transfer, but we can ignore that). */
178 void io_set_sock_fds(int f_in, int f_out)
179 {
180         sock_f_in = f_in;
181         sock_f_out = f_out;
182 }
183
184 void set_io_timeout(int secs)
185 {
186         io_timeout = secs;
187
188         if (!io_timeout || io_timeout > SELECT_TIMEOUT)
189                 select_timeout = SELECT_TIMEOUT;
190         else
191                 select_timeout = io_timeout;
192
193         allowed_lull = read_batch ? 0 : (io_timeout + 1) / 2;
194 }
195
196 /* Setup the fd used to receive MSG_* messages.  Only needed during the
197  * early stages of being a local sender (up through the sending of the
198  * file list) or when we're the generator (to fetch the messages from
199  * the receiver). */
200 void set_msg_fd_in(int fd)
201 {
202         msg_fd_in = fd;
203 }
204
205 /* Setup the fd used to send our MSG_* messages.  Only needed when
206  * we're the receiver (to send our messages to the generator). */
207 void set_msg_fd_out(int fd)
208 {
209         msg_fd_out = fd;
210         set_nonblocking(msg_fd_out);
211 }
212
213 /* Add a message to the pending MSG_* list. */
214 static void msg_list_add(struct msg_list *lst, int code, const char *buf, int len)
215 {
216         struct msg_list_item *m;
217         int sz = len + 4 + sizeof m[0] - 1;
218
219         if (!(m = (struct msg_list_item *)new_array(char, sz)))
220                 out_of_memory("msg_list_add");
221         m->next = NULL;
222         m->len = len + 4;
223         SIVAL(m->buf, 0, ((code+MPLEX_BASE)<<24) | len);
224         memcpy(m->buf + 4, buf, len);
225         if (lst->tail)
226                 lst->tail->next = m;
227         else
228                 lst->head = m;
229         lst->tail = m;
230 }
231
232 /* Read a message from the MSG_* fd and handle it.  This is called either
233  * during the early stages of being a local sender (up through the sending
234  * of the file list) or when we're the generator (to fetch the messages
235  * from the receiver). */
236 static void read_msg_fd(void)
237 {
238         char buf[2048];
239         size_t n;
240         int fd = msg_fd_in;
241         int tag, len;
242
243         /* Temporarily disable msg_fd_in.  This is needed to avoid looping back
244          * to this routine from writefd_unbuffered(). */
245         msg_fd_in = -1;
246
247         read_loop(fd, buf, 4);
248         tag = IVAL(buf, 0);
249
250         len = tag & 0xFFFFFF;
251         tag = (tag >> 24) - MPLEX_BASE;
252
253         switch (tag) {
254         case MSG_DONE:
255                 if (len != 0 || !am_generator) {
256                         rprintf(FERROR, "invalid message %d:%d\n", tag, len);
257                         exit_cleanup(RERR_STREAMIO);
258                 }
259                 flist_ndx_push(&redo_list, -1);
260                 break;
261         case MSG_REDO:
262                 if (len != 4 || !am_generator) {
263                         rprintf(FERROR, "invalid message %d:%d\n", tag, len);
264                         exit_cleanup(RERR_STREAMIO);
265                 }
266                 read_loop(fd, buf, 4);
267                 if (remove_source_files)
268                         decrement_active_files(IVAL(buf,0));
269                 flist_ndx_push(&redo_list, IVAL(buf,0));
270                 break;
271         case MSG_DELETED:
272                 if (len >= (int)sizeof buf || !am_generator) {
273                         rprintf(FERROR, "invalid message %d:%d\n", tag, len);
274                         exit_cleanup(RERR_STREAMIO);
275                 }
276                 read_loop(fd, buf, len);
277                 send_msg(MSG_DELETED, buf, len);
278                 break;
279         case MSG_SUCCESS:
280                 if (len != 4 || !am_generator) {
281                         rprintf(FERROR, "invalid message %d:%d\n", tag, len);
282                         exit_cleanup(RERR_STREAMIO);
283                 }
284                 read_loop(fd, buf, len);
285                 if (remove_source_files) {
286                         decrement_active_files(IVAL(buf,0));
287                         send_msg(MSG_SUCCESS, buf, len);
288                 }
289                 if (preserve_hard_links)
290                         flist_ndx_push(&hlink_list, IVAL(buf,0));
291                 break;
292         case MSG_SOCKERR:
293         case MSG_CLIENT:
294                 if (!am_generator) {
295                         rprintf(FERROR, "invalid message %d:%d\n", tag, len);
296                         exit_cleanup(RERR_STREAMIO);
297                 }
298                 if (tag == MSG_SOCKERR)
299                         close_multiplexing_out();
300                 /* FALL THROUGH */
301         case MSG_INFO:
302         case MSG_ERROR:
303         case MSG_LOG:
304                 while (len) {
305                         n = len;
306                         if (n >= sizeof buf)
307                                 n = sizeof buf - 1;
308                         read_loop(fd, buf, n);
309                         rwrite((enum logcode)tag, buf, n);
310                         len -= n;
311                 }
312                 break;
313         default:
314                 rprintf(FERROR, "unknown message %d:%d [%s]\n",
315                         tag, len, who_am_i());
316                 exit_cleanup(RERR_STREAMIO);
317         }
318
319         msg_fd_in = fd;
320 }
321
322 /* This is used by the generator to limit how many file transfers can
323  * be active at once when --remove-source-files is specified.  Without
324  * this, sender-side deletions were mostly happening at the end. */
325 void increment_active_files(int ndx, int itemizing, enum logcode code)
326 {
327         /* TODO: tune these limits? */
328         while (active_filecnt >= (active_bytecnt >= 128*1024 ? 10 : 50)) {
329 #ifdef SUPPORT_HARD_LINKS
330                 if (hlink_list.head)
331                         check_for_finished_hlinks(itemizing, code);
332 #endif
333                 read_msg_fd();
334         }
335
336         active_filecnt++;
337         active_bytecnt += F_LENGTH(the_file_list->files[ndx]);
338 }
339
340 void decrement_active_files(int ndx)
341 {
342         active_filecnt--;
343         active_bytecnt -= F_LENGTH(the_file_list->files[ndx]);
344 }
345
346 /* Try to push messages off the list onto the wire.  If we leave with more
347  * to do, return 0.  On error, return -1.  If everything flushed, return 1.
348  * This is only active in the receiver. */
349 static int msg2genr_flush(int flush_it_all)
350 {
351         static int written = 0;
352         struct timeval tv;
353         fd_set fds;
354
355         if (msg_fd_out < 0)
356                 return -1;
357
358         while (msg2genr.head) {
359                 struct msg_list_item *m = msg2genr.head;
360                 int n = write(msg_fd_out, m->buf + written, m->len - written);
361                 if (n < 0) {
362                         if (errno == EINTR)
363                                 continue;
364                         if (errno != EWOULDBLOCK && errno != EAGAIN)
365                                 return -1;
366                         if (!flush_it_all)
367                                 return 0;
368                         FD_ZERO(&fds);
369                         FD_SET(msg_fd_out, &fds);
370                         tv.tv_sec = select_timeout;
371                         tv.tv_usec = 0;
372                         if (!select(msg_fd_out+1, NULL, &fds, NULL, &tv))
373                                 check_timeout();
374                 } else if ((written += n) == m->len) {
375                         msg2genr.head = m->next;
376                         if (!msg2genr.head)
377                                 msg2genr.tail = NULL;
378                         free(m);
379                         written = 0;
380                 }
381         }
382         return 1;
383 }
384
385 int send_msg(enum msgcode code, const char *buf, int len)
386 {
387         if (msg_fd_out < 0) {
388                 if (!defer_forwarding_messages)
389                         return io_multiplex_write(code, buf, len);
390                 if (!io_multiplexing_out)
391                         return 0;
392                 msg_list_add(&msg2sndr, code, buf, len);
393                 return 1;
394         }
395         msg_list_add(&msg2genr, code, buf, len);
396         msg2genr_flush(NORMAL_FLUSH);
397         return 1;
398 }
399
400 void send_msg_int(enum msgcode code, int num)
401 {
402         char numbuf[4];
403         SIVAL(numbuf, 0, num);
404         send_msg(code, numbuf, 4);
405 }
406
407 int get_redo_num(int itemizing, enum logcode code)
408 {
409         while (1) {
410 #ifdef SUPPORT_HARD_LINKS
411                 if (hlink_list.head)
412                         check_for_finished_hlinks(itemizing, code);
413 #endif
414                 if (redo_list.head)
415                         break;
416                 read_msg_fd();
417         }
418
419         return flist_ndx_pop(&redo_list);
420 }
421
422 int get_hlink_num(void)
423 {
424         return flist_ndx_pop(&hlink_list);
425 }
426
427 /**
428  * When we're the receiver and we have a local --files-from list of names
429  * that needs to be sent over the socket to the sender, we have to do two
430  * things at the same time: send the sender a list of what files we're
431  * processing and read the incoming file+info list from the sender.  We do
432  * this by augmenting the read_timeout() function to copy this data.  It
433  * uses the io_filesfrom_buf to read a block of data from f_in (when it is
434  * ready, since it might be a pipe) and then blast it out f_out (when it
435  * is ready to receive more data).
436  */
437 void io_set_filesfrom_fds(int f_in, int f_out)
438 {
439         io_filesfrom_f_in = f_in;
440         io_filesfrom_f_out = f_out;
441         io_filesfrom_bp = io_filesfrom_buf;
442         io_filesfrom_lastchar = '\0';
443         io_filesfrom_buflen = 0;
444 }
445
446 /* It's almost always an error to get an EOF when we're trying to read from the
447  * network, because the protocol is (for the most part) self-terminating.
448  *
449  * There is one case for the receiver when it is at the end of the transfer
450  * (hanging around reading any keep-alive packets that might come its way): if
451  * the sender dies before the generator's kill-signal comes through, we can end
452  * up here needing to loop until the kill-signal arrives.  In this situation,
453  * kluge_around_eof will be < 0.
454  *
455  * There is another case for older protocol versions (< 24) where the module
456  * listing was not terminated, so we must ignore an EOF error in that case and
457  * exit.  In this situation, kluge_around_eof will be > 0. */
458 static void whine_about_eof(int fd)
459 {
460         if (kluge_around_eof && fd == sock_f_in) {
461                 int i;
462                 if (kluge_around_eof > 0)
463                         exit_cleanup(0);
464                 /* If we're still here after 10 seconds, exit with an error. */
465                 for (i = 10*1000/20; i--; )
466                         msleep(20);
467         }
468
469         rprintf(FERROR, RSYNC_NAME ": connection unexpectedly closed "
470                 "(%.0f bytes received so far) [%s]\n",
471                 (double)stats.total_read, who_am_i());
472
473         exit_cleanup(RERR_STREAMIO);
474 }
475
476 /**
477  * Read from a socket with I/O timeout. return the number of bytes
478  * read. If no bytes can be read then exit, never return a number <= 0.
479  *
480  * TODO: If the remote shell connection fails, then current versions
481  * actually report an "unexpected EOF" error here.  Since it's a
482  * fairly common mistake to try to use rsh when ssh is required, we
483  * should trap that: if we fail to read any data at all, we should
484  * give a better explanation.  We can tell whether the connection has
485  * started by looking e.g. at whether the remote version is known yet.
486  */
487 static int read_timeout(int fd, char *buf, size_t len)
488 {
489         int n, cnt = 0;
490
491         io_flush(NORMAL_FLUSH);
492
493         while (cnt == 0) {
494                 /* until we manage to read *something* */
495                 fd_set r_fds, w_fds;
496                 struct timeval tv;
497                 int maxfd = fd;
498                 int count;
499
500                 FD_ZERO(&r_fds);
501                 FD_ZERO(&w_fds);
502                 FD_SET(fd, &r_fds);
503                 if (msg2genr.head) {
504                         FD_SET(msg_fd_out, &w_fds);
505                         if (msg_fd_out > maxfd)
506                                 maxfd = msg_fd_out;
507                 }
508                 if (io_filesfrom_f_out >= 0) {
509                         int new_fd;
510                         if (io_filesfrom_buflen == 0) {
511                                 if (io_filesfrom_f_in >= 0) {
512                                         FD_SET(io_filesfrom_f_in, &r_fds);
513                                         new_fd = io_filesfrom_f_in;
514                                 } else {
515                                         io_filesfrom_f_out = -1;
516                                         new_fd = -1;
517                                 }
518                         } else {
519                                 FD_SET(io_filesfrom_f_out, &w_fds);
520                                 new_fd = io_filesfrom_f_out;
521                         }
522                         if (new_fd > maxfd)
523                                 maxfd = new_fd;
524                 }
525
526                 tv.tv_sec = select_timeout;
527                 tv.tv_usec = 0;
528
529                 errno = 0;
530
531                 count = select(maxfd + 1, &r_fds, &w_fds, NULL, &tv);
532
533                 if (count <= 0) {
534                         if (errno == EBADF)
535                                 exit_cleanup(RERR_SOCKETIO);
536                         check_timeout();
537                         continue;
538                 }
539
540                 if (msg2genr.head && FD_ISSET(msg_fd_out, &w_fds))
541                         msg2genr_flush(NORMAL_FLUSH);
542
543                 if (io_filesfrom_f_out >= 0) {
544                         if (io_filesfrom_buflen) {
545                                 if (FD_ISSET(io_filesfrom_f_out, &w_fds)) {
546                                         int l = write(io_filesfrom_f_out,
547                                                       io_filesfrom_bp,
548                                                       io_filesfrom_buflen);
549                                         if (l > 0) {
550                                                 if (!(io_filesfrom_buflen -= l))
551                                                         io_filesfrom_bp = io_filesfrom_buf;
552                                                 else
553                                                         io_filesfrom_bp += l;
554                                         } else {
555                                                 /* XXX should we complain? */
556                                                 io_filesfrom_f_out = -1;
557                                         }
558                                 }
559                         } else if (io_filesfrom_f_in >= 0) {
560                                 if (FD_ISSET(io_filesfrom_f_in, &r_fds)) {
561                                         int l = read(io_filesfrom_f_in,
562                                                      io_filesfrom_buf,
563                                                      sizeof io_filesfrom_buf);
564                                         if (l <= 0) {
565                                                 /* Send end-of-file marker */
566                                                 io_filesfrom_buf[0] = '\0';
567                                                 io_filesfrom_buf[1] = '\0';
568                                                 io_filesfrom_buflen = io_filesfrom_lastchar? 2 : 1;
569                                                 io_filesfrom_f_in = -1;
570                                         } else {
571                                                 if (!eol_nulls) {
572                                                         char *s = io_filesfrom_buf + l;
573                                                         /* Transform CR and/or LF into '\0' */
574                                                         while (s-- > io_filesfrom_buf) {
575                                                                 if (*s == '\n' || *s == '\r')
576                                                                         *s = '\0';
577                                                         }
578                                                 }
579                                                 if (!io_filesfrom_lastchar) {
580                                                         /* Last buf ended with a '\0', so don't
581                                                          * let this buf start with one. */
582                                                         while (l && !*io_filesfrom_bp)
583                                                                 io_filesfrom_bp++, l--;
584                                                 }
585                                                 if (!l)
586                                                         io_filesfrom_bp = io_filesfrom_buf;
587                                                 else {
588                                                         char *f = io_filesfrom_bp;
589                                                         char *t = f;
590                                                         char *eob = f + l;
591                                                         /* Eliminate any multi-'\0' runs. */
592                                                         while (f != eob) {
593                                                                 if (!(*t++ = *f++)) {
594                                                                         while (f != eob && !*f)
595                                                                                 f++, l--;
596                                                                 }
597                                                         }
598                                                         io_filesfrom_lastchar = f[-1];
599                                                 }
600                                                 io_filesfrom_buflen = l;
601                                         }
602                                 }
603                         }
604                 }
605
606                 if (!FD_ISSET(fd, &r_fds))
607                         continue;
608
609                 n = read(fd, buf, len);
610
611                 if (n <= 0) {
612                         if (n == 0)
613                                 whine_about_eof(fd); /* Doesn't return. */
614                         if (errno == EINTR || errno == EWOULDBLOCK
615                             || errno == EAGAIN)
616                                 continue;
617
618                         /* Don't write errors on a dead socket. */
619                         if (fd == sock_f_in) {
620                                 close_multiplexing_out();
621                                 rsyserr(FSOCKERR, errno, "read error");
622                         } else
623                                 rsyserr(FERROR, errno, "read error");
624                         exit_cleanup(RERR_STREAMIO);
625                 }
626
627                 buf += n;
628                 len -= n;
629                 cnt += n;
630
631                 if (fd == sock_f_in && io_timeout)
632                         last_io_in = time(NULL);
633         }
634
635         return cnt;
636 }
637
638 /**
639  * Read a line into the "fname" buffer (which must be at least MAXPATHLEN
640  * characters long).
641  */
642 int read_filesfrom_line(int fd, char *fname)
643 {
644         char ch, *s, *eob = fname + MAXPATHLEN - 1;
645         int cnt;
646         int reading_remotely = filesfrom_host != NULL;
647         int nulls = eol_nulls || reading_remotely;
648
649   start:
650         s = fname;
651         while (1) {
652                 cnt = read(fd, &ch, 1);
653                 if (cnt < 0 && (errno == EWOULDBLOCK
654                   || errno == EINTR || errno == EAGAIN)) {
655                         struct timeval tv;
656                         fd_set r_fds, e_fds;
657                         FD_ZERO(&r_fds);
658                         FD_SET(fd, &r_fds);
659                         FD_ZERO(&e_fds);
660                         FD_SET(fd, &e_fds);
661                         tv.tv_sec = select_timeout;
662                         tv.tv_usec = 0;
663                         if (!select(fd+1, &r_fds, NULL, &e_fds, &tv))
664                                 check_timeout();
665                         if (FD_ISSET(fd, &e_fds)) {
666                                 rsyserr(FINFO, errno,
667                                         "select exception on fd %d", fd);
668                         }
669                         continue;
670                 }
671                 if (cnt != 1)
672                         break;
673                 if (nulls? !ch : (ch == '\r' || ch == '\n')) {
674                         /* Skip empty lines if reading locally. */
675                         if (!reading_remotely && s == fname)
676                                 continue;
677                         break;
678                 }
679                 if (s < eob)
680                         *s++ = ch;
681         }
682         *s = '\0';
683
684         /* Dump comments. */
685         if (*fname == '#' || *fname == ';')
686                 goto start;
687
688         return s - fname;
689 }
690
691 static char *iobuf_out;
692 static int iobuf_out_cnt;
693
694 void io_start_buffering_out(void)
695 {
696         if (iobuf_out)
697                 return;
698         if (!(iobuf_out = new_array(char, IO_BUFFER_SIZE)))
699                 out_of_memory("io_start_buffering_out");
700         iobuf_out_cnt = 0;
701 }
702
703 static char *iobuf_in;
704 static size_t iobuf_in_siz;
705
706 void io_start_buffering_in(void)
707 {
708         if (iobuf_in)
709                 return;
710         iobuf_in_siz = 2 * IO_BUFFER_SIZE;
711         if (!(iobuf_in = new_array(char, iobuf_in_siz)))
712                 out_of_memory("io_start_buffering_in");
713 }
714
715 void io_end_buffering(void)
716 {
717         io_flush(NORMAL_FLUSH);
718         if (!io_multiplexing_out) {
719                 free(iobuf_out);
720                 iobuf_out = NULL;
721         }
722 }
723
724 void maybe_flush_socket(void)
725 {
726         if (iobuf_out && iobuf_out_cnt && time(NULL) - last_io_out >= 5)
727                 io_flush(NORMAL_FLUSH);
728 }
729
730 void maybe_send_keepalive(void)
731 {
732         if (time(NULL) - last_io_out >= allowed_lull) {
733                 if (!iobuf_out || !iobuf_out_cnt) {
734                         if (protocol_version < 29)
735                                 return; /* there's nothing we can do */
736                         write_int(sock_f_out, the_file_list->count);
737                         write_shortint(sock_f_out, ITEM_IS_NEW);
738                 }
739                 if (iobuf_out)
740                         io_flush(NORMAL_FLUSH);
741         }
742 }
743
744 /**
745  * Continue trying to read len bytes - don't return until len has been
746  * read.
747  **/
748 static void read_loop(int fd, char *buf, size_t len)
749 {
750         while (len) {
751                 int n = read_timeout(fd, buf, len);
752
753                 buf += n;
754                 len -= n;
755         }
756 }
757
758 /**
759  * Read from the file descriptor handling multiplexing - return number
760  * of bytes read.
761  *
762  * Never returns <= 0.
763  */
764 static int readfd_unbuffered(int fd, char *buf, size_t len)
765 {
766         static size_t remaining;
767         static size_t iobuf_in_ndx;
768         size_t msg_bytes;
769         int tag, cnt = 0;
770         char line[BIGPATHBUFLEN];
771
772         if (!iobuf_in || fd != sock_f_in)
773                 return read_timeout(fd, buf, len);
774
775         if (!io_multiplexing_in && remaining == 0) {
776                 remaining = read_timeout(fd, iobuf_in, iobuf_in_siz);
777                 iobuf_in_ndx = 0;
778         }
779
780         while (cnt == 0) {
781                 if (remaining) {
782                         len = MIN(len, remaining);
783                         memcpy(buf, iobuf_in + iobuf_in_ndx, len);
784                         iobuf_in_ndx += len;
785                         remaining -= len;
786                         cnt = len;
787                         break;
788                 }
789
790                 read_loop(fd, line, 4);
791                 tag = IVAL(line, 0);
792
793                 msg_bytes = tag & 0xFFFFFF;
794                 tag = (tag >> 24) - MPLEX_BASE;
795
796                 switch (tag) {
797                 case MSG_DATA:
798                         if (msg_bytes > iobuf_in_siz) {
799                                 if (!(iobuf_in = realloc_array(iobuf_in, char,
800                                                                msg_bytes)))
801                                         out_of_memory("readfd_unbuffered");
802                                 iobuf_in_siz = msg_bytes;
803                         }
804                         read_loop(fd, iobuf_in, msg_bytes);
805                         remaining = msg_bytes;
806                         iobuf_in_ndx = 0;
807                         break;
808                 case MSG_DELETED:
809                         if (msg_bytes >= sizeof line)
810                                 goto overflow;
811                         read_loop(fd, line, msg_bytes);
812                         /* A directory name was sent with the trailing null */
813                         if (msg_bytes > 0 && !line[msg_bytes-1])
814                                 log_delete(line, S_IFDIR);
815                         else {
816                                 line[msg_bytes] = '\0';
817                                 log_delete(line, S_IFREG);
818                         }
819                         break;
820                 case MSG_SUCCESS:
821                         if (msg_bytes != 4) {
822                                 rprintf(FERROR, "invalid multi-message %d:%ld [%s]\n",
823                                         tag, (long)msg_bytes, who_am_i());
824                                 exit_cleanup(RERR_STREAMIO);
825                         }
826                         read_loop(fd, line, msg_bytes);
827                         successful_send(IVAL(line, 0));
828                         break;
829                 case MSG_INFO:
830                 case MSG_ERROR:
831                         if (msg_bytes >= sizeof line) {
832                             overflow:
833                                 rprintf(FERROR,
834                                         "multiplexing overflow %d:%ld [%s]\n",
835                                         tag, (long)msg_bytes, who_am_i());
836                                 exit_cleanup(RERR_STREAMIO);
837                         }
838                         read_loop(fd, line, msg_bytes);
839                         rwrite((enum logcode)tag, line, msg_bytes);
840                         break;
841                 default:
842                         rprintf(FERROR, "unexpected tag %d [%s]\n",
843                                 tag, who_am_i());
844                         exit_cleanup(RERR_STREAMIO);
845                 }
846         }
847
848         if (remaining == 0)
849                 io_flush(NORMAL_FLUSH);
850
851         return cnt;
852 }
853
854 /**
855  * Do a buffered read from @p fd.  Don't return until all @p n bytes
856  * have been read.  If all @p n can't be read then exit with an
857  * error.
858  **/
859 static void readfd(int fd, char *buffer, size_t N)
860 {
861         int  cnt;
862         size_t total = 0;
863
864         while (total < N) {
865                 cnt = readfd_unbuffered(fd, buffer + total, N-total);
866                 total += cnt;
867         }
868
869         if (fd == write_batch_monitor_in) {
870                 if ((size_t)write(batch_fd, buffer, total) != total)
871                         exit_cleanup(RERR_FILEIO);
872         }
873
874         if (fd == sock_f_in)
875                 stats.total_read += total;
876 }
877
878 unsigned short read_shortint(int f)
879 {
880         char b[2];
881         readfd(f, b, 2);
882         return (UVAL(b, 1) << 8) + UVAL(b, 0);
883 }
884
885 int32 read_int(int f)
886 {
887         char b[4];
888         int32 num;
889
890         readfd(f, b, 4);
891         num = IVAL(b, 0);
892 #if SIZEOF_INT32 > 4
893         if (num & (int32)0x80000000)
894                 num |= ~(int32)0xffffffff;
895 #endif
896         return num;
897 }
898
899 int64 read_longint(int f)
900 {
901         int64 num;
902         char b[9];
903
904         if (protocol_version < 30) {
905                 num = read_int(f);
906
907                 if ((int32)num != (int32)0xffffffff)
908                         return num;
909
910 #if SIZEOF_INT64 < 8
911                 rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
912                 exit_cleanup(RERR_UNSUPPORTED);
913 #else
914                 readfd(f, b, 8);
915                 num = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
916 #endif
917         } else {
918                 int cnt;
919                 readfd(f, b, 3);
920                 cnt = int_byte_cnt[CVAL(b, 0) / 4];
921 #if SIZEOF_INT64 < 8
922                 if (cnt > 5 || (cnt == 5 && (CVAL(b,0)&0x3F || CVAL(b,1)&0x80))) {
923                         rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
924                         exit_cleanup(RERR_UNSUPPORTED);
925                 }
926 #endif
927                 if (cnt > 3)
928                         readfd(f, b + 3, cnt - 3);
929                 switch (cnt) {
930                 case 3:
931                         num = NVAL3(b, 0);
932                         break;
933                 case 4:
934                         num = NVAL4(b, 0x80);
935                         break;
936                 case 5:
937                         num = NVAL5(b, 0xC0);
938                         break;
939 #if SIZEOF_INT64 >= 8
940                 case 6:
941                         num = NVAL6(b, 0xE0);
942                         break;
943                 case 7:
944                         num = NVAL7(b, 0xF0);
945                         break;
946                 case 8:
947                         num = NVAL8(b, 0xF8);
948                         break;
949                 case 9:
950                         num = NVAL8(b+1, 0);
951                         break;
952 #endif
953                 default:
954                         exit_cleanup(RERR_PROTOCOL); /* impossible... */
955                 }
956         }
957
958         return num;
959 }
960
961 void read_buf(int f, char *buf, size_t len)
962 {
963         readfd(f,buf,len);
964 }
965
966 void read_sbuf(int f, char *buf, size_t len)
967 {
968         readfd(f, buf, len);
969         buf[len] = '\0';
970 }
971
972 uchar read_byte(int f)
973 {
974         uchar c;
975         readfd(f, (char *)&c, 1);
976         return c;
977 }
978
979 int read_vstring(int f, char *buf, int bufsize)
980 {
981         int len = read_byte(f);
982
983         if (len & 0x80)
984                 len = (len & ~0x80) * 0x100 + read_byte(f);
985
986         if (len >= bufsize) {
987                 rprintf(FERROR, "over-long vstring received (%d > %d)\n",
988                         len, bufsize - 1);
989                 return -1;
990         }
991
992         if (len)
993                 readfd(f, buf, len);
994         buf[len] = '\0';
995         return len;
996 }
997
998 /* Populate a sum_struct with values from the socket.  This is
999  * called by both the sender and the receiver. */
1000 void read_sum_head(int f, struct sum_struct *sum)
1001 {
1002         sum->count = read_int(f);
1003         if (sum->count < 0) {
1004                 rprintf(FERROR, "Invalid checksum count %ld [%s]\n",
1005                         (long)sum->count, who_am_i());
1006                 exit_cleanup(RERR_PROTOCOL);
1007         }
1008         sum->blength = read_int(f);
1009         if (sum->blength < 0 || sum->blength > MAX_BLOCK_SIZE) {
1010                 rprintf(FERROR, "Invalid block length %ld [%s]\n",
1011                         (long)sum->blength, who_am_i());
1012                 exit_cleanup(RERR_PROTOCOL);
1013         }
1014         sum->s2length = protocol_version < 27 ? csum_length : (int)read_int(f);
1015         if (sum->s2length < 0 || sum->s2length > MD4_SUM_LENGTH) {
1016                 rprintf(FERROR, "Invalid checksum length %d [%s]\n",
1017                         sum->s2length, who_am_i());
1018                 exit_cleanup(RERR_PROTOCOL);
1019         }
1020         sum->remainder = read_int(f);
1021         if (sum->remainder < 0 || sum->remainder > sum->blength) {
1022                 rprintf(FERROR, "Invalid remainder length %ld [%s]\n",
1023                         (long)sum->remainder, who_am_i());
1024                 exit_cleanup(RERR_PROTOCOL);
1025         }
1026 }
1027
1028 /* Send the values from a sum_struct over the socket.  Set sum to
1029  * NULL if there are no checksums to send.  This is called by both
1030  * the generator and the sender. */
1031 void write_sum_head(int f, struct sum_struct *sum)
1032 {
1033         static struct sum_struct null_sum;
1034
1035         if (sum == NULL)
1036                 sum = &null_sum;
1037
1038         write_int(f, sum->count);
1039         write_int(f, sum->blength);
1040         if (protocol_version >= 27)
1041                 write_int(f, sum->s2length);
1042         write_int(f, sum->remainder);
1043 }
1044
1045 /**
1046  * Sleep after writing to limit I/O bandwidth usage.
1047  *
1048  * @todo Rather than sleeping after each write, it might be better to
1049  * use some kind of averaging.  The current algorithm seems to always
1050  * use a bit less bandwidth than specified, because it doesn't make up
1051  * for slow periods.  But arguably this is a feature.  In addition, we
1052  * ought to take the time used to write the data into account.
1053  *
1054  * During some phases of big transfers (file FOO is uptodate) this is
1055  * called with a small bytes_written every time.  As the kernel has to
1056  * round small waits up to guarantee that we actually wait at least the
1057  * requested number of microseconds, this can become grossly inaccurate.
1058  * We therefore keep track of the bytes we've written over time and only
1059  * sleep when the accumulated delay is at least 1 tenth of a second.
1060  **/
1061 static void sleep_for_bwlimit(int bytes_written)
1062 {
1063         static struct timeval prior_tv;
1064         static long total_written = 0;
1065         struct timeval tv, start_tv;
1066         long elapsed_usec, sleep_usec;
1067
1068 #define ONE_SEC 1000000L /* # of microseconds in a second */
1069
1070         if (!bwlimit_writemax)
1071                 return;
1072
1073         total_written += bytes_written;
1074
1075         gettimeofday(&start_tv, NULL);
1076         if (prior_tv.tv_sec) {
1077                 elapsed_usec = (start_tv.tv_sec - prior_tv.tv_sec) * ONE_SEC
1078                              + (start_tv.tv_usec - prior_tv.tv_usec);
1079                 total_written -= elapsed_usec * bwlimit / (ONE_SEC/1024);
1080                 if (total_written < 0)
1081                         total_written = 0;
1082         }
1083
1084         sleep_usec = total_written * (ONE_SEC/1024) / bwlimit;
1085         if (sleep_usec < ONE_SEC / 10) {
1086                 prior_tv = start_tv;
1087                 return;
1088         }
1089
1090         tv.tv_sec  = sleep_usec / ONE_SEC;
1091         tv.tv_usec = sleep_usec % ONE_SEC;
1092         select(0, NULL, NULL, NULL, &tv);
1093
1094         gettimeofday(&prior_tv, NULL);
1095         elapsed_usec = (prior_tv.tv_sec - start_tv.tv_sec) * ONE_SEC
1096                      + (prior_tv.tv_usec - start_tv.tv_usec);
1097         total_written = (sleep_usec - elapsed_usec) * bwlimit / (ONE_SEC/1024);
1098 }
1099
1100 /* Write len bytes to the file descriptor fd, looping as necessary to get
1101  * the job done and also (in certain circumstances) reading any data on
1102  * msg_fd_in to avoid deadlock.
1103  *
1104  * This function underlies the multiplexing system.  The body of the
1105  * application never calls this function directly. */
1106 static void writefd_unbuffered(int fd, const char *buf, size_t len)
1107 {
1108         size_t n, total = 0;
1109         fd_set w_fds, r_fds, e_fds;
1110         int maxfd, count, cnt, using_r_fds;
1111         int defer_save = defer_forwarding_messages;
1112         struct timeval tv;
1113
1114         no_flush++;
1115
1116         while (total < len) {
1117                 FD_ZERO(&w_fds);
1118                 FD_SET(fd, &w_fds);
1119                 FD_ZERO(&e_fds);
1120                 FD_SET(fd, &e_fds);
1121                 maxfd = fd;
1122
1123                 if (msg_fd_in >= 0) {
1124                         FD_ZERO(&r_fds);
1125                         FD_SET(msg_fd_in, &r_fds);
1126                         if (msg_fd_in > maxfd)
1127                                 maxfd = msg_fd_in;
1128                         using_r_fds = 1;
1129                 } else
1130                         using_r_fds = 0;
1131
1132                 tv.tv_sec = select_timeout;
1133                 tv.tv_usec = 0;
1134
1135                 errno = 0;
1136                 count = select(maxfd + 1, using_r_fds ? &r_fds : NULL,
1137                                &w_fds, &e_fds, &tv);
1138
1139                 if (count <= 0) {
1140                         if (count < 0 && errno == EBADF)
1141                                 exit_cleanup(RERR_SOCKETIO);
1142                         check_timeout();
1143                         continue;
1144                 }
1145
1146                 if (FD_ISSET(fd, &e_fds)) {
1147                         rsyserr(FINFO, errno,
1148                                 "select exception on fd %d", fd);
1149                 }
1150
1151                 if (using_r_fds && FD_ISSET(msg_fd_in, &r_fds))
1152                         read_msg_fd();
1153
1154                 if (!FD_ISSET(fd, &w_fds))
1155                         continue;
1156
1157                 n = len - total;
1158                 if (bwlimit_writemax && n > bwlimit_writemax)
1159                         n = bwlimit_writemax;
1160                 cnt = write(fd, buf + total, n);
1161
1162                 if (cnt <= 0) {
1163                         if (cnt < 0) {
1164                                 if (errno == EINTR)
1165                                         continue;
1166                                 if (errno == EWOULDBLOCK || errno == EAGAIN) {
1167                                         msleep(1);
1168                                         continue;
1169                                 }
1170                         }
1171
1172                         /* Don't try to write errors back across the stream. */
1173                         if (fd == sock_f_out)
1174                                 close_multiplexing_out();
1175                         rsyserr(FERROR, errno,
1176                                 "writefd_unbuffered failed to write %ld bytes [%s]",
1177                                 (long)len, who_am_i());
1178                         /* If the other side is sending us error messages, try
1179                          * to grab any messages they sent before they died. */
1180                         while (fd == sock_f_out && io_multiplexing_in) {
1181                                 set_io_timeout(30);
1182                                 ignore_timeout = 0;
1183                                 readfd_unbuffered(sock_f_in, io_filesfrom_buf,
1184                                                   sizeof io_filesfrom_buf);
1185                         }
1186                         exit_cleanup(RERR_STREAMIO);
1187                 }
1188
1189                 total += cnt;
1190                 defer_forwarding_messages = 1;
1191
1192                 if (fd == sock_f_out) {
1193                         if (io_timeout || am_generator)
1194                                 last_io_out = time(NULL);
1195                         sleep_for_bwlimit(cnt);
1196                 }
1197         }
1198
1199         defer_forwarding_messages = defer_save;
1200         no_flush--;
1201 }
1202
1203 static void msg2sndr_flush(void)
1204 {
1205         if (defer_forwarding_messages)
1206                 return;
1207
1208         while (msg2sndr.head && io_multiplexing_out) {
1209                 struct msg_list_item *m = msg2sndr.head;
1210                 if (!(msg2sndr.head = m->next))
1211                         msg2sndr.tail = NULL;
1212                 stats.total_written += m->len;
1213                 defer_forwarding_messages = 1;
1214                 writefd_unbuffered(sock_f_out, m->buf, m->len);
1215                 defer_forwarding_messages = 0;
1216                 free(m);
1217         }
1218 }
1219
1220 /**
1221  * Write an message to a multiplexed stream. If this fails then rsync
1222  * exits.
1223  **/
1224 static void mplex_write(enum msgcode code, const char *buf, size_t len)
1225 {
1226         char buffer[1024];
1227         size_t n = len;
1228
1229         SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
1230
1231         if (n > sizeof buffer - 4)
1232                 n = 0;
1233         else
1234                 memcpy(buffer + 4, buf, n);
1235
1236         writefd_unbuffered(sock_f_out, buffer, n+4);
1237
1238         len -= n;
1239         buf += n;
1240
1241         if (len) {
1242                 defer_forwarding_messages = 1;
1243                 writefd_unbuffered(sock_f_out, buf, len);
1244                 defer_forwarding_messages = 0;
1245                 msg2sndr_flush();
1246         }
1247 }
1248
1249 void io_flush(int flush_it_all)
1250 {
1251         msg2genr_flush(flush_it_all);
1252         msg2sndr_flush();
1253
1254         if (!iobuf_out_cnt || no_flush)
1255                 return;
1256
1257         if (io_multiplexing_out)
1258                 mplex_write(MSG_DATA, iobuf_out, iobuf_out_cnt);
1259         else
1260                 writefd_unbuffered(sock_f_out, iobuf_out, iobuf_out_cnt);
1261         iobuf_out_cnt = 0;
1262 }
1263
1264 static void writefd(int fd, const char *buf, size_t len)
1265 {
1266         if (fd == msg_fd_out) {
1267                 rprintf(FERROR, "Internal error: wrong write used in receiver.\n");
1268                 exit_cleanup(RERR_PROTOCOL);
1269         }
1270
1271         if (fd == sock_f_out)
1272                 stats.total_written += len;
1273
1274         if (fd == write_batch_monitor_out) {
1275                 if ((size_t)write(batch_fd, buf, len) != len)
1276                         exit_cleanup(RERR_FILEIO);
1277         }
1278
1279         if (!iobuf_out || fd != sock_f_out) {
1280                 writefd_unbuffered(fd, buf, len);
1281                 return;
1282         }
1283
1284         while (len) {
1285                 int n = MIN((int)len, IO_BUFFER_SIZE - iobuf_out_cnt);
1286                 if (n > 0) {
1287                         memcpy(iobuf_out+iobuf_out_cnt, buf, n);
1288                         buf += n;
1289                         len -= n;
1290                         iobuf_out_cnt += n;
1291                 }
1292
1293                 if (iobuf_out_cnt == IO_BUFFER_SIZE)
1294                         io_flush(NORMAL_FLUSH);
1295         }
1296 }
1297
1298 void write_shortint(int f, unsigned short x)
1299 {
1300         char b[2];
1301         b[0] = (char)x;
1302         b[1] = (char)(x >> 8);
1303         writefd(f, b, 2);
1304 }
1305
1306 void write_int(int f, int32 x)
1307 {
1308         char b[4];
1309         SIVAL(b, 0, x);
1310         writefd(f, b, 4);
1311 }
1312
1313 /*
1314  * Note: int64 may actually be a 32-bit type if ./configure couldn't find any
1315  * 64-bit types on this platform.
1316  */
1317 void write_longint(int f, int64 x)
1318 {
1319         char b[12];
1320
1321 #if SIZEOF_INT64 < 8
1322         if (x < 0 || x > 0x7FFFFFFF) {
1323                 rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
1324                 exit_cleanup(RERR_UNSUPPORTED);
1325         }
1326 #endif
1327
1328         if (protocol_version < 30) {
1329                 char * const s = b+4;
1330                 SIVAL(s, 0, x);
1331 #if SIZEOF_INT64 < 8
1332                 writefd(f, s, 4);
1333 #else
1334                 if (x <= 0x7FFFFFFF && x >= 0) {
1335                         writefd(f, s, 4);
1336                         return;
1337                 }
1338
1339                 memset(b, 0xFF, 4);
1340                 SIVAL(s, 4, x >> 32);
1341                 writefd(f, b, 12);
1342         } else if (x < 0) {
1343                 goto all_bits;
1344 #endif
1345         } else if (x < ((int32)1<<(3*8-1))) {
1346                 b[0] = (char)(x >> 16);
1347                 b[1] = (char)(x >> 8);
1348                 b[2] = (char)x;
1349                 writefd(f, b, 3);
1350         } else if (x < ((int64)1<<(4*8-2))) {
1351                 b[0] = (char)((x >> 24) | 0x80);
1352                 b[1] = (char)(x >> 16);
1353                 b[2] = (char)(x >> 8);
1354                 b[3] = (char)x;
1355                 writefd(f, b, 4);
1356 #if SIZEOF_INT64 < 8
1357         } else {
1358                 b[0] = 0xC0;
1359                 b[1] = (char)(x >> 24);
1360                 b[2] = (char)(x >> 16);
1361                 b[3] = (char)(x >> 8);
1362                 b[4] = (char)x;
1363                 writefd(f, b, 5);
1364         }
1365 #else
1366         } else if (x < ((int64)1<<(5*8-3))) {
1367                 b[0] = (char)((x >> 32) | 0xC0);
1368                 b[1] = (char)(x >> 24);
1369                 b[2] = (char)(x >> 16);
1370                 b[3] = (char)(x >> 8);
1371                 b[4] = (char)x;
1372                 writefd(f, b, 5);
1373         } else if (x < ((int64)1<<(6*8-4))) {
1374                 b[0] = (char)((x >> 40) | 0xE0);
1375                 b[1] = (char)(x >> 32);
1376                 b[2] = (char)(x >> 24);
1377                 b[3] = (char)(x >> 16);
1378                 b[4] = (char)(x >> 8);
1379                 b[5] = (char)x;
1380                 writefd(f, b, 6);
1381         } else if (x < ((int64)1<<(7*8-5))) {
1382                 b[0] = (char)((x >> 48) | 0xF0);
1383                 b[1] = (char)(x >> 40);
1384                 b[2] = (char)(x >> 32);
1385                 b[3] = (char)(x >> 24);
1386                 b[4] = (char)(x >> 16);
1387                 b[5] = (char)(x >> 8);
1388                 b[6] = (char)x;
1389                 writefd(f, b, 7);
1390         } else if (x < ((int64)1<<(8*8-6))) {
1391                 b[0] = (char)((x >> 56) | 0xF8);
1392                 b[1] = (char)(x >> 48);
1393                 b[2] = (char)(x >> 40);
1394                 b[3] = (char)(x >> 32);
1395                 b[4] = (char)(x >> 24);
1396                 b[5] = (char)(x >> 16);
1397                 b[6] = (char)(x >> 8);
1398                 b[7] = (char)x;
1399                 writefd(f, b, 8);
1400         } else {
1401           all_bits:
1402                 b[0] = (char)0xFC;
1403                 b[1] = (char)(x >> 56);
1404                 b[2] = (char)(x >> 48);
1405                 b[3] = (char)(x >> 40);
1406                 b[4] = (char)(x >> 32);
1407                 b[5] = (char)(x >> 24);
1408                 b[6] = (char)(x >> 16);
1409                 b[7] = (char)(x >> 8);
1410                 b[8] = (char)x;
1411                 writefd(f, b, 9);
1412         }
1413 #endif
1414 }
1415
1416 void write_buf(int f, const char *buf, size_t len)
1417 {
1418         writefd(f,buf,len);
1419 }
1420
1421 /** Write a string to the connection */
1422 void write_sbuf(int f, const char *buf)
1423 {
1424         writefd(f, buf, strlen(buf));
1425 }
1426
1427 void write_byte(int f, uchar c)
1428 {
1429         writefd(f, (char *)&c, 1);
1430 }
1431
1432 void write_vstring(int f, const char *str, int len)
1433 {
1434         uchar lenbuf[3], *lb = lenbuf;
1435
1436         if (len > 0x7F) {
1437                 if (len > 0x7FFF) {
1438                         rprintf(FERROR,
1439                                 "attempting to send over-long vstring (%d > %d)\n",
1440                                 len, 0x7FFF);
1441                         exit_cleanup(RERR_PROTOCOL);
1442                 }
1443                 *lb++ = len / 0x100 + 0x80;
1444         }
1445         *lb = len;
1446
1447         writefd(f, (char*)lenbuf, lb - lenbuf + 1);
1448         if (len)
1449                 writefd(f, str, len);
1450 }
1451
1452 /**
1453  * Read a line of up to @p maxlen characters into @p buf (not counting
1454  * the trailing null).  Strips the (required) trailing newline and all
1455  * carriage returns.
1456  *
1457  * @return 1 for success; 0 for I/O error or truncation.
1458  **/
1459 int read_line(int f, char *buf, size_t maxlen)
1460 {
1461         while (maxlen) {
1462                 buf[0] = 0;
1463                 read_buf(f, buf, 1);
1464                 if (buf[0] == 0)
1465                         return 0;
1466                 if (buf[0] == '\n')
1467                         break;
1468                 if (buf[0] != '\r') {
1469                         buf++;
1470                         maxlen--;
1471                 }
1472         }
1473         *buf = '\0';
1474         return maxlen > 0;
1475 }
1476
1477 void io_printf(int fd, const char *format, ...)
1478 {
1479         va_list ap;
1480         char buf[BIGPATHBUFLEN];
1481         int len;
1482
1483         va_start(ap, format);
1484         len = vsnprintf(buf, sizeof buf, format, ap);
1485         va_end(ap);
1486
1487         if (len < 0)
1488                 exit_cleanup(RERR_STREAMIO);
1489
1490         if (len > (int)sizeof buf) {
1491                 rprintf(FERROR, "io_printf() was too long for the buffer.\n");
1492                 exit_cleanup(RERR_STREAMIO);
1493         }
1494
1495         write_sbuf(fd, buf);
1496 }
1497
1498 /** Setup for multiplexing a MSG_* stream with the data stream. */
1499 void io_start_multiplex_out(void)
1500 {
1501         io_flush(NORMAL_FLUSH);
1502         io_start_buffering_out();
1503         io_multiplexing_out = 1;
1504 }
1505
1506 /** Setup for multiplexing a MSG_* stream with the data stream. */
1507 void io_start_multiplex_in(void)
1508 {
1509         io_flush(NORMAL_FLUSH);
1510         io_start_buffering_in();
1511         io_multiplexing_in = 1;
1512 }
1513
1514 /** Write an message to the multiplexed data stream. */
1515 int io_multiplex_write(enum msgcode code, const char *buf, size_t len)
1516 {
1517         if (!io_multiplexing_out)
1518                 return 0;
1519
1520         io_flush(NORMAL_FLUSH);
1521         stats.total_written += (len+4);
1522         mplex_write(code, buf, len);
1523         return 1;
1524 }
1525
1526 void close_multiplexing_in(void)
1527 {
1528         io_multiplexing_in = 0;
1529 }
1530
1531 /** Stop output multiplexing. */
1532 void close_multiplexing_out(void)
1533 {
1534         io_multiplexing_out = 0;
1535 }
1536
1537 void start_write_batch(int fd)
1538 {
1539         /* Some communication has already taken place, but we don't
1540          * enable batch writing until here so that we can write a
1541          * canonical record of the communication even though the
1542          * actual communication so far depends on whether a daemon
1543          * is involved. */
1544         write_int(batch_fd, protocol_version);
1545         write_int(batch_fd, checksum_seed);
1546
1547         if (am_sender)
1548                 write_batch_monitor_out = fd;
1549         else
1550                 write_batch_monitor_in = fd;
1551 }
1552
1553 void stop_write_batch(void)
1554 {
1555         write_batch_monitor_out = -1;
1556         write_batch_monitor_in = -1;
1557 }