1 /* -*- c-file-style: "linux" -*-
3 Copyright (C) 1996-2001 by Andrew Tridgell
4 Copyright (C) Paul Mackerras 1996
5 Copyright (C) 2001 by Martin Pool <mbp@samba.org>
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 2 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, write to the Free Software
19 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
26 * Socket and pipe IO utilities used in rsync.
28 * rsync provides its own multiplexing system, which is used to send
29 * stderr and stdout over a single socket. We need this because
30 * stdout normally carries the binary data stream, and stderr all our
33 * For historical reasons this is off during the start of the
34 * connection, but it's switched on quite early using
35 * io_start_multiplex_out() and io_start_multiplex_in().
40 /* if no timeout is specified then use a 60 second select timeout */
41 #define SELECT_TIMEOUT 60
43 static int io_multiplexing_out;
44 static int io_multiplexing_in;
45 static int multiplex_in_fd;
46 static int multiplex_out_fd;
47 static time_t last_io;
52 extern int io_timeout;
53 extern struct stats stats;
56 /** Ignore EOF errors while reading a module listing if the remote
57 version is 24 or less. */
58 int kludge_around_eof = False;
61 static int io_error_fd = -1;
63 static void read_loop(int fd, char *buf, size_t len);
65 static void check_timeout(void)
67 extern int am_server, am_daemon;
72 if (!io_timeout) return;
81 if (last_io && io_timeout && (t-last_io) >= io_timeout) {
82 if (!am_server && !am_daemon) {
83 rprintf(FERROR,"io timeout after %d seconds - exiting\n",
86 exit_cleanup(RERR_TIMEOUT);
90 /* setup the fd used to propogate errors */
91 void io_set_error_fd(int fd)
96 /* read some data from the error fd and write it to the write log code */
97 static void read_error_fd(void)
101 int fd = io_error_fd;
104 /* io_error_fd is temporarily disabled -- is this meant to
105 * prevent indefinite recursion? */
108 read_loop(fd, buf, 4);
111 len = tag & 0xFFFFFF;
117 if (n > (sizeof(buf)-1))
119 read_loop(fd, buf, n);
120 rwrite((enum logcode)tag, buf, n);
128 static void whine_about_eof (void)
131 It's almost always an error to get an EOF when we're trying
132 to read from the network, because the protocol is
135 However, there is one unfortunate cases where it is not,
136 which is rsync <2.4.6 sending a list of modules on a
137 server, since the list is terminated by closing the socket.
138 So, for the section of the program where that is a problem
139 (start_socket_client), kludge_around_eof is True and we
143 if (kludge_around_eof)
147 "%s: connection unexpectedly closed "
148 "(%.0f bytes read so far)\n",
149 RSYNC_NAME, (double)stats.total_read);
151 exit_cleanup (RERR_STREAMIO);
156 static void die_from_readerr (int err)
158 /* this prevents us trying to write errors on a dead socket */
159 io_multiplexing_close();
161 rprintf(FERROR, "%s: read error: %s\n",
162 RSYNC_NAME, strerror (err));
163 exit_cleanup(RERR_STREAMIO);
168 * Read from a socket with IO timeout. return the number of bytes
169 * read. If no bytes can be read then exit, never return a number <= 0.
171 * TODO: If the remote shell connection fails, then current versions
172 * actually report an "unexpected EOF" error here. Since it's a
173 * fairly common mistake to try to use rsh when ssh is required, we
174 * should trap that: if we fail to read any data at all, we should
175 * give a better explanation. We can tell whether the connection has
176 * started by looking e.g. at whether the remote version is known yet.
178 static int read_timeout (int fd, char *buf, size_t len)
185 /* until we manage to read *something* */
193 if (io_error_fd != -1) {
194 FD_SET(io_error_fd, &fds);
195 if (io_error_fd > fd) fd_count = io_error_fd+1;
198 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
203 count = select(fd_count, &fds, NULL, NULL, &tv);
210 if (errno == EBADF) {
211 exit_cleanup(RERR_SOCKETIO);
216 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &fds)) {
220 if (!FD_ISSET(fd, &fds)) continue;
222 n = read(fd, buf, len);
229 last_io = time(NULL);
233 return -1; /* doesn't return */
234 } else if (n == -1) {
235 if (errno == EINTR || errno == EWOULDBLOCK ||
239 die_from_readerr (errno);
249 /*! Continue trying to read len bytes - don't return until len has
251 static void read_loop (int fd, char *buf, size_t len)
254 int n = read_timeout(fd, buf, len);
263 * Read from the file descriptor handling multiplexing - return number
266 * Never returns <= 0.
268 static int read_unbuffered(int fd, char *buf, size_t len)
270 static size_t remaining;
274 if (!io_multiplexing_in || fd != multiplex_in_fd)
275 return read_timeout(fd, buf, len);
279 len = MIN(len, remaining);
280 read_loop(fd, buf, len);
286 read_loop(fd, line, 4);
289 remaining = tag & 0xFFFFFF;
292 if (tag == MPLEX_BASE)
297 if (tag != FERROR && tag != FINFO) {
298 rprintf(FERROR, "unexpected tag %d\n", tag);
299 exit_cleanup(RERR_STREAMIO);
302 if (remaining > sizeof(line) - 1) {
303 rprintf(FERROR, "multiplexing overflow %d\n\n",
305 exit_cleanup(RERR_STREAMIO);
308 read_loop(fd, line, remaining);
311 rprintf((enum logcode) tag, "%s", line);
320 /* do a buffered read from fd. don't return until all N bytes
321 have been read. If all N can't be read then exit with an error */
322 static void readfd (int fd, char *buffer, size_t N)
330 ret = read_unbuffered (fd, buffer + total, N-total);
334 stats.total_read += total;
338 int32 read_int(int f)
345 if (ret == (int32)0xffffffff) return -1;
349 int64 read_longint(int f)
351 extern int remote_version;
356 if ((int32)ret != (int32)0xffffffff) {
361 rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
362 exit_cleanup(RERR_UNSUPPORTED);
364 if (remote_version >= 16) {
366 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
373 void read_buf(int f,char *buf,size_t len)
378 void read_sbuf(int f,char *buf,size_t len)
380 read_buf (f,buf,len);
384 unsigned char read_byte(int f)
387 read_buf (f, (char *)&c, 1);
391 /* Write len bytes to fd. This underlies the multiplexing system,
392 * which is always called by application code. */
393 static void writefd_unbuffered(int fd,char *buf,size_t len)
404 while (total < len) {
410 if (io_error_fd != -1) {
411 FD_SET(io_error_fd,&r_fds);
412 if (io_error_fd > fd_count)
413 fd_count = io_error_fd;
416 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
421 count = select(fd_count+1,
422 io_error_fd != -1?&r_fds:NULL,
431 if (errno == EBADF) {
432 exit_cleanup(RERR_SOCKETIO);
437 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &r_fds)) {
441 if (FD_ISSET(fd, &w_fds)) {
443 size_t n = len-total;
444 ret = write(fd,buf+total,n);
446 if (ret == -1 && errno == EINTR) {
451 (errno == EWOULDBLOCK || errno == EAGAIN)) {
457 /* Don't try to write errors back
458 * across the stream */
459 io_multiplexing_close();
460 rprintf(FERROR, RSYNC_NAME
461 ": error writing %d unbuffered bytes"
462 " - exiting: %s\n", len,
464 exit_cleanup(RERR_STREAMIO);
467 /* Sleep after writing to limit I/O bandwidth */
471 tv.tv_usec = ret * 1000 / bwlimit;
472 while (tv.tv_usec > 1000000)
475 tv.tv_usec -= 1000000;
477 select(0, NULL, NULL, NULL, &tv);
483 last_io = time(NULL);
491 static char *io_buffer;
492 static int io_buffer_count;
494 void io_start_buffering(int fd)
496 if (io_buffer) return;
497 multiplex_out_fd = fd;
498 io_buffer = (char *)malloc(IO_BUFFER_SIZE);
499 if (!io_buffer) out_of_memory("writefd");
503 /* write an message to a multiplexed stream. If this fails then rsync
505 static void mplex_write(int fd, enum logcode code, char *buf, size_t len)
510 SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
512 if (n > (sizeof(buffer)-4)) {
513 n = sizeof(buffer)-4;
516 memcpy(&buffer[4], buf, n);
517 writefd_unbuffered(fd, buffer, n+4);
523 writefd_unbuffered(fd, buf, len);
530 int fd = multiplex_out_fd;
534 if (!io_buffer_count || no_flush) return;
536 if (io_multiplexing_out) {
537 mplex_write(fd, FNONE, io_buffer, io_buffer_count);
539 writefd_unbuffered(fd, io_buffer, io_buffer_count);
545 void io_end_buffering(void)
548 if (!io_multiplexing_out) {
554 static void writefd(int fd,char *buf,size_t len)
556 stats.total_written += len;
560 if (!io_buffer || fd != multiplex_out_fd) {
561 writefd_unbuffered(fd, buf, len);
566 int n = MIN((int) len, IO_BUFFER_SIZE-io_buffer_count);
568 memcpy(io_buffer+io_buffer_count, buf, n);
571 io_buffer_count += n;
574 if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
579 void write_int(int f,int32 x)
588 * Note: int64 may actually be a 32-bit type if ./configure couldn't find any
589 * 64-bit types on this platform.
591 void write_longint(int f, int64 x)
593 extern int remote_version;
596 if (remote_version < 16 || x <= 0x7FFFFFFF) {
597 write_int(f, (int)x);
601 write_int(f, (int32)0xFFFFFFFF);
602 SIVAL(b,0,(x&0xFFFFFFFF));
603 SIVAL(b,4,((x>>32)&0xFFFFFFFF));
608 void write_buf(int f,char *buf,size_t len)
613 /* write a string to the connection */
614 static void write_sbuf(int f,char *buf)
616 write_buf(f, buf, strlen(buf));
620 void write_byte(int f,unsigned char c)
622 write_buf(f,(char *)&c,1);
627 int read_line(int f, char *buf, size_t maxlen)
632 if (buf[0] == 0) return 0;
633 if (buf[0] == '\n') {
637 if (buf[0] != '\r') {
651 void io_printf(int fd, const char *format, ...)
657 va_start(ap, format);
658 len = vsnprintf(buf, sizeof(buf), format, ap);
661 if (len < 0) exit_cleanup(RERR_STREAMIO);
667 /* setup for multiplexing an error stream with the data stream */
668 void io_start_multiplex_out(int fd)
670 multiplex_out_fd = fd;
672 io_start_buffering(fd);
673 io_multiplexing_out = 1;
676 /* setup for multiplexing an error stream with the data stream */
677 void io_start_multiplex_in(int fd)
679 multiplex_in_fd = fd;
681 io_multiplexing_in = 1;
684 /* write an message to the multiplexed error stream */
685 int io_multiplex_write(enum logcode code, char *buf, size_t len)
687 if (!io_multiplexing_out) return 0;
690 stats.total_written += (len+4);
691 mplex_write(multiplex_out_fd, code, buf, len);
695 /* stop output multiplexing */
696 void io_multiplexing_close(void)
698 io_multiplexing_out = 0;