Notes on logging etc
[rsync/rsync.git] / io.c
... / ...
CommitLineData
1/* -*- c-file-style: "linux" -*-
2
3 Copyright (C) 1996-2001 by Andrew Tridgell
4 Copyright (C) Paul Mackerras 1996
5 Copyright (C) 2001 by Martin Pool <mbp@samba.org>
6
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 2 of the License, or
10 (at your option) any later version.
11
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, write to the Free Software
19 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20*/
21
22/**
23 *
24 * @file io.c
25 *
26 * Socket and pipe IO utilities used in rsync.
27 *
28 * rsync provides its own multiplexing system, which is used to send
29 * stderr and stdout over a single socket. We need this because
30 * stdout normally carries the binary data stream, and stderr all our
31 * error messages.
32 *
33 * For historical reasons this is off during the start of the
34 * connection, but it's switched on quite early using
35 * io_start_multiplex_out() and io_start_multiplex_in().
36 **/
37
38#include "rsync.h"
39
40/* if no timeout is specified then use a 60 second select timeout */
41#define SELECT_TIMEOUT 60
42
43static int io_multiplexing_out;
44static int io_multiplexing_in;
45static int multiplex_in_fd;
46static int multiplex_out_fd;
47static time_t last_io;
48static int no_flush;
49
50extern int bwlimit;
51extern int verbose;
52extern int io_timeout;
53extern struct stats stats;
54
55
56/** Ignore EOF errors while reading a module listing if the remote
57 version is 24 or less. */
58int kludge_around_eof = False;
59
60
61static int io_error_fd = -1;
62
63static void read_loop(int fd, char *buf, size_t len);
64
65static void check_timeout(void)
66{
67 extern int am_server, am_daemon;
68 time_t t;
69
70 err_list_push();
71
72 if (!io_timeout) return;
73
74 if (!last_io) {
75 last_io = time(NULL);
76 return;
77 }
78
79 t = time(NULL);
80
81 if (last_io && io_timeout && (t-last_io) >= io_timeout) {
82 if (!am_server && !am_daemon) {
83 rprintf(FERROR,"io timeout after %d seconds - exiting\n",
84 (int)(t-last_io));
85 }
86 exit_cleanup(RERR_TIMEOUT);
87 }
88}
89
90/* setup the fd used to propogate errors */
91void io_set_error_fd(int fd)
92{
93 io_error_fd = fd;
94}
95
96/* read some data from the error fd and write it to the write log code */
97static void read_error_fd(void)
98{
99 char buf[200];
100 size_t n;
101 int fd = io_error_fd;
102 int tag, len;
103
104 /* io_error_fd is temporarily disabled -- is this meant to
105 * prevent indefinite recursion? */
106 io_error_fd = -1;
107
108 read_loop(fd, buf, 4);
109 tag = IVAL(buf, 0);
110
111 len = tag & 0xFFFFFF;
112 tag = tag >> 24;
113 tag -= MPLEX_BASE;
114
115 while (len) {
116 n = len;
117 if (n > (sizeof(buf)-1))
118 n = sizeof(buf)-1;
119 read_loop(fd, buf, n);
120 rwrite((enum logcode)tag, buf, n);
121 len -= n;
122 }
123
124 io_error_fd = fd;
125}
126
127
128static void whine_about_eof (void)
129{
130 /**
131 It's almost always an error to get an EOF when we're trying
132 to read from the network, because the protocol is
133 self-terminating.
134
135 However, there is one unfortunate cases where it is not,
136 which is rsync <2.4.6 sending a list of modules on a
137 server, since the list is terminated by closing the socket.
138 So, for the section of the program where that is a problem
139 (start_socket_client), kludge_around_eof is True and we
140 just exit.
141 */
142
143 if (kludge_around_eof)
144 exit_cleanup (0);
145 else {
146 rprintf (FERROR,
147 "%s: connection unexpectedly closed "
148 "(%.0f bytes read so far)\n",
149 RSYNC_NAME, (double)stats.total_read);
150
151 exit_cleanup (RERR_STREAMIO);
152 }
153}
154
155
156static void die_from_readerr (int err)
157{
158 /* this prevents us trying to write errors on a dead socket */
159 io_multiplexing_close();
160
161 rprintf(FERROR, "%s: read error: %s\n",
162 RSYNC_NAME, strerror (err));
163 exit_cleanup(RERR_STREAMIO);
164}
165
166
167/*!
168 * Read from a socket with IO timeout. return the number of bytes
169 * read. If no bytes can be read then exit, never return a number <= 0.
170 *
171 * TODO: If the remote shell connection fails, then current versions
172 * actually report an "unexpected EOF" error here. Since it's a
173 * fairly common mistake to try to use rsh when ssh is required, we
174 * should trap that: if we fail to read any data at all, we should
175 * give a better explanation. We can tell whether the connection has
176 * started by looking e.g. at whether the remote version is known yet.
177 */
178static int read_timeout (int fd, char *buf, size_t len)
179{
180 int n, ret=0;
181
182 io_flush();
183
184 while (ret == 0) {
185 /* until we manage to read *something* */
186 fd_set fds;
187 struct timeval tv;
188 int fd_count = fd+1;
189 int count;
190
191 FD_ZERO(&fds);
192 FD_SET(fd, &fds);
193 if (io_error_fd != -1) {
194 FD_SET(io_error_fd, &fds);
195 if (io_error_fd > fd) fd_count = io_error_fd+1;
196 }
197
198 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
199 tv.tv_usec = 0;
200
201 errno = 0;
202
203 count = select(fd_count, &fds, NULL, NULL, &tv);
204
205 if (count == 0) {
206 check_timeout();
207 }
208
209 if (count <= 0) {
210 if (errno == EBADF) {
211 exit_cleanup(RERR_SOCKETIO);
212 }
213 continue;
214 }
215
216 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &fds)) {
217 read_error_fd();
218 }
219
220 if (!FD_ISSET(fd, &fds)) continue;
221
222 n = read(fd, buf, len);
223
224 if (n > 0) {
225 buf += n;
226 len -= n;
227 ret += n;
228 if (io_timeout)
229 last_io = time(NULL);
230 continue;
231 } else if (n == 0) {
232 whine_about_eof ();
233 return -1; /* doesn't return */
234 } else if (n == -1) {
235 if (errno == EINTR || errno == EWOULDBLOCK ||
236 errno == EAGAIN)
237 continue;
238 else
239 die_from_readerr (errno);
240 }
241 }
242
243 return ret;
244}
245
246
247
248
249/*! Continue trying to read len bytes - don't return until len has
250 been read. */
251static void read_loop (int fd, char *buf, size_t len)
252{
253 while (len) {
254 int n = read_timeout(fd, buf, len);
255
256 buf += n;
257 len -= n;
258 }
259}
260
261
262/**
263 * Read from the file descriptor handling multiplexing - return number
264 * of bytes read.
265 *
266 * Never returns <= 0.
267 */
268static int read_unbuffered(int fd, char *buf, size_t len)
269{
270 static size_t remaining;
271 int tag, ret = 0;
272 char line[1024];
273
274 if (!io_multiplexing_in || fd != multiplex_in_fd)
275 return read_timeout(fd, buf, len);
276
277 while (ret == 0) {
278 if (remaining) {
279 len = MIN(len, remaining);
280 read_loop(fd, buf, len);
281 remaining -= len;
282 ret = len;
283 continue;
284 }
285
286 read_loop(fd, line, 4);
287 tag = IVAL(line, 0);
288
289 remaining = tag & 0xFFFFFF;
290 tag = tag >> 24;
291
292 if (tag == MPLEX_BASE)
293 continue;
294
295 tag -= MPLEX_BASE;
296
297 if (tag != FERROR && tag != FINFO) {
298 rprintf(FERROR, "unexpected tag %d\n", tag);
299 exit_cleanup(RERR_STREAMIO);
300 }
301
302 if (remaining > sizeof(line) - 1) {
303 rprintf(FERROR, "multiplexing overflow %d\n\n",
304 remaining);
305 exit_cleanup(RERR_STREAMIO);
306 }
307
308 read_loop(fd, line, remaining);
309 line[remaining] = 0;
310
311 rprintf((enum logcode) tag, "%s", line);
312 remaining = 0;
313 }
314
315 return ret;
316}
317
318
319
320/* do a buffered read from fd. don't return until all N bytes
321 have been read. If all N can't be read then exit with an error */
322static void readfd (int fd, char *buffer, size_t N)
323{
324 int ret;
325 size_t total=0;
326
327 while (total < N) {
328 io_flush();
329
330 ret = read_unbuffered (fd, buffer + total, N-total);
331 total += ret;
332 }
333
334 stats.total_read += total;
335}
336
337
338int32 read_int(int f)
339{
340 char b[4];
341 int32 ret;
342
343 readfd(f,b,4);
344 ret = IVAL(b,0);
345 if (ret == (int32)0xffffffff) return -1;
346 return ret;
347}
348
349int64 read_longint(int f)
350{
351 extern int remote_version;
352 int64 ret;
353 char b[8];
354 ret = read_int(f);
355
356 if ((int32)ret != (int32)0xffffffff) {
357 return ret;
358 }
359
360#ifdef NO_INT64
361 rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
362 exit_cleanup(RERR_UNSUPPORTED);
363#else
364 if (remote_version >= 16) {
365 readfd(f,b,8);
366 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
367 }
368#endif
369
370 return ret;
371}
372
373void read_buf(int f,char *buf,size_t len)
374{
375 readfd(f,buf,len);
376}
377
378void read_sbuf(int f,char *buf,size_t len)
379{
380 read_buf (f,buf,len);
381 buf[len] = 0;
382}
383
384unsigned char read_byte(int f)
385{
386 unsigned char c;
387 read_buf (f, (char *)&c, 1);
388 return c;
389}
390
391/* Write len bytes to fd. This underlies the multiplexing system,
392 * which is always called by application code. */
393static void writefd_unbuffered(int fd,char *buf,size_t len)
394{
395 size_t total = 0;
396 fd_set w_fds, r_fds;
397 int fd_count, count;
398 struct timeval tv;
399
400 err_list_push();
401
402 no_flush++;
403
404 while (total < len) {
405 FD_ZERO(&w_fds);
406 FD_ZERO(&r_fds);
407 FD_SET(fd,&w_fds);
408 fd_count = fd;
409
410 if (io_error_fd != -1) {
411 FD_SET(io_error_fd,&r_fds);
412 if (io_error_fd > fd_count)
413 fd_count = io_error_fd;
414 }
415
416 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
417 tv.tv_usec = 0;
418
419 errno = 0;
420
421 count = select(fd_count+1,
422 io_error_fd != -1?&r_fds:NULL,
423 &w_fds,NULL,
424 &tv);
425
426 if (count == 0) {
427 check_timeout();
428 }
429
430 if (count <= 0) {
431 if (errno == EBADF) {
432 exit_cleanup(RERR_SOCKETIO);
433 }
434 continue;
435 }
436
437 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &r_fds)) {
438 read_error_fd();
439 }
440
441 if (FD_ISSET(fd, &w_fds)) {
442 int ret;
443 size_t n = len-total;
444 ret = write(fd,buf+total,n);
445
446 if (ret == -1 && errno == EINTR) {
447 continue;
448 }
449
450 if (ret == -1 &&
451 (errno == EWOULDBLOCK || errno == EAGAIN)) {
452 msleep(1);
453 continue;
454 }
455
456 if (ret <= 0) {
457 /* Don't try to write errors back
458 * across the stream */
459 io_multiplexing_close();
460 rprintf(FERROR, RSYNC_NAME
461 ": error writing %d unbuffered bytes"
462 " - exiting: %s\n", len,
463 strerror(errno));
464 exit_cleanup(RERR_STREAMIO);
465 }
466
467 /* Sleep after writing to limit I/O bandwidth */
468 if (bwlimit)
469 {
470 tv.tv_sec = 0;
471 tv.tv_usec = ret * 1000 / bwlimit;
472 while (tv.tv_usec > 1000000)
473 {
474 tv.tv_sec++;
475 tv.tv_usec -= 1000000;
476 }
477 select(0, NULL, NULL, NULL, &tv);
478 }
479
480 total += ret;
481
482 if (io_timeout)
483 last_io = time(NULL);
484 }
485 }
486
487 no_flush--;
488}
489
490
491static char *io_buffer;
492static int io_buffer_count;
493
494void io_start_buffering(int fd)
495{
496 if (io_buffer) return;
497 multiplex_out_fd = fd;
498 io_buffer = (char *)malloc(IO_BUFFER_SIZE);
499 if (!io_buffer) out_of_memory("writefd");
500 io_buffer_count = 0;
501}
502
503/* write an message to a multiplexed stream. If this fails then rsync
504 exits */
505static void mplex_write(int fd, enum logcode code, char *buf, size_t len)
506{
507 char buffer[4096];
508 size_t n = len;
509
510 SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
511
512 if (n > (sizeof(buffer)-4)) {
513 n = sizeof(buffer)-4;
514 }
515
516 memcpy(&buffer[4], buf, n);
517 writefd_unbuffered(fd, buffer, n+4);
518
519 len -= n;
520 buf += n;
521
522 if (len) {
523 writefd_unbuffered(fd, buf, len);
524 }
525}
526
527
528void io_flush(void)
529{
530 int fd = multiplex_out_fd;
531
532 err_list_push();
533
534 if (!io_buffer_count || no_flush) return;
535
536 if (io_multiplexing_out) {
537 mplex_write(fd, FNONE, io_buffer, io_buffer_count);
538 } else {
539 writefd_unbuffered(fd, io_buffer, io_buffer_count);
540 }
541 io_buffer_count = 0;
542}
543
544
545void io_end_buffering(void)
546{
547 io_flush();
548 if (!io_multiplexing_out) {
549 free(io_buffer);
550 io_buffer = NULL;
551 }
552}
553
554static void writefd(int fd,char *buf,size_t len)
555{
556 stats.total_written += len;
557
558 err_list_push();
559
560 if (!io_buffer || fd != multiplex_out_fd) {
561 writefd_unbuffered(fd, buf, len);
562 return;
563 }
564
565 while (len) {
566 int n = MIN((int) len, IO_BUFFER_SIZE-io_buffer_count);
567 if (n > 0) {
568 memcpy(io_buffer+io_buffer_count, buf, n);
569 buf += n;
570 len -= n;
571 io_buffer_count += n;
572 }
573
574 if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
575 }
576}
577
578
579void write_int(int f,int32 x)
580{
581 char b[4];
582 SIVAL(b,0,x);
583 writefd(f,b,4);
584}
585
586
587/*
588 * Note: int64 may actually be a 32-bit type if ./configure couldn't find any
589 * 64-bit types on this platform.
590 */
591void write_longint(int f, int64 x)
592{
593 extern int remote_version;
594 char b[8];
595
596 if (remote_version < 16 || x <= 0x7FFFFFFF) {
597 write_int(f, (int)x);
598 return;
599 }
600
601 write_int(f, (int32)0xFFFFFFFF);
602 SIVAL(b,0,(x&0xFFFFFFFF));
603 SIVAL(b,4,((x>>32)&0xFFFFFFFF));
604
605 writefd(f,b,8);
606}
607
608void write_buf(int f,char *buf,size_t len)
609{
610 writefd(f,buf,len);
611}
612
613/* write a string to the connection */
614static void write_sbuf(int f,char *buf)
615{
616 write_buf(f, buf, strlen(buf));
617}
618
619
620void write_byte(int f,unsigned char c)
621{
622 write_buf(f,(char *)&c,1);
623}
624
625
626
627int read_line(int f, char *buf, size_t maxlen)
628{
629 while (maxlen) {
630 buf[0] = 0;
631 read_buf(f, buf, 1);
632 if (buf[0] == 0) return 0;
633 if (buf[0] == '\n') {
634 buf[0] = 0;
635 break;
636 }
637 if (buf[0] != '\r') {
638 buf++;
639 maxlen--;
640 }
641 }
642 if (maxlen == 0) {
643 *buf = 0;
644 return 0;
645 }
646
647 return 1;
648}
649
650
651void io_printf(int fd, const char *format, ...)
652{
653 va_list ap;
654 char buf[1024];
655 int len;
656
657 va_start(ap, format);
658 len = vsnprintf(buf, sizeof(buf), format, ap);
659 va_end(ap);
660
661 if (len < 0) exit_cleanup(RERR_STREAMIO);
662
663 write_sbuf(fd, buf);
664}
665
666
667/* setup for multiplexing an error stream with the data stream */
668void io_start_multiplex_out(int fd)
669{
670 multiplex_out_fd = fd;
671 io_flush();
672 io_start_buffering(fd);
673 io_multiplexing_out = 1;
674}
675
676/* setup for multiplexing an error stream with the data stream */
677void io_start_multiplex_in(int fd)
678{
679 multiplex_in_fd = fd;
680 io_flush();
681 io_multiplexing_in = 1;
682}
683
684/* write an message to the multiplexed error stream */
685int io_multiplex_write(enum logcode code, char *buf, size_t len)
686{
687 if (!io_multiplexing_out) return 0;
688
689 io_flush();
690 stats.total_written += (len+4);
691 mplex_write(multiplex_out_fd, code, buf, len);
692 return 1;
693}
694
695/* stop output multiplexing */
696void io_multiplexing_close(void)
697{
698 io_multiplexing_out = 0;
699}
700