rwrite: Doc.
[rsync/rsync.git] / io.c
... / ...
CommitLineData
1/* -*- c-file-style: "linux" -*-
2
3 Copyright (C) 1996-2001 by Andrew Tridgell
4 Copyright (C) Paul Mackerras 1996
5 Copyright (C) 2001 by Martin Pool <mbp@samba.org>
6
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 2 of the License, or
10 (at your option) any later version.
11
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, write to the Free Software
19 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20*/
21
22/**
23 *
24 * @file io.c
25 *
26 * Socket and pipe IO utilities used in rsync.
27 *
28 * rsync provides its own multiplexing system, which is used to send
29 * stderr and stdout over a single socket. We need this because
30 * stdout normally carries the binary data stream, and stderr all our
31 * error messages.
32 *
33 * For historical reasons this is off during the start of the
34 * connection, but it's switched on quite early using
35 * io_start_multiplex_out() and io_start_multiplex_in().
36 **/
37
38#include "rsync.h"
39
40/* if no timeout is specified then use a 60 second select timeout */
41#define SELECT_TIMEOUT 60
42
43static int io_multiplexing_out;
44static int io_multiplexing_in;
45static int multiplex_in_fd;
46static int multiplex_out_fd;
47static time_t last_io;
48static int no_flush;
49
50extern int bwlimit;
51extern int verbose;
52extern int io_timeout;
53extern struct stats stats;
54
55
56/** Ignore EOF errors while reading a module listing if the remote
57 version is 24 or less. */
58int kludge_around_eof = False;
59
60
61static int io_error_fd = -1;
62
63static void read_loop(int fd, char *buf, size_t len);
64
65static void check_timeout(void)
66{
67 extern int am_server, am_daemon;
68 time_t t;
69
70 err_list_push();
71
72 if (!io_timeout) return;
73
74 if (!last_io) {
75 last_io = time(NULL);
76 return;
77 }
78
79 t = time(NULL);
80
81 if (last_io && io_timeout && (t-last_io) >= io_timeout) {
82 if (!am_server && !am_daemon) {
83 rprintf(FERROR,"io timeout after %d seconds - exiting\n",
84 (int)(t-last_io));
85 }
86 exit_cleanup(RERR_TIMEOUT);
87 }
88}
89
90/* setup the fd used to propogate errors */
91void io_set_error_fd(int fd)
92{
93 io_error_fd = fd;
94}
95
96/* read some data from the error fd and write it to the write log code */
97static void read_error_fd(void)
98{
99 char buf[200];
100 size_t n;
101 int fd = io_error_fd;
102 int tag, len;
103
104 /* io_error_fd is temporarily disabled -- is this meant to
105 * prevent indefinite recursion? */
106 io_error_fd = -1;
107
108 read_loop(fd, buf, 4);
109 tag = IVAL(buf, 0);
110
111 len = tag & 0xFFFFFF;
112 tag = tag >> 24;
113 tag -= MPLEX_BASE;
114
115 while (len) {
116 n = len;
117 if (n > (sizeof(buf)-1))
118 n = sizeof(buf)-1;
119 read_loop(fd, buf, n);
120 rwrite((enum logcode)tag, buf, n);
121 len -= n;
122 }
123
124 io_error_fd = fd;
125}
126
127
128static void whine_about_eof (void)
129{
130 /**
131 It's almost always an error to get an EOF when we're trying
132 to read from the network, because the protocol is
133 self-terminating.
134
135 However, there is one unfortunate cases where it is not,
136 which is rsync <2.4.6 sending a list of modules on a
137 server, since the list is terminated by closing the socket.
138 So, for the section of the program where that is a problem
139 (start_socket_client), kludge_around_eof is True and we
140 just exit.
141 */
142
143 if (kludge_around_eof)
144 exit_cleanup (0);
145 else {
146 rprintf (FERROR,
147 "%s: connection unexpectedly closed "
148 "(%.0f bytes read so far)\n",
149 RSYNC_NAME, (double)stats.total_read);
150
151 exit_cleanup (RERR_STREAMIO);
152 }
153}
154
155
156static void die_from_readerr (int err)
157{
158 /* this prevents us trying to write errors on a dead socket */
159 io_multiplexing_close();
160
161 rprintf(FERROR, "%s: read error: %s\n",
162 RSYNC_NAME, strerror (err));
163 exit_cleanup(RERR_STREAMIO);
164}
165
166
167/*!
168 * Read from a socket with IO timeout. return the number of bytes
169 * read. If no bytes can be read then exit, never return a number <= 0.
170 *
171 * TODO: If the remote shell connection fails, then current versions
172 * actually report an "unexpected EOF" error here. Since it's a
173 * fairly common mistake to try to use rsh when ssh is required, we
174 * should trap that: if we fail to read any data at all, we should
175 * give a better explanation. We can tell whether the connection has
176 * started by looking e.g. at whether the remote version is known yet.
177 */
178static int read_timeout (int fd, char *buf, size_t len)
179{
180 int n, ret=0;
181
182 io_flush();
183
184 while (ret == 0) {
185 /* until we manage to read *something* */
186 fd_set fds;
187 struct timeval tv;
188 int fd_count = fd+1;
189 int count;
190
191 FD_ZERO(&fds);
192 FD_SET(fd, &fds);
193 if (io_error_fd != -1) {
194 FD_SET(io_error_fd, &fds);
195 if (io_error_fd > fd) fd_count = io_error_fd+1;
196 }
197
198 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
199 tv.tv_usec = 0;
200
201 errno = 0;
202
203 count = select(fd_count, &fds, NULL, NULL, &tv);
204
205 if (count == 0) {
206 check_timeout();
207 }
208
209 if (count <= 0) {
210 if (errno == EBADF) {
211 exit_cleanup(RERR_SOCKETIO);
212 }
213 continue;
214 }
215
216 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &fds)) {
217 read_error_fd();
218 }
219
220 if (!FD_ISSET(fd, &fds)) continue;
221
222 n = read(fd, buf, len);
223
224 if (n > 0) {
225 buf += n;
226 len -= n;
227 ret += n;
228 if (io_timeout)
229 last_io = time(NULL);
230 continue;
231 } else if (n == 0) {
232 whine_about_eof ();
233 return -1; /* doesn't return */
234 } else if (n == -1) {
235 if (errno == EINTR || errno == EWOULDBLOCK ||
236 errno == EAGAIN)
237 continue;
238 else
239 die_from_readerr (errno);
240 }
241 }
242
243 return ret;
244}
245
246
247
248
249/*! Continue trying to read len bytes - don't return until len has
250 been read. */
251static void read_loop (int fd, char *buf, size_t len)
252{
253 while (len) {
254 int n = read_timeout(fd, buf, len);
255
256 buf += n;
257 len -= n;
258 }
259}
260
261
262/**
263 * Read from the file descriptor handling multiplexing - return number
264 * of bytes read.
265 *
266 * Never returns <= 0.
267 */
268static int read_unbuffered(int fd, char *buf, size_t len)
269{
270 static size_t remaining;
271 int tag, ret = 0;
272 char line[1024];
273
274 if (!io_multiplexing_in || fd != multiplex_in_fd)
275 return read_timeout(fd, buf, len);
276
277 while (ret == 0) {
278 if (remaining) {
279 len = MIN(len, remaining);
280 read_loop(fd, buf, len);
281 remaining -= len;
282 ret = len;
283 continue;
284 }
285
286 read_loop(fd, line, 4);
287 tag = IVAL(line, 0);
288
289 remaining = tag & 0xFFFFFF;
290 tag = tag >> 24;
291
292 if (tag == MPLEX_BASE)
293 continue;
294
295 tag -= MPLEX_BASE;
296
297 if (tag != FERROR && tag != FINFO) {
298 rprintf(FERROR, "unexpected tag %d\n", tag);
299 exit_cleanup(RERR_STREAMIO);
300 }
301
302 if (remaining > sizeof(line) - 1) {
303 rprintf(FERROR, "multiplexing overflow %d\n\n",
304 remaining);
305 exit_cleanup(RERR_STREAMIO);
306 }
307
308 read_loop(fd, line, remaining);
309 line[remaining] = 0;
310
311 rprintf((enum logcode) tag, "%s", line);
312 remaining = 0;
313 }
314
315 return ret;
316}
317
318
319
320/* do a buffered read from fd. don't return until all N bytes
321 have been read. If all N can't be read then exit with an error */
322static void readfd (int fd, char *buffer, size_t N)
323{
324 int ret;
325 size_t total=0;
326
327 while (total < N) {
328 io_flush();
329
330 ret = read_unbuffered (fd, buffer + total, N-total);
331 total += ret;
332 }
333
334 stats.total_read += total;
335}
336
337
338int32 read_int(int f)
339{
340 char b[4];
341 int32 ret;
342
343 readfd(f,b,4);
344 ret = IVAL(b,0);
345 if (ret == (int32)0xffffffff) return -1;
346 return ret;
347}
348
349int64 read_longint(int f)
350{
351 extern int remote_version;
352 int64 ret;
353 char b[8];
354 ret = read_int(f);
355
356 if ((int32)ret != (int32)0xffffffff) {
357 return ret;
358 }
359
360#ifdef NO_INT64
361 rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
362 exit_cleanup(RERR_UNSUPPORTED);
363#else
364 if (remote_version >= 16) {
365 readfd(f,b,8);
366 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
367 }
368#endif
369
370 return ret;
371}
372
373void read_buf(int f,char *buf,size_t len)
374{
375 readfd(f,buf,len);
376}
377
378void read_sbuf(int f,char *buf,size_t len)
379{
380 read_buf (f,buf,len);
381 buf[len] = 0;
382}
383
384unsigned char read_byte(int f)
385{
386 unsigned char c;
387 read_buf (f, (char *)&c, 1);
388 return c;
389}
390
391/* Write len bytes to fd. This underlies the multiplexing system,
392 * which is always called by application code. */
393static void writefd_unbuffered(int fd,char *buf,size_t len)
394{
395 size_t total = 0;
396 fd_set w_fds, r_fds;
397 int fd_count, count;
398 struct timeval tv;
399
400 err_list_push();
401
402 no_flush++;
403
404 while (total < len) {
405 FD_ZERO(&w_fds);
406 FD_ZERO(&r_fds);
407 FD_SET(fd,&w_fds);
408 fd_count = fd;
409
410 if (io_error_fd != -1) {
411 FD_SET(io_error_fd,&r_fds);
412 if (io_error_fd > fd_count)
413 fd_count = io_error_fd;
414 }
415
416 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
417 tv.tv_usec = 0;
418
419 errno = 0;
420
421 count = select(fd_count+1,
422 io_error_fd != -1?&r_fds:NULL,
423 &w_fds,NULL,
424 &tv);
425
426 if (count == 0) {
427 check_timeout();
428 }
429
430 if (count <= 0) {
431 if (errno == EBADF) {
432 exit_cleanup(RERR_SOCKETIO);
433 }
434 continue;
435 }
436
437 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &r_fds)) {
438 read_error_fd();
439 }
440
441 if (FD_ISSET(fd, &w_fds)) {
442 int ret;
443 size_t n = len-total;
444 ret = write(fd,buf+total,n);
445
446 if (ret == -1 && errno == EINTR) {
447 continue;
448 }
449
450 if (ret == -1 &&
451 (errno == EWOULDBLOCK || errno == EAGAIN)) {
452 msleep(1);
453 continue;
454 }
455
456 if (ret <= 0) {
457 rprintf(FERROR,
458 "error writing %d unbuffered bytes"
459 " - exiting: %s\n", len,
460 strerror(errno));
461 exit_cleanup(RERR_STREAMIO);
462 }
463
464 /* Sleep after writing to limit I/O bandwidth */
465 if (bwlimit)
466 {
467 tv.tv_sec = 0;
468 tv.tv_usec = ret * 1000 / bwlimit;
469 while (tv.tv_usec > 1000000)
470 {
471 tv.tv_sec++;
472 tv.tv_usec -= 1000000;
473 }
474 select(0, NULL, NULL, NULL, &tv);
475 }
476
477 total += ret;
478
479 if (io_timeout)
480 last_io = time(NULL);
481 }
482 }
483
484 no_flush--;
485}
486
487
488static char *io_buffer;
489static int io_buffer_count;
490
491void io_start_buffering(int fd)
492{
493 if (io_buffer) return;
494 multiplex_out_fd = fd;
495 io_buffer = (char *)malloc(IO_BUFFER_SIZE);
496 if (!io_buffer) out_of_memory("writefd");
497 io_buffer_count = 0;
498}
499
500/* write an message to a multiplexed stream. If this fails then rsync
501 exits */
502static void mplex_write(int fd, enum logcode code, char *buf, size_t len)
503{
504 char buffer[4096];
505 size_t n = len;
506
507 SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
508
509 if (n > (sizeof(buffer)-4)) {
510 n = sizeof(buffer)-4;
511 }
512
513 memcpy(&buffer[4], buf, n);
514 writefd_unbuffered(fd, buffer, n+4);
515
516 len -= n;
517 buf += n;
518
519 if (len) {
520 writefd_unbuffered(fd, buf, len);
521 }
522}
523
524
525void io_flush(void)
526{
527 int fd = multiplex_out_fd;
528
529 err_list_push();
530
531 if (!io_buffer_count || no_flush) return;
532
533 if (io_multiplexing_out) {
534 mplex_write(fd, FNONE, io_buffer, io_buffer_count);
535 } else {
536 writefd_unbuffered(fd, io_buffer, io_buffer_count);
537 }
538 io_buffer_count = 0;
539}
540
541
542void io_end_buffering(void)
543{
544 io_flush();
545 if (!io_multiplexing_out) {
546 free(io_buffer);
547 io_buffer = NULL;
548 }
549}
550
551static void writefd(int fd,char *buf,size_t len)
552{
553 stats.total_written += len;
554
555 err_list_push();
556
557 if (!io_buffer || fd != multiplex_out_fd) {
558 writefd_unbuffered(fd, buf, len);
559 return;
560 }
561
562 while (len) {
563 int n = MIN((int) len, IO_BUFFER_SIZE-io_buffer_count);
564 if (n > 0) {
565 memcpy(io_buffer+io_buffer_count, buf, n);
566 buf += n;
567 len -= n;
568 io_buffer_count += n;
569 }
570
571 if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
572 }
573}
574
575
576void write_int(int f,int32 x)
577{
578 char b[4];
579 SIVAL(b,0,x);
580 writefd(f,b,4);
581}
582
583
584/*
585 * Note: int64 may actually be a 32-bit type if ./configure couldn't find any
586 * 64-bit types on this platform.
587 */
588void write_longint(int f, int64 x)
589{
590 extern int remote_version;
591 char b[8];
592
593 if (remote_version < 16 || x <= 0x7FFFFFFF) {
594 write_int(f, (int)x);
595 return;
596 }
597
598 write_int(f, (int32)0xFFFFFFFF);
599 SIVAL(b,0,(x&0xFFFFFFFF));
600 SIVAL(b,4,((x>>32)&0xFFFFFFFF));
601
602 writefd(f,b,8);
603}
604
605void write_buf(int f,char *buf,size_t len)
606{
607 writefd(f,buf,len);
608}
609
610/* write a string to the connection */
611static void write_sbuf(int f,char *buf)
612{
613 write_buf(f, buf, strlen(buf));
614}
615
616
617void write_byte(int f,unsigned char c)
618{
619 write_buf(f,(char *)&c,1);
620}
621
622
623
624int read_line(int f, char *buf, size_t maxlen)
625{
626 while (maxlen) {
627 buf[0] = 0;
628 read_buf(f, buf, 1);
629 if (buf[0] == 0) return 0;
630 if (buf[0] == '\n') {
631 buf[0] = 0;
632 break;
633 }
634 if (buf[0] != '\r') {
635 buf++;
636 maxlen--;
637 }
638 }
639 if (maxlen == 0) {
640 *buf = 0;
641 return 0;
642 }
643
644 return 1;
645}
646
647
648void io_printf(int fd, const char *format, ...)
649{
650 va_list ap;
651 char buf[1024];
652 int len;
653
654 va_start(ap, format);
655 len = vsnprintf(buf, sizeof(buf), format, ap);
656 va_end(ap);
657
658 if (len < 0) exit_cleanup(RERR_STREAMIO);
659
660 write_sbuf(fd, buf);
661}
662
663
664/* setup for multiplexing an error stream with the data stream */
665void io_start_multiplex_out(int fd)
666{
667 multiplex_out_fd = fd;
668 io_flush();
669 io_start_buffering(fd);
670 io_multiplexing_out = 1;
671}
672
673/* setup for multiplexing an error stream with the data stream */
674void io_start_multiplex_in(int fd)
675{
676 multiplex_in_fd = fd;
677 io_flush();
678 io_multiplexing_in = 1;
679}
680
681/* write an message to the multiplexed error stream */
682int io_multiplex_write(enum logcode code, char *buf, size_t len)
683{
684 if (!io_multiplexing_out) return 0;
685
686 io_flush();
687 stats.total_written += (len+4);
688 mplex_write(multiplex_out_fd, code, buf, len);
689 return 1;
690}
691
692/* stop output multiplexing */
693void io_multiplexing_close(void)
694{
695 io_multiplexing_out = 0;
696}
697