2 Copyright (C) Andrew Tridgell 1996
3 Copyright (C) Paul Mackerras 1996
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21 Utilities used in rsync
27 static int64 total_written;
28 static int64 total_read;
30 static int io_multiplexing_out;
31 static int io_multiplexing_in;
32 static time_t last_io;
35 extern int sparse_files;
36 extern int io_timeout;
38 int64 write_total(void)
43 int64 read_total(void)
48 static int buffer_f_in = -1;
50 void setup_nonblocking(int f_in,int f_out)
52 set_blocking(f_out,0);
56 static void check_timeout(void)
60 if (!io_timeout) return;
69 if (last_io && io_timeout && (t-last_io)>io_timeout) {
70 rprintf(FERROR,"read timeout after %d second - exiting\n",
77 static char *read_buffer;
78 static char *read_buffer_p;
79 static int read_buffer_len;
80 static int read_buffer_size;
83 /* continue trying to read len bytes - don't return until len
85 static void read_loop(int fd, char *buf, int len)
88 int n = read(fd, buf, len);
94 rprintf(FERROR,"EOF in read_loop\n");
101 if (errno != EAGAIN && errno != EWOULDBLOCK) {
102 rprintf(FERROR,"io error: %s\n",
109 tv.tv_sec = io_timeout;
112 if (select(fd+1, &fds, NULL, NULL,
113 io_timeout?&tv:NULL) != 1) {
120 static int read_unbuffered(int fd, char *buf, int len)
122 static int remaining;
127 if (!io_multiplexing_in) return read(fd, buf, len);
131 len = MIN(len, remaining);
132 read_loop(fd, buf, len);
138 read_loop(fd, ibuf, 4);
140 remaining = tag & 0xFFFFFF;
143 if (tag == MPLEX_BASE) continue;
147 if (tag != FERROR && tag != FINFO) {
148 rprintf(FERROR,"unexpected tag %d\n", tag);
152 if (remaining > sizeof(line)-1) {
153 rprintf(FERROR,"multiplexing overflow %d\n\n",
158 read_loop(fd, line, remaining);
161 rprintf(tag,"%s", line);
169 /* This function was added to overcome a deadlock problem when using
170 * ssh. It looks like we can't allow our receive queue to get full or
171 * ssh will clag up. Uggh. */
172 static void read_check(int f)
178 if (read_buffer_len == 0) {
179 read_buffer_p = read_buffer;
182 if ((n=num_waiting(f)) <= 0)
185 /* things could deteriorate if we read in really small chunks */
186 if (n < 10) n = 1024;
188 if (n > MAX_READ_BUFFER/4)
189 n = MAX_READ_BUFFER/4;
191 if (read_buffer_p != read_buffer) {
192 memmove(read_buffer,read_buffer_p,read_buffer_len);
193 read_buffer_p = read_buffer;
196 if (n > (read_buffer_size - read_buffer_len)) {
197 read_buffer_size += n;
199 read_buffer = (char *)malloc(read_buffer_size);
201 read_buffer = (char *)realloc(read_buffer,read_buffer_size);
202 if (!read_buffer) out_of_memory("read check");
203 read_buffer_p = read_buffer;
206 n = read_unbuffered(f,read_buffer+read_buffer_len,n);
208 read_buffer_len += n;
212 static int readfd(int fd,char *buffer,int N)
218 if (read_buffer_len < N)
219 read_check(buffer_f_in);
222 if (read_buffer_len > 0 && buffer_f_in == fd) {
223 ret = MIN(read_buffer_len,N-total);
224 memcpy(buffer+total,read_buffer_p,ret);
225 read_buffer_p += ret;
226 read_buffer_len -= ret;
233 while ((ret = read_unbuffered(fd,buffer + total,N-total)) == -1) {
236 if (errno != EAGAIN && errno != EWOULDBLOCK)
240 tv.tv_sec = io_timeout;
243 if (select(fd+1, &fds, NULL, NULL,
244 io_timeout?&tv:NULL) != 1) {
255 last_io = time(NULL);
260 int32 read_int(int f)
264 if ((ret=readfd(f,b,4)) != 4) {
266 rprintf(FERROR,"(%d) read_int: Error reading %d bytes : %s\n",
267 getpid(),4,ret==-1?strerror(errno):"EOF");
274 int64 read_longint(int f)
276 extern int remote_version;
281 if ((int32)ret != (int32)0xffffffff) return ret;
284 rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
287 if (remote_version >= 16) {
288 if ((ret=readfd(f,b,8)) != 8) {
290 rprintf(FERROR,"(%d) read_longint: Error reading %d bytes : %s\n",
291 getpid(),8,ret==-1?strerror(errno):"EOF");
295 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
302 void read_buf(int f,char *buf,int len)
305 if ((ret=readfd(f,buf,len)) != len) {
307 rprintf(FERROR,"(%d) read_buf: Error reading %d bytes : %s\n",
308 getpid(),len,ret==-1?strerror(errno):"EOF");
314 void read_sbuf(int f,char *buf,int len)
320 unsigned char read_byte(int f)
323 read_buf(f,(char *)&c,1);
328 static char last_byte;
329 static int last_sparse;
331 int sparse_end(int f)
334 do_lseek(f,-1,SEEK_CUR);
335 return (write(f,&last_byte,1) == 1 ? 0 : -1);
342 static int write_sparse(int f,char *buf,int len)
347 for (l1=0;l1<len && buf[l1]==0;l1++) ;
348 for (l2=0;l2<(len-l1) && buf[len-(l2+1)]==0;l2++) ;
350 last_byte = buf[len-1];
352 if (l1 == len || l2 > 0)
356 do_lseek(f,l1,SEEK_CUR);
361 if ((ret=write(f,buf+l1,len-(l1+l2))) != len-(l1+l2)) {
362 if (ret == -1 || ret == 0) return ret;
367 do_lseek(f,l2,SEEK_CUR);
374 int write_file(int f,char *buf,int len)
379 return write(f,buf,len);
382 int len1 = MIN(len, SPARSE_WRITE_SIZE);
383 int r1 = write_sparse(f, buf, len1);
385 if (ret > 0) return ret;
396 static int writefd_unbuffered(int fd,char *buf,int len)
400 int fd_count, count, got_select=0;
403 while (total < len) {
404 int ret = write(fd,buf+total,len-total);
406 if (ret == 0) return total;
408 if (ret == -1 && !(errno == EWOULDBLOCK || errno == EAGAIN))
411 if (ret == -1 && got_select) {
412 /* hmmm, we got a write select on the fd and
413 then failed to write. Why doesn't that
414 mean that the fd is dead? It doesn't on
415 some systems it seems (eg. IRIX) */
427 if (read_buffer_len < MAX_READ_BUFFER && buffer_f_in != -1)
428 read_check(buffer_f_in);
434 if (buffer_f_in != -1) {
435 FD_SET(buffer_f_in,&r_fds);
436 if (buffer_f_in > fd)
437 fd_count = buffer_f_in+1;
440 tv.tv_sec = BLOCKING_TIMEOUT;
442 count = select(fd_count,buffer_f_in == -1? NULL: &r_fds,
445 if (count == -1 && errno != EINTR) {
447 rprintf(FERROR,"select error: %s\n", strerror(errno));
456 if (FD_ISSET(fd, &w_fds)) {
462 last_io = time(NULL);
468 static char *io_buffer;
469 static int io_buffer_count;
470 static int io_out_fd;
472 void io_start_buffering(int fd)
474 if (io_buffer) return;
476 io_buffer = (char *)malloc(IO_BUFFER_SIZE+4);
477 if (!io_buffer) out_of_memory("writefd");
480 /* leave room for the multiplex header in case it's needed */
487 if (!io_buffer_count) return;
489 if (io_multiplexing_out) {
490 SIVAL(io_buffer-4, 0, (MPLEX_BASE<<24) + io_buffer_count);
491 if (writefd_unbuffered(fd, io_buffer-4, io_buffer_count+4) !=
493 rprintf(FERROR,"write failed\n");
497 if (writefd_unbuffered(fd, io_buffer, io_buffer_count) !=
499 rprintf(FERROR,"write failed\n");
506 void io_end_buffering(int fd)
509 if (!io_multiplexing_out) {
515 static int writefd(int fd,char *buf,int len1)
519 if (!io_buffer) return writefd_unbuffered(fd, buf, len);
522 int n = MIN(len, IO_BUFFER_SIZE-io_buffer_count);
524 memcpy(io_buffer+io_buffer_count, buf, n);
527 io_buffer_count += n;
530 if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
537 void write_int(int f,int32 x)
542 if ((ret=writefd(f,b,4)) != 4) {
543 rprintf(FERROR,"write_int failed : %s\n",
544 ret==-1?strerror(errno):"EOF");
550 void write_longint(int f, int64 x)
552 extern int remote_version;
556 if (remote_version < 16 || x <= 0x7FFFFFFF) {
557 write_int(f, (int)x);
562 SIVAL(b,0,(x&0xFFFFFFFF));
563 SIVAL(b,4,((x>>32)&0xFFFFFFFF));
565 if ((ret=writefd(f,b,8)) != 8) {
566 rprintf(FERROR,"write_longint failed : %s\n",
567 ret==-1?strerror(errno):"EOF");
573 void write_buf(int f,char *buf,int len)
576 if ((ret=writefd(f,buf,len)) != len) {
577 rprintf(FERROR,"write_buf failed : %s\n",
578 ret==-1?strerror(errno):"EOF");
581 total_written += len;
584 /* write a string to the connection */
585 void write_sbuf(int f,char *buf)
587 write_buf(f, buf, strlen(buf));
591 void write_byte(int f,unsigned char c)
593 write_buf(f,(char *)&c,1);
596 void write_flush(int f)
601 int read_line(int f, char *buf, int maxlen)
605 if (buf[0] == '\n') {
609 if (buf[0] != '\r') {
622 void io_printf(int fd, const char *format, ...)
628 va_start(ap, format);
629 len = vslprintf(buf, sizeof(buf)-1, format, ap);
632 if (len < 0) exit_cleanup(1);
638 /* setup for multiplexing an error stream with the data stream */
639 void io_start_multiplex_out(int fd)
641 io_start_buffering(fd);
642 io_multiplexing_out = 1;
645 /* setup for multiplexing an error stream with the data stream */
646 void io_start_multiplex_in(int fd)
648 if (read_buffer_len) {
649 fprintf(stderr,"ERROR: data in read buffer at mplx start\n");
653 io_multiplexing_in = 1;
656 /* write an message to the error stream */
657 int io_multiplex_write(int f, char *buf, int len)
659 if (!io_multiplexing_out) return 0;
663 SIVAL(io_buffer-4, 0, ((MPLEX_BASE + f)<<24) + len);
664 memcpy(io_buffer, buf, len);
666 writefd_unbuffered(io_out_fd, io_buffer-4, len+4);
670 void io_close_input(int fd)