The length check in make_file() doesn't need to subtract pathname_len
[rsync/rsync.git] / io.c
... / ...
CommitLineData
1/*
2 * Socket and pipe I/O utilities used in rsync.
3 *
4 * Copyright (C) 1996-2001 Andrew Tridgell
5 * Copyright (C) 1996 Paul Mackerras
6 * Copyright (C) 2001, 2002 Martin Pool <mbp@samba.org>
7 * Copyright (C) 2003-2007 Wayne Davison
8 *
9 * This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License version 3 as
11 * published by the Free Software Foundation.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License along
19 * with this program; if not, visit the http://fsf.org website.
20 */
21
22/* Rsync provides its own multiplexing system, which is used to send
23 * stderr and stdout over a single socket.
24 *
25 * For historical reasons this is off during the start of the
26 * connection, but it's switched on quite early using
27 * io_start_multiplex_out() and io_start_multiplex_in(). */
28
29#include "rsync.h"
30
31/** If no timeout is specified then use a 60 second select timeout */
32#define SELECT_TIMEOUT 60
33
34extern int bwlimit;
35extern size_t bwlimit_writemax;
36extern int io_timeout;
37extern int allowed_lull;
38extern int am_server;
39extern int am_daemon;
40extern int am_sender;
41extern int am_generator;
42extern int inc_recurse;
43extern int io_error;
44extern int eol_nulls;
45extern int flist_eof;
46extern int read_batch;
47extern int csum_length;
48extern int checksum_seed;
49extern int protocol_version;
50extern int remove_source_files;
51extern int preserve_hard_links;
52extern char *filesfrom_host;
53extern struct stats stats;
54extern struct file_list *cur_flist, *first_flist;
55#ifdef ICONV_OPTION
56extern iconv_t ic_send, ic_recv;
57#endif
58
59const char phase_unknown[] = "unknown";
60int ignore_timeout = 0;
61int batch_fd = -1;
62int msgdone_cnt = 0;
63
64/* Ignore an EOF error if non-zero. See whine_about_eof(). */
65int kluge_around_eof = 0;
66
67int msg_fd_in = -1;
68int msg_fd_out = -1;
69int sock_f_in = -1;
70int sock_f_out = -1;
71
72static int iobuf_f_in = -1;
73static char *iobuf_in;
74static size_t iobuf_in_siz;
75static size_t iobuf_in_ndx;
76static size_t iobuf_in_remaining;
77
78static int iobuf_f_out = -1;
79static char *iobuf_out;
80static int iobuf_out_cnt;
81
82int flist_forward_from = -1;
83
84static int io_multiplexing_out;
85static int io_multiplexing_in;
86static time_t last_io_in;
87static time_t last_io_out;
88static int no_flush;
89
90static int write_batch_monitor_in = -1;
91static int write_batch_monitor_out = -1;
92
93static int io_filesfrom_f_in = -1;
94static int io_filesfrom_f_out = -1;
95static char io_filesfrom_buf[2048];
96#ifdef ICONV_OPTION
97static char iconv_buf[sizeof io_filesfrom_buf / 2];
98#endif
99static char *io_filesfrom_bp;
100static char io_filesfrom_lastchar;
101static int io_filesfrom_buflen;
102static int defer_forwarding_messages = 0;
103static int select_timeout = SELECT_TIMEOUT;
104static int active_filecnt = 0;
105static OFF_T active_bytecnt = 0;
106
107static char int_byte_extra[64] = {
108 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, /* (00 - 3F)/4 */
109 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, /* (40 - 7F)/4 */
110 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, /* (80 - BF)/4 */
111 2, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 4, 4, 5, 6, /* (C0 - FF)/4 */
112};
113
114static void readfd(int fd, char *buffer, size_t N);
115static void writefd(int fd, const char *buf, size_t len);
116static void writefd_unbuffered(int fd, const char *buf, size_t len);
117static void decrement_active_files(int ndx);
118static void decrement_flist_in_progress(int ndx, int redo);
119static void mplex_write(int fd, enum msgcode code, const char *buf, size_t len, int convert);
120
121struct flist_ndx_item {
122 struct flist_ndx_item *next;
123 int ndx;
124};
125
126struct flist_ndx_list {
127 struct flist_ndx_item *head, *tail;
128};
129
130static struct flist_ndx_list redo_list, hlink_list;
131
132struct msg_list_item {
133 struct msg_list_item *next;
134 char convert;
135 char buf[1];
136};
137
138struct msg_list {
139 struct msg_list_item *head, *tail;
140};
141
142static struct msg_list msg_queue;
143
144static void flist_ndx_push(struct flist_ndx_list *lp, int ndx)
145{
146 struct flist_ndx_item *item;
147
148 if (!(item = new(struct flist_ndx_item)))
149 out_of_memory("flist_ndx_push");
150 item->next = NULL;
151 item->ndx = ndx;
152 if (lp->tail)
153 lp->tail->next = item;
154 else
155 lp->head = item;
156 lp->tail = item;
157}
158
159static int flist_ndx_pop(struct flist_ndx_list *lp)
160{
161 struct flist_ndx_item *next;
162 int ndx;
163
164 if (!lp->head)
165 return -1;
166
167 ndx = lp->head->ndx;
168 next = lp->head->next;
169 free(lp->head);
170 lp->head = next;
171 if (!next)
172 lp->tail = NULL;
173
174 return ndx;
175}
176
177static void check_timeout(void)
178{
179 time_t t;
180
181 if (!io_timeout || ignore_timeout)
182 return;
183
184 if (!last_io_in) {
185 last_io_in = time(NULL);
186 return;
187 }
188
189 t = time(NULL);
190
191 if (t - last_io_in >= io_timeout) {
192 if (!am_server && !am_daemon) {
193 rprintf(FERROR, "io timeout after %d seconds -- exiting\n",
194 (int)(t-last_io_in));
195 }
196 exit_cleanup(RERR_TIMEOUT);
197 }
198}
199
200/* Note the fds used for the main socket (which might really be a pipe
201 * for a local transfer, but we can ignore that). */
202void io_set_sock_fds(int f_in, int f_out)
203{
204 sock_f_in = f_in;
205 sock_f_out = f_out;
206}
207
208void set_io_timeout(int secs)
209{
210 io_timeout = secs;
211
212 if (!io_timeout || io_timeout > SELECT_TIMEOUT)
213 select_timeout = SELECT_TIMEOUT;
214 else
215 select_timeout = io_timeout;
216
217 allowed_lull = read_batch ? 0 : (io_timeout + 1) / 2;
218}
219
220/* Setup the fd used to receive MSG_* messages. Only needed during the
221 * early stages of being a local sender (up through the sending of the
222 * file list) or when we're the generator (to fetch the messages from
223 * the receiver). */
224void set_msg_fd_in(int fd)
225{
226 msg_fd_in = fd;
227}
228
229/* Setup the fd used to send our MSG_* messages. Only needed when
230 * we're the receiver (to send our messages to the generator). */
231void set_msg_fd_out(int fd)
232{
233 msg_fd_out = fd;
234 set_nonblocking(msg_fd_out);
235}
236
237/* Add a message to the pending MSG_* list. */
238static void msg_list_add(struct msg_list *lst, int code, const char *buf, int len, int convert)
239{
240 struct msg_list_item *m;
241 int sz = len + 4 + sizeof m[0] - 1;
242
243 if (!(m = (struct msg_list_item *)new_array(char, sz)))
244 out_of_memory("msg_list_add");
245 m->next = NULL;
246 m->convert = convert;
247 SIVAL(m->buf, 0, ((code+MPLEX_BASE)<<24) | len);
248 memcpy(m->buf + 4, buf, len);
249 if (lst->tail)
250 lst->tail->next = m;
251 else
252 lst->head = m;
253 lst->tail = m;
254}
255
256static void msg_flush(void)
257{
258 if (am_generator) {
259 while (msg_queue.head && io_multiplexing_out) {
260 struct msg_list_item *m = msg_queue.head;
261 int len = IVAL(m->buf, 0) & 0xFFFFFF;
262 int tag = *((uchar*)m->buf+3) - MPLEX_BASE;
263 if (!(msg_queue.head = m->next))
264 msg_queue.tail = NULL;
265 stats.total_written += len + 4;
266 defer_forwarding_messages++;
267 mplex_write(sock_f_out, tag, m->buf + 4, len, m->convert);
268 defer_forwarding_messages--;
269 free(m);
270 }
271 } else {
272 while (msg_queue.head) {
273 struct msg_list_item *m = msg_queue.head;
274 int len = IVAL(m->buf, 0) & 0xFFFFFF;
275 int tag = *((uchar*)m->buf+3) - MPLEX_BASE;
276 if (!(msg_queue.head = m->next))
277 msg_queue.tail = NULL;
278 defer_forwarding_messages++;
279 mplex_write(msg_fd_out, tag, m->buf + 4, len, m->convert);
280 defer_forwarding_messages--;
281 free(m);
282 }
283 }
284}
285
286/* Read a message from the MSG_* fd and handle it. This is called either
287 * during the early stages of being a local sender (up through the sending
288 * of the file list) or when we're the generator (to fetch the messages
289 * from the receiver). */
290static void read_msg_fd(void)
291{
292 char buf[2048];
293 size_t n;
294 struct file_list *flist;
295 int fd = msg_fd_in;
296 int tag, len;
297
298 /* Temporarily disable msg_fd_in. This is needed to avoid looping back
299 * to this routine from writefd_unbuffered(). */
300 no_flush++;
301 msg_fd_in = -1;
302 defer_forwarding_messages++;
303
304 readfd(fd, buf, 4);
305 tag = IVAL(buf, 0);
306
307 len = tag & 0xFFFFFF;
308 tag = (tag >> 24) - MPLEX_BASE;
309
310 switch (tag) {
311 case MSG_DONE:
312 if (len < 0 || len > 1 || !am_generator) {
313 invalid_msg:
314 rprintf(FERROR, "invalid message %d:%d [%s%s]\n",
315 tag, len, who_am_i(),
316 inc_recurse ? "/inc" : "");
317 exit_cleanup(RERR_STREAMIO);
318 }
319 if (len) {
320 readfd(fd, buf, len);
321 stats.total_read = read_varlong(fd, 3);
322 }
323 msgdone_cnt++;
324 break;
325 case MSG_REDO:
326 if (len != 4 || !am_generator)
327 goto invalid_msg;
328 readfd(fd, buf, 4);
329 if (remove_source_files)
330 decrement_active_files(IVAL(buf,0));
331 flist_ndx_push(&redo_list, IVAL(buf,0));
332 if (inc_recurse)
333 decrement_flist_in_progress(IVAL(buf,0), 1);
334 break;
335 case MSG_FLIST:
336 if (len != 4 || !am_generator || !inc_recurse)
337 goto invalid_msg;
338 readfd(fd, buf, 4);
339 /* Read extra file list from receiver. */
340 assert(iobuf_in != NULL);
341 assert(iobuf_f_in == fd);
342 if (verbose > 3) {
343 rprintf(FINFO, "[%s] receiving flist for dir %d\n",
344 who_am_i(), IVAL(buf,0));
345 }
346 flist = recv_file_list(fd);
347 flist->parent_ndx = IVAL(buf,0);
348 break;
349 case MSG_FLIST_EOF:
350 if (len != 0 || !am_generator || !inc_recurse)
351 goto invalid_msg;
352 flist_eof = 1;
353 break;
354 case MSG_DELETED:
355 if (len >= (int)sizeof buf || !am_generator)
356 goto invalid_msg;
357 readfd(fd, buf, len);
358 send_msg(MSG_DELETED, buf, len, 1);
359 break;
360 case MSG_SUCCESS:
361 if (len != 4 || !am_generator)
362 goto invalid_msg;
363 readfd(fd, buf, len);
364 if (remove_source_files) {
365 decrement_active_files(IVAL(buf,0));
366 send_msg(MSG_SUCCESS, buf, len, 0);
367 }
368 if (preserve_hard_links)
369 flist_ndx_push(&hlink_list, IVAL(buf,0));
370 if (inc_recurse)
371 decrement_flist_in_progress(IVAL(buf,0), 0);
372 break;
373 case MSG_NO_SEND:
374 if (len != 4 || !am_generator)
375 goto invalid_msg;
376 readfd(fd, buf, len);
377 if (inc_recurse)
378 decrement_flist_in_progress(IVAL(buf,0), 0);
379 break;
380 case MSG_SOCKERR:
381 case MSG_CLIENT:
382 if (!am_generator)
383 goto invalid_msg;
384 if (tag == MSG_SOCKERR)
385 io_end_multiplex_out();
386 /* FALL THROUGH */
387 case MSG_INFO:
388 case MSG_ERROR:
389 case MSG_LOG:
390 while (len) {
391 n = len;
392 if (n >= sizeof buf)
393 n = sizeof buf - 1;
394 readfd(fd, buf, n);
395 rwrite((enum logcode)tag, buf, n, !am_generator);
396 len -= n;
397 }
398 break;
399 default:
400 rprintf(FERROR, "unknown message %d:%d [%s]\n",
401 tag, len, who_am_i());
402 exit_cleanup(RERR_STREAMIO);
403 }
404
405 no_flush--;
406 msg_fd_in = fd;
407 if (!--defer_forwarding_messages)
408 msg_flush();
409}
410
411/* This is used by the generator to limit how many file transfers can
412 * be active at once when --remove-source-files is specified. Without
413 * this, sender-side deletions were mostly happening at the end. */
414void increment_active_files(int ndx, int itemizing, enum logcode code)
415{
416 /* TODO: tune these limits? */
417 while (active_filecnt >= (active_bytecnt >= 128*1024 ? 10 : 50)) {
418 check_for_finished_files(itemizing, code, 0);
419 if (iobuf_out_cnt)
420 io_flush(NORMAL_FLUSH);
421 else
422 read_msg_fd();
423 }
424
425 active_filecnt++;
426 active_bytecnt += F_LENGTH(cur_flist->files[ndx - cur_flist->ndx_start]);
427}
428
429static void decrement_active_files(int ndx)
430{
431 struct file_list *flist = flist_for_ndx(ndx);
432 assert(flist != NULL);
433 active_filecnt--;
434 active_bytecnt -= F_LENGTH(flist->files[ndx - flist->ndx_start]);
435}
436
437static void decrement_flist_in_progress(int ndx, int redo)
438{
439 struct file_list *flist = cur_flist ? cur_flist : first_flist;
440
441 while (ndx < flist->ndx_start) {
442 if (flist == first_flist) {
443 invalid_ndx:
444 rprintf(FERROR,
445 "Invalid file index: %d (%d - %d) [%s]\n",
446 ndx, first_flist->ndx_start,
447 first_flist->prev->ndx_start + first_flist->prev->count - 1,
448 who_am_i());
449 exit_cleanup(RERR_PROTOCOL);
450 }
451 flist = flist->prev;
452 }
453 while (ndx >= flist->ndx_start + flist->count) {
454 if (!(flist = flist->next))
455 goto invalid_ndx;
456 }
457
458 flist->in_progress--;
459 if (redo)
460 flist->to_redo++;
461}
462
463/* Write an message to a multiplexed stream. If this fails, rsync exits. */
464static void mplex_write(int fd, enum msgcode code, const char *buf, size_t len, int convert)
465{
466 char buffer[BIGPATHBUFLEN]; /* Oversized for use by iconv code. */
467 size_t n = len;
468
469 SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
470
471#ifdef ICONV_OPTION
472 if (convert && ic_send == (iconv_t)-1)
473#endif
474 convert = 0;
475
476 if (convert || n > 1024 - 4) /* BIGPATHBUFLEN can handle 1024 bytes */
477 n = 0;
478 else
479 memcpy(buffer + 4, buf, n);
480
481 writefd_unbuffered(fd, buffer, n+4);
482
483 len -= n;
484 buf += n;
485
486#ifdef ICONV_OPTION
487 if (convert) {
488 iconv(ic_send, NULL, 0, NULL, 0);
489 defer_forwarding_messages++;
490 while (len) {
491 ICONV_CONST char *ibuf = (ICONV_CONST char *)buf;
492 char *obuf = buffer;
493 size_t ocnt = sizeof buffer;
494 while (len && iconv(ic_send, &ibuf,&len,
495 &obuf,&ocnt) == (size_t)-1) {
496 if (errno == E2BIG || !ocnt)
497 break;
498 *obuf++ = *ibuf++;
499 ocnt--, len--;
500 }
501 n = obuf - buffer;
502 writefd_unbuffered(fd, buffer, n);
503 }
504 if (!--defer_forwarding_messages)
505 msg_flush();
506 } else
507#endif
508 if (len) {
509 defer_forwarding_messages++;
510 writefd_unbuffered(fd, buf, len);
511 if (!--defer_forwarding_messages)
512 msg_flush();
513 }
514}
515
516int send_msg(enum msgcode code, const char *buf, int len, int convert)
517{
518 if (msg_fd_out < 0) {
519 if (!defer_forwarding_messages)
520 return io_multiplex_write(code, buf, len, convert);
521 if (!io_multiplexing_out)
522 return 0;
523 msg_list_add(&msg_queue, code, buf, len, convert);
524 return 1;
525 }
526 if (flist_forward_from >= 0)
527 msg_list_add(&msg_queue, code, buf, len, convert);
528 else
529 mplex_write(msg_fd_out, code, buf, len, convert);
530 return 1;
531}
532
533void send_msg_int(enum msgcode code, int num)
534{
535 char numbuf[4];
536 SIVAL(numbuf, 0, num);
537 send_msg(code, numbuf, 4, 0);
538}
539
540void wait_for_receiver(void)
541{
542 if (iobuf_out_cnt)
543 io_flush(NORMAL_FLUSH);
544 else
545 read_msg_fd();
546}
547
548int get_redo_num(void)
549{
550 return flist_ndx_pop(&redo_list);
551}
552
553int get_hlink_num(void)
554{
555 return flist_ndx_pop(&hlink_list);
556}
557
558/**
559 * When we're the receiver and we have a local --files-from list of names
560 * that needs to be sent over the socket to the sender, we have to do two
561 * things at the same time: send the sender a list of what files we're
562 * processing and read the incoming file+info list from the sender. We do
563 * this by augmenting the read_timeout() function to copy this data. It
564 * uses the io_filesfrom_buf to read a block of data from f_in (when it is
565 * ready, since it might be a pipe) and then blast it out f_out (when it
566 * is ready to receive more data).
567 */
568void io_set_filesfrom_fds(int f_in, int f_out)
569{
570 io_filesfrom_f_in = f_in;
571 io_filesfrom_f_out = f_out;
572 io_filesfrom_bp = io_filesfrom_buf;
573 io_filesfrom_lastchar = '\0';
574 io_filesfrom_buflen = 0;
575}
576
577/* It's almost always an error to get an EOF when we're trying to read from the
578 * network, because the protocol is (for the most part) self-terminating.
579 *
580 * There is one case for the receiver when it is at the end of the transfer
581 * (hanging around reading any keep-alive packets that might come its way): if
582 * the sender dies before the generator's kill-signal comes through, we can end
583 * up here needing to loop until the kill-signal arrives. In this situation,
584 * kluge_around_eof will be < 0.
585 *
586 * There is another case for older protocol versions (< 24) where the module
587 * listing was not terminated, so we must ignore an EOF error in that case and
588 * exit. In this situation, kluge_around_eof will be > 0. */
589static void whine_about_eof(int fd)
590{
591 if (kluge_around_eof && fd == sock_f_in) {
592 int i;
593 if (kluge_around_eof > 0)
594 exit_cleanup(0);
595 /* If we're still here after 10 seconds, exit with an error. */
596 for (i = 10*1000/20; i--; )
597 msleep(20);
598 }
599
600 rprintf(FERROR, RSYNC_NAME ": connection unexpectedly closed "
601 "(%.0f bytes received so far) [%s]\n",
602 (double)stats.total_read, who_am_i());
603
604 exit_cleanup(RERR_STREAMIO);
605}
606
607/**
608 * Read from a socket with I/O timeout. return the number of bytes
609 * read. If no bytes can be read then exit, never return a number <= 0.
610 *
611 * TODO: If the remote shell connection fails, then current versions
612 * actually report an "unexpected EOF" error here. Since it's a
613 * fairly common mistake to try to use rsh when ssh is required, we
614 * should trap that: if we fail to read any data at all, we should
615 * give a better explanation. We can tell whether the connection has
616 * started by looking e.g. at whether the remote version is known yet.
617 */
618static int read_timeout(int fd, char *buf, size_t len)
619{
620 int n, cnt = 0;
621
622 io_flush(FULL_FLUSH);
623
624 while (cnt == 0) {
625 /* until we manage to read *something* */
626 fd_set r_fds, w_fds;
627 struct timeval tv;
628 int maxfd = fd;
629 int count;
630
631 FD_ZERO(&r_fds);
632 FD_ZERO(&w_fds);
633 FD_SET(fd, &r_fds);
634 if (io_filesfrom_f_out >= 0) {
635 int new_fd;
636 if (io_filesfrom_buflen == 0) {
637 if (io_filesfrom_f_in >= 0) {
638 FD_SET(io_filesfrom_f_in, &r_fds);
639 new_fd = io_filesfrom_f_in;
640 } else {
641 io_filesfrom_f_out = -1;
642 new_fd = -1;
643 }
644 } else {
645 FD_SET(io_filesfrom_f_out, &w_fds);
646 new_fd = io_filesfrom_f_out;
647 }
648 if (new_fd > maxfd)
649 maxfd = new_fd;
650 }
651
652 tv.tv_sec = select_timeout;
653 tv.tv_usec = 0;
654
655 errno = 0;
656
657 count = select(maxfd + 1, &r_fds, &w_fds, NULL, &tv);
658
659 if (count <= 0) {
660 if (errno == EBADF) {
661 defer_forwarding_messages = 0;
662 exit_cleanup(RERR_SOCKETIO);
663 }
664 check_timeout();
665 continue;
666 }
667
668 if (io_filesfrom_f_out >= 0) {
669 if (io_filesfrom_buflen) {
670 if (FD_ISSET(io_filesfrom_f_out, &w_fds)) {
671 int l = write(io_filesfrom_f_out,
672 io_filesfrom_bp,
673 io_filesfrom_buflen);
674 if (l > 0) {
675 if (!(io_filesfrom_buflen -= l))
676 io_filesfrom_bp = io_filesfrom_buf;
677 else
678 io_filesfrom_bp += l;
679 } else if (errno != EINTR) {
680 /* XXX should we complain? */
681 io_filesfrom_f_out = -1;
682 }
683 }
684 } else if (io_filesfrom_f_in >= 0) {
685 if (FD_ISSET(io_filesfrom_f_in, &r_fds)) {
686 int l = read(io_filesfrom_f_in,
687 io_filesfrom_buf,
688 sizeof io_filesfrom_buf);
689 if (l <= 0) {
690 if (l == 0 || errno != EINTR) {
691 /* Send end-of-file marker */
692 io_filesfrom_buf[0] = '\0';
693 io_filesfrom_buf[1] = '\0';
694 io_filesfrom_buflen = io_filesfrom_lastchar? 2 : 1;
695 io_filesfrom_f_in = -1;
696 }
697 } else {
698 if (!eol_nulls) {
699 char *s = io_filesfrom_buf + l;
700 /* Transform CR and/or LF into '\0' */
701 while (s-- > io_filesfrom_buf) {
702 if (*s == '\n' || *s == '\r')
703 *s = '\0';
704 }
705 }
706 if (!io_filesfrom_lastchar) {
707 /* Last buf ended with a '\0', so don't
708 * let this buf start with one. */
709 while (l && !*io_filesfrom_bp)
710 io_filesfrom_bp++, l--;
711 }
712 if (!l)
713 io_filesfrom_bp = io_filesfrom_buf;
714 else {
715 char *f = io_filesfrom_bp;
716 char *t = f;
717 char *eob = f + l;
718 /* Eliminate any multi-'\0' runs. */
719 while (f != eob) {
720 if (!(*t++ = *f++)) {
721 while (f != eob && !*f)
722 f++, l--;
723 }
724 }
725 io_filesfrom_lastchar = f[-1];
726 }
727 io_filesfrom_buflen = l;
728 }
729 }
730 }
731 }
732
733 if (!FD_ISSET(fd, &r_fds))
734 continue;
735
736 n = read(fd, buf, len);
737
738 if (n <= 0) {
739 if (n == 0)
740 whine_about_eof(fd); /* Doesn't return. */
741 if (errno == EINTR || errno == EWOULDBLOCK
742 || errno == EAGAIN)
743 continue;
744
745 /* Don't write errors on a dead socket. */
746 if (fd == sock_f_in) {
747 io_end_multiplex_out();
748 rsyserr(FSOCKERR, errno, "read error");
749 } else
750 rsyserr(FERROR, errno, "read error");
751 exit_cleanup(RERR_STREAMIO);
752 }
753
754 buf += n;
755 len -= n;
756 cnt += n;
757
758 if (fd == sock_f_in && io_timeout)
759 last_io_in = time(NULL);
760 }
761
762 return cnt;
763}
764
765/* Read a line into the "fname" buffer (which must be at least MAXPATHLEN
766 * characters long). */
767int read_filesfrom_line(int fd, char *fname)
768{
769 char ch, *s, *eob = fname + MAXPATHLEN - 1;
770 int cnt;
771 int reading_remotely = filesfrom_host != NULL;
772 int nulls = eol_nulls || reading_remotely;
773
774 start:
775 s = fname;
776 while (1) {
777 cnt = read(fd, &ch, 1);
778 if (cnt < 0 && (errno == EWOULDBLOCK
779 || errno == EINTR || errno == EAGAIN)) {
780 struct timeval tv;
781 fd_set r_fds, e_fds;
782 FD_ZERO(&r_fds);
783 FD_SET(fd, &r_fds);
784 FD_ZERO(&e_fds);
785 FD_SET(fd, &e_fds);
786 tv.tv_sec = select_timeout;
787 tv.tv_usec = 0;
788 if (!select(fd+1, &r_fds, NULL, &e_fds, &tv))
789 check_timeout();
790 if (FD_ISSET(fd, &e_fds)) {
791 rsyserr(FINFO, errno,
792 "select exception on fd %d", fd);
793 }
794 continue;
795 }
796 if (cnt != 1)
797 break;
798 if (nulls? !ch : (ch == '\r' || ch == '\n')) {
799 /* Skip empty lines if reading locally. */
800 if (!reading_remotely && s == fname)
801 continue;
802 break;
803 }
804 if (s < eob)
805 *s++ = ch;
806 }
807 *s = '\0';
808
809 /* Dump comments. */
810 if (*fname == '#' || *fname == ';')
811 goto start;
812
813 return s - fname;
814}
815
816int io_start_buffering_out(int f_out)
817{
818 if (iobuf_out) {
819 assert(f_out == iobuf_f_out);
820 return 0;
821 }
822 if (!(iobuf_out = new_array(char, IO_BUFFER_SIZE)))
823 out_of_memory("io_start_buffering_out");
824 iobuf_out_cnt = 0;
825 iobuf_f_out = f_out;
826 return 1;
827}
828
829int io_start_buffering_in(int f_in)
830{
831 if (iobuf_in) {
832 assert(f_in == iobuf_f_in);
833 return 0;
834 }
835 iobuf_in_siz = 2 * IO_BUFFER_SIZE;
836 if (!(iobuf_in = new_array(char, iobuf_in_siz)))
837 out_of_memory("io_start_buffering_in");
838 iobuf_f_in = f_in;
839 return 1;
840}
841
842void io_end_buffering_in(void)
843{
844 if (!iobuf_in)
845 return;
846 free(iobuf_in);
847 iobuf_in = NULL;
848 iobuf_in_ndx = 0;
849 iobuf_in_remaining = 0;
850 iobuf_f_in = -1;
851}
852
853void io_end_buffering_out(void)
854{
855 if (!iobuf_out)
856 return;
857 io_flush(FULL_FLUSH);
858 free(iobuf_out);
859 iobuf_out = NULL;
860 iobuf_f_out = -1;
861}
862
863void maybe_flush_socket(int important)
864{
865 if (iobuf_out && iobuf_out_cnt
866 && (important || time(NULL) - last_io_out >= 5))
867 io_flush(NORMAL_FLUSH);
868}
869
870void maybe_send_keepalive(void)
871{
872 if (time(NULL) - last_io_out >= allowed_lull) {
873 if (!iobuf_out || !iobuf_out_cnt) {
874 if (protocol_version < 29)
875 return; /* there's nothing we can do */
876 if (protocol_version >= 30)
877 send_msg(MSG_NOOP, "", 0, 0);
878 else {
879 write_int(sock_f_out, cur_flist->count);
880 write_shortint(sock_f_out, ITEM_IS_NEW);
881 }
882 }
883 if (iobuf_out)
884 io_flush(NORMAL_FLUSH);
885 }
886}
887
888void start_flist_forward(int f_in)
889{
890 assert(iobuf_out != NULL);
891 assert(iobuf_f_out == msg_fd_out);
892 flist_forward_from = f_in;
893}
894
895void stop_flist_forward()
896{
897 flist_forward_from = -1;
898 io_flush(FULL_FLUSH);
899}
900
901/**
902 * Continue trying to read len bytes - don't return until len has been
903 * read.
904 **/
905static void read_loop(int fd, char *buf, size_t len)
906{
907 while (len) {
908 int n = read_timeout(fd, buf, len);
909
910 buf += n;
911 len -= n;
912 }
913}
914
915/**
916 * Read from the file descriptor handling multiplexing - return number
917 * of bytes read.
918 *
919 * Never returns <= 0.
920 */
921static int readfd_unbuffered(int fd, char *buf, size_t len)
922{
923 size_t msg_bytes;
924 int tag, cnt = 0;
925 char line[BIGPATHBUFLEN];
926
927 if (!iobuf_in || fd != iobuf_f_in)
928 return read_timeout(fd, buf, len);
929
930 if (!io_multiplexing_in && iobuf_in_remaining == 0) {
931 iobuf_in_remaining = read_timeout(fd, iobuf_in, iobuf_in_siz);
932 iobuf_in_ndx = 0;
933 }
934
935 while (cnt == 0) {
936 if (iobuf_in_remaining) {
937 len = MIN(len, iobuf_in_remaining);
938 memcpy(buf, iobuf_in + iobuf_in_ndx, len);
939 iobuf_in_ndx += len;
940 iobuf_in_remaining -= len;
941 cnt = len;
942 break;
943 }
944
945 read_loop(fd, line, 4);
946 tag = IVAL(line, 0);
947
948 msg_bytes = tag & 0xFFFFFF;
949 tag = (tag >> 24) - MPLEX_BASE;
950
951 switch (tag) {
952 case MSG_DATA:
953 if (msg_bytes > iobuf_in_siz) {
954 if (!(iobuf_in = realloc_array(iobuf_in, char,
955 msg_bytes)))
956 out_of_memory("readfd_unbuffered");
957 iobuf_in_siz = msg_bytes;
958 }
959 read_loop(fd, iobuf_in, msg_bytes);
960 iobuf_in_remaining = msg_bytes;
961 iobuf_in_ndx = 0;
962 break;
963 case MSG_NOOP:
964 if (am_sender)
965 maybe_send_keepalive();
966 break;
967 case MSG_IO_ERROR:
968 if (msg_bytes != 4)
969 goto invalid_msg;
970 read_loop(fd, line, msg_bytes);
971 io_error |= IVAL(line, 0);
972 break;
973 case MSG_DELETED:
974 if (msg_bytes >= sizeof line)
975 goto overflow;
976#ifdef ICONV_OPTION
977 if (ic_recv != (iconv_t)-1) {
978 ICONV_CONST char *ibuf;
979 char *obuf = line;
980 size_t icnt, ocnt = sizeof line - 1;
981 int add_null = 0;
982 iconv(ic_send, NULL, 0, NULL, 0);
983 while (msg_bytes) {
984 icnt = msg_bytes > sizeof iconv_buf
985 ? sizeof iconv_buf : msg_bytes;
986 read_loop(fd, iconv_buf, icnt);
987 if (!(msg_bytes -= icnt) && !iconv_buf[icnt-1])
988 icnt--, add_null = 1;
989 ibuf = (ICONV_CONST char *)iconv_buf;
990 if (iconv(ic_send, &ibuf,&icnt,
991 &obuf,&ocnt) == (size_t)-1)
992 goto overflow; // XXX
993 }
994 if (add_null)
995 *obuf++ = '\0';
996 msg_bytes = obuf - line;
997 } else
998#endif
999 read_loop(fd, line, msg_bytes);
1000 /* A directory name was sent with the trailing null */
1001 if (msg_bytes > 0 && !line[msg_bytes-1])
1002 log_delete(line, S_IFDIR);
1003 else {
1004 line[msg_bytes] = '\0';
1005 log_delete(line, S_IFREG);
1006 }
1007 break;
1008 case MSG_SUCCESS:
1009 if (msg_bytes != 4) {
1010 invalid_msg:
1011 rprintf(FERROR, "invalid multi-message %d:%ld [%s]\n",
1012 tag, (long)msg_bytes, who_am_i());
1013 exit_cleanup(RERR_STREAMIO);
1014 }
1015 read_loop(fd, line, msg_bytes);
1016 successful_send(IVAL(line, 0));
1017 break;
1018 case MSG_NO_SEND:
1019 if (msg_bytes != 4)
1020 goto invalid_msg;
1021 read_loop(fd, line, msg_bytes);
1022 send_msg_int(MSG_NO_SEND, IVAL(line, 0));
1023 break;
1024 case MSG_INFO:
1025 case MSG_ERROR:
1026 if (msg_bytes >= sizeof line) {
1027 overflow:
1028 rprintf(FERROR,
1029 "multiplexing overflow %d:%ld [%s]\n",
1030 tag, (long)msg_bytes, who_am_i());
1031 exit_cleanup(RERR_STREAMIO);
1032 }
1033 read_loop(fd, line, msg_bytes);
1034 rwrite((enum logcode)tag, line, msg_bytes, 1);
1035 break;
1036 default:
1037 rprintf(FERROR, "unexpected tag %d [%s]\n",
1038 tag, who_am_i());
1039 exit_cleanup(RERR_STREAMIO);
1040 }
1041 }
1042
1043 if (iobuf_in_remaining == 0)
1044 io_flush(NORMAL_FLUSH);
1045
1046 return cnt;
1047}
1048
1049/* Do a buffered read from fd. Don't return until all N bytes have
1050 * been read. If all N can't be read then exit with an error. */
1051static void readfd(int fd, char *buffer, size_t N)
1052{
1053 int cnt;
1054 size_t total = 0;
1055
1056 while (total < N) {
1057 cnt = readfd_unbuffered(fd, buffer + total, N-total);
1058 total += cnt;
1059 }
1060
1061 if (fd == write_batch_monitor_in) {
1062 if ((size_t)write(batch_fd, buffer, total) != total)
1063 exit_cleanup(RERR_FILEIO);
1064 }
1065
1066 if (fd == flist_forward_from)
1067 writefd(iobuf_f_out, buffer, total);
1068
1069 if (fd == sock_f_in)
1070 stats.total_read += total;
1071}
1072
1073unsigned short read_shortint(int f)
1074{
1075 char b[2];
1076 readfd(f, b, 2);
1077 return (UVAL(b, 1) << 8) + UVAL(b, 0);
1078}
1079
1080int32 read_int(int f)
1081{
1082 char b[4];
1083 int32 num;
1084
1085 readfd(f, b, 4);
1086 num = IVAL(b, 0);
1087#if SIZEOF_INT32 > 4
1088 if (num & (int32)0x80000000)
1089 num |= ~(int32)0xffffffff;
1090#endif
1091 return num;
1092}
1093
1094int32 read_varint(int f)
1095{
1096 union {
1097 char b[5];
1098 int32 x;
1099 } u;
1100 uchar ch;
1101 int extra;
1102
1103 u.x = 0;
1104 readfd(f, (char*)&ch, 1);
1105 extra = int_byte_extra[ch / 4];
1106 if (extra) {
1107 uchar bit = ((uchar)1<<(8-extra));
1108 if (extra >= (int)sizeof u.b) {
1109 rprintf(FERROR, "Overflow in read_varint()\n");
1110 exit_cleanup(RERR_STREAMIO);
1111 }
1112 readfd(f, u.b, extra);
1113 u.b[extra] = ch & (bit-1);
1114 } else
1115 u.b[0] = ch;
1116#if CAREFUL_ALIGNMENT
1117 u.x = IVAL(u.b,0);
1118#endif
1119#if SIZEOF_INT32 > 4
1120 if (u.x & (int32)0x80000000)
1121 u.x |= ~(int32)0xffffffff;
1122#endif
1123 return u.x;
1124}
1125
1126int64 read_varlong(int f, uchar min_bytes)
1127{
1128 union {
1129 char b[9];
1130 int64 x;
1131 } u;
1132 char b2[8];
1133 int extra;
1134
1135#if SIZEOF_INT64 < 8
1136 memset(u.b, 0, 8);
1137#else
1138 u.x = 0;
1139#endif
1140 readfd(f, b2, min_bytes);
1141 memcpy(u.b, b2+1, min_bytes-1);
1142 extra = int_byte_extra[CVAL(b2, 0) / 4];
1143 if (extra) {
1144 uchar bit = ((uchar)1<<(8-extra));
1145 if (min_bytes + extra > (int)sizeof u.b) {
1146 rprintf(FERROR, "Overflow in read_varlong()\n");
1147 exit_cleanup(RERR_STREAMIO);
1148 }
1149 readfd(f, u.b + min_bytes - 1, extra);
1150 u.b[min_bytes + extra - 1] = CVAL(b2, 0) & (bit-1);
1151#if SIZEOF_INT64 < 8
1152 if (min_bytes + extra > 5 || u.b[4] || CVAL(u.b,3) & 0x80) {
1153 rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
1154 exit_cleanup(RERR_UNSUPPORTED);
1155 }
1156#endif
1157 } else
1158 u.b[min_bytes + extra - 1] = CVAL(b2, 0);
1159#if SIZEOF_INT64 < 8
1160 u.x = IVAL(u.b,0);
1161#elif CAREFUL_ALIGNMENT
1162 u.x = IVAL(u.b,0) | (((int64)IVAL(u.b,4))<<32);
1163#endif
1164 return u.x;
1165}
1166
1167int64 read_longint(int f)
1168{
1169#if SIZEOF_INT64 >= 8
1170 char b[9];
1171#endif
1172 int32 num = read_int(f);
1173
1174 if (num != (int32)0xffffffff)
1175 return num;
1176
1177#if SIZEOF_INT64 < 8
1178 rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
1179 exit_cleanup(RERR_UNSUPPORTED);
1180#else
1181 readfd(f, b, 8);
1182 return IVAL(b,0) | (((int64)IVAL(b,4))<<32);
1183#endif
1184}
1185
1186void read_buf(int f, char *buf, size_t len)
1187{
1188 readfd(f,buf,len);
1189}
1190
1191void read_sbuf(int f, char *buf, size_t len)
1192{
1193 readfd(f, buf, len);
1194 buf[len] = '\0';
1195}
1196
1197uchar read_byte(int f)
1198{
1199 uchar c;
1200 readfd(f, (char *)&c, 1);
1201 return c;
1202}
1203
1204int read_vstring(int f, char *buf, int bufsize)
1205{
1206 int len = read_byte(f);
1207
1208 if (len & 0x80)
1209 len = (len & ~0x80) * 0x100 + read_byte(f);
1210
1211 if (len >= bufsize) {
1212 rprintf(FERROR, "over-long vstring received (%d > %d)\n",
1213 len, bufsize - 1);
1214 return -1;
1215 }
1216
1217 if (len)
1218 readfd(f, buf, len);
1219 buf[len] = '\0';
1220 return len;
1221}
1222
1223/* Populate a sum_struct with values from the socket. This is
1224 * called by both the sender and the receiver. */
1225void read_sum_head(int f, struct sum_struct *sum)
1226{
1227 sum->count = read_int(f);
1228 if (sum->count < 0) {
1229 rprintf(FERROR, "Invalid checksum count %ld [%s]\n",
1230 (long)sum->count, who_am_i());
1231 exit_cleanup(RERR_PROTOCOL);
1232 }
1233 sum->blength = read_int(f);
1234 if (sum->blength < 0 || sum->blength > MAX_BLOCK_SIZE) {
1235 rprintf(FERROR, "Invalid block length %ld [%s]\n",
1236 (long)sum->blength, who_am_i());
1237 exit_cleanup(RERR_PROTOCOL);
1238 }
1239 sum->s2length = protocol_version < 27 ? csum_length : (int)read_int(f);
1240 if (sum->s2length < 0 || sum->s2length > MAX_DIGEST_LEN) {
1241 rprintf(FERROR, "Invalid checksum length %d [%s]\n",
1242 sum->s2length, who_am_i());
1243 exit_cleanup(RERR_PROTOCOL);
1244 }
1245 sum->remainder = read_int(f);
1246 if (sum->remainder < 0 || sum->remainder > sum->blength) {
1247 rprintf(FERROR, "Invalid remainder length %ld [%s]\n",
1248 (long)sum->remainder, who_am_i());
1249 exit_cleanup(RERR_PROTOCOL);
1250 }
1251}
1252
1253/* Send the values from a sum_struct over the socket. Set sum to
1254 * NULL if there are no checksums to send. This is called by both
1255 * the generator and the sender. */
1256void write_sum_head(int f, struct sum_struct *sum)
1257{
1258 static struct sum_struct null_sum;
1259
1260 if (sum == NULL)
1261 sum = &null_sum;
1262
1263 write_int(f, sum->count);
1264 write_int(f, sum->blength);
1265 if (protocol_version >= 27)
1266 write_int(f, sum->s2length);
1267 write_int(f, sum->remainder);
1268}
1269
1270/**
1271 * Sleep after writing to limit I/O bandwidth usage.
1272 *
1273 * @todo Rather than sleeping after each write, it might be better to
1274 * use some kind of averaging. The current algorithm seems to always
1275 * use a bit less bandwidth than specified, because it doesn't make up
1276 * for slow periods. But arguably this is a feature. In addition, we
1277 * ought to take the time used to write the data into account.
1278 *
1279 * During some phases of big transfers (file FOO is uptodate) this is
1280 * called with a small bytes_written every time. As the kernel has to
1281 * round small waits up to guarantee that we actually wait at least the
1282 * requested number of microseconds, this can become grossly inaccurate.
1283 * We therefore keep track of the bytes we've written over time and only
1284 * sleep when the accumulated delay is at least 1 tenth of a second.
1285 **/
1286static void sleep_for_bwlimit(int bytes_written)
1287{
1288 static struct timeval prior_tv;
1289 static long total_written = 0;
1290 struct timeval tv, start_tv;
1291 long elapsed_usec, sleep_usec;
1292
1293#define ONE_SEC 1000000L /* # of microseconds in a second */
1294
1295 if (!bwlimit_writemax)
1296 return;
1297
1298 total_written += bytes_written;
1299
1300 gettimeofday(&start_tv, NULL);
1301 if (prior_tv.tv_sec) {
1302 elapsed_usec = (start_tv.tv_sec - prior_tv.tv_sec) * ONE_SEC
1303 + (start_tv.tv_usec - prior_tv.tv_usec);
1304 total_written -= elapsed_usec * bwlimit / (ONE_SEC/1024);
1305 if (total_written < 0)
1306 total_written = 0;
1307 }
1308
1309 sleep_usec = total_written * (ONE_SEC/1024) / bwlimit;
1310 if (sleep_usec < ONE_SEC / 10) {
1311 prior_tv = start_tv;
1312 return;
1313 }
1314
1315 tv.tv_sec = sleep_usec / ONE_SEC;
1316 tv.tv_usec = sleep_usec % ONE_SEC;
1317 select(0, NULL, NULL, NULL, &tv);
1318
1319 gettimeofday(&prior_tv, NULL);
1320 elapsed_usec = (prior_tv.tv_sec - start_tv.tv_sec) * ONE_SEC
1321 + (prior_tv.tv_usec - start_tv.tv_usec);
1322 total_written = (sleep_usec - elapsed_usec) * bwlimit / (ONE_SEC/1024);
1323}
1324
1325/* Write len bytes to the file descriptor fd, looping as necessary to get
1326 * the job done and also (in certain circumstances) reading any data on
1327 * msg_fd_in to avoid deadlock.
1328 *
1329 * This function underlies the multiplexing system. The body of the
1330 * application never calls this function directly. */
1331static void writefd_unbuffered(int fd, const char *buf, size_t len)
1332{
1333 size_t n, total = 0;
1334 fd_set w_fds, r_fds, e_fds;
1335 int maxfd, count, cnt, using_r_fds;
1336 int defer_inc = 0;
1337 struct timeval tv;
1338
1339 if (no_flush++)
1340 defer_forwarding_messages++, defer_inc++;
1341
1342 while (total < len) {
1343 FD_ZERO(&w_fds);
1344 FD_SET(fd, &w_fds);
1345 FD_ZERO(&e_fds);
1346 FD_SET(fd, &e_fds);
1347 maxfd = fd;
1348
1349 if (msg_fd_in >= 0) {
1350 FD_ZERO(&r_fds);
1351 FD_SET(msg_fd_in, &r_fds);
1352 if (msg_fd_in > maxfd)
1353 maxfd = msg_fd_in;
1354 using_r_fds = 1;
1355 } else
1356 using_r_fds = 0;
1357
1358 tv.tv_sec = select_timeout;
1359 tv.tv_usec = 0;
1360
1361 errno = 0;
1362 count = select(maxfd + 1, using_r_fds ? &r_fds : NULL,
1363 &w_fds, &e_fds, &tv);
1364
1365 if (count <= 0) {
1366 if (count < 0 && errno == EBADF)
1367 exit_cleanup(RERR_SOCKETIO);
1368 check_timeout();
1369 continue;
1370 }
1371
1372 if (FD_ISSET(fd, &e_fds)) {
1373 rsyserr(FINFO, errno,
1374 "select exception on fd %d", fd);
1375 }
1376
1377 if (using_r_fds && FD_ISSET(msg_fd_in, &r_fds))
1378 read_msg_fd();
1379
1380 if (!FD_ISSET(fd, &w_fds))
1381 continue;
1382
1383 n = len - total;
1384 if (bwlimit_writemax && n > bwlimit_writemax)
1385 n = bwlimit_writemax;
1386 cnt = write(fd, buf + total, n);
1387
1388 if (cnt <= 0) {
1389 if (cnt < 0) {
1390 if (errno == EINTR)
1391 continue;
1392 if (errno == EWOULDBLOCK || errno == EAGAIN) {
1393 msleep(1);
1394 continue;
1395 }
1396 }
1397
1398 /* Don't try to write errors back across the stream. */
1399 if (fd == sock_f_out)
1400 io_end_multiplex_out();
1401 /* Don't try to write errors down a failing msg pipe. */
1402 if (am_server && fd == msg_fd_out)
1403 exit_cleanup(RERR_STREAMIO);
1404 rsyserr(FERROR, errno,
1405 "writefd_unbuffered failed to write %ld bytes [%s]",
1406 (long)len, who_am_i());
1407 /* If the other side is sending us error messages, try
1408 * to grab any messages they sent before they died. */
1409 while (fd == sock_f_out && io_multiplexing_in) {
1410 set_io_timeout(30);
1411 ignore_timeout = 0;
1412 readfd_unbuffered(sock_f_in, io_filesfrom_buf,
1413 sizeof io_filesfrom_buf);
1414 }
1415 exit_cleanup(RERR_STREAMIO);
1416 }
1417
1418 total += cnt;
1419 defer_forwarding_messages++, defer_inc++;
1420
1421 if (fd == sock_f_out) {
1422 if (io_timeout || am_generator)
1423 last_io_out = time(NULL);
1424 sleep_for_bwlimit(cnt);
1425 }
1426 }
1427
1428 no_flush--;
1429 if (!(defer_forwarding_messages -= defer_inc))
1430 msg_flush();
1431}
1432
1433void io_flush(int flush_it_all)
1434{
1435 if (!iobuf_out_cnt || no_flush)
1436 return;
1437
1438 if (io_multiplexing_out)
1439 mplex_write(sock_f_out, MSG_DATA, iobuf_out, iobuf_out_cnt, 0);
1440 else
1441 writefd_unbuffered(iobuf_f_out, iobuf_out, iobuf_out_cnt);
1442 iobuf_out_cnt = 0;
1443
1444 if (flush_it_all && !defer_forwarding_messages)
1445 msg_flush();
1446}
1447
1448static void writefd(int fd, const char *buf, size_t len)
1449{
1450 if (fd == sock_f_out)
1451 stats.total_written += len;
1452
1453 if (fd == write_batch_monitor_out) {
1454 if ((size_t)write(batch_fd, buf, len) != len)
1455 exit_cleanup(RERR_FILEIO);
1456 }
1457
1458 if (!iobuf_out || fd != iobuf_f_out) {
1459 writefd_unbuffered(fd, buf, len);
1460 return;
1461 }
1462
1463 while (len) {
1464 int n = MIN((int)len, IO_BUFFER_SIZE - iobuf_out_cnt);
1465 if (n > 0) {
1466 memcpy(iobuf_out+iobuf_out_cnt, buf, n);
1467 buf += n;
1468 len -= n;
1469 iobuf_out_cnt += n;
1470 }
1471
1472 if (iobuf_out_cnt == IO_BUFFER_SIZE)
1473 io_flush(NORMAL_FLUSH);
1474 }
1475}
1476
1477void write_shortint(int f, unsigned short x)
1478{
1479 char b[2];
1480 b[0] = (char)x;
1481 b[1] = (char)(x >> 8);
1482 writefd(f, b, 2);
1483}
1484
1485void write_int(int f, int32 x)
1486{
1487 char b[4];
1488 SIVAL(b, 0, x);
1489 writefd(f, b, 4);
1490}
1491
1492void write_varint(int f, int32 x)
1493{
1494 char b[5];
1495 uchar bit;
1496 int cnt = 4;
1497
1498 SIVAL(b, 1, x);
1499
1500 while (cnt > 1 && b[cnt] == 0)
1501 cnt--;
1502 bit = ((uchar)1<<(7-cnt+1));
1503 if (CVAL(b, cnt) >= bit) {
1504 cnt++;
1505 *b = ~(bit-1);
1506 } else if (cnt > 1)
1507 *b = b[cnt] | ~(bit*2-1);
1508 else
1509 *b = b[cnt];
1510
1511 writefd(f, b, cnt);
1512}
1513
1514void write_varlong(int f, int64 x, uchar min_bytes)
1515{
1516 char b[9];
1517 uchar bit;
1518 int cnt = 8;
1519
1520 SIVAL(b, 1, x);
1521#if SIZEOF_INT64 >= 8
1522 SIVAL(b, 5, x >> 32);
1523#else
1524 if (x <= 0x7FFFFFFF && x >= 0)
1525 memset(b + 5, 0, 4);
1526 else {
1527 rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
1528 exit_cleanup(RERR_UNSUPPORTED);
1529 }
1530#endif
1531
1532 while (cnt > min_bytes && b[cnt] == 0)
1533 cnt--;
1534 bit = ((uchar)1<<(7-cnt+min_bytes));
1535 if (CVAL(b, cnt) >= bit) {
1536 cnt++;
1537 *b = ~(bit-1);
1538 } else if (cnt > min_bytes)
1539 *b = b[cnt] | ~(bit*2-1);
1540 else
1541 *b = b[cnt];
1542
1543 writefd(f, b, cnt);
1544}
1545
1546/*
1547 * Note: int64 may actually be a 32-bit type if ./configure couldn't find any
1548 * 64-bit types on this platform.
1549 */
1550void write_longint(int f, int64 x)
1551{
1552 char b[12], * const s = b+4;
1553
1554 SIVAL(s, 0, x);
1555 if (x <= 0x7FFFFFFF && x >= 0) {
1556 writefd(f, s, 4);
1557 return;
1558 }
1559
1560#if SIZEOF_INT64 < 8
1561 rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
1562 exit_cleanup(RERR_UNSUPPORTED);
1563#else
1564 memset(b, 0xFF, 4);
1565 SIVAL(s, 4, x >> 32);
1566 writefd(f, b, 12);
1567#endif
1568}
1569
1570void write_buf(int f, const char *buf, size_t len)
1571{
1572 writefd(f,buf,len);
1573}
1574
1575/** Write a string to the connection */
1576void write_sbuf(int f, const char *buf)
1577{
1578 writefd(f, buf, strlen(buf));
1579}
1580
1581void write_byte(int f, uchar c)
1582{
1583 writefd(f, (char *)&c, 1);
1584}
1585
1586void write_vstring(int f, const char *str, int len)
1587{
1588 uchar lenbuf[3], *lb = lenbuf;
1589
1590 if (len > 0x7F) {
1591 if (len > 0x7FFF) {
1592 rprintf(FERROR,
1593 "attempting to send over-long vstring (%d > %d)\n",
1594 len, 0x7FFF);
1595 exit_cleanup(RERR_PROTOCOL);
1596 }
1597 *lb++ = len / 0x100 + 0x80;
1598 }
1599 *lb = len;
1600
1601 writefd(f, (char*)lenbuf, lb - lenbuf + 1);
1602 if (len)
1603 writefd(f, str, len);
1604}
1605
1606/* Send a file-list index using a byte-reduction method. */
1607void write_ndx(int f, int32 ndx)
1608{
1609 static int32 prev_positive = -1, prev_negative = 1;
1610 int32 diff, cnt = 0;
1611 char b[6];
1612
1613 if (protocol_version < 30 || read_batch) {
1614 write_int(f, ndx);
1615 return;
1616 }
1617
1618 /* Send NDX_DONE as a single-byte 0 with no side effects. Send
1619 * negative nums as a positive after sending a leading 0xFF. */
1620 if (ndx >= 0) {
1621 diff = ndx - prev_positive;
1622 prev_positive = ndx;
1623 } else if (ndx == NDX_DONE) {
1624 *b = 0;
1625 writefd(f, b, 1);
1626 return;
1627 } else {
1628 b[cnt++] = (char)0xFF;
1629 ndx = -ndx;
1630 diff = ndx - prev_negative;
1631 prev_negative = ndx;
1632 }
1633
1634 /* A diff of 1 - 253 is sent as a one-byte diff; a diff of 254 - 32767
1635 * or 0 is sent as a 0xFE + a two-byte diff; otherwise we send 0xFE
1636 * & all 4 bytes of the (non-negative) num with the high-bit set. */
1637 if (diff < 0xFE && diff > 0)
1638 b[cnt++] = (char)diff;
1639 else if (diff < 0 || diff > 0x7FFF) {
1640 b[cnt++] = (char)0xFE;
1641 b[cnt++] = (char)((ndx >> 24) | 0x80);
1642 b[cnt++] = (char)ndx;
1643 b[cnt++] = (char)(ndx >> 8);
1644 b[cnt++] = (char)(ndx >> 16);
1645 } else {
1646 b[cnt++] = (char)0xFE;
1647 b[cnt++] = (char)(diff >> 8);
1648 b[cnt++] = (char)diff;
1649 }
1650 writefd(f, b, cnt);
1651}
1652
1653/* Receive a file-list index using a byte-reduction method. */
1654int32 read_ndx(int f)
1655{
1656 static int32 prev_positive = -1, prev_negative = 1;
1657 int32 *prev_ptr, num;
1658 char b[4];
1659
1660 if (protocol_version < 30)
1661 return read_int(f);
1662
1663 readfd(f, b, 1);
1664 if (CVAL(b, 0) == 0xFF) {
1665 readfd(f, b, 1);
1666 prev_ptr = &prev_negative;
1667 } else if (CVAL(b, 0) == 0)
1668 return NDX_DONE;
1669 else
1670 prev_ptr = &prev_positive;
1671 if (CVAL(b, 0) == 0xFE) {
1672 readfd(f, b, 2);
1673 if (CVAL(b, 0) & 0x80) {
1674 b[3] = CVAL(b, 0) & ~0x80;
1675 b[0] = b[1];
1676 readfd(f, b+1, 2);
1677 num = IVAL(b, 0);
1678 } else
1679 num = (UVAL(b,0)<<8) + UVAL(b,1) + *prev_ptr;
1680 } else
1681 num = UVAL(b, 0) + *prev_ptr;
1682 *prev_ptr = num;
1683 if (prev_ptr == &prev_negative)
1684 num = -num;
1685 return num;
1686}
1687
1688/**
1689 * Read a line of up to @p maxlen characters into @p buf (not counting
1690 * the trailing null). Strips the (required) trailing newline and all
1691 * carriage returns.
1692 *
1693 * @return 1 for success; 0 for I/O error or truncation.
1694 **/
1695int read_line(int f, char *buf, size_t maxlen)
1696{
1697 while (maxlen) {
1698 buf[0] = 0;
1699 read_buf(f, buf, 1);
1700 if (buf[0] == 0)
1701 return 0;
1702 if (buf[0] == '\n')
1703 break;
1704 if (buf[0] != '\r') {
1705 buf++;
1706 maxlen--;
1707 }
1708 }
1709 *buf = '\0';
1710 return maxlen > 0;
1711}
1712
1713void io_printf(int fd, const char *format, ...)
1714{
1715 va_list ap;
1716 char buf[BIGPATHBUFLEN];
1717 int len;
1718
1719 va_start(ap, format);
1720 len = vsnprintf(buf, sizeof buf, format, ap);
1721 va_end(ap);
1722
1723 if (len < 0)
1724 exit_cleanup(RERR_STREAMIO);
1725
1726 if (len > (int)sizeof buf) {
1727 rprintf(FERROR, "io_printf() was too long for the buffer.\n");
1728 exit_cleanup(RERR_STREAMIO);
1729 }
1730
1731 write_sbuf(fd, buf);
1732}
1733
1734/** Setup for multiplexing a MSG_* stream with the data stream. */
1735void io_start_multiplex_out(void)
1736{
1737 io_flush(NORMAL_FLUSH);
1738 io_start_buffering_out(sock_f_out);
1739 io_multiplexing_out = 1;
1740}
1741
1742/** Setup for multiplexing a MSG_* stream with the data stream. */
1743void io_start_multiplex_in(void)
1744{
1745 io_flush(NORMAL_FLUSH);
1746 io_start_buffering_in(sock_f_in);
1747 io_multiplexing_in = 1;
1748}
1749
1750/** Write an message to the multiplexed data stream. */
1751int io_multiplex_write(enum msgcode code, const char *buf, size_t len, int convert)
1752{
1753 if (!io_multiplexing_out)
1754 return 0;
1755 io_flush(NORMAL_FLUSH);
1756 stats.total_written += (len+4);
1757 mplex_write(sock_f_out, code, buf, len, convert);
1758 return 1;
1759}
1760
1761void io_end_multiplex_in(void)
1762{
1763 io_multiplexing_in = 0;
1764 io_end_buffering_in();
1765}
1766
1767/** Stop output multiplexing. */
1768void io_end_multiplex_out(void)
1769{
1770 io_multiplexing_out = 0;
1771 io_end_buffering_out();
1772}
1773
1774void start_write_batch(int fd)
1775{
1776 /* Some communication has already taken place, but we don't
1777 * enable batch writing until here so that we can write a
1778 * canonical record of the communication even though the
1779 * actual communication so far depends on whether a daemon
1780 * is involved. */
1781 write_int(batch_fd, protocol_version);
1782 write_int(batch_fd, checksum_seed);
1783
1784 if (am_sender)
1785 write_batch_monitor_out = fd;
1786 else
1787 write_batch_monitor_in = fd;
1788}
1789
1790void stop_write_batch(void)
1791{
1792 write_batch_monitor_out = -1;
1793 write_batch_monitor_in = -1;
1794}