Doc.
[rsync/rsync.git] / io.c
... / ...
CommitLineData
1/* -*- c-file-style: "linux" -*-
2
3 Copyright (C) 1996-2001 by Andrew Tridgell
4 Copyright (C) Paul Mackerras 1996
5 Copyright (C) 2001 by Martin Pool <mbp@samba.org>
6
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 2 of the License, or
10 (at your option) any later version.
11
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, write to the Free Software
19 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20*/
21
22/*
23 socket and pipe IO utilities used in rsync
24
25 tridge, June 1996
26 */
27#include "rsync.h"
28
29/* if no timeout is specified then use a 60 second select timeout */
30#define SELECT_TIMEOUT 60
31
32static int io_multiplexing_out;
33static int io_multiplexing_in;
34static int multiplex_in_fd;
35static int multiplex_out_fd;
36static time_t last_io;
37static int no_flush;
38
39extern int bwlimit;
40extern int verbose;
41extern int io_timeout;
42extern struct stats stats;
43
44
45/** Ignore EOF errors while reading a module listing if the remote
46 version is 24 or less. */
47int kludge_around_eof = False;
48
49
50static int io_error_fd = -1;
51
52static void read_loop(int fd, char *buf, size_t len);
53
54static void check_timeout(void)
55{
56 extern int am_server, am_daemon;
57 time_t t;
58
59 err_list_push();
60
61 if (!io_timeout) return;
62
63 if (!last_io) {
64 last_io = time(NULL);
65 return;
66 }
67
68 t = time(NULL);
69
70 if (last_io && io_timeout && (t-last_io) >= io_timeout) {
71 if (!am_server && !am_daemon) {
72 rprintf(FERROR,"io timeout after %d seconds - exiting\n",
73 (int)(t-last_io));
74 }
75 exit_cleanup(RERR_TIMEOUT);
76 }
77}
78
79/* setup the fd used to propogate errors */
80void io_set_error_fd(int fd)
81{
82 io_error_fd = fd;
83}
84
85/* read some data from the error fd and write it to the write log code */
86static void read_error_fd(void)
87{
88 char buf[200];
89 size_t n;
90 int fd = io_error_fd;
91 int tag, len;
92
93 /* io_error_fd is temporarily disabled -- is this meant to
94 * prevent indefinite recursion? */
95 io_error_fd = -1;
96
97 read_loop(fd, buf, 4);
98 tag = IVAL(buf, 0);
99
100 len = tag & 0xFFFFFF;
101 tag = tag >> 24;
102 tag -= MPLEX_BASE;
103
104 while (len) {
105 n = len;
106 if (n > (sizeof(buf)-1))
107 n = sizeof(buf)-1;
108 read_loop(fd, buf, n);
109 rwrite((enum logcode)tag, buf, n);
110 len -= n;
111 }
112
113 io_error_fd = fd;
114}
115
116
117static void whine_about_eof (void)
118{
119 /**
120 It's almost always an error to get an EOF when we're trying
121 to read from the network, because the protocol is
122 self-terminating.
123
124 However, there is one unfortunate cases where it is not,
125 which is rsync <2.4.6 sending a list of modules on a
126 server, since the list is terminated by closing the socket.
127 So, for the section of the program where that is a problem
128 (start_socket_client), kludge_around_eof is True and we
129 just exit.
130 */
131
132 if (kludge_around_eof)
133 exit_cleanup (0);
134 else {
135 rprintf (FERROR,
136 "%s: connection unexpectedly closed "
137 "(%.0f bytes read so far)\n",
138 RSYNC_NAME, (double)stats.total_read);
139
140 exit_cleanup (RERR_STREAMIO);
141 }
142}
143
144
145static void die_from_readerr (int err)
146{
147 /* this prevents us trying to write errors on a dead socket */
148 io_multiplexing_close();
149
150 rprintf(FERROR, "%s: read error: %s\n",
151 RSYNC_NAME, strerror (err));
152 exit_cleanup(RERR_STREAMIO);
153}
154
155
156/*!
157 * Read from a socket with IO timeout. return the number of bytes
158 * read. If no bytes can be read then exit, never return a number <= 0.
159 *
160 * TODO: If the remote shell connection fails, then current versions
161 * actually report an "unexpected EOF" error here. Since it's a
162 * fairly common mistake to try to use rsh when ssh is required, we
163 * should trap that: if we fail to read any data at all, we should
164 * give a better explanation. We can tell whether the connection has
165 * started by looking e.g. at whether the remote version is known yet.
166 */
167static int read_timeout (int fd, char *buf, size_t len)
168{
169 int n, ret=0;
170
171 io_flush();
172
173 while (ret == 0) {
174 /* until we manage to read *something* */
175 fd_set fds;
176 struct timeval tv;
177 int fd_count = fd+1;
178 int count;
179
180 FD_ZERO(&fds);
181 FD_SET(fd, &fds);
182 if (io_error_fd != -1) {
183 FD_SET(io_error_fd, &fds);
184 if (io_error_fd > fd) fd_count = io_error_fd+1;
185 }
186
187 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
188 tv.tv_usec = 0;
189
190 errno = 0;
191
192 count = select(fd_count, &fds, NULL, NULL, &tv);
193
194 if (count == 0) {
195 check_timeout();
196 }
197
198 if (count <= 0) {
199 if (errno == EBADF) {
200 exit_cleanup(RERR_SOCKETIO);
201 }
202 continue;
203 }
204
205 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &fds)) {
206 read_error_fd();
207 }
208
209 if (!FD_ISSET(fd, &fds)) continue;
210
211 n = read(fd, buf, len);
212
213 if (n > 0) {
214 buf += n;
215 len -= n;
216 ret += n;
217 if (io_timeout)
218 last_io = time(NULL);
219 continue;
220 } else if (n == 0) {
221 whine_about_eof ();
222 return -1; /* doesn't return */
223 } else if (n == -1) {
224 if (errno == EINTR || errno == EWOULDBLOCK ||
225 errno == EAGAIN)
226 continue;
227 else
228 die_from_readerr (errno);
229 }
230 }
231
232 return ret;
233}
234
235
236
237
238/*! Continue trying to read len bytes - don't return until len has
239 been read. */
240static void read_loop (int fd, char *buf, size_t len)
241{
242 while (len) {
243 int n = read_timeout(fd, buf, len);
244
245 buf += n;
246 len -= n;
247 }
248}
249
250
251/**
252 * Read from the file descriptor handling multiplexing - return number
253 * of bytes read.
254 *
255 * Never returns <= 0.
256 */
257static int read_unbuffered(int fd, char *buf, size_t len)
258{
259 static size_t remaining;
260 int tag, ret = 0;
261 char line[1024];
262
263 if (!io_multiplexing_in || fd != multiplex_in_fd)
264 return read_timeout(fd, buf, len);
265
266 while (ret == 0) {
267 if (remaining) {
268 len = MIN(len, remaining);
269 read_loop(fd, buf, len);
270 remaining -= len;
271 ret = len;
272 continue;
273 }
274
275 read_loop(fd, line, 4);
276 tag = IVAL(line, 0);
277
278 remaining = tag & 0xFFFFFF;
279 tag = tag >> 24;
280
281 if (tag == MPLEX_BASE)
282 continue;
283
284 tag -= MPLEX_BASE;
285
286 if (tag != FERROR && tag != FINFO) {
287 rprintf(FERROR, "unexpected tag %d\n", tag);
288 exit_cleanup(RERR_STREAMIO);
289 }
290
291 if (remaining > sizeof(line) - 1) {
292 rprintf(FERROR, "multiplexing overflow %d\n\n",
293 remaining);
294 exit_cleanup(RERR_STREAMIO);
295 }
296
297 read_loop(fd, line, remaining);
298 line[remaining] = 0;
299
300 rprintf((enum logcode) tag, "%s", line);
301 remaining = 0;
302 }
303
304 return ret;
305}
306
307
308
309/* do a buffered read from fd. don't return until all N bytes
310 have been read. If all N can't be read then exit with an error */
311static void readfd (int fd, char *buffer, size_t N)
312{
313 int ret;
314 size_t total=0;
315
316 while (total < N) {
317 io_flush();
318
319 ret = read_unbuffered (fd, buffer + total, N-total);
320 total += ret;
321 }
322
323 stats.total_read += total;
324}
325
326
327int32 read_int(int f)
328{
329 char b[4];
330 int32 ret;
331
332 readfd(f,b,4);
333 ret = IVAL(b,0);
334 if (ret == (int32)0xffffffff) return -1;
335 return ret;
336}
337
338int64 read_longint(int f)
339{
340 extern int remote_version;
341 int64 ret;
342 char b[8];
343 ret = read_int(f);
344
345 if ((int32)ret != (int32)0xffffffff) {
346 return ret;
347 }
348
349#ifdef NO_INT64
350 rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
351 exit_cleanup(RERR_UNSUPPORTED);
352#else
353 if (remote_version >= 16) {
354 readfd(f,b,8);
355 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
356 }
357#endif
358
359 return ret;
360}
361
362void read_buf(int f,char *buf,size_t len)
363{
364 readfd(f,buf,len);
365}
366
367void read_sbuf(int f,char *buf,size_t len)
368{
369 read_buf (f,buf,len);
370 buf[len] = 0;
371}
372
373unsigned char read_byte(int f)
374{
375 unsigned char c;
376 read_buf (f, (char *)&c, 1);
377 return c;
378}
379
380/* write len bytes to fd */
381static void writefd_unbuffered(int fd,char *buf,size_t len)
382{
383 size_t total = 0;
384 fd_set w_fds, r_fds;
385 int fd_count, count;
386 struct timeval tv;
387
388 err_list_push();
389
390 no_flush++;
391
392 while (total < len) {
393 FD_ZERO(&w_fds);
394 FD_ZERO(&r_fds);
395 FD_SET(fd,&w_fds);
396 fd_count = fd;
397
398 if (io_error_fd != -1) {
399 FD_SET(io_error_fd,&r_fds);
400 if (io_error_fd > fd_count)
401 fd_count = io_error_fd;
402 }
403
404 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
405 tv.tv_usec = 0;
406
407 errno = 0;
408
409 count = select(fd_count+1,
410 io_error_fd != -1?&r_fds:NULL,
411 &w_fds,NULL,
412 &tv);
413
414 if (count == 0) {
415 check_timeout();
416 }
417
418 if (count <= 0) {
419 if (errno == EBADF) {
420 exit_cleanup(RERR_SOCKETIO);
421 }
422 continue;
423 }
424
425 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &r_fds)) {
426 read_error_fd();
427 }
428
429 if (FD_ISSET(fd, &w_fds)) {
430 int ret;
431 size_t n = len-total;
432 ret = write(fd,buf+total,n);
433
434 if (ret == -1 && errno == EINTR) {
435 continue;
436 }
437
438 if (ret == -1 &&
439 (errno == EWOULDBLOCK || errno == EAGAIN)) {
440 msleep(1);
441 continue;
442 }
443
444 if (ret <= 0) {
445 rprintf(FERROR,
446 "error writing %d unbuffered bytes"
447 " - exiting: %s\n", len,
448 strerror(errno));
449 exit_cleanup(RERR_STREAMIO);
450 }
451
452 /* Sleep after writing to limit I/O bandwidth */
453 if (bwlimit)
454 {
455 tv.tv_sec = 0;
456 tv.tv_usec = ret * 1000 / bwlimit;
457 while (tv.tv_usec > 1000000)
458 {
459 tv.tv_sec++;
460 tv.tv_usec -= 1000000;
461 }
462 select(0, NULL, NULL, NULL, &tv);
463 }
464
465 total += ret;
466
467 if (io_timeout)
468 last_io = time(NULL);
469 }
470 }
471
472 no_flush--;
473}
474
475
476static char *io_buffer;
477static int io_buffer_count;
478
479void io_start_buffering(int fd)
480{
481 if (io_buffer) return;
482 multiplex_out_fd = fd;
483 io_buffer = (char *)malloc(IO_BUFFER_SIZE);
484 if (!io_buffer) out_of_memory("writefd");
485 io_buffer_count = 0;
486}
487
488/* write an message to a multiplexed stream. If this fails then rsync
489 exits */
490static void mplex_write(int fd, enum logcode code, char *buf, size_t len)
491{
492 char buffer[4096];
493 size_t n = len;
494
495 SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
496
497 if (n > (sizeof(buffer)-4)) {
498 n = sizeof(buffer)-4;
499 }
500
501 memcpy(&buffer[4], buf, n);
502 writefd_unbuffered(fd, buffer, n+4);
503
504 len -= n;
505 buf += n;
506
507 if (len) {
508 writefd_unbuffered(fd, buf, len);
509 }
510}
511
512
513void io_flush(void)
514{
515 int fd = multiplex_out_fd;
516
517 err_list_push();
518
519 if (!io_buffer_count || no_flush) return;
520
521 if (io_multiplexing_out) {
522 mplex_write(fd, FNONE, io_buffer, io_buffer_count);
523 } else {
524 writefd_unbuffered(fd, io_buffer, io_buffer_count);
525 }
526 io_buffer_count = 0;
527}
528
529
530void io_end_buffering(void)
531{
532 io_flush();
533 if (!io_multiplexing_out) {
534 free(io_buffer);
535 io_buffer = NULL;
536 }
537}
538
539static void writefd(int fd,char *buf,size_t len)
540{
541 stats.total_written += len;
542
543 err_list_push();
544
545 if (!io_buffer || fd != multiplex_out_fd) {
546 writefd_unbuffered(fd, buf, len);
547 return;
548 }
549
550 while (len) {
551 int n = MIN((int) len, IO_BUFFER_SIZE-io_buffer_count);
552 if (n > 0) {
553 memcpy(io_buffer+io_buffer_count, buf, n);
554 buf += n;
555 len -= n;
556 io_buffer_count += n;
557 }
558
559 if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
560 }
561}
562
563
564void write_int(int f,int32 x)
565{
566 char b[4];
567 SIVAL(b,0,x);
568 writefd(f,b,4);
569}
570
571
572/*
573 * Note: int64 may actually be a 32-bit type if ./configure couldn't find any
574 * 64-bit types on this platform.
575 */
576void write_longint(int f, int64 x)
577{
578 extern int remote_version;
579 char b[8];
580
581 if (remote_version < 16 || x <= 0x7FFFFFFF) {
582 write_int(f, (int)x);
583 return;
584 }
585
586 write_int(f, (int32)0xFFFFFFFF);
587 SIVAL(b,0,(x&0xFFFFFFFF));
588 SIVAL(b,4,((x>>32)&0xFFFFFFFF));
589
590 writefd(f,b,8);
591}
592
593void write_buf(int f,char *buf,size_t len)
594{
595 writefd(f,buf,len);
596}
597
598/* write a string to the connection */
599static void write_sbuf(int f,char *buf)
600{
601 write_buf(f, buf, strlen(buf));
602}
603
604
605void write_byte(int f,unsigned char c)
606{
607 write_buf(f,(char *)&c,1);
608}
609
610
611
612int read_line(int f, char *buf, size_t maxlen)
613{
614 while (maxlen) {
615 buf[0] = 0;
616 read_buf(f, buf, 1);
617 if (buf[0] == 0) return 0;
618 if (buf[0] == '\n') {
619 buf[0] = 0;
620 break;
621 }
622 if (buf[0] != '\r') {
623 buf++;
624 maxlen--;
625 }
626 }
627 if (maxlen == 0) {
628 *buf = 0;
629 return 0;
630 }
631
632 return 1;
633}
634
635
636void io_printf(int fd, const char *format, ...)
637{
638 va_list ap;
639 char buf[1024];
640 int len;
641
642 va_start(ap, format);
643 len = vsnprintf(buf, sizeof(buf), format, ap);
644 va_end(ap);
645
646 if (len < 0) exit_cleanup(RERR_STREAMIO);
647
648 write_sbuf(fd, buf);
649}
650
651
652/* setup for multiplexing an error stream with the data stream */
653void io_start_multiplex_out(int fd)
654{
655 multiplex_out_fd = fd;
656 io_flush();
657 io_start_buffering(fd);
658 io_multiplexing_out = 1;
659}
660
661/* setup for multiplexing an error stream with the data stream */
662void io_start_multiplex_in(int fd)
663{
664 multiplex_in_fd = fd;
665 io_flush();
666 io_multiplexing_in = 1;
667}
668
669/* write an message to the multiplexed error stream */
670int io_multiplex_write(enum logcode code, char *buf, size_t len)
671{
672 if (!io_multiplexing_out) return 0;
673
674 io_flush();
675 stats.total_written += (len+4);
676 mplex_write(multiplex_out_fd, code, buf, len);
677 return 1;
678}
679
680/* stop output multiplexing */
681void io_multiplexing_close(void)
682{
683 io_multiplexing_out = 0;
684}
685