2 Copyright (C) Andrew Tridgell 1996
3 Copyright (C) Paul Mackerras 1996
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21 socket and pipe IO utilities used in rsync
27 /* if no timeout is specified then use a 60 second select timeout */
28 #define SELECT_TIMEOUT 60
32 static int io_multiplexing_out;
33 static int io_multiplexing_in;
34 static int multiplex_in_fd;
35 static int multiplex_out_fd;
36 static time_t last_io;
37 static int eof_error=1;
39 extern int io_timeout;
40 extern struct stats stats;
42 static int buffer_f_in = -1;
43 static int io_error_fd = -1;
45 static void read_loop(int fd, char *buf, int len);
47 void setup_readbuffer(int f_in)
52 static void check_timeout(void)
54 extern int am_server, am_daemon;
57 if (!io_timeout) return;
66 if (last_io && io_timeout && (t-last_io) >= io_timeout) {
67 if (!am_server && !am_daemon) {
68 rprintf(FERROR,"io timeout after %d second - exiting\n",
71 exit_cleanup(RERR_TIMEOUT);
75 /* setup the fd used to propogate errors */
76 void io_set_error_fd(int fd)
81 /* read some data from the error fd and write it to the write log code */
82 static void read_error_fd(void)
91 read_loop(fd, buf, 4);
100 if (n > (sizeof(buf)-1)) n = sizeof(buf)-1;
101 read_loop(fd, buf, n);
102 rwrite((enum logcode)tag, buf, n);
112 /* read from a socket with IO timeout. return the number of
113 bytes read. If no bytes can be read then exit, never return
115 static int read_timeout(int fd, char *buf, int len)
128 if (io_error_fd != -1) {
129 FD_SET(io_error_fd, &fds);
130 if (io_error_fd > fd) fd_count = io_error_fd+1;
133 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
138 if (select(fd_count, &fds, NULL, NULL, &tv) < 1) {
139 if (errno == EBADF) {
140 exit_cleanup(RERR_SOCKETIO);
146 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &fds)) {
150 if (!FD_ISSET(fd, &fds)) continue;
152 n = read(fd, buf, len);
159 last_io = time(NULL);
163 if (n == -1 && errno == EINTR) {
168 (errno == EWOULDBLOCK || errno == EAGAIN)) {
175 rprintf(FERROR,"unexpected EOF in read_timeout\n");
177 exit_cleanup(RERR_STREAMIO);
180 /* this prevents us trying to write errors on a dead socket */
181 io_multiplexing_close();
183 rprintf(FERROR,"read error: %s\n", strerror(errno));
184 exit_cleanup(RERR_STREAMIO);
190 /* continue trying to read len bytes - don't return until len
192 static void read_loop(int fd, char *buf, int len)
195 int n = read_timeout(fd, buf, len);
202 /* read from the file descriptor handling multiplexing -
203 return number of bytes read
205 static int read_unbuffered(int fd, char *buf, int len)
207 static int remaining;
211 if (!io_multiplexing_in || fd != multiplex_in_fd)
212 return read_timeout(fd, buf, len);
216 len = MIN(len, remaining);
217 read_loop(fd, buf, len);
223 read_loop(fd, line, 4);
226 remaining = tag & 0xFFFFFF;
229 if (tag == MPLEX_BASE) continue;
233 if (tag != FERROR && tag != FINFO) {
234 rprintf(FERROR,"unexpected tag %d\n", tag);
235 exit_cleanup(RERR_STREAMIO);
238 if (remaining > sizeof(line)-1) {
239 rprintf(FERROR,"multiplexing overflow %d\n\n",
241 exit_cleanup(RERR_STREAMIO);
244 read_loop(fd, line, remaining);
247 rprintf((enum logcode)tag,"%s", line);
255 /* do a buffered read from fd. don't return until all N bytes
256 have been read. If all N can't be read then exit with an error */
257 static void readfd(int fd,char *buffer,int N)
265 ret = read_unbuffered(fd,buffer + total,N-total);
269 stats.total_read += total;
273 int32 read_int(int f)
280 if (ret == (int32)0xffffffff) return -1;
284 int64 read_longint(int f)
286 extern int remote_version;
291 if ((int32)ret != (int32)0xffffffff) {
296 rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
297 exit_cleanup(RERR_UNSUPPORTED);
299 if (remote_version >= 16) {
301 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
308 void read_buf(int f,char *buf,int len)
313 void read_sbuf(int f,char *buf,int len)
319 unsigned char read_byte(int f)
322 read_buf(f,(char *)&c,1);
328 /* write len bytes to fd, possibly reading from buffer_f_in if set
329 in order to unclog the pipe. don't return until all len
330 bytes have been written */
331 static void writefd_unbuffered(int fd,char *buf,int len)
340 while (total < len) {
346 if (io_error_fd != -1) {
347 FD_SET(io_error_fd,&r_fds);
348 if (io_error_fd > fd_count)
349 fd_count = io_error_fd;
352 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
357 count = select(fd_count+1,
358 io_error_fd != -1?&r_fds:NULL,
363 if (errno == EBADF) {
364 exit_cleanup(RERR_SOCKETIO);
370 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &r_fds)) {
374 if (FD_ISSET(fd, &w_fds)) {
375 int ret, n = len-total;
377 ret = write(fd,buf+total,n);
379 if (ret == -1 && errno == EINTR) {
384 (errno == EWOULDBLOCK || errno == EAGAIN)) {
389 rprintf(FERROR,"erroring writing %d bytes - exiting\n", len);
390 exit_cleanup(RERR_STREAMIO);
393 /* Sleep after writing to limit I/O bandwidth */
397 tv.tv_usec = ret * 1000 / bwlimit;
398 while (tv.tv_usec > 1000000)
401 tv.tv_usec -= 1000000;
403 select(0, NULL, NULL, NULL, &tv);
409 last_io = time(NULL);
417 static char *io_buffer;
418 static int io_buffer_count;
420 void io_start_buffering(int fd)
422 if (io_buffer) return;
423 multiplex_out_fd = fd;
424 io_buffer = (char *)malloc(IO_BUFFER_SIZE);
425 if (!io_buffer) out_of_memory("writefd");
429 /* write an message to a multiplexed stream. If this fails then rsync
431 static void mplex_write(int fd, enum logcode code, char *buf, int len)
436 SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
438 if (n > (sizeof(buffer)-4)) {
439 n = sizeof(buffer)-4;
442 memcpy(&buffer[4], buf, n);
443 writefd_unbuffered(fd, buffer, n+4);
449 writefd_unbuffered(fd, buf, len);
456 int fd = multiplex_out_fd;
457 if (!io_buffer_count || no_flush) return;
459 if (io_multiplexing_out) {
460 mplex_write(fd, FNONE, io_buffer, io_buffer_count);
462 writefd_unbuffered(fd, io_buffer, io_buffer_count);
467 void io_end_buffering(int fd)
470 if (!io_multiplexing_out) {
476 static void writefd(int fd,char *buf,int len)
478 stats.total_written += len;
480 if (!io_buffer || fd != multiplex_out_fd) {
481 writefd_unbuffered(fd, buf, len);
486 int n = MIN(len, IO_BUFFER_SIZE-io_buffer_count);
488 memcpy(io_buffer+io_buffer_count, buf, n);
491 io_buffer_count += n;
494 if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
499 void write_int(int f,int32 x)
506 void write_longint(int f, int64 x)
508 extern int remote_version;
511 if (remote_version < 16 || x <= 0x7FFFFFFF) {
512 write_int(f, (int)x);
516 write_int(f, (int32)0xFFFFFFFF);
517 SIVAL(b,0,(x&0xFFFFFFFF));
518 SIVAL(b,4,((x>>32)&0xFFFFFFFF));
523 void write_buf(int f,char *buf,int len)
528 /* write a string to the connection */
529 static void write_sbuf(int f,char *buf)
531 write_buf(f, buf, strlen(buf));
535 void write_byte(int f,unsigned char c)
537 write_buf(f,(char *)&c,1);
540 int read_line(int f, char *buf, int maxlen)
547 if (buf[0] == 0) return 0;
548 if (buf[0] == '\n') {
552 if (buf[0] != '\r') {
568 void io_printf(int fd, const char *format, ...)
574 va_start(ap, format);
575 len = vslprintf(buf, sizeof(buf), format, ap);
578 if (len < 0) exit_cleanup(RERR_STREAMIO);
584 /* setup for multiplexing an error stream with the data stream */
585 void io_start_multiplex_out(int fd)
587 multiplex_out_fd = fd;
589 io_start_buffering(fd);
590 io_multiplexing_out = 1;
593 /* setup for multiplexing an error stream with the data stream */
594 void io_start_multiplex_in(int fd)
596 multiplex_in_fd = fd;
598 io_multiplexing_in = 1;
601 /* write an message to the multiplexed error stream */
602 int io_multiplex_write(enum logcode code, char *buf, int len)
604 if (!io_multiplexing_out) return 0;
607 stats.total_written += (len+4);
608 mplex_write(multiplex_out_fd, code, buf, len);
612 /* write a message to the special error fd */
613 int io_error_write(int f, enum logcode code, char *buf, int len)
615 if (f == -1) return 0;
616 mplex_write(f, code, buf, len);
620 /* stop output multiplexing */
621 void io_multiplexing_close(void)
623 io_multiplexing_out = 0;
626 void io_close_input(int fd)