Doc
[rsync/rsync.git] / io.c
CommitLineData
7a24c346
MP
1/* -*- c-file-style: "linux" -*-
2
ce6c7c63 3 Copyright (C) 1996-2001 by Andrew Tridgell
720b47f2 4 Copyright (C) Paul Mackerras 1996
7a55d06e 5 Copyright (C) 2001 by Martin Pool <mbp@samba.org>
720b47f2
AT
6
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 2 of the License, or
10 (at your option) any later version.
11
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, write to the Free Software
19 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20*/
21
87ee2481
MP
22/**
23 *
24 * @file io.c
25 *
26 * Socket and pipe IO utilities used in rsync.
27 *
28 * rsync provides its own multiplexing system, which is used to send
29 * stderr and stdout over a single socket. We need this because
30 * stdout normally carries the binary data stream, and stderr all our
31 * error messages.
32 *
33 * For historical reasons this is off during the start of the
34 * connection, but it's switched on quite early using
35 * io_start_multiplex_out() and io_start_multiplex_in().
36 **/
720b47f2 37
720b47f2
AT
38#include "rsync.h"
39
8cd9fd4e
AT
40/* if no timeout is specified then use a 60 second select timeout */
41#define SELECT_TIMEOUT 60
42
8d9dc9f9
AT
43static int io_multiplexing_out;
44static int io_multiplexing_in;
679e7657
AT
45static int multiplex_in_fd;
46static int multiplex_out_fd;
8d9dc9f9 47static time_t last_io;
7a55d06e
MP
48static int no_flush;
49
50extern int bwlimit;
720b47f2 51extern int verbose;
6ba9279f 52extern int io_timeout;
a800434a 53extern struct stats stats;
720b47f2 54
7a55d06e
MP
55
56/** Ignore EOF errors while reading a module listing if the remote
57 version is 24 or less. */
58int kludge_around_eof = False;
59
60
554e0a8d 61static int io_error_fd = -1;
720b47f2 62
9dd891bb 63static void read_loop(int fd, char *buf, size_t len);
ff41a59f 64
8d9dc9f9
AT
65static void check_timeout(void)
66{
0adb99b9 67 extern int am_server, am_daemon;
8d9dc9f9 68 time_t t;
90ba34e2
AT
69
70 err_list_push();
8d9dc9f9
AT
71
72 if (!io_timeout) return;
73
74 if (!last_io) {
75 last_io = time(NULL);
76 return;
77 }
78
79 t = time(NULL);
80
86ffe37f 81 if (last_io && io_timeout && (t-last_io) >= io_timeout) {
0adb99b9 82 if (!am_server && !am_daemon) {
ce6c7c63 83 rprintf(FERROR,"io timeout after %d seconds - exiting\n",
0adb99b9
AT
84 (int)(t-last_io));
85 }
65417579 86 exit_cleanup(RERR_TIMEOUT);
8d9dc9f9
AT
87 }
88}
89
554e0a8d
AT
90/* setup the fd used to propogate errors */
91void io_set_error_fd(int fd)
92{
93 io_error_fd = fd;
94}
95
ff41a59f 96/* read some data from the error fd and write it to the write log code */
554e0a8d
AT
97static void read_error_fd(void)
98{
99 char buf[200];
06ce139f 100 size_t n;
554e0a8d 101 int fd = io_error_fd;
ff41a59f
AT
102 int tag, len;
103
8886f8d0
MP
104 /* io_error_fd is temporarily disabled -- is this meant to
105 * prevent indefinite recursion? */
554e0a8d
AT
106 io_error_fd = -1;
107
ff41a59f
AT
108 read_loop(fd, buf, 4);
109 tag = IVAL(buf, 0);
110
111 len = tag & 0xFFFFFF;
112 tag = tag >> 24;
113 tag -= MPLEX_BASE;
114
115 while (len) {
116 n = len;
06ce139f
MP
117 if (n > (sizeof(buf)-1))
118 n = sizeof(buf)-1;
ff41a59f
AT
119 read_loop(fd, buf, n);
120 rwrite((enum logcode)tag, buf, n);
121 len -= n;
554e0a8d
AT
122 }
123
124 io_error_fd = fd;
125}
126
720b47f2 127
7a55d06e
MP
128static void whine_about_eof (void)
129{
130 /**
131 It's almost always an error to get an EOF when we're trying
132 to read from the network, because the protocol is
133 self-terminating.
134
135 However, there is one unfortunate cases where it is not,
136 which is rsync <2.4.6 sending a list of modules on a
137 server, since the list is terminated by closing the socket.
138 So, for the section of the program where that is a problem
139 (start_socket_client), kludge_around_eof is True and we
140 just exit.
141 */
142
143 if (kludge_around_eof)
144 exit_cleanup (0);
145 else {
146 rprintf (FERROR,
147 "%s: connection unexpectedly closed "
148 "(%.0f bytes read so far)\n",
149 RSYNC_NAME, (double)stats.total_read);
150
151 exit_cleanup (RERR_STREAMIO);
152 }
153}
720b47f2 154
7a55d06e
MP
155
156static void die_from_readerr (int err)
157{
158 /* this prevents us trying to write errors on a dead socket */
159 io_multiplexing_close();
160
161 rprintf(FERROR, "%s: read error: %s\n",
162 RSYNC_NAME, strerror (err));
163 exit_cleanup(RERR_STREAMIO);
164}
165
166
167/*!
c3563c46
MP
168 * Read from a socket with IO timeout. return the number of bytes
169 * read. If no bytes can be read then exit, never return a number <= 0.
170 *
8886f8d0
MP
171 * TODO: If the remote shell connection fails, then current versions
172 * actually report an "unexpected EOF" error here. Since it's a
173 * fairly common mistake to try to use rsh when ssh is required, we
174 * should trap that: if we fail to read any data at all, we should
175 * give a better explanation. We can tell whether the connection has
176 * started by looking e.g. at whether the remote version is known yet.
c3563c46 177 */
9dd891bb 178static int read_timeout (int fd, char *buf, size_t len)
8d9dc9f9 179{
4c36ddbe
AT
180 int n, ret=0;
181
ea2111d1
AT
182 io_flush();
183
4c36ddbe 184 while (ret == 0) {
7a55d06e 185 /* until we manage to read *something* */
4c36ddbe
AT
186 fd_set fds;
187 struct timeval tv;
554e0a8d 188 int fd_count = fd+1;
a57873b7 189 int count;
4c36ddbe
AT
190
191 FD_ZERO(&fds);
192 FD_SET(fd, &fds);
554e0a8d
AT
193 if (io_error_fd != -1) {
194 FD_SET(io_error_fd, &fds);
195 if (io_error_fd > fd) fd_count = io_error_fd+1;
196 }
197
8cd9fd4e 198 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
4c36ddbe
AT
199 tv.tv_usec = 0;
200
554e0a8d
AT
201 errno = 0;
202
a57873b7
AT
203 count = select(fd_count, &fds, NULL, NULL, &tv);
204
205 if (count == 0) {
206 check_timeout();
207 }
208
209 if (count <= 0) {
554e0a8d
AT
210 if (errno == EBADF) {
211 exit_cleanup(RERR_SOCKETIO);
212 }
4c36ddbe
AT
213 continue;
214 }
215
554e0a8d
AT
216 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &fds)) {
217 read_error_fd();
218 }
219
220 if (!FD_ISSET(fd, &fds)) continue;
221
4c36ddbe
AT
222 n = read(fd, buf, len);
223
8d9dc9f9
AT
224 if (n > 0) {
225 buf += n;
226 len -= n;
4c36ddbe
AT
227 ret += n;
228 if (io_timeout)
229 last_io = time(NULL);
230 continue;
7a55d06e
MP
231 } else if (n == 0) {
232 whine_about_eof ();
233 return -1; /* doesn't return */
234 } else if (n == -1) {
235 if (errno == EINTR || errno == EWOULDBLOCK ||
236 errno == EAGAIN)
237 continue;
238 else
239 die_from_readerr (errno);
8d9dc9f9 240 }
4c36ddbe 241 }
8d9dc9f9 242
4c36ddbe
AT
243 return ret;
244}
8d9dc9f9 245
7a55d06e
MP
246
247
248
249/*! Continue trying to read len bytes - don't return until len has
250 been read. */
9dd891bb 251static void read_loop (int fd, char *buf, size_t len)
4c36ddbe
AT
252{
253 while (len) {
254 int n = read_timeout(fd, buf, len);
255
256 buf += n;
257 len -= n;
8d9dc9f9
AT
258 }
259}
260
7a55d06e
MP
261
262/**
263 * Read from the file descriptor handling multiplexing - return number
264 * of bytes read.
265 *
266 * Never returns <= 0.
267 */
9dd891bb 268static int read_unbuffered(int fd, char *buf, size_t len)
8d9dc9f9 269{
6fe25398 270 static size_t remaining;
909ce14f 271 int tag, ret = 0;
8d9dc9f9
AT
272 char line[1024];
273
7a55d06e 274 if (!io_multiplexing_in || fd != multiplex_in_fd)
4c36ddbe 275 return read_timeout(fd, buf, len);
8d9dc9f9
AT
276
277 while (ret == 0) {
278 if (remaining) {
279 len = MIN(len, remaining);
280 read_loop(fd, buf, len);
281 remaining -= len;
282 ret = len;
283 continue;
284 }
285
909ce14f 286 read_loop(fd, line, 4);
ff41a59f 287 tag = IVAL(line, 0);
679e7657 288
8d9dc9f9
AT
289 remaining = tag & 0xFFFFFF;
290 tag = tag >> 24;
291
909ce14f
MP
292 if (tag == MPLEX_BASE)
293 continue;
8d9dc9f9
AT
294
295 tag -= MPLEX_BASE;
296
297 if (tag != FERROR && tag != FINFO) {
909ce14f 298 rprintf(FERROR, "unexpected tag %d\n", tag);
65417579 299 exit_cleanup(RERR_STREAMIO);
8d9dc9f9
AT
300 }
301
909ce14f
MP
302 if (remaining > sizeof(line) - 1) {
303 rprintf(FERROR, "multiplexing overflow %d\n\n",
8d9dc9f9 304 remaining);
65417579 305 exit_cleanup(RERR_STREAMIO);
8d9dc9f9
AT
306 }
307
308 read_loop(fd, line, remaining);
309 line[remaining] = 0;
310
909ce14f 311 rprintf((enum logcode) tag, "%s", line);
8d9dc9f9
AT
312 remaining = 0;
313 }
314
315 return ret;
316}
317
318
909ce14f 319
4c36ddbe
AT
320/* do a buffered read from fd. don't return until all N bytes
321 have been read. If all N can't be read then exit with an error */
9dd891bb 322static void readfd (int fd, char *buffer, size_t N)
720b47f2 323{
6ba9279f 324 int ret;
06ce139f 325 size_t total=0;
6ba9279f 326
6ba9279f 327 while (total < N) {
8d9dc9f9
AT
328 io_flush();
329
7a55d06e 330 ret = read_unbuffered (fd, buffer + total, N-total);
6ba9279f 331 total += ret;
7f28dbee 332 }
1b7c47cb
AT
333
334 stats.total_read += total;
720b47f2
AT
335}
336
337
b7922338 338int32 read_int(int f)
720b47f2 339{
4c36ddbe 340 char b[4];
d730b113
AT
341 int32 ret;
342
4c36ddbe 343 readfd(f,b,4);
d730b113
AT
344 ret = IVAL(b,0);
345 if (ret == (int32)0xffffffff) return -1;
346 return ret;
720b47f2
AT
347}
348
71c46176 349int64 read_longint(int f)
3a6a366f
AT
350{
351 extern int remote_version;
71c46176 352 int64 ret;
3a6a366f
AT
353 char b[8];
354 ret = read_int(f);
71c46176 355
8de330a3
AT
356 if ((int32)ret != (int32)0xffffffff) {
357 return ret;
358 }
71c46176 359
3bee6733 360#ifdef NO_INT64
9486289c 361 rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
65417579 362 exit_cleanup(RERR_UNSUPPORTED);
71c46176
AT
363#else
364 if (remote_version >= 16) {
4c36ddbe 365 readfd(f,b,8);
71c46176 366 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
3a6a366f 367 }
71c46176
AT
368#endif
369
3a6a366f
AT
370 return ret;
371}
372
9dd891bb 373void read_buf(int f,char *buf,size_t len)
720b47f2 374{
4c36ddbe 375 readfd(f,buf,len);
720b47f2
AT
376}
377
9dd891bb 378void read_sbuf(int f,char *buf,size_t len)
575f2fca 379{
7a55d06e 380 read_buf (f,buf,len);
575f2fca
AT
381 buf[len] = 0;
382}
383
182dca5c
AT
384unsigned char read_byte(int f)
385{
4c36ddbe 386 unsigned char c;
7a55d06e 387 read_buf (f, (char *)&c, 1);
4c36ddbe 388 return c;
182dca5c 389}
720b47f2 390
87ee2481
MP
391/* Write len bytes to fd. This underlies the multiplexing system,
392 * which is always called by application code. */
9dd891bb 393static void writefd_unbuffered(int fd,char *buf,size_t len)
720b47f2 394{
06ce139f 395 size_t total = 0;
8d9dc9f9 396 fd_set w_fds, r_fds;
4c36ddbe 397 int fd_count, count;
8d9dc9f9 398 struct timeval tv;
720b47f2 399
90ba34e2
AT
400 err_list_push();
401
e44f9a12
AT
402 no_flush++;
403
4c36ddbe 404 while (total < len) {
8d9dc9f9
AT
405 FD_ZERO(&w_fds);
406 FD_ZERO(&r_fds);
407 FD_SET(fd,&w_fds);
554e0a8d 408 fd_count = fd;
4c36ddbe 409
554e0a8d
AT
410 if (io_error_fd != -1) {
411 FD_SET(io_error_fd,&r_fds);
412 if (io_error_fd > fd_count)
413 fd_count = io_error_fd;
8d9dc9f9
AT
414 }
415
8cd9fd4e 416 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
8d9dc9f9 417 tv.tv_usec = 0;
4c36ddbe 418
554e0a8d
AT
419 errno = 0;
420
421 count = select(fd_count+1,
08f15335 422 io_error_fd != -1?&r_fds:NULL,
4c36ddbe 423 &w_fds,NULL,
8cd9fd4e 424 &tv);
4c36ddbe 425
a57873b7
AT
426 if (count == 0) {
427 check_timeout();
428 }
429
4c36ddbe 430 if (count <= 0) {
554e0a8d
AT
431 if (errno == EBADF) {
432 exit_cleanup(RERR_SOCKETIO);
433 }
8d9dc9f9
AT
434 continue;
435 }
4c36ddbe 436
554e0a8d
AT
437 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &r_fds)) {
438 read_error_fd();
439 }
440
8d9dc9f9 441 if (FD_ISSET(fd, &w_fds)) {
06ce139f
MP
442 int ret;
443 size_t n = len-total;
f0359dd0 444 ret = write(fd,buf+total,n);
4c36ddbe
AT
445
446 if (ret == -1 && errno == EINTR) {
447 continue;
448 }
449
f0359dd0
AT
450 if (ret == -1 &&
451 (errno == EWOULDBLOCK || errno == EAGAIN)) {
e92ee128 452 msleep(1);
f0359dd0
AT
453 continue;
454 }
455
4c36ddbe 456 if (ret <= 0) {
befbfe61
MP
457 /* Don't try to write errors back
458 * across the stream */
459 io_multiplexing_close();
ce6c7c63
MP
460 rprintf(FERROR,
461 "error writing %d unbuffered bytes"
462 " - exiting: %s\n", len,
463 strerror(errno));
65417579 464 exit_cleanup(RERR_STREAMIO);
4c36ddbe
AT
465 }
466
ef5d23eb
DD
467 /* Sleep after writing to limit I/O bandwidth */
468 if (bwlimit)
469 {
470 tv.tv_sec = 0;
471 tv.tv_usec = ret * 1000 / bwlimit;
472 while (tv.tv_usec > 1000000)
473 {
474 tv.tv_sec++;
475 tv.tv_usec -= 1000000;
476 }
477 select(0, NULL, NULL, NULL, &tv);
478 }
479
4c36ddbe 480 total += ret;
a800434a 481
4c36ddbe
AT
482 if (io_timeout)
483 last_io = time(NULL);
8d9dc9f9 484 }
4c36ddbe 485 }
e44f9a12
AT
486
487 no_flush--;
720b47f2
AT
488}
489
8d9dc9f9 490
d6dead6b
AT
491static char *io_buffer;
492static int io_buffer_count;
493
494void io_start_buffering(int fd)
495{
8d9dc9f9 496 if (io_buffer) return;
679e7657 497 multiplex_out_fd = fd;
ff41a59f 498 io_buffer = (char *)malloc(IO_BUFFER_SIZE);
d6dead6b
AT
499 if (!io_buffer) out_of_memory("writefd");
500 io_buffer_count = 0;
ff41a59f
AT
501}
502
503/* write an message to a multiplexed stream. If this fails then rsync
504 exits */
9dd891bb 505static void mplex_write(int fd, enum logcode code, char *buf, size_t len)
ff41a59f
AT
506{
507 char buffer[4096];
06ce139f 508 size_t n = len;
8d9dc9f9 509
ff41a59f
AT
510 SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
511
6d7b6081
AT
512 if (n > (sizeof(buffer)-4)) {
513 n = sizeof(buffer)-4;
ff41a59f
AT
514 }
515
516 memcpy(&buffer[4], buf, n);
517 writefd_unbuffered(fd, buffer, n+4);
518
519 len -= n;
520 buf += n;
521
6d7b6081
AT
522 if (len) {
523 writefd_unbuffered(fd, buf, len);
524 }
d6dead6b
AT
525}
526
ff41a59f 527
8d9dc9f9 528void io_flush(void)
d6dead6b 529{
679e7657 530 int fd = multiplex_out_fd;
90ba34e2
AT
531
532 err_list_push();
533
e44f9a12 534 if (!io_buffer_count || no_flush) return;
8d9dc9f9
AT
535
536 if (io_multiplexing_out) {
0f3203c3 537 mplex_write(fd, FNONE, io_buffer, io_buffer_count);
8d9dc9f9 538 } else {
4c36ddbe 539 writefd_unbuffered(fd, io_buffer, io_buffer_count);
d6dead6b 540 }
8d9dc9f9
AT
541 io_buffer_count = 0;
542}
543
0ba48136 544
7b5c3eb0 545void io_end_buffering(void)
8d9dc9f9
AT
546{
547 io_flush();
548 if (!io_multiplexing_out) {
ff41a59f 549 free(io_buffer);
8d9dc9f9
AT
550 io_buffer = NULL;
551 }
d6dead6b
AT
552}
553
9dd891bb 554static void writefd(int fd,char *buf,size_t len)
d6dead6b 555{
1b7c47cb
AT
556 stats.total_written += len;
557
90ba34e2
AT
558 err_list_push();
559
554e0a8d 560 if (!io_buffer || fd != multiplex_out_fd) {
4c36ddbe
AT
561 writefd_unbuffered(fd, buf, len);
562 return;
563 }
d6dead6b
AT
564
565 while (len) {
7b5c3eb0 566 int n = MIN((int) len, IO_BUFFER_SIZE-io_buffer_count);
d6dead6b
AT
567 if (n > 0) {
568 memcpy(io_buffer+io_buffer_count, buf, n);
569 buf += n;
570 len -= n;
571 io_buffer_count += n;
572 }
573
8d9dc9f9 574 if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
d6dead6b 575 }
d6dead6b 576}
720b47f2
AT
577
578
b7922338 579void write_int(int f,int32 x)
720b47f2 580{
8d9dc9f9
AT
581 char b[4];
582 SIVAL(b,0,x);
4c36ddbe 583 writefd(f,b,4);
720b47f2
AT
584}
585
7a24c346
MP
586
587/*
588 * Note: int64 may actually be a 32-bit type if ./configure couldn't find any
589 * 64-bit types on this platform.
590 */
71c46176 591void write_longint(int f, int64 x)
3a6a366f
AT
592{
593 extern int remote_version;
594 char b[8];
3a6a366f
AT
595
596 if (remote_version < 16 || x <= 0x7FFFFFFF) {
597 write_int(f, (int)x);
598 return;
599 }
600
8de330a3 601 write_int(f, (int32)0xFFFFFFFF);
3a6a366f
AT
602 SIVAL(b,0,(x&0xFFFFFFFF));
603 SIVAL(b,4,((x>>32)&0xFFFFFFFF));
604
4c36ddbe 605 writefd(f,b,8);
3a6a366f
AT
606}
607
9dd891bb 608void write_buf(int f,char *buf,size_t len)
720b47f2 609{
4c36ddbe 610 writefd(f,buf,len);
720b47f2
AT
611}
612
f0fca04e 613/* write a string to the connection */
6e4fb64e 614static void write_sbuf(int f,char *buf)
f0fca04e
AT
615{
616 write_buf(f, buf, strlen(buf));
617}
618
720b47f2 619
182dca5c
AT
620void write_byte(int f,unsigned char c)
621{
f0fca04e 622 write_buf(f,(char *)&c,1);
182dca5c
AT
623}
624
7a55d06e
MP
625
626
9dd891bb 627int read_line(int f, char *buf, size_t maxlen)
f0fca04e
AT
628{
629 while (maxlen) {
528bfcd7 630 buf[0] = 0;
f0fca04e 631 read_buf(f, buf, 1);
528bfcd7 632 if (buf[0] == 0) return 0;
f0fca04e
AT
633 if (buf[0] == '\n') {
634 buf[0] = 0;
635 break;
636 }
637 if (buf[0] != '\r') {
638 buf++;
639 maxlen--;
640 }
641 }
642 if (maxlen == 0) {
643 *buf = 0;
644 return 0;
645 }
528bfcd7 646
f0fca04e
AT
647 return 1;
648}
649
650
651void io_printf(int fd, const char *format, ...)
652{
653 va_list ap;
654 char buf[1024];
655 int len;
656
657 va_start(ap, format);
8950ac03 658 len = vsnprintf(buf, sizeof(buf), format, ap);
f0fca04e
AT
659 va_end(ap);
660
65417579 661 if (len < 0) exit_cleanup(RERR_STREAMIO);
f0fca04e
AT
662
663 write_sbuf(fd, buf);
664}
8d9dc9f9
AT
665
666
667/* setup for multiplexing an error stream with the data stream */
668void io_start_multiplex_out(int fd)
669{
679e7657
AT
670 multiplex_out_fd = fd;
671 io_flush();
8d9dc9f9
AT
672 io_start_buffering(fd);
673 io_multiplexing_out = 1;
674}
675
676/* setup for multiplexing an error stream with the data stream */
677void io_start_multiplex_in(int fd)
678{
679e7657
AT
679 multiplex_in_fd = fd;
680 io_flush();
8d9dc9f9
AT
681 io_multiplexing_in = 1;
682}
683
554e0a8d 684/* write an message to the multiplexed error stream */
9dd891bb 685int io_multiplex_write(enum logcode code, char *buf, size_t len)
8d9dc9f9
AT
686{
687 if (!io_multiplexing_out) return 0;
688
689 io_flush();
1b7c47cb 690 stats.total_written += (len+4);
ff41a59f 691 mplex_write(multiplex_out_fd, code, buf, len);
8d9dc9f9
AT
692 return 1;
693}
694
554e0a8d
AT
695/* stop output multiplexing */
696void io_multiplexing_close(void)
697{
698 io_multiplexing_out = 0;
699}
700