OK, we can now get phase messages if we fail in send_file_entry
[rsync/rsync.git] / io.c
... / ...
CommitLineData
1/* -*- c-file-style: "linux" -*-
2 *
3 * Copyright (C) 1996-2001 by Andrew Tridgell
4 * Copyright (C) Paul Mackerras 1996
5 * Copyright (C) 2001, 2002 by Martin Pool <mbp@samba.org>
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 */
21
22/**
23 * @file io.c
24 *
25 * Socket and pipe IO utilities used in rsync.
26 *
27 * rsync provides its own multiplexing system, which is used to send
28 * stderr and stdout over a single socket. We need this because
29 * stdout normally carries the binary data stream, and stderr all our
30 * error messages.
31 *
32 * For historical reasons this is off during the start of the
33 * connection, but it's switched on quite early using
34 * io_start_multiplex_out() and io_start_multiplex_in().
35 **/
36
37#include "rsync.h"
38
39/** If no timeout is specified then use a 60 second select timeout */
40#define SELECT_TIMEOUT 60
41
42static int io_multiplexing_out;
43static int io_multiplexing_in;
44static int multiplex_in_fd;
45static int multiplex_out_fd;
46static time_t last_io;
47static int no_flush;
48
49extern int bwlimit;
50extern int verbose;
51extern int io_timeout;
52extern struct stats stats;
53
54
55/**
56 * The connection might be dropped at some point; perhaps because the
57 * remote instance crashed. Just giving the offset on the stream is
58 * not very helpful. So instead we try to make io_phase_name point to
59 * something useful.
60 *
61 * @todo Perhaps we want some simple stack functionality, but there's
62 * no need to overdo it.
63 **/
64const char *io_write_phase = "unknown";
65const char *io_read_phase = "unknown";
66
67
68/** Ignore EOF errors while reading a module listing if the remote
69 version is 24 or less. */
70int kludge_around_eof = False;
71
72
73static int io_error_fd = -1;
74
75static void read_loop(int fd, char *buf, size_t len);
76
77static void check_timeout(void)
78{
79 extern int am_server, am_daemon;
80 time_t t;
81
82 err_list_push();
83
84 if (!io_timeout) return;
85
86 if (!last_io) {
87 last_io = time(NULL);
88 return;
89 }
90
91 t = time(NULL);
92
93 if (last_io && io_timeout && (t-last_io) >= io_timeout) {
94 if (!am_server && !am_daemon) {
95 rprintf(FERROR,"io timeout after %d seconds - exiting\n",
96 (int)(t-last_io));
97 }
98 exit_cleanup(RERR_TIMEOUT);
99 }
100}
101
102/** Setup the fd used to propogate errors */
103void io_set_error_fd(int fd)
104{
105 io_error_fd = fd;
106}
107
108/** Read some data from the error fd and write it to the write log code */
109static void read_error_fd(void)
110{
111 char buf[200];
112 size_t n;
113 int fd = io_error_fd;
114 int tag, len;
115
116 /* io_error_fd is temporarily disabled -- is this meant to
117 * prevent indefinite recursion? */
118 io_error_fd = -1;
119
120 read_loop(fd, buf, 4);
121 tag = IVAL(buf, 0);
122
123 len = tag & 0xFFFFFF;
124 tag = tag >> 24;
125 tag -= MPLEX_BASE;
126
127 while (len) {
128 n = len;
129 if (n > (sizeof(buf)-1))
130 n = sizeof(buf)-1;
131 read_loop(fd, buf, n);
132 rwrite((enum logcode)tag, buf, n);
133 len -= n;
134 }
135
136 io_error_fd = fd;
137}
138
139
140/**
141 * It's almost always an error to get an EOF when we're trying to read
142 * from the network, because the protocol is self-terminating.
143 *
144 * However, there is one unfortunate cases where it is not, which is
145 * rsync <2.4.6 sending a list of modules on a server, since the list
146 * is terminated by closing the socket. So, for the section of the
147 * program where that is a problem (start_socket_client),
148 * kludge_around_eof is True and we just exit.
149 */
150static void whine_about_eof (void)
151{
152 if (kludge_around_eof)
153 exit_cleanup (0);
154 else {
155 rprintf (FERROR,
156 "%s: connection unexpectedly closed "
157 "(%.0f bytes read so far)\n",
158 RSYNC_NAME, (double)stats.total_read);
159
160 exit_cleanup (RERR_STREAMIO);
161 }
162}
163
164
165static void die_from_readerr (int err)
166{
167 /* this prevents us trying to write errors on a dead socket */
168 io_multiplexing_close();
169
170 rprintf(FERROR, "%s: read error: %s\n",
171 RSYNC_NAME, strerror (err));
172 exit_cleanup(RERR_STREAMIO);
173}
174
175
176/**
177 * Read from a socket with IO timeout. return the number of bytes
178 * read. If no bytes can be read then exit, never return a number <= 0.
179 *
180 * TODO: If the remote shell connection fails, then current versions
181 * actually report an "unexpected EOF" error here. Since it's a
182 * fairly common mistake to try to use rsh when ssh is required, we
183 * should trap that: if we fail to read any data at all, we should
184 * give a better explanation. We can tell whether the connection has
185 * started by looking e.g. at whether the remote version is known yet.
186 */
187static int read_timeout (int fd, char *buf, size_t len)
188{
189 int n, ret=0;
190
191 io_flush();
192
193 while (ret == 0) {
194 /* until we manage to read *something* */
195 fd_set fds;
196 struct timeval tv;
197 int fd_count = fd+1;
198 int count;
199
200 FD_ZERO(&fds);
201 FD_SET(fd, &fds);
202 if (io_error_fd != -1) {
203 FD_SET(io_error_fd, &fds);
204 if (io_error_fd > fd) fd_count = io_error_fd+1;
205 }
206
207 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
208 tv.tv_usec = 0;
209
210 errno = 0;
211
212 count = select(fd_count, &fds, NULL, NULL, &tv);
213
214 if (count == 0) {
215 check_timeout();
216 }
217
218 if (count <= 0) {
219 if (errno == EBADF) {
220 exit_cleanup(RERR_SOCKETIO);
221 }
222 continue;
223 }
224
225 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &fds)) {
226 read_error_fd();
227 }
228
229 if (!FD_ISSET(fd, &fds)) continue;
230
231 n = read(fd, buf, len);
232
233 if (n > 0) {
234 buf += n;
235 len -= n;
236 ret += n;
237 if (io_timeout)
238 last_io = time(NULL);
239 continue;
240 } else if (n == 0) {
241 whine_about_eof ();
242 return -1; /* doesn't return */
243 } else if (n == -1) {
244 if (errno == EINTR || errno == EWOULDBLOCK ||
245 errno == EAGAIN)
246 continue;
247 else
248 die_from_readerr (errno);
249 }
250 }
251
252 return ret;
253}
254
255
256
257
258/**
259 * Continue trying to read len bytes - don't return until len has been
260 * read.
261 **/
262static void read_loop (int fd, char *buf, size_t len)
263{
264 while (len) {
265 int n = read_timeout(fd, buf, len);
266
267 buf += n;
268 len -= n;
269 }
270}
271
272
273/**
274 * Read from the file descriptor handling multiplexing - return number
275 * of bytes read.
276 *
277 * Never returns <= 0.
278 */
279static int read_unbuffered(int fd, char *buf, size_t len)
280{
281 static size_t remaining;
282 int tag, ret = 0;
283 char line[1024];
284
285 if (!io_multiplexing_in || fd != multiplex_in_fd)
286 return read_timeout(fd, buf, len);
287
288 while (ret == 0) {
289 if (remaining) {
290 len = MIN(len, remaining);
291 read_loop(fd, buf, len);
292 remaining -= len;
293 ret = len;
294 continue;
295 }
296
297 read_loop(fd, line, 4);
298 tag = IVAL(line, 0);
299
300 remaining = tag & 0xFFFFFF;
301 tag = tag >> 24;
302
303 if (tag == MPLEX_BASE)
304 continue;
305
306 tag -= MPLEX_BASE;
307
308 if (tag != FERROR && tag != FINFO) {
309 rprintf(FERROR, "unexpected tag %d\n", tag);
310 exit_cleanup(RERR_STREAMIO);
311 }
312
313 if (remaining > sizeof(line) - 1) {
314 rprintf(FERROR, "multiplexing overflow %d\n\n",
315 remaining);
316 exit_cleanup(RERR_STREAMIO);
317 }
318
319 read_loop(fd, line, remaining);
320 line[remaining] = 0;
321
322 rprintf((enum logcode) tag, "%s", line);
323 remaining = 0;
324 }
325
326 return ret;
327}
328
329
330
331/**
332 * Do a buffered read from @p fd. Don't return until all @p n bytes
333 * have been read. If all @p n can't be read then exit with an
334 * error.
335 **/
336static void readfd (int fd, char *buffer, size_t N)
337{
338 int ret;
339 size_t total=0;
340
341 while (total < N) {
342 io_flush();
343
344 ret = read_unbuffered (fd, buffer + total, N-total);
345 total += ret;
346 }
347
348 stats.total_read += total;
349}
350
351
352int32 read_int(int f)
353{
354 char b[4];
355 int32 ret;
356
357 readfd(f,b,4);
358 ret = IVAL(b,0);
359 if (ret == (int32)0xffffffff) return -1;
360 return ret;
361}
362
363int64 read_longint(int f)
364{
365 extern int remote_version;
366 int64 ret;
367 char b[8];
368 ret = read_int(f);
369
370 if ((int32)ret != (int32)0xffffffff) {
371 return ret;
372 }
373
374#ifdef NO_INT64
375 rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
376 exit_cleanup(RERR_UNSUPPORTED);
377#else
378 if (remote_version >= 16) {
379 readfd(f,b,8);
380 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
381 }
382#endif
383
384 return ret;
385}
386
387void read_buf(int f,char *buf,size_t len)
388{
389 readfd(f,buf,len);
390}
391
392void read_sbuf(int f,char *buf,size_t len)
393{
394 read_buf (f,buf,len);
395 buf[len] = 0;
396}
397
398unsigned char read_byte(int f)
399{
400 unsigned char c;
401 read_buf (f, (char *)&c, 1);
402 return c;
403}
404
405
406/**
407 * Sleep after writing to limit I/O bandwidth usage.
408 *
409 * @todo Rather than sleeping after each write, it might be better to
410 * use some kind of averaging. The current algorithm seems to always
411 * use a bit less bandwidth than specified, because it doesn't make up
412 * for slow periods. But arguably this is a feature. In addition, we
413 * ought to take the time used to write the data into account.
414 **/
415static void sleep_for_bwlimit(int bytes_written)
416{
417 struct timeval tv;
418
419 if (!bwlimit)
420 return;
421
422 assert(bytes_written > 0);
423 assert(bwlimit > 0);
424
425 tv.tv_usec = bytes_written * 1000 / bwlimit;
426 tv.tv_sec = tv.tv_usec / 1000000;
427 tv.tv_usec = tv.tv_usec % 1000000;
428
429 select(0, NULL, NULL, NULL, &tv);
430}
431
432
433/**
434 * Write len bytes to the file descriptor @p fd.
435 *
436 * This function underlies the multiplexing system. The body of the
437 * application never calls this function directly.
438 **/
439static void writefd_unbuffered(int fd,char *buf,size_t len)
440{
441 size_t total = 0;
442 fd_set w_fds, r_fds;
443 int fd_count, count;
444 struct timeval tv;
445
446 err_list_push();
447
448 no_flush++;
449
450 while (total < len) {
451 FD_ZERO(&w_fds);
452 FD_ZERO(&r_fds);
453 FD_SET(fd,&w_fds);
454 fd_count = fd;
455
456 if (io_error_fd != -1) {
457 FD_SET(io_error_fd,&r_fds);
458 if (io_error_fd > fd_count)
459 fd_count = io_error_fd;
460 }
461
462 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
463 tv.tv_usec = 0;
464
465 errno = 0;
466
467 count = select(fd_count+1,
468 io_error_fd != -1?&r_fds:NULL,
469 &w_fds,NULL,
470 &tv);
471
472 if (count == 0) {
473 check_timeout();
474 }
475
476 if (count <= 0) {
477 if (errno == EBADF) {
478 exit_cleanup(RERR_SOCKETIO);
479 }
480 continue;
481 }
482
483 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &r_fds)) {
484 read_error_fd();
485 }
486
487 if (FD_ISSET(fd, &w_fds)) {
488 int ret;
489 size_t n = len-total;
490 ret = write(fd,buf+total,n);
491
492 if (ret == -1 && errno == EINTR) {
493 continue;
494 }
495
496 if (ret == -1 &&
497 (errno == EWOULDBLOCK || errno == EAGAIN)) {
498 msleep(1);
499 continue;
500 }
501
502 if (ret <= 0) {
503 /* Don't try to write errors back
504 * across the stream */
505 io_multiplexing_close();
506 rprintf(FERROR, RSYNC_NAME
507 ": writefd_unbuffered failed to write %ld bytes: phase \"%s\": %s\n",
508 (long) len, io_write_phase,
509 strerror(errno));
510 exit_cleanup(RERR_STREAMIO);
511 }
512
513 sleep_for_bwlimit(ret);
514
515 total += ret;
516
517 if (io_timeout)
518 last_io = time(NULL);
519 }
520 }
521
522 no_flush--;
523}
524
525
526static char *io_buffer;
527static int io_buffer_count;
528
529void io_start_buffering(int fd)
530{
531 if (io_buffer) return;
532 multiplex_out_fd = fd;
533 io_buffer = (char *)malloc(IO_BUFFER_SIZE);
534 if (!io_buffer) out_of_memory("writefd");
535 io_buffer_count = 0;
536}
537
538/**
539 * Write an message to a multiplexed stream. If this fails then rsync
540 * exits.
541 **/
542static void mplex_write(int fd, enum logcode code, char *buf, size_t len)
543{
544 char buffer[4096];
545 size_t n = len;
546
547 SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
548
549 if (n > (sizeof(buffer)-4)) {
550 n = sizeof(buffer)-4;
551 }
552
553 memcpy(&buffer[4], buf, n);
554 writefd_unbuffered(fd, buffer, n+4);
555
556 len -= n;
557 buf += n;
558
559 if (len) {
560 writefd_unbuffered(fd, buf, len);
561 }
562}
563
564
565void io_flush(void)
566{
567 int fd = multiplex_out_fd;
568
569 err_list_push();
570
571 if (!io_buffer_count || no_flush) return;
572
573 if (io_multiplexing_out) {
574 mplex_write(fd, FNONE, io_buffer, io_buffer_count);
575 } else {
576 writefd_unbuffered(fd, io_buffer, io_buffer_count);
577 }
578 io_buffer_count = 0;
579}
580
581
582void io_end_buffering(void)
583{
584 io_flush();
585 if (!io_multiplexing_out) {
586 free(io_buffer);
587 io_buffer = NULL;
588 }
589}
590
591static void writefd(int fd,char *buf,size_t len)
592{
593 stats.total_written += len;
594
595 err_list_push();
596
597 if (!io_buffer || fd != multiplex_out_fd) {
598 writefd_unbuffered(fd, buf, len);
599 return;
600 }
601
602 while (len) {
603 int n = MIN((int) len, IO_BUFFER_SIZE-io_buffer_count);
604 if (n > 0) {
605 memcpy(io_buffer+io_buffer_count, buf, n);
606 buf += n;
607 len -= n;
608 io_buffer_count += n;
609 }
610
611 if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
612 }
613}
614
615
616void write_int(int f,int32 x)
617{
618 char b[4];
619 SIVAL(b,0,x);
620 writefd(f,b,4);
621}
622
623
624/*
625 * Note: int64 may actually be a 32-bit type if ./configure couldn't find any
626 * 64-bit types on this platform.
627 */
628void write_longint(int f, int64 x)
629{
630 extern int remote_version;
631 char b[8];
632
633 if (remote_version < 16 || x <= 0x7FFFFFFF) {
634 write_int(f, (int)x);
635 return;
636 }
637
638 write_int(f, (int32)0xFFFFFFFF);
639 SIVAL(b,0,(x&0xFFFFFFFF));
640 SIVAL(b,4,((x>>32)&0xFFFFFFFF));
641
642 writefd(f,b,8);
643}
644
645void write_buf(int f,char *buf,size_t len)
646{
647 writefd(f,buf,len);
648}
649
650/** Write a string to the connection */
651static void write_sbuf(int f,char *buf)
652{
653 write_buf(f, buf, strlen(buf));
654}
655
656
657void write_byte(int f,unsigned char c)
658{
659 write_buf(f,(char *)&c,1);
660}
661
662
663
664/**
665 * Read a line of up to @p maxlen characters into @p buf. Does not
666 * contain a trailing newline or carriage return.
667 *
668 * @return 1 for success; 0 for io error or truncation.
669 **/
670int read_line(int f, char *buf, size_t maxlen)
671{
672 while (maxlen) {
673 buf[0] = 0;
674 read_buf(f, buf, 1);
675 if (buf[0] == 0)
676 return 0;
677 if (buf[0] == '\n') {
678 buf[0] = 0;
679 break;
680 }
681 if (buf[0] != '\r') {
682 buf++;
683 maxlen--;
684 }
685 }
686 if (maxlen == 0) {
687 *buf = 0;
688 return 0;
689 }
690
691 return 1;
692}
693
694
695void io_printf(int fd, const char *format, ...)
696{
697 va_list ap;
698 char buf[1024];
699 int len;
700
701 va_start(ap, format);
702 len = vsnprintf(buf, sizeof(buf), format, ap);
703 va_end(ap);
704
705 if (len < 0) exit_cleanup(RERR_STREAMIO);
706
707 write_sbuf(fd, buf);
708}
709
710
711/** Setup for multiplexing an error stream with the data stream */
712void io_start_multiplex_out(int fd)
713{
714 multiplex_out_fd = fd;
715 io_flush();
716 io_start_buffering(fd);
717 io_multiplexing_out = 1;
718}
719
720/** Setup for multiplexing an error stream with the data stream */
721void io_start_multiplex_in(int fd)
722{
723 multiplex_in_fd = fd;
724 io_flush();
725 io_multiplexing_in = 1;
726}
727
728/** Write an message to the multiplexed error stream */
729int io_multiplex_write(enum logcode code, char *buf, size_t len)
730{
731 if (!io_multiplexing_out) return 0;
732
733 io_flush();
734 stats.total_written += (len+4);
735 mplex_write(multiplex_out_fd, code, buf, len);
736 return 1;
737}
738
739/** Stop output multiplexing */
740void io_multiplexing_close(void)
741{
742 io_multiplexing_out = 0;
743}
744