567065b97c4cf0de0fdd5610646690037b9eefc9
[rsync/rsync.git] / io.c
1 /*
2  * Socket and pipe I/O utilities used in rsync.
3  *
4  * Copyright (C) 1996-2001 Andrew Tridgell
5  * Copyright (C) 1996 Paul Mackerras
6  * Copyright (C) 2001, 2002 Martin Pool <mbp@samba.org>
7  * Copyright (C) 2003, 2004, 2005, 2006 Wayne Davison
8  *
9  * This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License along
20  * with this program; if not, write to the Free Software Foundation, Inc.,
21  * 51 Franklin Street - Fifth Floor, Boston, MA 02110-1301, USA.
22  */
23
24 /* Rsync provides its own multiplexing system, which is used to send
25  * stderr and stdout over a single socket.
26  *
27  * For historical reasons this is off during the start of the
28  * connection, but it's switched on quite early using
29  * io_start_multiplex_out() and io_start_multiplex_in(). */
30
31 #include "rsync.h"
32
33 /** If no timeout is specified then use a 60 second select timeout */
34 #define SELECT_TIMEOUT 60
35
36 extern int bwlimit;
37 extern size_t bwlimit_writemax;
38 extern int io_timeout;
39 extern int allowed_lull;
40 extern int am_server;
41 extern int am_daemon;
42 extern int am_sender;
43 extern int am_generator;
44 extern int incremental;
45 extern int io_error;
46 extern int eol_nulls;
47 extern int flist_eof;
48 extern int read_batch;
49 extern int csum_length;
50 extern int checksum_seed;
51 extern int protocol_version;
52 extern int remove_source_files;
53 extern int preserve_hard_links;
54 extern char *filesfrom_host;
55 extern struct stats stats;
56 extern struct file_list *cur_flist, *first_flist;
57
58 const char phase_unknown[] = "unknown";
59 int ignore_timeout = 0;
60 int batch_fd = -1;
61 int done_cnt = 0;
62
63 /* Ignore an EOF error if non-zero. See whine_about_eof(). */
64 int kluge_around_eof = 0;
65
66 int msg_fd_in = -1;
67 int msg_fd_out = -1;
68 int sock_f_in = -1;
69 int sock_f_out = -1;
70
71 static int iobuf_f_in = -1;
72 static char *iobuf_in;
73 static size_t iobuf_in_siz;
74 static size_t iobuf_in_ndx;
75 static size_t iobuf_in_remaining;
76
77 static int iobuf_f_out = -1;
78 static char *iobuf_out;
79 static int iobuf_out_cnt;
80
81 int flist_forward_from = -1;
82
83 static int io_multiplexing_out;
84 static int io_multiplexing_in;
85 static time_t last_io_in;
86 static time_t last_io_out;
87 static int no_flush;
88
89 static int write_batch_monitor_in = -1;
90 static int write_batch_monitor_out = -1;
91
92 static int io_filesfrom_f_in = -1;
93 static int io_filesfrom_f_out = -1;
94 static char io_filesfrom_buf[2048];
95 static char *io_filesfrom_bp;
96 static char io_filesfrom_lastchar;
97 static int io_filesfrom_buflen;
98 static int defer_forwarding_messages = 0;
99 static int select_timeout = SELECT_TIMEOUT;
100 static int active_filecnt = 0;
101 static OFF_T active_bytecnt = 0;
102
103 static char int_byte_cnt[64] = {
104         3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, /* (00 - 3F)/4 */
105         3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, /* (40 - 7F)/4 */
106         4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, /* (80 - BF)/4 */
107         5, 5, 5, 5, 5, 5, 5, 5, 6, 6, 6, 6, 7, 7, 8, 9, /* (C0 - FF)/4 */
108 };
109
110 static void msg2sndr_flush(void);
111 static void readfd(int fd, char *buffer, size_t N);
112 static void writefd(int fd, const char *buf, size_t len);
113 static void writefd_unbuffered(int fd, const char *buf, size_t len);
114 static void decrement_active_files(int ndx);
115 static void decrement_flist_in_progress(int ndx, int redo);
116
117 struct flist_ndx_item {
118         struct flist_ndx_item *next;
119         int ndx;
120 };
121
122 struct flist_ndx_list {
123         struct flist_ndx_item *head, *tail;
124 };
125
126 static struct flist_ndx_list redo_list, hlink_list;
127
128 struct msg_list_item {
129         struct msg_list_item *next;
130         int len;
131         char buf[1];
132 };
133
134 struct msg_list {
135         struct msg_list_item *head, *tail;
136 };
137
138 static struct msg_list msg2genr, msg2sndr;
139
140 static void flist_ndx_push(struct flist_ndx_list *lp, int ndx)
141 {
142         struct flist_ndx_item *item;
143
144         if (!(item = new(struct flist_ndx_item)))
145                 out_of_memory("flist_ndx_push");
146         item->next = NULL;
147         item->ndx = ndx;
148         if (lp->tail)
149                 lp->tail->next = item;
150         else
151                 lp->head = item;
152         lp->tail = item;
153 }
154
155 static int flist_ndx_pop(struct flist_ndx_list *lp)
156 {
157         struct flist_ndx_item *next;
158         int ndx;
159
160         if (!lp->head)
161                 return -1;
162
163         ndx = lp->head->ndx;
164         next = lp->head->next;
165         free(lp->head);
166         lp->head = next;
167         if (!next)
168                 lp->tail = NULL;
169
170         return ndx;
171 }
172
173 static void check_timeout(void)
174 {
175         time_t t;
176
177         if (!io_timeout || ignore_timeout)
178                 return;
179
180         if (!last_io_in) {
181                 last_io_in = time(NULL);
182                 return;
183         }
184
185         t = time(NULL);
186
187         if (t - last_io_in >= io_timeout) {
188                 if (!am_server && !am_daemon) {
189                         rprintf(FERROR, "io timeout after %d seconds -- exiting\n",
190                                 (int)(t-last_io_in));
191                 }
192                 exit_cleanup(RERR_TIMEOUT);
193         }
194 }
195
196 /* Note the fds used for the main socket (which might really be a pipe
197  * for a local transfer, but we can ignore that). */
198 void io_set_sock_fds(int f_in, int f_out)
199 {
200         sock_f_in = f_in;
201         sock_f_out = f_out;
202 }
203
204 void set_io_timeout(int secs)
205 {
206         io_timeout = secs;
207
208         if (!io_timeout || io_timeout > SELECT_TIMEOUT)
209                 select_timeout = SELECT_TIMEOUT;
210         else
211                 select_timeout = io_timeout;
212
213         allowed_lull = read_batch ? 0 : (io_timeout + 1) / 2;
214 }
215
216 /* Setup the fd used to receive MSG_* messages.  Only needed during the
217  * early stages of being a local sender (up through the sending of the
218  * file list) or when we're the generator (to fetch the messages from
219  * the receiver). */
220 void set_msg_fd_in(int fd)
221 {
222         msg_fd_in = fd;
223 }
224
225 /* Setup the fd used to send our MSG_* messages.  Only needed when
226  * we're the receiver (to send our messages to the generator). */
227 void set_msg_fd_out(int fd)
228 {
229         msg_fd_out = fd;
230         set_nonblocking(msg_fd_out);
231 }
232
233 /* Add a message to the pending MSG_* list. */
234 static void msg_list_add(struct msg_list *lst, int code, const char *buf, int len)
235 {
236         struct msg_list_item *m;
237         int sz = len + 4 + sizeof m[0] - 1;
238
239         if (!(m = (struct msg_list_item *)new_array(char, sz)))
240                 out_of_memory("msg_list_add");
241         m->next = NULL;
242         m->len = len + 4;
243         SIVAL(m->buf, 0, ((code+MPLEX_BASE)<<24) | len);
244         memcpy(m->buf + 4, buf, len);
245         if (lst->tail)
246                 lst->tail->next = m;
247         else
248                 lst->head = m;
249         lst->tail = m;
250 }
251
252 /* Read a message from the MSG_* fd and handle it.  This is called either
253  * during the early stages of being a local sender (up through the sending
254  * of the file list) or when we're the generator (to fetch the messages
255  * from the receiver). */
256 static void read_msg_fd(void)
257 {
258         char buf[2048];
259         size_t n;
260         struct file_list *flist;
261         int fd = msg_fd_in;
262         int tag, len;
263
264         /* Temporarily disable msg_fd_in.  This is needed to avoid looping back
265          * to this routine from writefd_unbuffered(). */
266         msg_fd_in = -1;
267         defer_forwarding_messages++;
268
269         readfd(fd, buf, 4);
270         tag = IVAL(buf, 0);
271
272         len = tag & 0xFFFFFF;
273         tag = (tag >> 24) - MPLEX_BASE;
274
275         switch (tag) {
276         case MSG_DONE:
277                 if (len != 0 || !am_generator) {
278                   invalid_msg:
279                         rprintf(FERROR, "invalid message %d:%d [%s%s]\n",
280                                 tag, len, who_am_i(),
281                                 incremental ? "/incremental" : "");
282                         exit_cleanup(RERR_STREAMIO);
283                 }
284                 done_cnt++;
285                 break;
286         case MSG_REDO:
287                 if (len != 4 || !am_generator)
288                         goto invalid_msg;
289                 readfd(fd, buf, 4);
290                 if (remove_source_files)
291                         decrement_active_files(IVAL(buf,0));
292                 flist_ndx_push(&redo_list, IVAL(buf,0));
293                 if (incremental)
294                         decrement_flist_in_progress(IVAL(buf,0), 1);
295                 break;
296         case MSG_FLIST:
297                 if (len != 4 || !am_generator || !incremental)
298                         goto invalid_msg;
299                 readfd(fd, buf, 4);
300                 /* Read extra file list from receiver. */
301                 assert(iobuf_in != NULL);
302                 assert(iobuf_f_in == fd);
303                 flist = recv_file_list(fd);
304                 flist->parent_ndx = IVAL(buf,0);
305                 break;
306         case MSG_FLIST_EOF:
307                 if (len != 0 || !am_generator || !incremental)
308                         goto invalid_msg;
309                 flist_eof = 1;
310                 break;
311         case MSG_DELETED:
312                 if (len >= (int)sizeof buf || !am_generator)
313                         goto invalid_msg;
314                 readfd(fd, buf, len);
315                 send_msg(MSG_DELETED, buf, len);
316                 break;
317         case MSG_SUCCESS:
318                 if (len != 4 || !am_generator)
319                         goto invalid_msg;
320                 readfd(fd, buf, len);
321                 if (remove_source_files) {
322                         decrement_active_files(IVAL(buf,0));
323                         send_msg(MSG_SUCCESS, buf, len);
324                 }
325                 if (preserve_hard_links)
326                         flist_ndx_push(&hlink_list, IVAL(buf,0));
327                 if (incremental)
328                         decrement_flist_in_progress(IVAL(buf,0), 0);
329                 break;
330         case MSG_NO_SEND:
331                 if (len != 4 || !am_generator)
332                         goto invalid_msg;
333                 readfd(fd, buf, len);
334                 if (incremental)
335                         decrement_flist_in_progress(IVAL(buf,0), 0);
336                 break;
337         case MSG_SOCKERR:
338         case MSG_CLIENT:
339                 if (!am_generator)
340                         goto invalid_msg;
341                 if (tag == MSG_SOCKERR)
342                         io_end_multiplex_out();
343                 /* FALL THROUGH */
344         case MSG_INFO:
345         case MSG_ERROR:
346         case MSG_LOG:
347                 while (len) {
348                         n = len;
349                         if (n >= sizeof buf)
350                                 n = sizeof buf - 1;
351                         readfd(fd, buf, n);
352                         rwrite((enum logcode)tag, buf, n);
353                         len -= n;
354                 }
355                 break;
356         default:
357                 rprintf(FERROR, "unknown message %d:%d [%s]\n",
358                         tag, len, who_am_i());
359                 exit_cleanup(RERR_STREAMIO);
360         }
361
362         defer_forwarding_messages--;
363         msg_fd_in = fd;
364         msg2sndr_flush();
365 }
366
367 /* This is used by the generator to limit how many file transfers can
368  * be active at once when --remove-source-files is specified.  Without
369  * this, sender-side deletions were mostly happening at the end. */
370 void increment_active_files(int ndx, int itemizing, enum logcode code)
371 {
372         /* TODO: tune these limits? */
373         while (active_filecnt >= (active_bytecnt >= 128*1024 ? 10 : 50)) {
374 #ifdef SUPPORT_HARD_LINKS
375                 if (hlink_list.head)
376                         check_for_finished_hlinks(itemizing, code);
377 #endif
378                 read_msg_fd();
379         }
380
381         active_filecnt++;
382         active_bytecnt += F_LENGTH(cur_flist->files[ndx]);
383 }
384
385 static void decrement_active_files(int ndx)
386 {
387         struct file_list *flist = flist_for_ndx(ndx);
388         assert(flist != NULL);
389         active_filecnt--;
390         active_bytecnt -= F_LENGTH(flist->files[ndx - flist->ndx_start]);
391 }
392
393 static void decrement_flist_in_progress(int ndx, int redo)
394 {
395         struct file_list *flist = cur_flist ? cur_flist : first_flist;
396
397         while (ndx < flist->ndx_start) {
398                 if (flist == first_flist) {
399                   invalid_ndx:
400                         rprintf(FERROR,
401                                 "Invalid file index: %d (%d - %d) [%s]\n",
402                                 ndx, first_flist->ndx_start,
403                                 first_flist->prev->ndx_start + first_flist->prev->count - 1,
404                                 who_am_i());
405                         exit_cleanup(RERR_PROTOCOL);
406                 }
407                 flist = flist->prev;
408         }
409         while (ndx >= flist->ndx_start + flist->count) {
410                 if (!(flist = flist->next))
411                         goto invalid_ndx;
412         }
413
414         flist->in_progress--;
415         if (redo)
416                 flist->to_redo++;
417 }
418
419 /* Try to push messages off the list onto the wire.  If we leave with more
420  * to do, return 0.  On error, return -1.  If everything flushed, return 1.
421  * This is only active in the receiver. */
422 static int msg2genr_flush(void)
423 {
424         if (msg_fd_out < 0 || no_flush || flist_forward_from >= 0)
425                 return -1;
426
427         no_flush++;
428         while (msg2genr.head) {
429                 struct msg_list_item *m = msg2genr.head;
430                 writefd(msg_fd_out, m->buf, m->len);
431                 msg2genr.head = m->next;
432                 if (!msg2genr.head)
433                         msg2genr.tail = NULL;
434                 free(m);
435         }
436         if (iobuf_out_cnt) {
437                 writefd_unbuffered(iobuf_f_out, iobuf_out, iobuf_out_cnt);
438                 iobuf_out_cnt = 0;
439         }
440         no_flush--;
441         return 1;
442 }
443
444 int send_msg(enum msgcode code, const char *buf, int len)
445 {
446         if (msg_fd_out < 0) {
447                 if (!defer_forwarding_messages)
448                         return io_multiplex_write(code, buf, len);
449                 if (!io_multiplexing_out)
450                         return 0;
451                 msg_list_add(&msg2sndr, code, buf, len);
452                 return 1;
453         }
454         msg_list_add(&msg2genr, code, buf, len);
455         msg2genr_flush();
456         return 1;
457 }
458
459 void send_msg_int(enum msgcode code, int num)
460 {
461         char numbuf[4];
462         SIVAL(numbuf, 0, num);
463         send_msg(code, numbuf, 4);
464 }
465
466 void wait_for_receiver(void)
467 {
468         read_msg_fd();
469 }
470
471 int get_redo_num(void)
472 {
473         return flist_ndx_pop(&redo_list);
474 }
475
476 int get_hlink_num(void)
477 {
478         return flist_ndx_pop(&hlink_list);
479 }
480
481 /**
482  * When we're the receiver and we have a local --files-from list of names
483  * that needs to be sent over the socket to the sender, we have to do two
484  * things at the same time: send the sender a list of what files we're
485  * processing and read the incoming file+info list from the sender.  We do
486  * this by augmenting the read_timeout() function to copy this data.  It
487  * uses the io_filesfrom_buf to read a block of data from f_in (when it is
488  * ready, since it might be a pipe) and then blast it out f_out (when it
489  * is ready to receive more data).
490  */
491 void io_set_filesfrom_fds(int f_in, int f_out)
492 {
493         io_filesfrom_f_in = f_in;
494         io_filesfrom_f_out = f_out;
495         io_filesfrom_bp = io_filesfrom_buf;
496         io_filesfrom_lastchar = '\0';
497         io_filesfrom_buflen = 0;
498 }
499
500 /* It's almost always an error to get an EOF when we're trying to read from the
501  * network, because the protocol is (for the most part) self-terminating.
502  *
503  * There is one case for the receiver when it is at the end of the transfer
504  * (hanging around reading any keep-alive packets that might come its way): if
505  * the sender dies before the generator's kill-signal comes through, we can end
506  * up here needing to loop until the kill-signal arrives.  In this situation,
507  * kluge_around_eof will be < 0.
508  *
509  * There is another case for older protocol versions (< 24) where the module
510  * listing was not terminated, so we must ignore an EOF error in that case and
511  * exit.  In this situation, kluge_around_eof will be > 0. */
512 static void whine_about_eof(int fd)
513 {
514         if (kluge_around_eof && fd == sock_f_in) {
515                 int i;
516                 if (kluge_around_eof > 0)
517                         exit_cleanup(0);
518                 /* If we're still here after 10 seconds, exit with an error. */
519                 for (i = 10*1000/20; i--; )
520                         msleep(20);
521         }
522
523         rprintf(FERROR, RSYNC_NAME ": connection unexpectedly closed "
524                 "(%.0f bytes received so far) [%s]\n",
525                 (double)stats.total_read, who_am_i());
526
527         exit_cleanup(RERR_STREAMIO);
528 }
529
530 /**
531  * Read from a socket with I/O timeout. return the number of bytes
532  * read. If no bytes can be read then exit, never return a number <= 0.
533  *
534  * TODO: If the remote shell connection fails, then current versions
535  * actually report an "unexpected EOF" error here.  Since it's a
536  * fairly common mistake to try to use rsh when ssh is required, we
537  * should trap that: if we fail to read any data at all, we should
538  * give a better explanation.  We can tell whether the connection has
539  * started by looking e.g. at whether the remote version is known yet.
540  */
541 static int read_timeout(int fd, char *buf, size_t len)
542 {
543         int n, cnt = 0;
544
545         io_flush(FULL_FLUSH);
546
547         while (cnt == 0) {
548                 /* until we manage to read *something* */
549                 fd_set r_fds, w_fds;
550                 struct timeval tv;
551                 int maxfd = fd;
552                 int count;
553
554                 FD_ZERO(&r_fds);
555                 FD_ZERO(&w_fds);
556                 FD_SET(fd, &r_fds);
557                 if (msg2genr.head) {
558                         FD_SET(msg_fd_out, &w_fds);
559                         if (msg_fd_out > maxfd)
560                                 maxfd = msg_fd_out;
561                 }
562                 if (io_filesfrom_f_out >= 0) {
563                         int new_fd;
564                         if (io_filesfrom_buflen == 0) {
565                                 if (io_filesfrom_f_in >= 0) {
566                                         FD_SET(io_filesfrom_f_in, &r_fds);
567                                         new_fd = io_filesfrom_f_in;
568                                 } else {
569                                         io_filesfrom_f_out = -1;
570                                         new_fd = -1;
571                                 }
572                         } else {
573                                 FD_SET(io_filesfrom_f_out, &w_fds);
574                                 new_fd = io_filesfrom_f_out;
575                         }
576                         if (new_fd > maxfd)
577                                 maxfd = new_fd;
578                 }
579
580                 tv.tv_sec = select_timeout;
581                 tv.tv_usec = 0;
582
583                 errno = 0;
584
585                 count = select(maxfd + 1, &r_fds, &w_fds, NULL, &tv);
586
587                 if (count <= 0) {
588                         if (errno == EBADF)
589                                 exit_cleanup(RERR_SOCKETIO);
590                         check_timeout();
591                         continue;
592                 }
593
594                 if (msg2genr.head && FD_ISSET(msg_fd_out, &w_fds))
595                         msg2genr_flush();
596
597                 if (io_filesfrom_f_out >= 0) {
598                         if (io_filesfrom_buflen) {
599                                 if (FD_ISSET(io_filesfrom_f_out, &w_fds)) {
600                                         int l = write(io_filesfrom_f_out,
601                                                       io_filesfrom_bp,
602                                                       io_filesfrom_buflen);
603                                         if (l > 0) {
604                                                 if (!(io_filesfrom_buflen -= l))
605                                                         io_filesfrom_bp = io_filesfrom_buf;
606                                                 else
607                                                         io_filesfrom_bp += l;
608                                         } else {
609                                                 /* XXX should we complain? */
610                                                 io_filesfrom_f_out = -1;
611                                         }
612                                 }
613                         } else if (io_filesfrom_f_in >= 0) {
614                                 if (FD_ISSET(io_filesfrom_f_in, &r_fds)) {
615                                         int l = read(io_filesfrom_f_in,
616                                                      io_filesfrom_buf,
617                                                      sizeof io_filesfrom_buf);
618                                         if (l <= 0) {
619                                                 /* Send end-of-file marker */
620                                                 io_filesfrom_buf[0] = '\0';
621                                                 io_filesfrom_buf[1] = '\0';
622                                                 io_filesfrom_buflen = io_filesfrom_lastchar? 2 : 1;
623                                                 io_filesfrom_f_in = -1;
624                                         } else {
625                                                 if (!eol_nulls) {
626                                                         char *s = io_filesfrom_buf + l;
627                                                         /* Transform CR and/or LF into '\0' */
628                                                         while (s-- > io_filesfrom_buf) {
629                                                                 if (*s == '\n' || *s == '\r')
630                                                                         *s = '\0';
631                                                         }
632                                                 }
633                                                 if (!io_filesfrom_lastchar) {
634                                                         /* Last buf ended with a '\0', so don't
635                                                          * let this buf start with one. */
636                                                         while (l && !*io_filesfrom_bp)
637                                                                 io_filesfrom_bp++, l--;
638                                                 }
639                                                 if (!l)
640                                                         io_filesfrom_bp = io_filesfrom_buf;
641                                                 else {
642                                                         char *f = io_filesfrom_bp;
643                                                         char *t = f;
644                                                         char *eob = f + l;
645                                                         /* Eliminate any multi-'\0' runs. */
646                                                         while (f != eob) {
647                                                                 if (!(*t++ = *f++)) {
648                                                                         while (f != eob && !*f)
649                                                                                 f++, l--;
650                                                                 }
651                                                         }
652                                                         io_filesfrom_lastchar = f[-1];
653                                                 }
654                                                 io_filesfrom_buflen = l;
655                                         }
656                                 }
657                         }
658                 }
659
660                 if (!FD_ISSET(fd, &r_fds))
661                         continue;
662
663                 n = read(fd, buf, len);
664
665                 if (n <= 0) {
666                         if (n == 0)
667                                 whine_about_eof(fd); /* Doesn't return. */
668                         if (errno == EINTR || errno == EWOULDBLOCK
669                             || errno == EAGAIN)
670                                 continue;
671
672                         /* Don't write errors on a dead socket. */
673                         if (fd == sock_f_in) {
674                                 io_end_multiplex_out();
675                                 rsyserr(FSOCKERR, errno, "read error");
676                         } else
677                                 rsyserr(FERROR, errno, "read error");
678                         exit_cleanup(RERR_STREAMIO);
679                 }
680
681                 buf += n;
682                 len -= n;
683                 cnt += n;
684
685                 if (fd == sock_f_in && io_timeout)
686                         last_io_in = time(NULL);
687         }
688
689         return cnt;
690 }
691
692 /**
693  * Read a line into the "fname" buffer (which must be at least MAXPATHLEN
694  * characters long).
695  */
696 int read_filesfrom_line(int fd, char *fname)
697 {
698         char ch, *s, *eob = fname + MAXPATHLEN - 1;
699         int cnt;
700         int reading_remotely = filesfrom_host != NULL;
701         int nulls = eol_nulls || reading_remotely;
702
703   start:
704         s = fname;
705         while (1) {
706                 cnt = read(fd, &ch, 1);
707                 if (cnt < 0 && (errno == EWOULDBLOCK
708                   || errno == EINTR || errno == EAGAIN)) {
709                         struct timeval tv;
710                         fd_set r_fds, e_fds;
711                         FD_ZERO(&r_fds);
712                         FD_SET(fd, &r_fds);
713                         FD_ZERO(&e_fds);
714                         FD_SET(fd, &e_fds);
715                         tv.tv_sec = select_timeout;
716                         tv.tv_usec = 0;
717                         if (!select(fd+1, &r_fds, NULL, &e_fds, &tv))
718                                 check_timeout();
719                         if (FD_ISSET(fd, &e_fds)) {
720                                 rsyserr(FINFO, errno,
721                                         "select exception on fd %d", fd);
722                         }
723                         continue;
724                 }
725                 if (cnt != 1)
726                         break;
727                 if (nulls? !ch : (ch == '\r' || ch == '\n')) {
728                         /* Skip empty lines if reading locally. */
729                         if (!reading_remotely && s == fname)
730                                 continue;
731                         break;
732                 }
733                 if (s < eob)
734                         *s++ = ch;
735         }
736         *s = '\0';
737
738         /* Dump comments. */
739         if (*fname == '#' || *fname == ';')
740                 goto start;
741
742         return s - fname;
743 }
744
745 int io_start_buffering_out(int f_out)
746 {
747         if (iobuf_out) {
748                 assert(f_out == iobuf_f_out);
749                 return 0;
750         }
751         if (!(iobuf_out = new_array(char, IO_BUFFER_SIZE)))
752                 out_of_memory("io_start_buffering_out");
753         iobuf_out_cnt = 0;
754         iobuf_f_out = f_out;
755         return 1;
756 }
757
758 int io_start_buffering_in(int f_in)
759 {
760         if (iobuf_in) {
761                 assert(f_in == iobuf_f_in);
762                 return 0;
763         }
764         iobuf_in_siz = 2 * IO_BUFFER_SIZE;
765         if (!(iobuf_in = new_array(char, iobuf_in_siz)))
766                 out_of_memory("io_start_buffering_in");
767         iobuf_f_in = f_in;
768         return 1;
769 }
770
771 void io_end_buffering_in(void)
772 {
773         if (!iobuf_in)
774                 return;
775         free(iobuf_in);
776         iobuf_in = NULL;
777         iobuf_in_ndx = 0;
778         iobuf_in_remaining = 0;
779         iobuf_f_in = -1;
780 }
781
782 void io_end_buffering_out(void)
783 {
784         if (!iobuf_out)
785                 return;
786         io_flush(FULL_FLUSH);
787         free(iobuf_out);
788         iobuf_out = NULL;
789         iobuf_f_out = -1;
790 }
791
792 void maybe_flush_socket(void)
793 {
794         if (iobuf_out && iobuf_out_cnt && time(NULL) - last_io_out >= 5)
795                 io_flush(NORMAL_FLUSH);
796 }
797
798 void maybe_send_keepalive(void)
799 {
800         if (time(NULL) - last_io_out >= allowed_lull) {
801                 if (!iobuf_out || !iobuf_out_cnt) {
802                         if (protocol_version < 29)
803                                 return; /* there's nothing we can do */
804                         if (protocol_version >= 30)
805                                 send_msg(MSG_NOOP, "", 0);
806                         else {
807                                 write_int(sock_f_out, cur_flist->count);
808                                 write_shortint(sock_f_out, ITEM_IS_NEW);
809                         }
810                 }
811                 if (iobuf_out)
812                         io_flush(NORMAL_FLUSH);
813         }
814 }
815
816 void start_flist_forward(int f_in)
817 {
818         assert(iobuf_out != NULL);
819         assert(iobuf_f_out == msg_fd_out);
820         flist_forward_from = f_in;
821 }
822
823 void stop_flist_forward()
824 {
825         flist_forward_from = -1;
826         io_flush(FULL_FLUSH);
827 }
828
829 /**
830  * Continue trying to read len bytes - don't return until len has been
831  * read.
832  **/
833 static void read_loop(int fd, char *buf, size_t len)
834 {
835         while (len) {
836                 int n = read_timeout(fd, buf, len);
837
838                 buf += n;
839                 len -= n;
840         }
841 }
842
843 /**
844  * Read from the file descriptor handling multiplexing - return number
845  * of bytes read.
846  *
847  * Never returns <= 0.
848  */
849 static int readfd_unbuffered(int fd, char *buf, size_t len)
850 {
851         size_t msg_bytes;
852         int tag, cnt = 0;
853         char line[BIGPATHBUFLEN];
854
855         if (!iobuf_in || fd != iobuf_f_in)
856                 return read_timeout(fd, buf, len);
857
858         if (!io_multiplexing_in && iobuf_in_remaining == 0) {
859                 iobuf_in_remaining = read_timeout(fd, iobuf_in, iobuf_in_siz);
860                 iobuf_in_ndx = 0;
861         }
862
863         while (cnt == 0) {
864                 if (iobuf_in_remaining) {
865                         len = MIN(len, iobuf_in_remaining);
866                         memcpy(buf, iobuf_in + iobuf_in_ndx, len);
867                         iobuf_in_ndx += len;
868                         iobuf_in_remaining -= len;
869                         cnt = len;
870                         break;
871                 }
872
873                 read_loop(fd, line, 4);
874                 tag = IVAL(line, 0);
875
876                 msg_bytes = tag & 0xFFFFFF;
877                 tag = (tag >> 24) - MPLEX_BASE;
878
879                 switch (tag) {
880                 case MSG_DATA:
881                         if (msg_bytes > iobuf_in_siz) {
882                                 if (!(iobuf_in = realloc_array(iobuf_in, char,
883                                                                msg_bytes)))
884                                         out_of_memory("readfd_unbuffered");
885                                 iobuf_in_siz = msg_bytes;
886                         }
887                         read_loop(fd, iobuf_in, msg_bytes);
888                         iobuf_in_remaining = msg_bytes;
889                         iobuf_in_ndx = 0;
890                         break;
891                 case MSG_NOOP:
892                         if (am_sender)
893                                 maybe_send_keepalive();
894                         break;
895                 case MSG_IO_ERROR:
896                         if (msg_bytes != 4)
897                                 goto invalid_msg;
898                         read_loop(fd, line, msg_bytes);
899                         io_error |= IVAL(line, 0);
900                         break;
901                 case MSG_DELETED:
902                         if (msg_bytes >= sizeof line)
903                                 goto overflow;
904                         read_loop(fd, line, msg_bytes);
905                         /* A directory name was sent with the trailing null */
906                         if (msg_bytes > 0 && !line[msg_bytes-1])
907                                 log_delete(line, S_IFDIR);
908                         else {
909                                 line[msg_bytes] = '\0';
910                                 log_delete(line, S_IFREG);
911                         }
912                         break;
913                 case MSG_SUCCESS:
914                         if (msg_bytes != 4) {
915                           invalid_msg:
916                                 rprintf(FERROR, "invalid multi-message %d:%ld [%s]\n",
917                                         tag, (long)msg_bytes, who_am_i());
918                                 exit_cleanup(RERR_STREAMIO);
919                         }
920                         read_loop(fd, line, msg_bytes);
921                         successful_send(IVAL(line, 0));
922                         break;
923                 case MSG_NO_SEND:
924                         if (msg_bytes != 4)
925                                 goto invalid_msg;
926                         read_loop(fd, line, msg_bytes);
927                         send_msg_int(MSG_NO_SEND, IVAL(line, 0));
928                         break;
929                 case MSG_INFO:
930                 case MSG_ERROR:
931                         if (msg_bytes >= sizeof line) {
932                             overflow:
933                                 rprintf(FERROR,
934                                         "multiplexing overflow %d:%ld [%s]\n",
935                                         tag, (long)msg_bytes, who_am_i());
936                                 exit_cleanup(RERR_STREAMIO);
937                         }
938                         read_loop(fd, line, msg_bytes);
939                         rwrite((enum logcode)tag, line, msg_bytes);
940                         break;
941                 default:
942                         rprintf(FERROR, "unexpected tag %d [%s]\n",
943                                 tag, who_am_i());
944                         exit_cleanup(RERR_STREAMIO);
945                 }
946         }
947
948         if (iobuf_in_remaining == 0)
949                 io_flush(NORMAL_FLUSH);
950
951         return cnt;
952 }
953
954 /* Do a buffered read from fd.  Don't return until all N bytes have
955  * been read.  If all N can't be read then exit with an error. */
956 static void readfd(int fd, char *buffer, size_t N)
957 {
958         int  cnt;
959         size_t total = 0;
960
961         while (total < N) {
962                 cnt = readfd_unbuffered(fd, buffer + total, N-total);
963                 total += cnt;
964         }
965
966         if (fd == write_batch_monitor_in) {
967                 if ((size_t)write(batch_fd, buffer, total) != total)
968                         exit_cleanup(RERR_FILEIO);
969         }
970
971         if (fd == flist_forward_from)
972                 writefd(iobuf_f_out, buffer, total);
973
974         if (fd == sock_f_in)
975                 stats.total_read += total;
976 }
977
978 unsigned short read_shortint(int f)
979 {
980         char b[2];
981         readfd(f, b, 2);
982         return (UVAL(b, 1) << 8) + UVAL(b, 0);
983 }
984
985 int32 read_int(int f)
986 {
987         char b[4];
988         int32 num;
989
990         readfd(f, b, 4);
991         num = IVAL(b, 0);
992 #if SIZEOF_INT32 > 4
993         if (num & (int32)0x80000000)
994                 num |= ~(int32)0xffffffff;
995 #endif
996         return num;
997 }
998
999 int64 read_longint(int f)
1000 {
1001         int64 num;
1002         char b[9];
1003
1004         if (protocol_version < 30) {
1005                 num = read_int(f);
1006
1007                 if ((int32)num != (int32)0xffffffff)
1008                         return num;
1009
1010 #if SIZEOF_INT64 < 8
1011                 rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
1012                 exit_cleanup(RERR_UNSUPPORTED);
1013 #else
1014                 readfd(f, b, 8);
1015                 num = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
1016 #endif
1017         } else {
1018                 int cnt;
1019                 readfd(f, b, 3);
1020                 cnt = int_byte_cnt[CVAL(b, 0) / 4];
1021 #if SIZEOF_INT64 < 8
1022                 if (cnt > 5 || (cnt == 5 && (CVAL(b,0)&0x3F || CVAL(b,1)&0x80))) {
1023                         rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
1024                         exit_cleanup(RERR_UNSUPPORTED);
1025                 }
1026 #endif
1027                 if (cnt > 3)
1028                         readfd(f, b + 3, cnt - 3);
1029                 switch (cnt) {
1030                 case 3:
1031                         num = NVAL3(b, 0);
1032                         break;
1033                 case 4:
1034                         num = NVAL4(b, 0x80);
1035                         break;
1036                 case 5:
1037                         num = NVAL5(b, 0xC0);
1038                         break;
1039 #if SIZEOF_INT64 >= 8
1040                 case 6:
1041                         num = NVAL6(b, 0xE0);
1042                         break;
1043                 case 7:
1044                         num = NVAL7(b, 0xF0);
1045                         break;
1046                 case 8:
1047                         num = NVAL8(b, 0xF8);
1048                         break;
1049                 case 9:
1050                         num = NVAL8(b+1, 0);
1051                         break;
1052 #endif
1053                 default:
1054                         exit_cleanup(RERR_PROTOCOL); /* impossible... */
1055                 }
1056         }
1057
1058         return num;
1059 }
1060
1061 void read_buf(int f, char *buf, size_t len)
1062 {
1063         readfd(f,buf,len);
1064 }
1065
1066 void read_sbuf(int f, char *buf, size_t len)
1067 {
1068         readfd(f, buf, len);
1069         buf[len] = '\0';
1070 }
1071
1072 uchar read_byte(int f)
1073 {
1074         uchar c;
1075         readfd(f, (char *)&c, 1);
1076         return c;
1077 }
1078
1079 int read_vstring(int f, char *buf, int bufsize)
1080 {
1081         int len = read_byte(f);
1082
1083         if (len & 0x80)
1084                 len = (len & ~0x80) * 0x100 + read_byte(f);
1085
1086         if (len >= bufsize) {
1087                 rprintf(FERROR, "over-long vstring received (%d > %d)\n",
1088                         len, bufsize - 1);
1089                 return -1;
1090         }
1091
1092         if (len)
1093                 readfd(f, buf, len);
1094         buf[len] = '\0';
1095         return len;
1096 }
1097
1098 /* Populate a sum_struct with values from the socket.  This is
1099  * called by both the sender and the receiver. */
1100 void read_sum_head(int f, struct sum_struct *sum)
1101 {
1102         sum->count = read_int(f);
1103         if (sum->count < 0) {
1104                 rprintf(FERROR, "Invalid checksum count %ld [%s]\n",
1105                         (long)sum->count, who_am_i());
1106                 exit_cleanup(RERR_PROTOCOL);
1107         }
1108         sum->blength = read_int(f);
1109         if (sum->blength < 0 || sum->blength > MAX_BLOCK_SIZE) {
1110                 rprintf(FERROR, "Invalid block length %ld [%s]\n",
1111                         (long)sum->blength, who_am_i());
1112                 exit_cleanup(RERR_PROTOCOL);
1113         }
1114         sum->s2length = protocol_version < 27 ? csum_length : (int)read_int(f);
1115         if (sum->s2length < 0 || sum->s2length > MD4_SUM_LENGTH) {
1116                 rprintf(FERROR, "Invalid checksum length %d [%s]\n",
1117                         sum->s2length, who_am_i());
1118                 exit_cleanup(RERR_PROTOCOL);
1119         }
1120         sum->remainder = read_int(f);
1121         if (sum->remainder < 0 || sum->remainder > sum->blength) {
1122                 rprintf(FERROR, "Invalid remainder length %ld [%s]\n",
1123                         (long)sum->remainder, who_am_i());
1124                 exit_cleanup(RERR_PROTOCOL);
1125         }
1126 }
1127
1128 /* Send the values from a sum_struct over the socket.  Set sum to
1129  * NULL if there are no checksums to send.  This is called by both
1130  * the generator and the sender. */
1131 void write_sum_head(int f, struct sum_struct *sum)
1132 {
1133         static struct sum_struct null_sum;
1134
1135         if (sum == NULL)
1136                 sum = &null_sum;
1137
1138         write_int(f, sum->count);
1139         write_int(f, sum->blength);
1140         if (protocol_version >= 27)
1141                 write_int(f, sum->s2length);
1142         write_int(f, sum->remainder);
1143 }
1144
1145 /**
1146  * Sleep after writing to limit I/O bandwidth usage.
1147  *
1148  * @todo Rather than sleeping after each write, it might be better to
1149  * use some kind of averaging.  The current algorithm seems to always
1150  * use a bit less bandwidth than specified, because it doesn't make up
1151  * for slow periods.  But arguably this is a feature.  In addition, we
1152  * ought to take the time used to write the data into account.
1153  *
1154  * During some phases of big transfers (file FOO is uptodate) this is
1155  * called with a small bytes_written every time.  As the kernel has to
1156  * round small waits up to guarantee that we actually wait at least the
1157  * requested number of microseconds, this can become grossly inaccurate.
1158  * We therefore keep track of the bytes we've written over time and only
1159  * sleep when the accumulated delay is at least 1 tenth of a second.
1160  **/
1161 static void sleep_for_bwlimit(int bytes_written)
1162 {
1163         static struct timeval prior_tv;
1164         static long total_written = 0;
1165         struct timeval tv, start_tv;
1166         long elapsed_usec, sleep_usec;
1167
1168 #define ONE_SEC 1000000L /* # of microseconds in a second */
1169
1170         if (!bwlimit_writemax)
1171                 return;
1172
1173         total_written += bytes_written;
1174
1175         gettimeofday(&start_tv, NULL);
1176         if (prior_tv.tv_sec) {
1177                 elapsed_usec = (start_tv.tv_sec - prior_tv.tv_sec) * ONE_SEC
1178                              + (start_tv.tv_usec - prior_tv.tv_usec);
1179                 total_written -= elapsed_usec * bwlimit / (ONE_SEC/1024);
1180                 if (total_written < 0)
1181                         total_written = 0;
1182         }
1183
1184         sleep_usec = total_written * (ONE_SEC/1024) / bwlimit;
1185         if (sleep_usec < ONE_SEC / 10) {
1186                 prior_tv = start_tv;
1187                 return;
1188         }
1189
1190         tv.tv_sec  = sleep_usec / ONE_SEC;
1191         tv.tv_usec = sleep_usec % ONE_SEC;
1192         select(0, NULL, NULL, NULL, &tv);
1193
1194         gettimeofday(&prior_tv, NULL);
1195         elapsed_usec = (prior_tv.tv_sec - start_tv.tv_sec) * ONE_SEC
1196                      + (prior_tv.tv_usec - start_tv.tv_usec);
1197         total_written = (sleep_usec - elapsed_usec) * bwlimit / (ONE_SEC/1024);
1198 }
1199
1200 /* Write len bytes to the file descriptor fd, looping as necessary to get
1201  * the job done and also (in certain circumstances) reading any data on
1202  * msg_fd_in to avoid deadlock.
1203  *
1204  * This function underlies the multiplexing system.  The body of the
1205  * application never calls this function directly. */
1206 static void writefd_unbuffered(int fd, const char *buf, size_t len)
1207 {
1208         size_t n, total = 0;
1209         fd_set w_fds, r_fds, e_fds;
1210         int maxfd, count, cnt, using_r_fds;
1211         int defer_save = defer_forwarding_messages;
1212         struct timeval tv;
1213
1214         no_flush++;
1215
1216         while (total < len) {
1217                 FD_ZERO(&w_fds);
1218                 FD_SET(fd, &w_fds);
1219                 FD_ZERO(&e_fds);
1220                 FD_SET(fd, &e_fds);
1221                 maxfd = fd;
1222
1223                 if (msg_fd_in >= 0) {
1224                         FD_ZERO(&r_fds);
1225                         FD_SET(msg_fd_in, &r_fds);
1226                         if (msg_fd_in > maxfd)
1227                                 maxfd = msg_fd_in;
1228                         using_r_fds = 1;
1229                 } else
1230                         using_r_fds = 0;
1231
1232                 tv.tv_sec = select_timeout;
1233                 tv.tv_usec = 0;
1234
1235                 errno = 0;
1236                 count = select(maxfd + 1, using_r_fds ? &r_fds : NULL,
1237                                &w_fds, &e_fds, &tv);
1238
1239                 if (count <= 0) {
1240                         if (count < 0 && errno == EBADF)
1241                                 exit_cleanup(RERR_SOCKETIO);
1242                         check_timeout();
1243                         continue;
1244                 }
1245
1246                 if (FD_ISSET(fd, &e_fds)) {
1247                         rsyserr(FINFO, errno,
1248                                 "select exception on fd %d", fd);
1249                 }
1250
1251                 if (using_r_fds && FD_ISSET(msg_fd_in, &r_fds))
1252                         read_msg_fd();
1253
1254                 if (!FD_ISSET(fd, &w_fds))
1255                         continue;
1256
1257                 n = len - total;
1258                 if (bwlimit_writemax && n > bwlimit_writemax)
1259                         n = bwlimit_writemax;
1260                 cnt = write(fd, buf + total, n);
1261
1262                 if (cnt <= 0) {
1263                         if (cnt < 0) {
1264                                 if (errno == EINTR)
1265                                         continue;
1266                                 if (errno == EWOULDBLOCK || errno == EAGAIN) {
1267                                         msleep(1);
1268                                         continue;
1269                                 }
1270                         }
1271
1272                         /* Don't try to write errors back across the stream. */
1273                         if (fd == sock_f_out)
1274                                 io_end_multiplex_out();
1275                         rsyserr(FERROR, errno,
1276                                 "writefd_unbuffered failed to write %ld bytes [%s]",
1277                                 (long)len, who_am_i());
1278                         /* If the other side is sending us error messages, try
1279                          * to grab any messages they sent before they died. */
1280                         while (fd == sock_f_out && io_multiplexing_in) {
1281                                 set_io_timeout(30);
1282                                 ignore_timeout = 0;
1283                                 readfd_unbuffered(sock_f_in, io_filesfrom_buf,
1284                                                   sizeof io_filesfrom_buf);
1285                         }
1286                         exit_cleanup(RERR_STREAMIO);
1287                 }
1288
1289                 total += cnt;
1290                 defer_forwarding_messages = 1;
1291
1292                 if (fd == sock_f_out) {
1293                         if (io_timeout || am_generator)
1294                                 last_io_out = time(NULL);
1295                         sleep_for_bwlimit(cnt);
1296                 }
1297         }
1298
1299         defer_forwarding_messages = defer_save;
1300         no_flush--;
1301 }
1302
1303 static void msg2sndr_flush(void)
1304 {
1305         if (defer_forwarding_messages)
1306                 return;
1307
1308         while (msg2sndr.head && io_multiplexing_out) {
1309                 struct msg_list_item *m = msg2sndr.head;
1310                 if (!(msg2sndr.head = m->next))
1311                         msg2sndr.tail = NULL;
1312                 stats.total_written += m->len;
1313                 defer_forwarding_messages = 1;
1314                 writefd_unbuffered(sock_f_out, m->buf, m->len);
1315                 defer_forwarding_messages = 0;
1316                 free(m);
1317         }
1318 }
1319
1320 /**
1321  * Write an message to a multiplexed stream. If this fails then rsync
1322  * exits.
1323  **/
1324 static void mplex_write(enum msgcode code, const char *buf, size_t len)
1325 {
1326         char buffer[1024];
1327         size_t n = len;
1328
1329         SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
1330
1331         if (n > sizeof buffer - 4)
1332                 n = 0;
1333         else
1334                 memcpy(buffer + 4, buf, n);
1335
1336         writefd_unbuffered(sock_f_out, buffer, n+4);
1337
1338         len -= n;
1339         buf += n;
1340
1341         if (len) {
1342                 defer_forwarding_messages = 1;
1343                 writefd_unbuffered(sock_f_out, buf, len);
1344                 defer_forwarding_messages = 0;
1345                 msg2sndr_flush();
1346         }
1347 }
1348
1349 void io_flush(int flush_it_all)
1350 {
1351         if (flush_it_all) {
1352                 msg2genr_flush();
1353                 msg2sndr_flush();
1354         }
1355
1356         if (!iobuf_out_cnt || no_flush)
1357                 return;
1358
1359         if (io_multiplexing_out)
1360                 mplex_write(MSG_DATA, iobuf_out, iobuf_out_cnt);
1361         else
1362                 writefd_unbuffered(iobuf_f_out, iobuf_out, iobuf_out_cnt);
1363         iobuf_out_cnt = 0;
1364 }
1365
1366 static void writefd(int fd, const char *buf, size_t len)
1367 {
1368         if (fd == sock_f_out)
1369                 stats.total_written += len;
1370
1371         if (fd == write_batch_monitor_out) {
1372                 if ((size_t)write(batch_fd, buf, len) != len)
1373                         exit_cleanup(RERR_FILEIO);
1374         }
1375
1376         if (!iobuf_out || fd != iobuf_f_out) {
1377                 writefd_unbuffered(fd, buf, len);
1378                 return;
1379         }
1380
1381         while (len) {
1382                 int n = MIN((int)len, IO_BUFFER_SIZE - iobuf_out_cnt);
1383                 if (n > 0) {
1384                         memcpy(iobuf_out+iobuf_out_cnt, buf, n);
1385                         buf += n;
1386                         len -= n;
1387                         iobuf_out_cnt += n;
1388                 }
1389
1390                 if (iobuf_out_cnt == IO_BUFFER_SIZE)
1391                         io_flush(NORMAL_FLUSH);
1392         }
1393 }
1394
1395 void write_shortint(int f, unsigned short x)
1396 {
1397         char b[2];
1398         b[0] = (char)x;
1399         b[1] = (char)(x >> 8);
1400         writefd(f, b, 2);
1401 }
1402
1403 void write_int(int f, int32 x)
1404 {
1405         char b[4];
1406         SIVAL(b, 0, x);
1407         writefd(f, b, 4);
1408 }
1409
1410 /*
1411  * Note: int64 may actually be a 32-bit type if ./configure couldn't find any
1412  * 64-bit types on this platform.
1413  */
1414 void write_longint(int f, int64 x)
1415 {
1416         char b[12];
1417
1418 #if SIZEOF_INT64 < 8
1419         if (x < 0 || x > 0x7FFFFFFF) {
1420                 rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
1421                 exit_cleanup(RERR_UNSUPPORTED);
1422         }
1423 #endif
1424
1425         if (protocol_version < 30) {
1426                 char * const s = b+4;
1427                 SIVAL(s, 0, x);
1428 #if SIZEOF_INT64 < 8
1429                 writefd(f, s, 4);
1430 #else
1431                 if (x <= 0x7FFFFFFF && x >= 0) {
1432                         writefd(f, s, 4);
1433                         return;
1434                 }
1435
1436                 memset(b, 0xFF, 4);
1437                 SIVAL(s, 4, x >> 32);
1438                 writefd(f, b, 12);
1439         } else if (x < 0) {
1440                 goto all_bits;
1441 #endif
1442         } else if (x < ((int32)1<<(3*8-1))) {
1443                 b[0] = (char)(x >> 16);
1444                 b[1] = (char)(x >> 8);
1445                 b[2] = (char)x;
1446                 writefd(f, b, 3);
1447         } else if (x < ((int64)1<<(4*8-2))) {
1448                 b[0] = (char)((x >> 24) | 0x80);
1449                 b[1] = (char)(x >> 16);
1450                 b[2] = (char)(x >> 8);
1451                 b[3] = (char)x;
1452                 writefd(f, b, 4);
1453 #if SIZEOF_INT64 < 8
1454         } else {
1455                 b[0] = 0xC0;
1456                 b[1] = (char)(x >> 24);
1457                 b[2] = (char)(x >> 16);
1458                 b[3] = (char)(x >> 8);
1459                 b[4] = (char)x;
1460                 writefd(f, b, 5);
1461         }
1462 #else
1463         } else if (x < ((int64)1<<(5*8-3))) {
1464                 b[0] = (char)((x >> 32) | 0xC0);
1465                 b[1] = (char)(x >> 24);
1466                 b[2] = (char)(x >> 16);
1467                 b[3] = (char)(x >> 8);
1468                 b[4] = (char)x;
1469                 writefd(f, b, 5);
1470         } else if (x < ((int64)1<<(6*8-4))) {
1471                 b[0] = (char)((x >> 40) | 0xE0);
1472                 b[1] = (char)(x >> 32);
1473                 b[2] = (char)(x >> 24);
1474                 b[3] = (char)(x >> 16);
1475                 b[4] = (char)(x >> 8);
1476                 b[5] = (char)x;
1477                 writefd(f, b, 6);
1478         } else if (x < ((int64)1<<(7*8-5))) {
1479                 b[0] = (char)((x >> 48) | 0xF0);
1480                 b[1] = (char)(x >> 40);
1481                 b[2] = (char)(x >> 32);
1482                 b[3] = (char)(x >> 24);
1483                 b[4] = (char)(x >> 16);
1484                 b[5] = (char)(x >> 8);
1485                 b[6] = (char)x;
1486                 writefd(f, b, 7);
1487         } else if (x < ((int64)1<<(8*8-6))) {
1488                 b[0] = (char)((x >> 56) | 0xF8);
1489                 b[1] = (char)(x >> 48);
1490                 b[2] = (char)(x >> 40);
1491                 b[3] = (char)(x >> 32);
1492                 b[4] = (char)(x >> 24);
1493                 b[5] = (char)(x >> 16);
1494                 b[6] = (char)(x >> 8);
1495                 b[7] = (char)x;
1496                 writefd(f, b, 8);
1497         } else {
1498           all_bits:
1499                 b[0] = (char)0xFC;
1500                 b[1] = (char)(x >> 56);
1501                 b[2] = (char)(x >> 48);
1502                 b[3] = (char)(x >> 40);
1503                 b[4] = (char)(x >> 32);
1504                 b[5] = (char)(x >> 24);
1505                 b[6] = (char)(x >> 16);
1506                 b[7] = (char)(x >> 8);
1507                 b[8] = (char)x;
1508                 writefd(f, b, 9);
1509         }
1510 #endif
1511 }
1512
1513 void write_buf(int f, const char *buf, size_t len)
1514 {
1515         writefd(f,buf,len);
1516 }
1517
1518 /** Write a string to the connection */
1519 void write_sbuf(int f, const char *buf)
1520 {
1521         writefd(f, buf, strlen(buf));
1522 }
1523
1524 void write_byte(int f, uchar c)
1525 {
1526         writefd(f, (char *)&c, 1);
1527 }
1528
1529 void write_vstring(int f, const char *str, int len)
1530 {
1531         uchar lenbuf[3], *lb = lenbuf;
1532
1533         if (len > 0x7F) {
1534                 if (len > 0x7FFF) {
1535                         rprintf(FERROR,
1536                                 "attempting to send over-long vstring (%d > %d)\n",
1537                                 len, 0x7FFF);
1538                         exit_cleanup(RERR_PROTOCOL);
1539                 }
1540                 *lb++ = len / 0x100 + 0x80;
1541         }
1542         *lb = len;
1543
1544         writefd(f, (char*)lenbuf, lb - lenbuf + 1);
1545         if (len)
1546                 writefd(f, str, len);
1547 }
1548
1549 /* Send a file-list index using a byte-reduction method. */
1550 void write_ndx(int f, int32 ndx)
1551 {
1552         static int32 prev_positive = -1, prev_negative = 1;
1553         int32 diff, cnt = 0;
1554         char b[6];
1555
1556         if (protocol_version < 30 || read_batch) {
1557                 write_int(f, ndx);
1558                 return;
1559         }
1560
1561         /* Send NDX_DONE as a single-byte 0 with no side effects.  Send
1562          * negative nums as a positive after sending a leading 0xFF. */
1563         if (ndx >= 0) {
1564                 diff = ndx - prev_positive;
1565                 prev_positive = ndx;
1566         } else if (ndx == NDX_DONE) {
1567                 *b = 0;
1568                 writefd(f, b, 1);
1569                 return;
1570         } else {
1571                 b[cnt++] = (char)0xFF;
1572                 ndx = -ndx;
1573                 diff = ndx - prev_negative;
1574                 prev_negative = ndx;
1575         }
1576
1577         /* A diff of 1 - 253 is sent as a one-byte diff; a diff of 254 - 32767
1578          * or 0 is sent as a 0xFE + a two-byte diff; otherwise we send 0xFE
1579          * & all 4 bytes of the (non-negative) num with the high-bit set. */
1580         if (diff < 0xFE && diff > 0)
1581                 b[cnt++] = (char)diff;
1582         else if (diff < 0 || diff > 0x7FFF) {
1583                 b[cnt++] = (char)0xFE;
1584                 b[cnt++] = (char)((ndx >> 24) | 0x80);
1585                 b[cnt++] = (char)(ndx >> 16);
1586                 b[cnt++] = (char)(ndx >> 8);
1587                 b[cnt++] = (char)ndx;
1588         } else {
1589                 b[cnt++] = (char)0xFE;
1590                 b[cnt++] = (char)(diff >> 8);
1591                 b[cnt++] = (char)diff;
1592         }
1593         writefd(f, b, cnt);
1594 }
1595
1596 /* Receive a file-list index using a byte-reduction method. */
1597 int32 read_ndx(int f)
1598 {
1599         static int32 prev_positive = -1, prev_negative = 1;
1600         int32 *prev_ptr, num;
1601         char b[4];
1602
1603         if (protocol_version < 30)
1604                 return read_int(f);
1605
1606         readfd(f, b, 1);
1607         if (CVAL(b, 0) == 0xFF) {
1608                 readfd(f, b, 1);
1609                 prev_ptr = &prev_negative;
1610         } else if (CVAL(b, 0) == 0)
1611                 return NDX_DONE;
1612         else
1613                 prev_ptr = &prev_positive;
1614         if (CVAL(b, 0) == 0xFE) {
1615                 readfd(f, b, 2);
1616                 if (CVAL(b, 0) & 0x80) {
1617                         readfd(f, b+2, 2);
1618                         num = NVAL4(b, 0x80);
1619                 } else
1620                         num = NVAL2(b, 0) + *prev_ptr;
1621         } else
1622                 num = CVAL(b, 0) + *prev_ptr;
1623         *prev_ptr = num;
1624         if (prev_ptr == &prev_negative)
1625                 num = -num;
1626         return num;
1627 }
1628
1629 /**
1630  * Read a line of up to @p maxlen characters into @p buf (not counting
1631  * the trailing null).  Strips the (required) trailing newline and all
1632  * carriage returns.
1633  *
1634  * @return 1 for success; 0 for I/O error or truncation.
1635  **/
1636 int read_line(int f, char *buf, size_t maxlen)
1637 {
1638         while (maxlen) {
1639                 buf[0] = 0;
1640                 read_buf(f, buf, 1);
1641                 if (buf[0] == 0)
1642                         return 0;
1643                 if (buf[0] == '\n')
1644                         break;
1645                 if (buf[0] != '\r') {
1646                         buf++;
1647                         maxlen--;
1648                 }
1649         }
1650         *buf = '\0';
1651         return maxlen > 0;
1652 }
1653
1654 void io_printf(int fd, const char *format, ...)
1655 {
1656         va_list ap;
1657         char buf[BIGPATHBUFLEN];
1658         int len;
1659
1660         va_start(ap, format);
1661         len = vsnprintf(buf, sizeof buf, format, ap);
1662         va_end(ap);
1663
1664         if (len < 0)
1665                 exit_cleanup(RERR_STREAMIO);
1666
1667         if (len > (int)sizeof buf) {
1668                 rprintf(FERROR, "io_printf() was too long for the buffer.\n");
1669                 exit_cleanup(RERR_STREAMIO);
1670         }
1671
1672         write_sbuf(fd, buf);
1673 }
1674
1675 /** Setup for multiplexing a MSG_* stream with the data stream. */
1676 void io_start_multiplex_out(void)
1677 {
1678         io_flush(NORMAL_FLUSH);
1679         io_start_buffering_out(sock_f_out);
1680         io_multiplexing_out = 1;
1681 }
1682
1683 /** Setup for multiplexing a MSG_* stream with the data stream. */
1684 void io_start_multiplex_in(void)
1685 {
1686         io_flush(NORMAL_FLUSH);
1687         io_start_buffering_in(sock_f_in);
1688         io_multiplexing_in = 1;
1689 }
1690
1691 /** Write an message to the multiplexed data stream. */
1692 int io_multiplex_write(enum msgcode code, const char *buf, size_t len)
1693 {
1694         if (!io_multiplexing_out)
1695                 return 0;
1696         io_flush(NORMAL_FLUSH);
1697         stats.total_written += (len+4);
1698         mplex_write(code, buf, len);
1699         return 1;
1700 }
1701
1702 void io_end_multiplex_in(void)
1703 {
1704         io_multiplexing_in = 0;
1705         io_end_buffering_in();
1706 }
1707
1708 /** Stop output multiplexing. */
1709 void io_end_multiplex_out(void)
1710 {
1711         io_multiplexing_out = 0;
1712         io_end_buffering_out();
1713 }
1714
1715 void start_write_batch(int fd)
1716 {
1717         /* Some communication has already taken place, but we don't
1718          * enable batch writing until here so that we can write a
1719          * canonical record of the communication even though the
1720          * actual communication so far depends on whether a daemon
1721          * is involved. */
1722         write_int(batch_fd, protocol_version);
1723         write_int(batch_fd, checksum_seed);
1724
1725         if (am_sender)
1726                 write_batch_monitor_out = fd;
1727         else
1728                 write_batch_monitor_in = fd;
1729 }
1730
1731 void stop_write_batch(void)
1732 {
1733         write_batch_monitor_out = -1;
1734         write_batch_monitor_in = -1;
1735 }