1 /* -*- c-file-style: "linux" -*-
3 Copyright (C) 1996-2001 by Andrew Tridgell
4 Copyright (C) Paul Mackerras 1996
5 Copyright (C) 2001 by Martin Pool <mbp@samba.org>
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 2 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, write to the Free Software
19 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
26 * Socket and pipe IO utilities used in rsync.
28 * rsync provides its own multiplexing system, which is used to send
29 * stderr and stdout over a single socket. We need this because
30 * stdout normally carries the binary data stream, and stderr all our
33 * For historical reasons this is off during the start of the
34 * connection, but it's switched on quite early using
35 * io_start_multiplex_out() and io_start_multiplex_in().
40 /* if no timeout is specified then use a 60 second select timeout */
41 #define SELECT_TIMEOUT 60
43 static int io_multiplexing_out;
44 static int io_multiplexing_in;
45 static int multiplex_in_fd;
46 static int multiplex_out_fd;
47 static time_t last_io;
52 extern int io_timeout;
53 extern struct stats stats;
56 /** Ignore EOF errors while reading a module listing if the remote
57 version is 24 or less. */
58 int kludge_around_eof = False;
61 static int io_error_fd = -1;
63 static void read_loop(int fd, char *buf, size_t len);
65 static void check_timeout(void)
67 extern int am_server, am_daemon;
72 if (!io_timeout) return;
81 if (last_io && io_timeout && (t-last_io) >= io_timeout) {
82 if (!am_server && !am_daemon) {
83 rprintf(FERROR,"io timeout after %d seconds - exiting\n",
86 exit_cleanup(RERR_TIMEOUT);
90 /* setup the fd used to propogate errors */
91 void io_set_error_fd(int fd)
96 /* read some data from the error fd and write it to the write log code */
97 static void read_error_fd(void)
101 int fd = io_error_fd;
104 /* io_error_fd is temporarily disabled -- is this meant to
105 * prevent indefinite recursion? */
108 read_loop(fd, buf, 4);
111 len = tag & 0xFFFFFF;
117 if (n > (sizeof(buf)-1))
119 read_loop(fd, buf, n);
120 rwrite((enum logcode)tag, buf, n);
128 static void whine_about_eof (void)
131 It's almost always an error to get an EOF when we're trying
132 to read from the network, because the protocol is
135 However, there is one unfortunate cases where it is not,
136 which is rsync <2.4.6 sending a list of modules on a
137 server, since the list is terminated by closing the socket.
138 So, for the section of the program where that is a problem
139 (start_socket_client), kludge_around_eof is True and we
143 if (kludge_around_eof)
147 "%s: connection unexpectedly closed "
148 "(%.0f bytes read so far)\n",
149 RSYNC_NAME, (double)stats.total_read);
151 exit_cleanup (RERR_STREAMIO);
156 static void die_from_readerr (int err)
158 /* this prevents us trying to write errors on a dead socket */
159 io_multiplexing_close();
161 rprintf(FERROR, "%s: read error: %s\n",
162 RSYNC_NAME, strerror (err));
163 exit_cleanup(RERR_STREAMIO);
168 * Read from a socket with IO timeout. return the number of bytes
169 * read. If no bytes can be read then exit, never return a number <= 0.
171 * TODO: If the remote shell connection fails, then current versions
172 * actually report an "unexpected EOF" error here. Since it's a
173 * fairly common mistake to try to use rsh when ssh is required, we
174 * should trap that: if we fail to read any data at all, we should
175 * give a better explanation. We can tell whether the connection has
176 * started by looking e.g. at whether the remote version is known yet.
178 static int read_timeout (int fd, char *buf, size_t len)
185 /* until we manage to read *something* */
193 if (io_error_fd != -1) {
194 FD_SET(io_error_fd, &fds);
195 if (io_error_fd > fd) fd_count = io_error_fd+1;
198 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
203 count = select(fd_count, &fds, NULL, NULL, &tv);
210 if (errno == EBADF) {
211 exit_cleanup(RERR_SOCKETIO);
216 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &fds)) {
220 if (!FD_ISSET(fd, &fds)) continue;
222 n = read(fd, buf, len);
229 last_io = time(NULL);
233 return -1; /* doesn't return */
234 } else if (n == -1) {
235 if (errno == EINTR || errno == EWOULDBLOCK ||
239 die_from_readerr (errno);
249 /*! Continue trying to read len bytes - don't return until len has
251 static void read_loop (int fd, char *buf, size_t len)
254 int n = read_timeout(fd, buf, len);
263 * Read from the file descriptor handling multiplexing - return number
266 * Never returns <= 0.
268 static int read_unbuffered(int fd, char *buf, size_t len)
270 static size_t remaining;
274 if (!io_multiplexing_in || fd != multiplex_in_fd)
275 return read_timeout(fd, buf, len);
279 len = MIN(len, remaining);
280 read_loop(fd, buf, len);
286 read_loop(fd, line, 4);
289 remaining = tag & 0xFFFFFF;
292 if (tag == MPLEX_BASE)
297 if (tag != FERROR && tag != FINFO) {
298 rprintf(FERROR, "unexpected tag %d\n", tag);
299 exit_cleanup(RERR_STREAMIO);
302 if (remaining > sizeof(line) - 1) {
303 rprintf(FERROR, "multiplexing overflow %d\n\n",
305 exit_cleanup(RERR_STREAMIO);
308 read_loop(fd, line, remaining);
311 rprintf((enum logcode) tag, "%s", line);
320 /* do a buffered read from fd. don't return until all N bytes
321 have been read. If all N can't be read then exit with an error */
322 static void readfd (int fd, char *buffer, size_t N)
330 ret = read_unbuffered (fd, buffer + total, N-total);
334 stats.total_read += total;
338 int32 read_int(int f)
345 if (ret == (int32)0xffffffff) return -1;
349 int64 read_longint(int f)
351 extern int remote_version;
356 if ((int32)ret != (int32)0xffffffff) {
361 rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
362 exit_cleanup(RERR_UNSUPPORTED);
364 if (remote_version >= 16) {
366 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
373 void read_buf(int f,char *buf,size_t len)
378 void read_sbuf(int f,char *buf,size_t len)
380 read_buf (f,buf,len);
384 unsigned char read_byte(int f)
387 read_buf (f, (char *)&c, 1);
391 /* Write len bytes to fd. This underlies the multiplexing system,
392 * which is always called by application code. */
393 static void writefd_unbuffered(int fd,char *buf,size_t len)
404 while (total < len) {
410 if (io_error_fd != -1) {
411 FD_SET(io_error_fd,&r_fds);
412 if (io_error_fd > fd_count)
413 fd_count = io_error_fd;
416 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
421 count = select(fd_count+1,
422 io_error_fd != -1?&r_fds:NULL,
431 if (errno == EBADF) {
432 exit_cleanup(RERR_SOCKETIO);
437 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &r_fds)) {
441 if (FD_ISSET(fd, &w_fds)) {
443 size_t n = len-total;
444 ret = write(fd,buf+total,n);
446 if (ret == -1 && errno == EINTR) {
451 (errno == EWOULDBLOCK || errno == EAGAIN)) {
458 "error writing %d unbuffered bytes"
459 " - exiting: %s\n", len,
461 exit_cleanup(RERR_STREAMIO);
464 /* Sleep after writing to limit I/O bandwidth */
468 tv.tv_usec = ret * 1000 / bwlimit;
469 while (tv.tv_usec > 1000000)
472 tv.tv_usec -= 1000000;
474 select(0, NULL, NULL, NULL, &tv);
480 last_io = time(NULL);
488 static char *io_buffer;
489 static int io_buffer_count;
491 void io_start_buffering(int fd)
493 if (io_buffer) return;
494 multiplex_out_fd = fd;
495 io_buffer = (char *)malloc(IO_BUFFER_SIZE);
496 if (!io_buffer) out_of_memory("writefd");
500 /* write an message to a multiplexed stream. If this fails then rsync
502 static void mplex_write(int fd, enum logcode code, char *buf, size_t len)
507 SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
509 if (n > (sizeof(buffer)-4)) {
510 n = sizeof(buffer)-4;
513 memcpy(&buffer[4], buf, n);
514 writefd_unbuffered(fd, buffer, n+4);
520 writefd_unbuffered(fd, buf, len);
527 int fd = multiplex_out_fd;
531 if (!io_buffer_count || no_flush) return;
533 if (io_multiplexing_out) {
534 mplex_write(fd, FNONE, io_buffer, io_buffer_count);
536 writefd_unbuffered(fd, io_buffer, io_buffer_count);
542 void io_end_buffering(void)
545 if (!io_multiplexing_out) {
551 static void writefd(int fd,char *buf,size_t len)
553 stats.total_written += len;
557 if (!io_buffer || fd != multiplex_out_fd) {
558 writefd_unbuffered(fd, buf, len);
563 int n = MIN((int) len, IO_BUFFER_SIZE-io_buffer_count);
565 memcpy(io_buffer+io_buffer_count, buf, n);
568 io_buffer_count += n;
571 if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
576 void write_int(int f,int32 x)
585 * Note: int64 may actually be a 32-bit type if ./configure couldn't find any
586 * 64-bit types on this platform.
588 void write_longint(int f, int64 x)
590 extern int remote_version;
593 if (remote_version < 16 || x <= 0x7FFFFFFF) {
594 write_int(f, (int)x);
598 write_int(f, (int32)0xFFFFFFFF);
599 SIVAL(b,0,(x&0xFFFFFFFF));
600 SIVAL(b,4,((x>>32)&0xFFFFFFFF));
605 void write_buf(int f,char *buf,size_t len)
610 /* write a string to the connection */
611 static void write_sbuf(int f,char *buf)
613 write_buf(f, buf, strlen(buf));
617 void write_byte(int f,unsigned char c)
619 write_buf(f,(char *)&c,1);
624 int read_line(int f, char *buf, size_t maxlen)
629 if (buf[0] == 0) return 0;
630 if (buf[0] == '\n') {
634 if (buf[0] != '\r') {
648 void io_printf(int fd, const char *format, ...)
654 va_start(ap, format);
655 len = vsnprintf(buf, sizeof(buf), format, ap);
658 if (len < 0) exit_cleanup(RERR_STREAMIO);
664 /* setup for multiplexing an error stream with the data stream */
665 void io_start_multiplex_out(int fd)
667 multiplex_out_fd = fd;
669 io_start_buffering(fd);
670 io_multiplexing_out = 1;
673 /* setup for multiplexing an error stream with the data stream */
674 void io_start_multiplex_in(int fd)
676 multiplex_in_fd = fd;
678 io_multiplexing_in = 1;
681 /* write an message to the multiplexed error stream */
682 int io_multiplex_write(enum logcode code, char *buf, size_t len)
684 if (!io_multiplexing_out) return 0;
687 stats.total_written += (len+4);
688 mplex_write(multiplex_out_fd, code, buf, len);
692 /* stop output multiplexing */
693 void io_multiplexing_close(void)
695 io_multiplexing_out = 0;