Mention changes to the patches dir.
[rsync/rsync.git] / io.c
... / ...
CommitLineData
1/* -*- c-file-style: "linux" -*-
2 *
3 * Copyright (C) 1996-2001 by Andrew Tridgell
4 * Copyright (C) Paul Mackerras 1996
5 * Copyright (C) 2001, 2002 by Martin Pool <mbp@samba.org>
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 */
21
22/**
23 * @file io.c
24 *
25 * Socket and pipe I/O utilities used in rsync.
26 *
27 * rsync provides its own multiplexing system, which is used to send
28 * stderr and stdout over a single socket. We need this because
29 * stdout normally carries the binary data stream, and stderr all our
30 * error messages.
31 *
32 * For historical reasons this is off during the start of the
33 * connection, but it's switched on quite early using
34 * io_start_multiplex_out() and io_start_multiplex_in().
35 **/
36
37#include "rsync.h"
38
39/** If no timeout is specified then use a 60 second select timeout */
40#define SELECT_TIMEOUT 60
41
42static int io_multiplexing_out;
43static int io_multiplexing_in;
44static int multiplex_in_fd = -1;
45static int multiplex_out_fd = -1;
46static time_t last_io;
47static int no_flush;
48
49extern int bwlimit;
50extern size_t bwlimit_writemax;
51extern int verbose;
52extern int io_timeout;
53extern int am_server;
54extern int am_daemon;
55extern int am_sender;
56extern struct stats stats;
57
58
59const char phase_unknown[] = "unknown";
60
61/**
62 * The connection might be dropped at some point; perhaps because the
63 * remote instance crashed. Just giving the offset on the stream is
64 * not very helpful. So instead we try to make io_phase_name point to
65 * something useful.
66 *
67 * For buffered/multiplexed I/O these names will be somewhat
68 * approximate; perhaps for ease of support we would rather make the
69 * buffer always flush when a single application-level I/O finishes.
70 *
71 * @todo Perhaps we want some simple stack functionality, but there's
72 * no need to overdo it.
73 **/
74const char *io_write_phase = phase_unknown;
75const char *io_read_phase = phase_unknown;
76
77/** Ignore EOF errors while reading a module listing if the remote
78 version is 24 or less. */
79int kludge_around_eof = False;
80
81int msg_fd_in = -1;
82int msg_fd_out = -1;
83
84static int io_filesfrom_f_in = -1;
85static int io_filesfrom_f_out = -1;
86static char io_filesfrom_buf[2048];
87static char *io_filesfrom_bp;
88static char io_filesfrom_lastchar;
89static int io_filesfrom_buflen;
90
91static void read_loop(int fd, char *buf, size_t len);
92
93struct redo_list {
94 struct redo_list *next;
95 int num;
96};
97
98static struct redo_list *redo_list_head;
99static struct redo_list *redo_list_tail;
100
101struct msg_list {
102 struct msg_list *next;
103 char *buf;
104 int len;
105};
106
107static struct msg_list *msg_list_head;
108static struct msg_list *msg_list_tail;
109
110static void redo_list_add(int num)
111{
112 struct redo_list *rl;
113
114 if (!(rl = new(struct redo_list)))
115 exit_cleanup(RERR_MALLOC);
116 rl->next = NULL;
117 rl->num = num;
118 if (redo_list_tail)
119 redo_list_tail->next = rl;
120 else
121 redo_list_head = rl;
122 redo_list_tail = rl;
123}
124
125static void check_timeout(void)
126{
127 time_t t;
128
129 if (!io_timeout)
130 return;
131
132 if (!last_io) {
133 last_io = time(NULL);
134 return;
135 }
136
137 t = time(NULL);
138
139 if (last_io && io_timeout && (t-last_io) >= io_timeout) {
140 if (!am_server && !am_daemon) {
141 rprintf(FERROR, "io timeout after %d seconds - exiting\n",
142 (int)(t-last_io));
143 }
144 exit_cleanup(RERR_TIMEOUT);
145 }
146}
147
148/** Setup the fd used to receive MSG_* messages. Only needed when
149 * we're the generator because the sender and receiver both use the
150 * multiplexed I/O setup. */
151void set_msg_fd_in(int fd)
152{
153 msg_fd_in = fd;
154}
155
156/** Setup the fd used to send our MSG_* messages. Only needed when
157 * we're the receiver because the generator and the sender both use
158 * the multiplexed I/O setup. */
159void set_msg_fd_out(int fd)
160{
161 msg_fd_out = fd;
162 set_nonblocking(msg_fd_out);
163}
164
165/* Add a message to the pending MSG_* list. */
166static void msg_list_add(int code, char *buf, int len)
167{
168 struct msg_list *ml;
169
170 if (!(ml = new(struct msg_list)))
171 exit_cleanup(RERR_MALLOC);
172 ml->next = NULL;
173 if (!(ml->buf = new_array(char, len+4)))
174 exit_cleanup(RERR_MALLOC);
175 SIVAL(ml->buf, 0, ((code+MPLEX_BASE)<<24) | len);
176 memcpy(ml->buf+4, buf, len);
177 ml->len = len+4;
178 if (msg_list_tail)
179 msg_list_tail->next = ml;
180 else
181 msg_list_head = ml;
182 msg_list_tail = ml;
183}
184
185void send_msg(enum msgcode code, char *buf, int len)
186{
187 msg_list_add(code, buf, len);
188 msg_list_push(NORMAL_FLUSH);
189}
190
191/** Read a message from the MSG_* fd and dispatch it. This is only
192 * called by the generator. */
193static void read_msg_fd(void)
194{
195 char buf[200];
196 size_t n;
197 int fd = msg_fd_in;
198 int tag, len;
199
200 /* Temporarily disable msg_fd_in. This is needed because we
201 * may call a write routine that could try to call us back. */
202 msg_fd_in = -1;
203
204 read_loop(fd, buf, 4);
205 tag = IVAL(buf, 0);
206
207 len = tag & 0xFFFFFF;
208 tag = (tag >> 24) - MPLEX_BASE;
209
210 switch (tag) {
211 case MSG_DONE:
212 if (len != 0) {
213 rprintf(FERROR, "invalid message %d:%d\n", tag, len);
214 exit_cleanup(RERR_STREAMIO);
215 }
216 redo_list_add(-1);
217 break;
218 case MSG_REDO:
219 if (len != 4) {
220 rprintf(FERROR, "invalid message %d:%d\n", tag, len);
221 exit_cleanup(RERR_STREAMIO);
222 }
223 read_loop(fd, buf, 4);
224 redo_list_add(IVAL(buf,0));
225 break;
226 case MSG_INFO:
227 case MSG_ERROR:
228 case MSG_LOG:
229 while (len) {
230 n = len;
231 if (n >= sizeof buf)
232 n = sizeof buf - 1;
233 read_loop(fd, buf, n);
234 rwrite((enum logcode)tag, buf, n);
235 len -= n;
236 }
237 break;
238 default:
239 rprintf(FERROR, "unknown message %d:%d\n", tag, len);
240 exit_cleanup(RERR_STREAMIO);
241 }
242
243 msg_fd_in = fd;
244}
245
246/* Try to push messages off the list onto the wire. If we leave with more
247 * to do, return 0. On error, return -1. If everything flushed, return 1.
248 * This is only called by the receiver. */
249int msg_list_push(int flush_it_all)
250{
251 static int written = 0;
252 struct timeval tv;
253 fd_set fds;
254
255 if (msg_fd_out < 0)
256 return -1;
257
258 while (msg_list_head) {
259 struct msg_list *ml = msg_list_head;
260 int n = write(msg_fd_out, ml->buf + written, ml->len - written);
261 if (n < 0) {
262 if (errno == EINTR)
263 continue;
264 if (errno != EWOULDBLOCK && errno != EAGAIN)
265 return -1;
266 if (!flush_it_all)
267 return 0;
268 FD_ZERO(&fds);
269 FD_SET(msg_fd_out, &fds);
270 tv.tv_sec = io_timeout ? io_timeout : SELECT_TIMEOUT;
271 tv.tv_usec = 0;
272 if (!select(msg_fd_out+1, NULL, &fds, NULL, &tv))
273 check_timeout();
274 } else if ((written += n) == ml->len) {
275 free(ml->buf);
276 msg_list_head = ml->next;
277 if (!msg_list_head)
278 msg_list_tail = NULL;
279 free(ml);
280 written = 0;
281 }
282 }
283 return 1;
284}
285
286int get_redo_num(void)
287{
288 struct redo_list *next;
289 int num;
290
291 while (!redo_list_head)
292 read_msg_fd();
293
294 num = redo_list_head->num;
295 next = redo_list_head->next;
296 free(redo_list_head);
297 redo_list_head = next;
298 if (!next)
299 redo_list_tail = NULL;
300
301 return num;
302}
303
304/**
305 * When we're the receiver and we have a local --files-from list of names
306 * that needs to be sent over the socket to the sender, we have to do two
307 * things at the same time: send the sender a list of what files we're
308 * processing and read the incoming file+info list from the sender. We do
309 * this by augmenting the read_timeout() function to copy this data. It
310 * uses the io_filesfrom_buf to read a block of data from f_in (when it is
311 * ready, since it might be a pipe) and then blast it out f_out (when it
312 * is ready to receive more data).
313 */
314void io_set_filesfrom_fds(int f_in, int f_out)
315{
316 io_filesfrom_f_in = f_in;
317 io_filesfrom_f_out = f_out;
318 io_filesfrom_bp = io_filesfrom_buf;
319 io_filesfrom_lastchar = '\0';
320 io_filesfrom_buflen = 0;
321}
322
323/**
324 * It's almost always an error to get an EOF when we're trying to read
325 * from the network, because the protocol is self-terminating.
326 *
327 * However, there is one unfortunate cases where it is not, which is
328 * rsync <2.4.6 sending a list of modules on a server, since the list
329 * is terminated by closing the socket. So, for the section of the
330 * program where that is a problem (start_socket_client),
331 * kludge_around_eof is True and we just exit.
332 */
333static void whine_about_eof(void)
334{
335 if (kludge_around_eof)
336 exit_cleanup(0);
337 else {
338 rprintf(FERROR, RSYNC_NAME ": connection unexpectedly closed "
339 "(%.0f bytes read so far)\n",
340 (double)stats.total_read);
341
342 exit_cleanup(RERR_STREAMIO);
343 }
344}
345
346
347static void die_from_readerr(int err)
348{
349 /* this prevents us trying to write errors on a dead socket */
350 io_multiplexing_close();
351
352 rsyserr(FERROR, err, "read error");
353 exit_cleanup(RERR_STREAMIO);
354}
355
356
357/**
358 * Read from a socket with I/O timeout. return the number of bytes
359 * read. If no bytes can be read then exit, never return a number <= 0.
360 *
361 * TODO: If the remote shell connection fails, then current versions
362 * actually report an "unexpected EOF" error here. Since it's a
363 * fairly common mistake to try to use rsh when ssh is required, we
364 * should trap that: if we fail to read any data at all, we should
365 * give a better explanation. We can tell whether the connection has
366 * started by looking e.g. at whether the remote version is known yet.
367 */
368static int read_timeout(int fd, char *buf, size_t len)
369{
370 int n, ret = 0;
371
372 io_flush(NORMAL_FLUSH);
373
374 while (ret == 0) {
375 /* until we manage to read *something* */
376 fd_set r_fds, w_fds;
377 struct timeval tv;
378 int fd_count = fd+1;
379 int count;
380
381 FD_ZERO(&r_fds);
382 FD_SET(fd, &r_fds);
383 if (msg_fd_in >= 0) {
384 FD_SET(msg_fd_in, &r_fds);
385 if (msg_fd_in >= fd_count)
386 fd_count = msg_fd_in+1;
387 }
388 if (io_filesfrom_f_out >= 0) {
389 int new_fd;
390 if (io_filesfrom_buflen == 0) {
391 if (io_filesfrom_f_in >= 0) {
392 FD_SET(io_filesfrom_f_in, &r_fds);
393 new_fd = io_filesfrom_f_in;
394 } else {
395 io_filesfrom_f_out = -1;
396 new_fd = -1;
397 }
398 } else {
399 FD_ZERO(&w_fds);
400 FD_SET(io_filesfrom_f_out, &w_fds);
401 new_fd = io_filesfrom_f_out;
402 }
403 if (new_fd >= fd_count)
404 fd_count = new_fd+1;
405 }
406
407 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
408 tv.tv_usec = 0;
409
410 errno = 0;
411
412 count = select(fd_count, &r_fds,
413 io_filesfrom_buflen? &w_fds : NULL,
414 NULL, &tv);
415
416 if (count == 0) {
417 msg_list_push(NORMAL_FLUSH);
418 check_timeout();
419 }
420
421 if (count <= 0) {
422 if (errno == EBADF) {
423 exit_cleanup(RERR_SOCKETIO);
424 }
425 continue;
426 }
427
428 if (msg_fd_in >= 0 && FD_ISSET(msg_fd_in, &r_fds))
429 read_msg_fd();
430
431 if (io_filesfrom_f_out >= 0) {
432 if (io_filesfrom_buflen) {
433 if (FD_ISSET(io_filesfrom_f_out, &w_fds)) {
434 int l = write(io_filesfrom_f_out,
435 io_filesfrom_bp,
436 io_filesfrom_buflen);
437 if (l > 0) {
438 if (!(io_filesfrom_buflen -= l))
439 io_filesfrom_bp = io_filesfrom_buf;
440 else
441 io_filesfrom_bp += l;
442 } else {
443 /* XXX should we complain? */
444 io_filesfrom_f_out = -1;
445 }
446 }
447 } else if (io_filesfrom_f_in >= 0) {
448 if (FD_ISSET(io_filesfrom_f_in, &r_fds)) {
449 int l = read(io_filesfrom_f_in,
450 io_filesfrom_buf,
451 sizeof io_filesfrom_buf);
452 if (l <= 0) {
453 /* Send end-of-file marker */
454 io_filesfrom_buf[0] = '\0';
455 io_filesfrom_buf[1] = '\0';
456 io_filesfrom_buflen = io_filesfrom_lastchar? 2 : 1;
457 io_filesfrom_f_in = -1;
458 } else {
459 extern int eol_nulls;
460 if (!eol_nulls) {
461 char *s = io_filesfrom_buf + l;
462 /* Transform CR and/or LF into '\0' */
463 while (s-- > io_filesfrom_buf) {
464 if (*s == '\n' || *s == '\r')
465 *s = '\0';
466 }
467 }
468 if (!io_filesfrom_lastchar) {
469 /* Last buf ended with a '\0', so don't
470 * let this buf start with one. */
471 while (l && !*io_filesfrom_bp)
472 io_filesfrom_bp++, l--;
473 }
474 if (!l)
475 io_filesfrom_bp = io_filesfrom_buf;
476 else {
477 char *f = io_filesfrom_bp;
478 char *t = f;
479 char *eob = f + l;
480 /* Eliminate any multi-'\0' runs. */
481 while (f != eob) {
482 if (!(*t++ = *f++)) {
483 while (f != eob && !*f)
484 f++, l--;
485 }
486 }
487 io_filesfrom_lastchar = f[-1];
488 }
489 io_filesfrom_buflen = l;
490 }
491 }
492 }
493 }
494
495 if (!FD_ISSET(fd, &r_fds)) continue;
496
497 n = read(fd, buf, len);
498
499 if (n > 0) {
500 buf += n;
501 len -= n;
502 ret += n;
503 if (io_timeout)
504 last_io = time(NULL);
505 continue;
506 } else if (n == 0) {
507 whine_about_eof();
508 return -1; /* doesn't return */
509 } else if (n < 0) {
510 if (errno == EINTR || errno == EWOULDBLOCK
511 || errno == EAGAIN)
512 continue;
513 die_from_readerr(errno);
514 }
515 }
516
517 return ret;
518}
519
520/**
521 * Read a line into the "fname" buffer (which must be at least MAXPATHLEN
522 * characters long).
523 */
524int read_filesfrom_line(int fd, char *fname)
525{
526 char ch, *s, *eob = fname + MAXPATHLEN - 1;
527 int cnt;
528 extern int io_timeout;
529 extern int eol_nulls;
530 extern char *remote_filesfrom_file;
531 int reading_remotely = remote_filesfrom_file != NULL;
532 int nulls = eol_nulls || reading_remotely;
533
534 start:
535 s = fname;
536 while (1) {
537 cnt = read(fd, &ch, 1);
538 if (cnt < 0 && (errno == EWOULDBLOCK
539 || errno == EINTR || errno == EAGAIN)) {
540 struct timeval tv;
541 fd_set fds;
542 FD_ZERO(&fds);
543 FD_SET(fd, &fds);
544 tv.tv_sec = io_timeout? io_timeout : SELECT_TIMEOUT;
545 tv.tv_usec = 0;
546 if (!select(fd+1, &fds, NULL, NULL, &tv))
547 check_timeout();
548 continue;
549 }
550 if (cnt != 1)
551 break;
552 if (nulls? !ch : (ch == '\r' || ch == '\n')) {
553 /* Skip empty lines if reading locally. */
554 if (!reading_remotely && s == fname)
555 continue;
556 break;
557 }
558 if (s < eob)
559 *s++ = ch;
560 }
561 *s = '\0';
562
563 /* Dump comments. */
564 if (*fname == '#' || *fname == ';')
565 goto start;
566
567 return s - fname;
568}
569
570
571/**
572 * Continue trying to read len bytes - don't return until len has been
573 * read.
574 **/
575static void read_loop(int fd, char *buf, size_t len)
576{
577 while (len) {
578 int n = read_timeout(fd, buf, len);
579
580 buf += n;
581 len -= n;
582 }
583}
584
585
586/**
587 * Read from the file descriptor handling multiplexing - return number
588 * of bytes read.
589 *
590 * Never returns <= 0.
591 */
592static int read_unbuffered(int fd, char *buf, size_t len)
593{
594 static size_t remaining;
595 int tag, ret = 0;
596 char line[1024];
597 static char *buffer;
598 static size_t bufferIdx = 0;
599 static size_t bufferSz;
600
601 if (fd != multiplex_in_fd)
602 return read_timeout(fd, buf, len);
603
604 if (!io_multiplexing_in && remaining == 0) {
605 if (!buffer) {
606 bufferSz = 2 * IO_BUFFER_SIZE;
607 buffer = new_array(char, bufferSz);
608 if (!buffer) out_of_memory("read_unbuffered");
609 }
610 remaining = read_timeout(fd, buffer, bufferSz);
611 bufferIdx = 0;
612 }
613
614 while (ret == 0) {
615 if (remaining) {
616 len = MIN(len, remaining);
617 memcpy(buf, buffer + bufferIdx, len);
618 bufferIdx += len;
619 remaining -= len;
620 ret = len;
621 break;
622 }
623
624 read_loop(fd, line, 4);
625 tag = IVAL(line, 0);
626
627 remaining = tag & 0xFFFFFF;
628 tag = (tag >> 24) - MPLEX_BASE;
629
630 switch (tag) {
631 case MSG_DATA:
632 if (!buffer || remaining > bufferSz) {
633 buffer = realloc_array(buffer, char, remaining);
634 if (!buffer) out_of_memory("read_unbuffered");
635 bufferSz = remaining;
636 }
637 read_loop(fd, buffer, remaining);
638 bufferIdx = 0;
639 break;
640 case MSG_INFO:
641 case MSG_ERROR:
642 if (remaining >= sizeof line) {
643 rprintf(FERROR, "multiplexing overflow %d:%ld\n\n",
644 tag, (long)remaining);
645 exit_cleanup(RERR_STREAMIO);
646 }
647 read_loop(fd, line, remaining);
648 rwrite((enum logcode)tag, line, remaining);
649 remaining = 0;
650 break;
651 default:
652 rprintf(FERROR, "unexpected tag %d\n", tag);
653 exit_cleanup(RERR_STREAMIO);
654 }
655 }
656
657 if (remaining == 0)
658 io_flush(NORMAL_FLUSH);
659
660 return ret;
661}
662
663
664
665/**
666 * Do a buffered read from @p fd. Don't return until all @p n bytes
667 * have been read. If all @p n can't be read then exit with an
668 * error.
669 **/
670static void readfd(int fd, char *buffer, size_t N)
671{
672 int ret;
673 size_t total = 0;
674
675 while (total < N) {
676 ret = read_unbuffered(fd, buffer + total, N-total);
677 total += ret;
678 }
679
680 stats.total_read += total;
681}
682
683
684int32 read_int(int f)
685{
686 char b[4];
687 int32 ret;
688
689 readfd(f,b,4);
690 ret = IVAL(b,0);
691 if (ret == (int32)0xffffffff) return -1;
692 return ret;
693}
694
695int64 read_longint(int f)
696{
697 int64 ret;
698 char b[8];
699 ret = read_int(f);
700
701 if ((int32)ret != (int32)0xffffffff) {
702 return ret;
703 }
704
705#ifdef NO_INT64
706 rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
707 exit_cleanup(RERR_UNSUPPORTED);
708#else
709 readfd(f,b,8);
710 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
711#endif
712
713 return ret;
714}
715
716void read_buf(int f,char *buf,size_t len)
717{
718 readfd(f,buf,len);
719}
720
721void read_sbuf(int f,char *buf,size_t len)
722{
723 read_buf(f,buf,len);
724 buf[len] = 0;
725}
726
727unsigned char read_byte(int f)
728{
729 unsigned char c;
730 read_buf(f, (char *)&c, 1);
731 return c;
732}
733
734
735/**
736 * Sleep after writing to limit I/O bandwidth usage.
737 *
738 * @todo Rather than sleeping after each write, it might be better to
739 * use some kind of averaging. The current algorithm seems to always
740 * use a bit less bandwidth than specified, because it doesn't make up
741 * for slow periods. But arguably this is a feature. In addition, we
742 * ought to take the time used to write the data into account.
743 *
744 * During some phases of big transfers (file FOO is uptodate) this is
745 * called with a small bytes_written every time. As the kernel has to
746 * round small waits up to guarantee that we actually wait at least the
747 * requested number of microseconds, this can become grossly inaccurate.
748 * We therefore keep track of the bytes we've written over time and only
749 * sleep when the accumulated delay is at least 1 tenth of a second.
750 **/
751static void sleep_for_bwlimit(int bytes_written)
752{
753 static struct timeval prior_tv;
754 static long total_written = 0;
755 struct timeval tv, start_tv;
756 long elapsed_usec, sleep_usec;
757
758#define ONE_SEC 1000000L /* # of microseconds in a second */
759
760 if (!bwlimit)
761 return;
762
763 total_written += bytes_written;
764
765 gettimeofday(&start_tv, NULL);
766 if (prior_tv.tv_sec) {
767 elapsed_usec = (start_tv.tv_sec - prior_tv.tv_sec) * ONE_SEC
768 + (start_tv.tv_usec - prior_tv.tv_usec);
769 total_written -= elapsed_usec * bwlimit / (ONE_SEC/1024);
770 if (total_written < 0)
771 total_written = 0;
772 }
773
774 sleep_usec = total_written * (ONE_SEC/1024) / bwlimit;
775 if (sleep_usec < ONE_SEC / 10) {
776 prior_tv = start_tv;
777 return;
778 }
779
780 tv.tv_sec = sleep_usec / ONE_SEC;
781 tv.tv_usec = sleep_usec % ONE_SEC;
782 select(0, NULL, NULL, NULL, &tv);
783
784 gettimeofday(&prior_tv, NULL);
785 elapsed_usec = (prior_tv.tv_sec - start_tv.tv_sec) * ONE_SEC
786 + (prior_tv.tv_usec - start_tv.tv_usec);
787 total_written = (sleep_usec - elapsed_usec) * bwlimit / (ONE_SEC/1024);
788}
789
790
791/**
792 * Write len bytes to the file descriptor @p fd.
793 *
794 * This function underlies the multiplexing system. The body of the
795 * application never calls this function directly.
796 **/
797static void writefd_unbuffered(int fd,char *buf,size_t len)
798{
799 size_t total = 0;
800 fd_set w_fds, r_fds;
801 int fd_count, count;
802 struct timeval tv;
803
804 msg_list_push(NORMAL_FLUSH);
805
806 no_flush++;
807
808 while (total < len) {
809 FD_ZERO(&w_fds);
810 FD_SET(fd,&w_fds);
811 fd_count = fd;
812
813 if (msg_fd_in >= 0) {
814 FD_ZERO(&r_fds);
815 FD_SET(msg_fd_in,&r_fds);
816 if (msg_fd_in > fd_count)
817 fd_count = msg_fd_in;
818 }
819
820 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
821 tv.tv_usec = 0;
822
823 errno = 0;
824 count = select(fd_count+1, msg_fd_in >= 0 ? &r_fds : NULL,
825 &w_fds, NULL, &tv);
826
827 if (count == 0) {
828 msg_list_push(NORMAL_FLUSH);
829 check_timeout();
830 }
831
832 if (count <= 0) {
833 if (errno == EBADF) {
834 exit_cleanup(RERR_SOCKETIO);
835 }
836 continue;
837 }
838
839 if (msg_fd_in >= 0 && FD_ISSET(msg_fd_in, &r_fds))
840 read_msg_fd();
841
842 if (FD_ISSET(fd, &w_fds)) {
843 int ret;
844 size_t n = len-total;
845 if (bwlimit && n > bwlimit_writemax)
846 n = bwlimit_writemax;
847 ret = write(fd,buf+total,n);
848
849 if (ret < 0) {
850 if (errno == EINTR)
851 continue;
852 if (errno == EWOULDBLOCK || errno == EAGAIN) {
853 msleep(1);
854 continue;
855 }
856 }
857
858 if (ret <= 0) {
859 /* Don't try to write errors back
860 * across the stream */
861 io_multiplexing_close();
862 rsyserr(FERROR, errno,
863 "writefd_unbuffered failed to write %ld bytes: phase \"%s\"",
864 (long) len, io_write_phase);
865 exit_cleanup(RERR_STREAMIO);
866 }
867
868 sleep_for_bwlimit(ret);
869
870 total += ret;
871
872 if (io_timeout)
873 last_io = time(NULL);
874 }
875 }
876
877 no_flush--;
878}
879
880
881static char *io_buffer;
882static int io_buffer_count;
883
884void io_start_buffering_out(int fd)
885{
886 if (io_buffer) return;
887 multiplex_out_fd = fd;
888 io_buffer = new_array(char, IO_BUFFER_SIZE);
889 if (!io_buffer) out_of_memory("writefd");
890 io_buffer_count = 0;
891}
892
893void io_start_buffering_in(int fd)
894{
895 multiplex_in_fd = fd;
896}
897
898/**
899 * Write an message to a multiplexed stream. If this fails then rsync
900 * exits.
901 **/
902static void mplex_write(int fd, enum msgcode code, char *buf, size_t len)
903{
904 char buffer[4096];
905 size_t n = len;
906
907 SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
908
909 if (n > (sizeof buffer - 4)) {
910 n = sizeof buffer - 4;
911 }
912
913 memcpy(&buffer[4], buf, n);
914 writefd_unbuffered(fd, buffer, n+4);
915
916 len -= n;
917 buf += n;
918
919 if (len) {
920 writefd_unbuffered(fd, buf, len);
921 }
922}
923
924
925void io_flush(int flush_it_all)
926{
927 int fd = multiplex_out_fd;
928
929 msg_list_push(flush_it_all);
930
931 if (!io_buffer_count || no_flush)
932 return;
933
934 if (io_multiplexing_out)
935 mplex_write(fd, MSG_DATA, io_buffer, io_buffer_count);
936 else
937 writefd_unbuffered(fd, io_buffer, io_buffer_count);
938 io_buffer_count = 0;
939}
940
941
942void io_end_buffering(void)
943{
944 io_flush(NORMAL_FLUSH);
945 if (!io_multiplexing_out) {
946 free(io_buffer);
947 io_buffer = NULL;
948 }
949}
950
951static void writefd(int fd,char *buf,size_t len)
952{
953 stats.total_written += len;
954
955 msg_list_push(NORMAL_FLUSH);
956
957 if (!io_buffer || fd != multiplex_out_fd) {
958 writefd_unbuffered(fd, buf, len);
959 return;
960 }
961
962 while (len) {
963 int n = MIN((int) len, IO_BUFFER_SIZE-io_buffer_count);
964 if (n > 0) {
965 memcpy(io_buffer+io_buffer_count, buf, n);
966 buf += n;
967 len -= n;
968 io_buffer_count += n;
969 }
970
971 if (io_buffer_count == IO_BUFFER_SIZE)
972 io_flush(NORMAL_FLUSH);
973 }
974}
975
976
977void write_int(int f,int32 x)
978{
979 char b[4];
980 SIVAL(b,0,x);
981 writefd(f,b,4);
982}
983
984
985void write_int_named(int f, int32 x, const char *phase)
986{
987 io_write_phase = phase;
988 write_int(f, x);
989 io_write_phase = phase_unknown;
990}
991
992
993/*
994 * Note: int64 may actually be a 32-bit type if ./configure couldn't find any
995 * 64-bit types on this platform.
996 */
997void write_longint(int f, int64 x)
998{
999 char b[8];
1000
1001 if (x <= 0x7FFFFFFF) {
1002 write_int(f, (int)x);
1003 return;
1004 }
1005
1006#ifdef NO_INT64
1007 rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
1008 exit_cleanup(RERR_UNSUPPORTED);
1009#else
1010 write_int(f, (int32)0xFFFFFFFF);
1011 SIVAL(b,0,(x&0xFFFFFFFF));
1012 SIVAL(b,4,((x>>32)&0xFFFFFFFF));
1013
1014 writefd(f,b,8);
1015#endif
1016}
1017
1018void write_buf(int f,char *buf,size_t len)
1019{
1020 writefd(f,buf,len);
1021}
1022
1023/** Write a string to the connection */
1024static void write_sbuf(int f,char *buf)
1025{
1026 write_buf(f, buf, strlen(buf));
1027}
1028
1029
1030void write_byte(int f,unsigned char c)
1031{
1032 write_buf(f,(char *)&c,1);
1033}
1034
1035
1036
1037/**
1038 * Read a line of up to @p maxlen characters into @p buf (not counting
1039 * the trailing null). Strips the (required) trailing newline and all
1040 * carriage returns.
1041 *
1042 * @return 1 for success; 0 for I/O error or truncation.
1043 **/
1044int read_line(int f, char *buf, size_t maxlen)
1045{
1046 while (maxlen) {
1047 buf[0] = 0;
1048 read_buf(f, buf, 1);
1049 if (buf[0] == 0)
1050 return 0;
1051 if (buf[0] == '\n')
1052 break;
1053 if (buf[0] != '\r') {
1054 buf++;
1055 maxlen--;
1056 }
1057 }
1058 *buf = '\0';
1059 return maxlen > 0;
1060}
1061
1062
1063void io_printf(int fd, const char *format, ...)
1064{
1065 va_list ap;
1066 char buf[1024];
1067 int len;
1068
1069 va_start(ap, format);
1070 len = vsnprintf(buf, sizeof buf, format, ap);
1071 va_end(ap);
1072
1073 if (len < 0) exit_cleanup(RERR_STREAMIO);
1074
1075 write_sbuf(fd, buf);
1076}
1077
1078
1079/** Setup for multiplexing a MSG_* stream with the data stream. */
1080void io_start_multiplex_out(int fd)
1081{
1082 multiplex_out_fd = fd;
1083 io_flush(NORMAL_FLUSH);
1084 io_start_buffering_out(fd);
1085 io_multiplexing_out = 1;
1086}
1087
1088/** Setup for multiplexing a MSG_* stream with the data stream. */
1089void io_start_multiplex_in(int fd)
1090{
1091 multiplex_in_fd = fd;
1092 io_flush(NORMAL_FLUSH);
1093 io_multiplexing_in = 1;
1094}
1095
1096/** Write an message to the multiplexed data stream. */
1097int io_multiplex_write(enum msgcode code, char *buf, size_t len)
1098{
1099 if (!io_multiplexing_out) return 0;
1100
1101 io_flush(NORMAL_FLUSH);
1102 stats.total_written += (len+4);
1103 mplex_write(multiplex_out_fd, code, buf, len);
1104 return 1;
1105}
1106
1107/** Stop output multiplexing. */
1108void io_multiplexing_close(void)
1109{
1110 io_multiplexing_out = 0;
1111}
1112