2 Copyright (C) Andrew Tridgell 1996
3 Copyright (C) Paul Mackerras 1996
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21 Utilities used in rsync
27 static int64 total_written;
28 static int64 total_read;
30 static int io_multiplexing_out;
31 static int io_multiplexing_in;
32 static int multiplex_in_fd;
33 static int multiplex_out_fd;
34 static time_t last_io;
37 extern int sparse_files;
38 extern int io_timeout;
40 int64 write_total(void)
45 int64 read_total(void)
50 static int buffer_f_in = -1;
52 void setup_nonblocking(int f_in,int f_out)
54 set_blocking(f_out,0);
58 static void check_timeout(void)
62 if (!io_timeout) return;
71 if (last_io && io_timeout && (t-last_io)>io_timeout) {
72 rprintf(FERROR,"read timeout after %d second - exiting\n",
79 static char *read_buffer;
80 static char *read_buffer_p;
81 static int read_buffer_len;
82 static int read_buffer_size;
85 /* continue trying to read len bytes - don't return until len
87 static void read_loop(int fd, char *buf, int len)
90 int n = read(fd, buf, len);
96 rprintf(FERROR,"EOF in read_loop\n");
103 if (errno != EAGAIN && errno != EWOULDBLOCK) {
104 rprintf(FERROR,"io error: %s\n",
111 tv.tv_sec = io_timeout;
114 if (select(fd+1, &fds, NULL, NULL,
115 io_timeout?&tv:NULL) != 1) {
122 static int read_unbuffered(int fd, char *buf, int len)
124 static int remaining;
129 if (!io_multiplexing_in || fd != multiplex_in_fd)
130 return read(fd, buf, len);
134 len = MIN(len, remaining);
135 read_loop(fd, buf, len);
141 read_loop(fd, ibuf, 4);
144 remaining = tag & 0xFFFFFF;
147 if (tag == MPLEX_BASE) continue;
151 if (tag != FERROR && tag != FINFO) {
152 rprintf(FERROR,"unexpected tag %d\n", tag);
156 if (remaining > sizeof(line)-1) {
157 rprintf(FERROR,"multiplexing overflow %d\n\n",
162 read_loop(fd, line, remaining);
165 rprintf(tag,"%s", line);
173 /* This function was added to overcome a deadlock problem when using
174 * ssh. It looks like we can't allow our receive queue to get full or
175 * ssh will clag up. Uggh. */
176 static void read_check(int f)
182 if (read_buffer_len == 0) {
183 read_buffer_p = read_buffer;
186 if ((n=num_waiting(f)) <= 0)
189 /* things could deteriorate if we read in really small chunks */
190 if (n < 10) n = 1024;
192 if (n > MAX_READ_BUFFER/4)
193 n = MAX_READ_BUFFER/4;
195 if (read_buffer_p != read_buffer) {
196 memmove(read_buffer,read_buffer_p,read_buffer_len);
197 read_buffer_p = read_buffer;
200 if (n > (read_buffer_size - read_buffer_len)) {
201 read_buffer_size += n;
203 read_buffer = (char *)malloc(read_buffer_size);
205 read_buffer = (char *)realloc(read_buffer,read_buffer_size);
206 if (!read_buffer) out_of_memory("read check");
207 read_buffer_p = read_buffer;
210 n = read_unbuffered(f,read_buffer+read_buffer_len,n);
212 read_buffer_len += n;
216 static int readfd(int fd,char *buffer,int N)
222 if (read_buffer_len < N)
223 read_check(buffer_f_in);
226 if (read_buffer_len > 0 && buffer_f_in == fd) {
227 ret = MIN(read_buffer_len,N-total);
228 memcpy(buffer+total,read_buffer_p,ret);
229 read_buffer_p += ret;
230 read_buffer_len -= ret;
237 while ((ret = read_unbuffered(fd,buffer + total,N-total)) == -1) {
240 if (errno != EAGAIN && errno != EWOULDBLOCK)
244 tv.tv_sec = io_timeout;
247 if (select(fd+1, &fds, NULL, NULL,
248 io_timeout?&tv:NULL) != 1) {
259 last_io = time(NULL);
264 int32 read_int(int f)
268 if ((ret=readfd(f,b,4)) != 4) {
270 rprintf(FERROR,"(%d) read_int: Error reading %d bytes : %s\n",
271 getpid(),4,ret==-1?strerror(errno):"EOF");
278 int64 read_longint(int f)
280 extern int remote_version;
285 if ((int32)ret != (int32)0xffffffff) return ret;
288 rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
291 if (remote_version >= 16) {
292 if ((ret=readfd(f,b,8)) != 8) {
294 rprintf(FERROR,"(%d) read_longint: Error reading %d bytes : %s\n",
295 getpid(),8,ret==-1?strerror(errno):"EOF");
299 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
306 void read_buf(int f,char *buf,int len)
309 if ((ret=readfd(f,buf,len)) != len) {
311 rprintf(FERROR,"(%d) read_buf: Error reading %d bytes : %s\n",
312 getpid(),len,ret==-1?strerror(errno):"EOF");
318 void read_sbuf(int f,char *buf,int len)
324 unsigned char read_byte(int f)
327 read_buf(f,(char *)&c,1);
332 static char last_byte;
333 static int last_sparse;
335 int sparse_end(int f)
338 do_lseek(f,-1,SEEK_CUR);
339 return (write(f,&last_byte,1) == 1 ? 0 : -1);
346 static int write_sparse(int f,char *buf,int len)
351 for (l1=0;l1<len && buf[l1]==0;l1++) ;
352 for (l2=0;l2<(len-l1) && buf[len-(l2+1)]==0;l2++) ;
354 last_byte = buf[len-1];
356 if (l1 == len || l2 > 0)
360 do_lseek(f,l1,SEEK_CUR);
365 if ((ret=write(f,buf+l1,len-(l1+l2))) != len-(l1+l2)) {
366 if (ret == -1 || ret == 0) return ret;
371 do_lseek(f,l2,SEEK_CUR);
378 int write_file(int f,char *buf,int len)
383 return write(f,buf,len);
386 int len1 = MIN(len, SPARSE_WRITE_SIZE);
387 int r1 = write_sparse(f, buf, len1);
389 if (ret > 0) return ret;
400 static int writefd_unbuffered(int fd,char *buf,int len)
404 int fd_count, count, got_select=0;
407 while (total < len) {
408 int ret = write(fd,buf+total,len-total);
410 if (ret == 0) return total;
412 if (ret == -1 && !(errno == EWOULDBLOCK || errno == EAGAIN))
415 if (ret == -1 && got_select) {
416 /* hmmm, we got a write select on the fd and
417 then failed to write. Why doesn't that
418 mean that the fd is dead? It doesn't on
419 some systems it seems (eg. IRIX) */
431 if (read_buffer_len < MAX_READ_BUFFER && buffer_f_in != -1)
432 read_check(buffer_f_in);
438 if (buffer_f_in != -1) {
439 FD_SET(buffer_f_in,&r_fds);
440 if (buffer_f_in > fd)
441 fd_count = buffer_f_in+1;
444 tv.tv_sec = BLOCKING_TIMEOUT;
446 count = select(fd_count,buffer_f_in == -1? NULL: &r_fds,
449 if (count == -1 && errno != EINTR) {
451 rprintf(FERROR,"select error: %s\n", strerror(errno));
460 if (FD_ISSET(fd, &w_fds)) {
466 last_io = time(NULL);
472 static char *io_buffer;
473 static int io_buffer_count;
475 void io_start_buffering(int fd)
477 if (io_buffer) return;
478 multiplex_out_fd = fd;
479 io_buffer = (char *)malloc(IO_BUFFER_SIZE+4);
480 if (!io_buffer) out_of_memory("writefd");
483 /* leave room for the multiplex header in case it's needed */
489 int fd = multiplex_out_fd;
490 if (!io_buffer_count) return;
492 if (io_multiplexing_out) {
493 SIVAL(io_buffer-4, 0, (MPLEX_BASE<<24) + io_buffer_count);
494 if (writefd_unbuffered(fd, io_buffer-4, io_buffer_count+4) !=
496 rprintf(FERROR,"write failed\n");
500 if (writefd_unbuffered(fd, io_buffer, io_buffer_count) !=
502 rprintf(FERROR,"write failed\n");
509 void io_end_buffering(int fd)
512 if (!io_multiplexing_out) {
518 static int writefd(int fd,char *buf,int len1)
522 if (!io_buffer) return writefd_unbuffered(fd, buf, len);
525 int n = MIN(len, IO_BUFFER_SIZE-io_buffer_count);
527 memcpy(io_buffer+io_buffer_count, buf, n);
530 io_buffer_count += n;
533 if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
540 void write_int(int f,int32 x)
545 if ((ret=writefd(f,b,4)) != 4) {
546 rprintf(FERROR,"write_int failed : %s\n",
547 ret==-1?strerror(errno):"EOF");
553 void write_longint(int f, int64 x)
555 extern int remote_version;
559 if (remote_version < 16 || x <= 0x7FFFFFFF) {
560 write_int(f, (int)x);
565 SIVAL(b,0,(x&0xFFFFFFFF));
566 SIVAL(b,4,((x>>32)&0xFFFFFFFF));
568 if ((ret=writefd(f,b,8)) != 8) {
569 rprintf(FERROR,"write_longint failed : %s\n",
570 ret==-1?strerror(errno):"EOF");
576 void write_buf(int f,char *buf,int len)
579 if ((ret=writefd(f,buf,len)) != len) {
580 rprintf(FERROR,"write_buf failed : %s\n",
581 ret==-1?strerror(errno):"EOF");
584 total_written += len;
587 /* write a string to the connection */
588 void write_sbuf(int f,char *buf)
590 write_buf(f, buf, strlen(buf));
594 void write_byte(int f,unsigned char c)
596 write_buf(f,(char *)&c,1);
599 void write_flush(int f)
604 int read_line(int f, char *buf, int maxlen)
608 if (buf[0] == '\n') {
612 if (buf[0] != '\r') {
625 void io_printf(int fd, const char *format, ...)
631 va_start(ap, format);
632 len = vslprintf(buf, sizeof(buf)-1, format, ap);
635 if (len < 0) exit_cleanup(1);
641 /* setup for multiplexing an error stream with the data stream */
642 void io_start_multiplex_out(int fd)
644 multiplex_out_fd = fd;
646 io_start_buffering(fd);
647 io_multiplexing_out = 1;
650 /* setup for multiplexing an error stream with the data stream */
651 void io_start_multiplex_in(int fd)
653 multiplex_in_fd = fd;
655 if (read_buffer_len) {
656 fprintf(stderr,"ERROR: data in read buffer at mplx start\n");
660 io_multiplexing_in = 1;
663 /* write an message to the error stream */
664 int io_multiplex_write(int f, char *buf, int len)
666 if (!io_multiplexing_out) return 0;
670 SIVAL(io_buffer-4, 0, ((MPLEX_BASE + f)<<24) + len);
671 memcpy(io_buffer, buf, len);
673 writefd_unbuffered(multiplex_out_fd, io_buffer-4, len+4);
677 void io_close_input(int fd)