1 /* -*- c-file-style: "linux" -*-
3 Copyright (C) 1996-2001 by Andrew Tridgell
4 Copyright (C) Paul Mackerras 1996
5 Copyright (C) 2001 by Martin Pool <mbp@samba.org>
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 2 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, write to the Free Software
19 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 socket and pipe IO utilities used in rsync
29 /* if no timeout is specified then use a 60 second select timeout */
30 #define SELECT_TIMEOUT 60
32 static int io_multiplexing_out;
33 static int io_multiplexing_in;
34 static int multiplex_in_fd;
35 static int multiplex_out_fd;
36 static time_t last_io;
41 extern int io_timeout;
42 extern struct stats stats;
45 /** Ignore EOF errors while reading a module listing if the remote
46 version is 24 or less. */
47 int kludge_around_eof = False;
50 static int io_error_fd = -1;
52 static void read_loop(int fd, char *buf, size_t len);
54 static void check_timeout(void)
56 extern int am_server, am_daemon;
61 if (!io_timeout) return;
70 if (last_io && io_timeout && (t-last_io) >= io_timeout) {
71 if (!am_server && !am_daemon) {
72 rprintf(FERROR,"io timeout after %d seconds - exiting\n",
75 exit_cleanup(RERR_TIMEOUT);
79 /* setup the fd used to propogate errors */
80 void io_set_error_fd(int fd)
85 /* read some data from the error fd and write it to the write log code */
86 static void read_error_fd(void)
93 /* io_error_fd is temporarily disabled -- is this meant to
94 * prevent indefinite recursion? */
97 read_loop(fd, buf, 4);
100 len = tag & 0xFFFFFF;
106 if (n > (sizeof(buf)-1)) n = sizeof(buf)-1;
107 read_loop(fd, buf, n);
108 rwrite((enum logcode)tag, buf, n);
116 static void whine_about_eof (void)
119 It's almost always an error to get an EOF when we're trying
120 to read from the network, because the protocol is
123 However, there is one unfortunate cases where it is not,
124 which is rsync <2.4.6 sending a list of modules on a
125 server, since the list is terminated by closing the socket.
126 So, for the section of the program where that is a problem
127 (start_socket_client), kludge_around_eof is True and we
131 if (kludge_around_eof)
135 "%s: connection unexpectedly closed "
136 "(%.0f bytes read so far)\n",
137 RSYNC_NAME, (double)stats.total_read);
139 exit_cleanup (RERR_STREAMIO);
144 static void die_from_readerr (int err)
146 /* this prevents us trying to write errors on a dead socket */
147 io_multiplexing_close();
149 rprintf(FERROR, "%s: read error: %s\n",
150 RSYNC_NAME, strerror (err));
151 exit_cleanup(RERR_STREAMIO);
156 * Read from a socket with IO timeout. return the number of bytes
157 * read. If no bytes can be read then exit, never return a number <= 0.
159 * TODO: If the remote shell connection fails, then current versions
160 * actually report an "unexpected EOF" error here. Since it's a
161 * fairly common mistake to try to use rsh when ssh is required, we
162 * should trap that: if we fail to read any data at all, we should
163 * give a better explanation. We can tell whether the connection has
164 * started by looking e.g. at whether the remote version is known yet.
166 static int read_timeout (int fd, char *buf, size_t len)
173 /* until we manage to read *something* */
181 if (io_error_fd != -1) {
182 FD_SET(io_error_fd, &fds);
183 if (io_error_fd > fd) fd_count = io_error_fd+1;
186 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
191 count = select(fd_count, &fds, NULL, NULL, &tv);
198 if (errno == EBADF) {
199 exit_cleanup(RERR_SOCKETIO);
204 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &fds)) {
208 if (!FD_ISSET(fd, &fds)) continue;
210 n = read(fd, buf, len);
217 last_io = time(NULL);
221 return -1; /* doesn't return */
222 } else if (n == -1) {
223 if (errno == EINTR || errno == EWOULDBLOCK ||
227 die_from_readerr (errno);
237 /*! Continue trying to read len bytes - don't return until len has
239 static void read_loop (int fd, char *buf, size_t len)
242 int n = read_timeout(fd, buf, len);
251 * Read from the file descriptor handling multiplexing - return number
254 * Never returns <= 0.
256 static int read_unbuffered(int fd, char *buf, size_t len)
258 static int remaining;
262 if (!io_multiplexing_in || fd != multiplex_in_fd)
263 return read_timeout(fd, buf, len);
267 len = MIN(len, remaining);
268 read_loop(fd, buf, len);
274 read_loop(fd, line, 4);
277 remaining = tag & 0xFFFFFF;
280 if (tag == MPLEX_BASE)
285 if (tag != FERROR && tag != FINFO) {
286 rprintf(FERROR, "unexpected tag %d\n", tag);
287 exit_cleanup(RERR_STREAMIO);
290 if (remaining > sizeof(line) - 1) {
291 rprintf(FERROR, "multiplexing overflow %d\n\n",
293 exit_cleanup(RERR_STREAMIO);
296 read_loop(fd, line, remaining);
299 rprintf((enum logcode) tag, "%s", line);
308 /* do a buffered read from fd. don't return until all N bytes
309 have been read. If all N can't be read then exit with an error */
310 static void readfd (int fd, char *buffer, size_t N)
318 ret = read_unbuffered (fd, buffer + total, N-total);
322 stats.total_read += total;
326 int32 read_int(int f)
333 if (ret == (int32)0xffffffff) return -1;
337 int64 read_longint(int f)
339 extern int remote_version;
344 if ((int32)ret != (int32)0xffffffff) {
349 rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
350 exit_cleanup(RERR_UNSUPPORTED);
352 if (remote_version >= 16) {
354 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
361 void read_buf(int f,char *buf,size_t len)
366 void read_sbuf(int f,char *buf,size_t len)
368 read_buf (f,buf,len);
372 unsigned char read_byte(int f)
375 read_buf (f, (char *)&c, 1);
379 /* write len bytes to fd */
380 static void writefd_unbuffered(int fd,char *buf,size_t len)
391 while (total < len) {
397 if (io_error_fd != -1) {
398 FD_SET(io_error_fd,&r_fds);
399 if (io_error_fd > fd_count)
400 fd_count = io_error_fd;
403 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
408 count = select(fd_count+1,
409 io_error_fd != -1?&r_fds:NULL,
418 if (errno == EBADF) {
419 exit_cleanup(RERR_SOCKETIO);
424 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &r_fds)) {
428 if (FD_ISSET(fd, &w_fds)) {
429 int ret, n = len-total;
430 ret = write(fd,buf+total,n);
432 if (ret == -1 && errno == EINTR) {
437 (errno == EWOULDBLOCK || errno == EAGAIN)) {
444 "error writing %d unbuffered bytes"
445 " - exiting: %s\n", len,
447 exit_cleanup(RERR_STREAMIO);
450 /* Sleep after writing to limit I/O bandwidth */
454 tv.tv_usec = ret * 1000 / bwlimit;
455 while (tv.tv_usec > 1000000)
458 tv.tv_usec -= 1000000;
460 select(0, NULL, NULL, NULL, &tv);
466 last_io = time(NULL);
474 static char *io_buffer;
475 static int io_buffer_count;
477 void io_start_buffering(int fd)
479 if (io_buffer) return;
480 multiplex_out_fd = fd;
481 io_buffer = (char *)malloc(IO_BUFFER_SIZE);
482 if (!io_buffer) out_of_memory("writefd");
486 /* write an message to a multiplexed stream. If this fails then rsync
488 static void mplex_write(int fd, enum logcode code, char *buf, size_t len)
493 SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
495 if (n > (sizeof(buffer)-4)) {
496 n = sizeof(buffer)-4;
499 memcpy(&buffer[4], buf, n);
500 writefd_unbuffered(fd, buffer, n+4);
506 writefd_unbuffered(fd, buf, len);
513 int fd = multiplex_out_fd;
517 if (!io_buffer_count || no_flush) return;
519 if (io_multiplexing_out) {
520 mplex_write(fd, FNONE, io_buffer, io_buffer_count);
522 writefd_unbuffered(fd, io_buffer, io_buffer_count);
528 /* XXX: fd is ignored, which seems a little strange. */
529 void io_end_buffering(int fd)
532 if (!io_multiplexing_out) {
538 static void writefd(int fd,char *buf,size_t len)
540 stats.total_written += len;
544 if (!io_buffer || fd != multiplex_out_fd) {
545 writefd_unbuffered(fd, buf, len);
550 int n = MIN(len, IO_BUFFER_SIZE-io_buffer_count);
552 memcpy(io_buffer+io_buffer_count, buf, n);
555 io_buffer_count += n;
558 if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
563 void write_int(int f,int32 x)
572 * Note: int64 may actually be a 32-bit type if ./configure couldn't find any
573 * 64-bit types on this platform.
575 void write_longint(int f, int64 x)
577 extern int remote_version;
580 if (remote_version < 16 || x <= 0x7FFFFFFF) {
581 write_int(f, (int)x);
585 write_int(f, (int32)0xFFFFFFFF);
586 SIVAL(b,0,(x&0xFFFFFFFF));
587 SIVAL(b,4,((x>>32)&0xFFFFFFFF));
592 void write_buf(int f,char *buf,size_t len)
597 /* write a string to the connection */
598 static void write_sbuf(int f,char *buf)
600 write_buf(f, buf, strlen(buf));
604 void write_byte(int f,unsigned char c)
606 write_buf(f,(char *)&c,1);
611 int read_line(int f, char *buf, size_t maxlen)
616 if (buf[0] == 0) return 0;
617 if (buf[0] == '\n') {
621 if (buf[0] != '\r') {
635 void io_printf(int fd, const char *format, ...)
641 va_start(ap, format);
642 len = vsnprintf(buf, sizeof(buf), format, ap);
645 if (len < 0) exit_cleanup(RERR_STREAMIO);
651 /* setup for multiplexing an error stream with the data stream */
652 void io_start_multiplex_out(int fd)
654 multiplex_out_fd = fd;
656 io_start_buffering(fd);
657 io_multiplexing_out = 1;
660 /* setup for multiplexing an error stream with the data stream */
661 void io_start_multiplex_in(int fd)
663 multiplex_in_fd = fd;
665 io_multiplexing_in = 1;
668 /* write an message to the multiplexed error stream */
669 int io_multiplex_write(enum logcode code, char *buf, size_t len)
671 if (!io_multiplexing_out) return 0;
674 stats.total_written += (len+4);
675 mplex_write(multiplex_out_fd, code, buf, len);
679 /* stop output multiplexing */
680 void io_multiplexing_close(void)
682 io_multiplexing_out = 0;