added support for --include, --include-from and the +/- syntax
[rsync/rsync.git] / io.c
... / ...
CommitLineData
1/*
2 Copyright (C) Andrew Tridgell 1996
3 Copyright (C) Paul Mackerras 1996
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18*/
19
20/*
21 Utilities used in rsync
22
23 tridge, June 1996
24 */
25#include "rsync.h"
26
27static int64 total_written;
28static int64 total_read;
29
30static int io_multiplexing_out;
31static int io_multiplexing_in;
32static int multiplex_in_fd;
33static int multiplex_out_fd;
34static time_t last_io;
35
36extern int verbose;
37extern int sparse_files;
38extern int io_timeout;
39
40int64 write_total(void)
41{
42 return total_written;
43}
44
45int64 read_total(void)
46{
47 return total_read;
48}
49
50static int buffer_f_in = -1;
51
52void setup_nonblocking(int f_in,int f_out)
53{
54 set_blocking(f_out,0);
55 buffer_f_in = f_in;
56}
57
58static void check_timeout(void)
59{
60 time_t t;
61
62 if (!io_timeout) return;
63
64 if (!last_io) {
65 last_io = time(NULL);
66 return;
67 }
68
69 t = time(NULL);
70
71 if (last_io && io_timeout && (t-last_io)>io_timeout) {
72 rprintf(FERROR,"read timeout after %d second - exiting\n",
73 (int)(t-last_io));
74 exit_cleanup(1);
75 }
76}
77
78
79static char *read_buffer;
80static char *read_buffer_p;
81static int read_buffer_len;
82static int read_buffer_size;
83
84
85/* continue trying to read len bytes - don't return until len
86 has been read */
87static void read_loop(int fd, char *buf, int len)
88{
89 while (len) {
90 int n = read(fd, buf, len);
91 if (n > 0) {
92 buf += n;
93 len -= n;
94 }
95 if (n == 0) {
96 rprintf(FERROR,"EOF in read_loop\n");
97 exit_cleanup(1);
98 }
99 if (n == -1) {
100 fd_set fds;
101 struct timeval tv;
102
103 if (errno != EAGAIN && errno != EWOULDBLOCK) {
104 rprintf(FERROR,"io error: %s\n",
105 strerror(errno));
106 exit_cleanup(1);
107 }
108
109 FD_ZERO(&fds);
110 FD_SET(fd, &fds);
111 tv.tv_sec = io_timeout;
112 tv.tv_usec = 0;
113
114 if (select(fd+1, &fds, NULL, NULL,
115 io_timeout?&tv:NULL) != 1) {
116 check_timeout();
117 }
118 }
119 }
120}
121
122static int read_unbuffered(int fd, char *buf, int len)
123{
124 static int remaining;
125 char ibuf[4];
126 int tag, ret=0;
127 char line[1024];
128
129 if (!io_multiplexing_in || fd != multiplex_in_fd)
130 return read(fd, buf, len);
131
132 while (ret == 0) {
133 if (remaining) {
134 len = MIN(len, remaining);
135 read_loop(fd, buf, len);
136 remaining -= len;
137 ret = len;
138 continue;
139 }
140
141 read_loop(fd, ibuf, 4);
142 tag = IVAL(ibuf, 0);
143
144 remaining = tag & 0xFFFFFF;
145 tag = tag >> 24;
146
147 if (tag == MPLEX_BASE) continue;
148
149 tag -= MPLEX_BASE;
150
151 if (tag != FERROR && tag != FINFO) {
152 rprintf(FERROR,"unexpected tag %d\n", tag);
153 exit_cleanup(1);
154 }
155
156 if (remaining > sizeof(line)-1) {
157 rprintf(FERROR,"multiplexing overflow %d\n\n",
158 remaining);
159 exit_cleanup(1);
160 }
161
162 read_loop(fd, line, remaining);
163 line[remaining] = 0;
164
165 rprintf(tag,"%s", line);
166 remaining = 0;
167 }
168
169 return ret;
170}
171
172
173/* This function was added to overcome a deadlock problem when using
174 * ssh. It looks like we can't allow our receive queue to get full or
175 * ssh will clag up. Uggh. */
176static void read_check(int f)
177{
178 int n;
179
180 if (f == -1) return;
181
182 if (read_buffer_len == 0) {
183 read_buffer_p = read_buffer;
184 }
185
186 if ((n=num_waiting(f)) <= 0)
187 return;
188
189 /* things could deteriorate if we read in really small chunks */
190 if (n < 10) n = 1024;
191
192 if (n > MAX_READ_BUFFER/4)
193 n = MAX_READ_BUFFER/4;
194
195 if (read_buffer_p != read_buffer) {
196 memmove(read_buffer,read_buffer_p,read_buffer_len);
197 read_buffer_p = read_buffer;
198 }
199
200 if (n > (read_buffer_size - read_buffer_len)) {
201 read_buffer_size += n;
202 if (!read_buffer)
203 read_buffer = (char *)malloc(read_buffer_size);
204 else
205 read_buffer = (char *)realloc(read_buffer,read_buffer_size);
206 if (!read_buffer) out_of_memory("read check");
207 read_buffer_p = read_buffer;
208 }
209
210 n = read_unbuffered(f,read_buffer+read_buffer_len,n);
211 if (n > 0) {
212 read_buffer_len += n;
213 }
214}
215
216static int readfd(int fd,char *buffer,int N)
217{
218 int ret;
219 int total=0;
220 struct timeval tv;
221
222 if (read_buffer_len < N)
223 read_check(buffer_f_in);
224
225 while (total < N) {
226 if (read_buffer_len > 0 && buffer_f_in == fd) {
227 ret = MIN(read_buffer_len,N-total);
228 memcpy(buffer+total,read_buffer_p,ret);
229 read_buffer_p += ret;
230 read_buffer_len -= ret;
231 total += ret;
232 continue;
233 }
234
235 io_flush();
236
237 while ((ret = read_unbuffered(fd,buffer + total,N-total)) == -1) {
238 fd_set fds;
239
240 if (errno != EAGAIN && errno != EWOULDBLOCK)
241 return -1;
242 FD_ZERO(&fds);
243 FD_SET(fd, &fds);
244 tv.tv_sec = io_timeout;
245 tv.tv_usec = 0;
246
247 if (select(fd+1, &fds, NULL, NULL,
248 io_timeout?&tv:NULL) != 1) {
249 check_timeout();
250 }
251 }
252
253 if (ret <= 0)
254 return total;
255 total += ret;
256 }
257
258 if (io_timeout)
259 last_io = time(NULL);
260 return total;
261}
262
263
264int32 read_int(int f)
265{
266 int ret;
267 char b[4];
268 if ((ret=readfd(f,b,4)) != 4) {
269 if (verbose > 1)
270 rprintf(FERROR,"(%d) read_int: Error reading %d bytes : %s\n",
271 getpid(),4,ret==-1?strerror(errno):"EOF");
272 exit_cleanup(1);
273 }
274 total_read += 4;
275 return IVAL(b,0);
276}
277
278int64 read_longint(int f)
279{
280 extern int remote_version;
281 int64 ret;
282 char b[8];
283 ret = read_int(f);
284
285 if ((int32)ret != (int32)0xffffffff) return ret;
286
287#ifdef NO_INT64
288 rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
289 exit_cleanup(1);
290#else
291 if (remote_version >= 16) {
292 if ((ret=readfd(f,b,8)) != 8) {
293 if (verbose > 1)
294 rprintf(FERROR,"(%d) read_longint: Error reading %d bytes : %s\n",
295 getpid(),8,ret==-1?strerror(errno):"EOF");
296 exit_cleanup(1);
297 }
298 total_read += 8;
299 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
300 }
301#endif
302
303 return ret;
304}
305
306void read_buf(int f,char *buf,int len)
307{
308 int ret;
309 if ((ret=readfd(f,buf,len)) != len) {
310 if (verbose > 1)
311 rprintf(FERROR,"(%d) read_buf: Error reading %d bytes : %s\n",
312 getpid(),len,ret==-1?strerror(errno):"EOF");
313 exit_cleanup(1);
314 }
315 total_read += len;
316}
317
318void read_sbuf(int f,char *buf,int len)
319{
320 read_buf(f,buf,len);
321 buf[len] = 0;
322}
323
324unsigned char read_byte(int f)
325{
326 unsigned char c;
327 read_buf(f,(char *)&c,1);
328 return c;
329}
330
331
332static char last_byte;
333static int last_sparse;
334
335int sparse_end(int f)
336{
337 if (last_sparse) {
338 do_lseek(f,-1,SEEK_CUR);
339 return (write(f,&last_byte,1) == 1 ? 0 : -1);
340 }
341 last_sparse = 0;
342 return 0;
343}
344
345
346static int write_sparse(int f,char *buf,int len)
347{
348 int l1=0,l2=0;
349 int ret;
350
351 for (l1=0;l1<len && buf[l1]==0;l1++) ;
352 for (l2=0;l2<(len-l1) && buf[len-(l2+1)]==0;l2++) ;
353
354 last_byte = buf[len-1];
355
356 if (l1 == len || l2 > 0)
357 last_sparse=1;
358
359 if (l1 > 0)
360 do_lseek(f,l1,SEEK_CUR);
361
362 if (l1 == len)
363 return len;
364
365 if ((ret=write(f,buf+l1,len-(l1+l2))) != len-(l1+l2)) {
366 if (ret == -1 || ret == 0) return ret;
367 return (l1+ret);
368 }
369
370 if (l2 > 0)
371 do_lseek(f,l2,SEEK_CUR);
372
373 return len;
374}
375
376
377
378int write_file(int f,char *buf,int len)
379{
380 int ret = 0;
381
382 if (!sparse_files)
383 return write(f,buf,len);
384
385 while (len>0) {
386 int len1 = MIN(len, SPARSE_WRITE_SIZE);
387 int r1 = write_sparse(f, buf, len1);
388 if (r1 <= 0) {
389 if (ret > 0) return ret;
390 return r1;
391 }
392 len -= r1;
393 buf += r1;
394 ret += r1;
395 }
396 return ret;
397}
398
399
400static int writefd_unbuffered(int fd,char *buf,int len)
401{
402 int total = 0;
403 fd_set w_fds, r_fds;
404 int fd_count, count, got_select=0;
405 struct timeval tv;
406
407 while (total < len) {
408 int ret = write(fd,buf+total,len-total);
409
410 if (ret == 0) return total;
411
412 if (ret == -1 && !(errno == EWOULDBLOCK || errno == EAGAIN))
413 return -1;
414
415 if (ret == -1 && got_select) {
416 /* hmmm, we got a write select on the fd and
417 then failed to write. Why doesn't that
418 mean that the fd is dead? It doesn't on
419 some systems it seems (eg. IRIX) */
420 u_sleep(1000);
421 }
422
423 got_select = 0;
424
425
426 if (ret != -1) {
427 total += ret;
428 continue;
429 }
430
431 if (read_buffer_len < MAX_READ_BUFFER && buffer_f_in != -1)
432 read_check(buffer_f_in);
433
434 fd_count = fd+1;
435 FD_ZERO(&w_fds);
436 FD_ZERO(&r_fds);
437 FD_SET(fd,&w_fds);
438 if (buffer_f_in != -1) {
439 FD_SET(buffer_f_in,&r_fds);
440 if (buffer_f_in > fd)
441 fd_count = buffer_f_in+1;
442 }
443
444 tv.tv_sec = BLOCKING_TIMEOUT;
445 tv.tv_usec = 0;
446 count = select(fd_count,buffer_f_in == -1? NULL: &r_fds,
447 &w_fds,NULL,&tv);
448
449 if (count == -1 && errno != EINTR) {
450 if (verbose > 1)
451 rprintf(FERROR,"select error: %s\n", strerror(errno));
452 exit_cleanup(1);
453 }
454
455 if (count == 0) {
456 check_timeout();
457 continue;
458 }
459
460 if (FD_ISSET(fd, &w_fds)) {
461 got_select = 1;
462 }
463 }
464
465 if (io_timeout)
466 last_io = time(NULL);
467
468 return total;
469}
470
471
472static char *io_buffer;
473static int io_buffer_count;
474
475void io_start_buffering(int fd)
476{
477 if (io_buffer) return;
478 multiplex_out_fd = fd;
479 io_buffer = (char *)malloc(IO_BUFFER_SIZE+4);
480 if (!io_buffer) out_of_memory("writefd");
481 io_buffer_count = 0;
482
483 /* leave room for the multiplex header in case it's needed */
484 io_buffer += 4;
485}
486
487void io_flush(void)
488{
489 int fd = multiplex_out_fd;
490 if (!io_buffer_count) return;
491
492 if (io_multiplexing_out) {
493 SIVAL(io_buffer-4, 0, (MPLEX_BASE<<24) + io_buffer_count);
494 if (writefd_unbuffered(fd, io_buffer-4, io_buffer_count+4) !=
495 io_buffer_count+4) {
496 rprintf(FERROR,"write failed\n");
497 exit_cleanup(1);
498 }
499 } else {
500 if (writefd_unbuffered(fd, io_buffer, io_buffer_count) !=
501 io_buffer_count) {
502 rprintf(FERROR,"write failed\n");
503 exit_cleanup(1);
504 }
505 }
506 io_buffer_count = 0;
507}
508
509void io_end_buffering(int fd)
510{
511 io_flush();
512 if (!io_multiplexing_out) {
513 free(io_buffer-4);
514 io_buffer = NULL;
515 }
516}
517
518static int writefd(int fd,char *buf,int len1)
519{
520 int len = len1;
521
522 if (!io_buffer) return writefd_unbuffered(fd, buf, len);
523
524 while (len) {
525 int n = MIN(len, IO_BUFFER_SIZE-io_buffer_count);
526 if (n > 0) {
527 memcpy(io_buffer+io_buffer_count, buf, n);
528 buf += n;
529 len -= n;
530 io_buffer_count += n;
531 }
532
533 if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
534 }
535
536 return len1;
537}
538
539
540void write_int(int f,int32 x)
541{
542 int ret;
543 char b[4];
544 SIVAL(b,0,x);
545 if ((ret=writefd(f,b,4)) != 4) {
546 rprintf(FERROR,"write_int failed : %s\n",
547 ret==-1?strerror(errno):"EOF");
548 exit_cleanup(1);
549 }
550 total_written += 4;
551}
552
553void write_longint(int f, int64 x)
554{
555 extern int remote_version;
556 char b[8];
557 int ret;
558
559 if (remote_version < 16 || x <= 0x7FFFFFFF) {
560 write_int(f, (int)x);
561 return;
562 }
563
564 write_int(f, -1);
565 SIVAL(b,0,(x&0xFFFFFFFF));
566 SIVAL(b,4,((x>>32)&0xFFFFFFFF));
567
568 if ((ret=writefd(f,b,8)) != 8) {
569 rprintf(FERROR,"write_longint failed : %s\n",
570 ret==-1?strerror(errno):"EOF");
571 exit_cleanup(1);
572 }
573 total_written += 8;
574}
575
576void write_buf(int f,char *buf,int len)
577{
578 int ret;
579 if ((ret=writefd(f,buf,len)) != len) {
580 rprintf(FERROR,"write_buf failed : %s\n",
581 ret==-1?strerror(errno):"EOF");
582 exit_cleanup(1);
583 }
584 total_written += len;
585}
586
587/* write a string to the connection */
588void write_sbuf(int f,char *buf)
589{
590 write_buf(f, buf, strlen(buf));
591}
592
593
594void write_byte(int f,unsigned char c)
595{
596 write_buf(f,(char *)&c,1);
597}
598
599void write_flush(int f)
600{
601}
602
603
604int read_line(int f, char *buf, int maxlen)
605{
606 while (maxlen) {
607 read_buf(f, buf, 1);
608 if (buf[0] == '\n') {
609 buf[0] = 0;
610 break;
611 }
612 if (buf[0] != '\r') {
613 buf++;
614 maxlen--;
615 }
616 }
617 if (maxlen == 0) {
618 *buf = 0;
619 return 0;
620 }
621 return 1;
622}
623
624
625void io_printf(int fd, const char *format, ...)
626{
627 va_list ap;
628 char buf[1024];
629 int len;
630
631 va_start(ap, format);
632 len = vslprintf(buf, sizeof(buf)-1, format, ap);
633 va_end(ap);
634
635 if (len < 0) exit_cleanup(1);
636
637 write_sbuf(fd, buf);
638}
639
640
641/* setup for multiplexing an error stream with the data stream */
642void io_start_multiplex_out(int fd)
643{
644 multiplex_out_fd = fd;
645 io_flush();
646 io_start_buffering(fd);
647 io_multiplexing_out = 1;
648}
649
650/* setup for multiplexing an error stream with the data stream */
651void io_start_multiplex_in(int fd)
652{
653 multiplex_in_fd = fd;
654 io_flush();
655 if (read_buffer_len) {
656 fprintf(stderr,"ERROR: data in read buffer at mplx start\n");
657 exit_cleanup(1);
658 }
659
660 io_multiplexing_in = 1;
661}
662
663/* write an message to the error stream */
664int io_multiplex_write(int f, char *buf, int len)
665{
666 if (!io_multiplexing_out) return 0;
667
668 io_flush();
669
670 SIVAL(io_buffer-4, 0, ((MPLEX_BASE + f)<<24) + len);
671 memcpy(io_buffer, buf, len);
672
673 writefd_unbuffered(multiplex_out_fd, io_buffer-4, len+4);
674 return 1;
675}
676
677void io_close_input(int fd)
678{
679 buffer_f_in = -1;
680}