1 /* -*- c-file-style: "linux" -*-
3 Copyright (C) 1996-2001 by Andrew Tridgell
4 Copyright (C) Paul Mackerras 1996
5 Copyright (C) 2001, 2002 by Martin Pool <mbp@samba.org>
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 2 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, write to the Free Software
19 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25 * Socket and pipe IO utilities used in rsync.
27 * rsync provides its own multiplexing system, which is used to send
28 * stderr and stdout over a single socket. We need this because
29 * stdout normally carries the binary data stream, and stderr all our
32 * For historical reasons this is off during the start of the
33 * connection, but it's switched on quite early using
34 * io_start_multiplex_out() and io_start_multiplex_in().
39 /* if no timeout is specified then use a 60 second select timeout */
40 #define SELECT_TIMEOUT 60
42 static int io_multiplexing_out;
43 static int io_multiplexing_in;
44 static int multiplex_in_fd;
45 static int multiplex_out_fd;
46 static time_t last_io;
51 extern int io_timeout;
52 extern struct stats stats;
55 /** Ignore EOF errors while reading a module listing if the remote
56 version is 24 or less. */
57 int kludge_around_eof = False;
60 static int io_error_fd = -1;
62 static void read_loop(int fd, char *buf, size_t len);
64 static void check_timeout(void)
66 extern int am_server, am_daemon;
71 if (!io_timeout) return;
80 if (last_io && io_timeout && (t-last_io) >= io_timeout) {
81 if (!am_server && !am_daemon) {
82 rprintf(FERROR,"io timeout after %d seconds - exiting\n",
85 exit_cleanup(RERR_TIMEOUT);
89 /* setup the fd used to propogate errors */
90 void io_set_error_fd(int fd)
95 /* read some data from the error fd and write it to the write log code */
96 static void read_error_fd(void)
100 int fd = io_error_fd;
103 /* io_error_fd is temporarily disabled -- is this meant to
104 * prevent indefinite recursion? */
107 read_loop(fd, buf, 4);
110 len = tag & 0xFFFFFF;
116 if (n > (sizeof(buf)-1))
118 read_loop(fd, buf, n);
119 rwrite((enum logcode)tag, buf, n);
127 static void whine_about_eof (void)
130 It's almost always an error to get an EOF when we're trying
131 to read from the network, because the protocol is
134 However, there is one unfortunate cases where it is not,
135 which is rsync <2.4.6 sending a list of modules on a
136 server, since the list is terminated by closing the socket.
137 So, for the section of the program where that is a problem
138 (start_socket_client), kludge_around_eof is True and we
142 if (kludge_around_eof)
146 "%s: connection unexpectedly closed "
147 "(%.0f bytes read so far)\n",
148 RSYNC_NAME, (double)stats.total_read);
150 exit_cleanup (RERR_STREAMIO);
155 static void die_from_readerr (int err)
157 /* this prevents us trying to write errors on a dead socket */
158 io_multiplexing_close();
160 rprintf(FERROR, "%s: read error: %s\n",
161 RSYNC_NAME, strerror (err));
162 exit_cleanup(RERR_STREAMIO);
167 * Read from a socket with IO timeout. return the number of bytes
168 * read. If no bytes can be read then exit, never return a number <= 0.
170 * TODO: If the remote shell connection fails, then current versions
171 * actually report an "unexpected EOF" error here. Since it's a
172 * fairly common mistake to try to use rsh when ssh is required, we
173 * should trap that: if we fail to read any data at all, we should
174 * give a better explanation. We can tell whether the connection has
175 * started by looking e.g. at whether the remote version is known yet.
177 static int read_timeout (int fd, char *buf, size_t len)
184 /* until we manage to read *something* */
192 if (io_error_fd != -1) {
193 FD_SET(io_error_fd, &fds);
194 if (io_error_fd > fd) fd_count = io_error_fd+1;
197 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
202 count = select(fd_count, &fds, NULL, NULL, &tv);
209 if (errno == EBADF) {
210 exit_cleanup(RERR_SOCKETIO);
215 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &fds)) {
219 if (!FD_ISSET(fd, &fds)) continue;
221 n = read(fd, buf, len);
228 last_io = time(NULL);
232 return -1; /* doesn't return */
233 } else if (n == -1) {
234 if (errno == EINTR || errno == EWOULDBLOCK ||
238 die_from_readerr (errno);
248 /*! Continue trying to read len bytes - don't return until len has
250 static void read_loop (int fd, char *buf, size_t len)
253 int n = read_timeout(fd, buf, len);
262 * Read from the file descriptor handling multiplexing - return number
265 * Never returns <= 0.
267 static int read_unbuffered(int fd, char *buf, size_t len)
269 static size_t remaining;
273 if (!io_multiplexing_in || fd != multiplex_in_fd)
274 return read_timeout(fd, buf, len);
278 len = MIN(len, remaining);
279 read_loop(fd, buf, len);
285 read_loop(fd, line, 4);
288 remaining = tag & 0xFFFFFF;
291 if (tag == MPLEX_BASE)
296 if (tag != FERROR && tag != FINFO) {
297 rprintf(FERROR, "unexpected tag %d\n", tag);
298 exit_cleanup(RERR_STREAMIO);
301 if (remaining > sizeof(line) - 1) {
302 rprintf(FERROR, "multiplexing overflow %d\n\n",
304 exit_cleanup(RERR_STREAMIO);
307 read_loop(fd, line, remaining);
310 rprintf((enum logcode) tag, "%s", line);
319 /* do a buffered read from fd. don't return until all N bytes
320 have been read. If all N can't be read then exit with an error */
321 static void readfd (int fd, char *buffer, size_t N)
329 ret = read_unbuffered (fd, buffer + total, N-total);
333 stats.total_read += total;
337 int32 read_int(int f)
344 if (ret == (int32)0xffffffff) return -1;
348 int64 read_longint(int f)
350 extern int remote_version;
355 if ((int32)ret != (int32)0xffffffff) {
360 rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
361 exit_cleanup(RERR_UNSUPPORTED);
363 if (remote_version >= 16) {
365 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
372 void read_buf(int f,char *buf,size_t len)
377 void read_sbuf(int f,char *buf,size_t len)
379 read_buf (f,buf,len);
383 unsigned char read_byte(int f)
386 read_buf (f, (char *)&c, 1);
390 /* Write len bytes to fd. This underlies the multiplexing system,
391 * which is always called by application code. */
392 static void writefd_unbuffered(int fd,char *buf,size_t len)
403 while (total < len) {
409 if (io_error_fd != -1) {
410 FD_SET(io_error_fd,&r_fds);
411 if (io_error_fd > fd_count)
412 fd_count = io_error_fd;
415 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
420 count = select(fd_count+1,
421 io_error_fd != -1?&r_fds:NULL,
430 if (errno == EBADF) {
431 exit_cleanup(RERR_SOCKETIO);
436 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &r_fds)) {
440 if (FD_ISSET(fd, &w_fds)) {
442 size_t n = len-total;
443 ret = write(fd,buf+total,n);
445 if (ret == -1 && errno == EINTR) {
450 (errno == EWOULDBLOCK || errno == EAGAIN)) {
456 /* Don't try to write errors back
457 * across the stream */
458 io_multiplexing_close();
459 rprintf(FERROR, RSYNC_NAME
460 ": error writing %d unbuffered bytes"
461 " - exiting: %s\n", len,
463 exit_cleanup(RERR_STREAMIO);
466 /* Sleep after writing to limit I/O bandwidth */
470 tv.tv_usec = ret * 1000 / bwlimit;
471 while (tv.tv_usec > 1000000)
474 tv.tv_usec -= 1000000;
476 select(0, NULL, NULL, NULL, &tv);
482 last_io = time(NULL);
490 static char *io_buffer;
491 static int io_buffer_count;
493 void io_start_buffering(int fd)
495 if (io_buffer) return;
496 multiplex_out_fd = fd;
497 io_buffer = (char *)malloc(IO_BUFFER_SIZE);
498 if (!io_buffer) out_of_memory("writefd");
502 /* write an message to a multiplexed stream. If this fails then rsync
504 static void mplex_write(int fd, enum logcode code, char *buf, size_t len)
509 SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
511 if (n > (sizeof(buffer)-4)) {
512 n = sizeof(buffer)-4;
515 memcpy(&buffer[4], buf, n);
516 writefd_unbuffered(fd, buffer, n+4);
522 writefd_unbuffered(fd, buf, len);
529 int fd = multiplex_out_fd;
533 if (!io_buffer_count || no_flush) return;
535 if (io_multiplexing_out) {
536 mplex_write(fd, FNONE, io_buffer, io_buffer_count);
538 writefd_unbuffered(fd, io_buffer, io_buffer_count);
544 void io_end_buffering(void)
547 if (!io_multiplexing_out) {
553 static void writefd(int fd,char *buf,size_t len)
555 stats.total_written += len;
559 if (!io_buffer || fd != multiplex_out_fd) {
560 writefd_unbuffered(fd, buf, len);
565 int n = MIN((int) len, IO_BUFFER_SIZE-io_buffer_count);
567 memcpy(io_buffer+io_buffer_count, buf, n);
570 io_buffer_count += n;
573 if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
578 void write_int(int f,int32 x)
587 * Note: int64 may actually be a 32-bit type if ./configure couldn't find any
588 * 64-bit types on this platform.
590 void write_longint(int f, int64 x)
592 extern int remote_version;
595 if (remote_version < 16 || x <= 0x7FFFFFFF) {
596 write_int(f, (int)x);
600 write_int(f, (int32)0xFFFFFFFF);
601 SIVAL(b,0,(x&0xFFFFFFFF));
602 SIVAL(b,4,((x>>32)&0xFFFFFFFF));
607 void write_buf(int f,char *buf,size_t len)
612 /* write a string to the connection */
613 static void write_sbuf(int f,char *buf)
615 write_buf(f, buf, strlen(buf));
619 void write_byte(int f,unsigned char c)
621 write_buf(f,(char *)&c,1);
627 * Read a line of up to @p maxlen characters into @p buf. Does not
628 * contain a trailing newline or carriage return.
630 * @return 1 for success; 0 for io error or truncation.
632 int read_line(int f, char *buf, size_t maxlen)
639 if (buf[0] == '\n') {
643 if (buf[0] != '\r') {
657 void io_printf(int fd, const char *format, ...)
663 va_start(ap, format);
664 len = vsnprintf(buf, sizeof(buf), format, ap);
667 if (len < 0) exit_cleanup(RERR_STREAMIO);
673 /* setup for multiplexing an error stream with the data stream */
674 void io_start_multiplex_out(int fd)
676 multiplex_out_fd = fd;
678 io_start_buffering(fd);
679 io_multiplexing_out = 1;
682 /* setup for multiplexing an error stream with the data stream */
683 void io_start_multiplex_in(int fd)
685 multiplex_in_fd = fd;
687 io_multiplexing_in = 1;
690 /* write an message to the multiplexed error stream */
691 int io_multiplex_write(enum logcode code, char *buf, size_t len)
693 if (!io_multiplexing_out) return 0;
696 stats.total_written += (len+4);
697 mplex_write(multiplex_out_fd, code, buf, len);
701 /* stop output multiplexing */
702 void io_multiplexing_close(void)
704 io_multiplexing_out = 0;