preparing for release of 2.0.9
[rsync/rsync.git] / io.c
CommitLineData
720b47f2
AT
1/*
2 Copyright (C) Andrew Tridgell 1996
3 Copyright (C) Paul Mackerras 1996
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18*/
19
20/*
21 Utilities used in rsync
22
23 tridge, June 1996
24 */
25#include "rsync.h"
26
71c46176
AT
27static int64 total_written;
28static int64 total_read;
720b47f2 29
8d9dc9f9
AT
30static int io_multiplexing_out;
31static int io_multiplexing_in;
679e7657
AT
32static int multiplex_in_fd;
33static int multiplex_out_fd;
8d9dc9f9
AT
34static time_t last_io;
35
720b47f2 36extern int verbose;
dc5ddbcc 37extern int sparse_files;
6ba9279f 38extern int io_timeout;
720b47f2 39
71c46176 40int64 write_total(void)
720b47f2 41{
8d9dc9f9 42 return total_written;
720b47f2
AT
43}
44
71c46176 45int64 read_total(void)
720b47f2 46{
8d9dc9f9 47 return total_read;
720b47f2
AT
48}
49
50static int buffer_f_in = -1;
51
52void setup_nonblocking(int f_in,int f_out)
53{
22d6234e
AT
54 set_blocking(f_out,0);
55 buffer_f_in = f_in;
720b47f2
AT
56}
57
8d9dc9f9
AT
58static void check_timeout(void)
59{
60 time_t t;
61
62 if (!io_timeout) return;
63
64 if (!last_io) {
65 last_io = time(NULL);
66 return;
67 }
68
69 t = time(NULL);
70
71 if (last_io && io_timeout && (t-last_io)>io_timeout) {
72 rprintf(FERROR,"read timeout after %d second - exiting\n",
73 (int)(t-last_io));
74 exit_cleanup(1);
75 }
76}
77
720b47f2 78
3a6a366f
AT
79static char *read_buffer;
80static char *read_buffer_p;
81static int read_buffer_len;
82static int read_buffer_size;
720b47f2
AT
83
84
8d9dc9f9
AT
85/* continue trying to read len bytes - don't return until len
86 has been read */
87static void read_loop(int fd, char *buf, int len)
88{
89 while (len) {
90 int n = read(fd, buf, len);
91 if (n > 0) {
92 buf += n;
93 len -= n;
94 }
95 if (n == 0) {
96 rprintf(FERROR,"EOF in read_loop\n");
97 exit_cleanup(1);
98 }
99 if (n == -1) {
100 fd_set fds;
101 struct timeval tv;
102
103 if (errno != EAGAIN && errno != EWOULDBLOCK) {
104 rprintf(FERROR,"io error: %s\n",
105 strerror(errno));
106 exit_cleanup(1);
107 }
108
109 FD_ZERO(&fds);
110 FD_SET(fd, &fds);
111 tv.tv_sec = io_timeout;
112 tv.tv_usec = 0;
113
114 if (select(fd+1, &fds, NULL, NULL,
115 io_timeout?&tv:NULL) != 1) {
116 check_timeout();
117 }
118 }
119 }
120}
121
122static int read_unbuffered(int fd, char *buf, int len)
123{
124 static int remaining;
125 char ibuf[4];
126 int tag, ret=0;
127 char line[1024];
128
679e7657
AT
129 if (!io_multiplexing_in || fd != multiplex_in_fd)
130 return read(fd, buf, len);
8d9dc9f9
AT
131
132 while (ret == 0) {
133 if (remaining) {
134 len = MIN(len, remaining);
135 read_loop(fd, buf, len);
136 remaining -= len;
137 ret = len;
138 continue;
139 }
140
141 read_loop(fd, ibuf, 4);
142 tag = IVAL(ibuf, 0);
679e7657 143
8d9dc9f9
AT
144 remaining = tag & 0xFFFFFF;
145 tag = tag >> 24;
146
147 if (tag == MPLEX_BASE) continue;
148
149 tag -= MPLEX_BASE;
150
151 if (tag != FERROR && tag != FINFO) {
152 rprintf(FERROR,"unexpected tag %d\n", tag);
153 exit_cleanup(1);
154 }
155
156 if (remaining > sizeof(line)-1) {
157 rprintf(FERROR,"multiplexing overflow %d\n\n",
158 remaining);
159 exit_cleanup(1);
160 }
161
162 read_loop(fd, line, remaining);
163 line[remaining] = 0;
164
165 rprintf(tag,"%s", line);
166 remaining = 0;
167 }
168
169 return ret;
170}
171
172
720b47f2
AT
173/* This function was added to overcome a deadlock problem when using
174 * ssh. It looks like we can't allow our receive queue to get full or
175 * ssh will clag up. Uggh. */
176static void read_check(int f)
177{
5dd7e031 178 int n;
720b47f2 179
5dd7e031 180 if (f == -1) return;
05c629f7 181
5dd7e031
AT
182 if (read_buffer_len == 0) {
183 read_buffer_p = read_buffer;
184 }
720b47f2 185
5dd7e031
AT
186 if ((n=num_waiting(f)) <= 0)
187 return;
720b47f2 188
5dd7e031
AT
189 /* things could deteriorate if we read in really small chunks */
190 if (n < 10) n = 1024;
05c629f7 191
5dd7e031
AT
192 if (n > MAX_READ_BUFFER/4)
193 n = MAX_READ_BUFFER/4;
720b47f2 194
5dd7e031
AT
195 if (read_buffer_p != read_buffer) {
196 memmove(read_buffer,read_buffer_p,read_buffer_len);
197 read_buffer_p = read_buffer;
198 }
720b47f2 199
5dd7e031
AT
200 if (n > (read_buffer_size - read_buffer_len)) {
201 read_buffer_size += n;
202 if (!read_buffer)
203 read_buffer = (char *)malloc(read_buffer_size);
204 else
205 read_buffer = (char *)realloc(read_buffer,read_buffer_size);
206 if (!read_buffer) out_of_memory("read check");
207 read_buffer_p = read_buffer;
208 }
209
8d9dc9f9 210 n = read_unbuffered(f,read_buffer+read_buffer_len,n);
5dd7e031
AT
211 if (n > 0) {
212 read_buffer_len += n;
213 }
720b47f2
AT
214}
215
720b47f2
AT
216static int readfd(int fd,char *buffer,int N)
217{
6ba9279f
AT
218 int ret;
219 int total=0;
220 struct timeval tv;
221
222 if (read_buffer_len < N)
223 read_check(buffer_f_in);
224
225 while (total < N) {
226 if (read_buffer_len > 0 && buffer_f_in == fd) {
227 ret = MIN(read_buffer_len,N-total);
228 memcpy(buffer+total,read_buffer_p,ret);
229 read_buffer_p += ret;
230 read_buffer_len -= ret;
a070c37b 231 total += ret;
6ba9279f
AT
232 continue;
233 }
234
8d9dc9f9
AT
235 io_flush();
236
237 while ((ret = read_unbuffered(fd,buffer + total,N-total)) == -1) {
6ba9279f
AT
238 fd_set fds;
239
240 if (errno != EAGAIN && errno != EWOULDBLOCK)
241 return -1;
242 FD_ZERO(&fds);
243 FD_SET(fd, &fds);
244 tv.tv_sec = io_timeout;
245 tv.tv_usec = 0;
246
344fb127
AT
247 if (select(fd+1, &fds, NULL, NULL,
248 io_timeout?&tv:NULL) != 1) {
6ba9279f
AT
249 check_timeout();
250 }
251 }
252
253 if (ret <= 0)
254 return total;
255 total += ret;
7f28dbee 256 }
720b47f2 257
6ba9279f
AT
258 if (io_timeout)
259 last_io = time(NULL);
720b47f2 260 return total;
720b47f2
AT
261}
262
263
b7922338 264int32 read_int(int f)
720b47f2 265{
4fe159a8 266 int ret;
720b47f2 267 char b[4];
4fe159a8 268 if ((ret=readfd(f,b,4)) != 4) {
720b47f2 269 if (verbose > 1)
8d9dc9f9 270 rprintf(FERROR,"(%d) read_int: Error reading %d bytes : %s\n",
9e31c482 271 getpid(),4,ret==-1?strerror(errno):"EOF");
34ccb63e 272 exit_cleanup(1);
720b47f2
AT
273 }
274 total_read += 4;
275 return IVAL(b,0);
276}
277
71c46176 278int64 read_longint(int f)
3a6a366f
AT
279{
280 extern int remote_version;
71c46176 281 int64 ret;
3a6a366f
AT
282 char b[8];
283 ret = read_int(f);
71c46176 284
b7922338 285 if ((int32)ret != (int32)0xffffffff) return ret;
71c46176 286
3bee6733 287#ifdef NO_INT64
9486289c 288 rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
71c46176
AT
289 exit_cleanup(1);
290#else
291 if (remote_version >= 16) {
3a6a366f
AT
292 if ((ret=readfd(f,b,8)) != 8) {
293 if (verbose > 1)
8d9dc9f9 294 rprintf(FERROR,"(%d) read_longint: Error reading %d bytes : %s\n",
3a6a366f
AT
295 getpid(),8,ret==-1?strerror(errno):"EOF");
296 exit_cleanup(1);
297 }
298 total_read += 8;
71c46176 299 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
3a6a366f 300 }
71c46176
AT
301#endif
302
3a6a366f
AT
303 return ret;
304}
305
720b47f2
AT
306void read_buf(int f,char *buf,int len)
307{
4fe159a8
AT
308 int ret;
309 if ((ret=readfd(f,buf,len)) != len) {
720b47f2 310 if (verbose > 1)
8d9dc9f9 311 rprintf(FERROR,"(%d) read_buf: Error reading %d bytes : %s\n",
9e31c482 312 getpid(),len,ret==-1?strerror(errno):"EOF");
34ccb63e 313 exit_cleanup(1);
720b47f2
AT
314 }
315 total_read += len;
316}
317
575f2fca
AT
318void read_sbuf(int f,char *buf,int len)
319{
320 read_buf(f,buf,len);
321 buf[len] = 0;
322}
323
182dca5c
AT
324unsigned char read_byte(int f)
325{
d89322c4
AT
326 unsigned char c;
327 read_buf(f,(char *)&c,1);
328 return c;
182dca5c 329}
720b47f2 330
7bec6a5c 331
3a6a366f
AT
332static char last_byte;
333static int last_sparse;
7bec6a5c
AT
334
335int sparse_end(int f)
336{
d867229b 337 if (last_sparse) {
73233f0f 338 do_lseek(f,-1,SEEK_CUR);
d867229b
AT
339 return (write(f,&last_byte,1) == 1 ? 0 : -1);
340 }
341 last_sparse = 0;
342 return 0;
7bec6a5c
AT
343}
344
d867229b
AT
345
346static int write_sparse(int f,char *buf,int len)
7bec6a5c 347{
d867229b
AT
348 int l1=0,l2=0;
349 int ret;
7bec6a5c 350
d867229b
AT
351 for (l1=0;l1<len && buf[l1]==0;l1++) ;
352 for (l2=0;l2<(len-l1) && buf[len-(l2+1)]==0;l2++) ;
7bec6a5c 353
d867229b 354 last_byte = buf[len-1];
7bec6a5c 355
d867229b
AT
356 if (l1 == len || l2 > 0)
357 last_sparse=1;
7bec6a5c 358
d867229b 359 if (l1 > 0)
73233f0f 360 do_lseek(f,l1,SEEK_CUR);
dc5ddbcc 361
d867229b
AT
362 if (l1 == len)
363 return len;
dc5ddbcc 364
d867229b
AT
365 if ((ret=write(f,buf+l1,len-(l1+l2))) != len-(l1+l2)) {
366 if (ret == -1 || ret == 0) return ret;
367 return (l1+ret);
368 }
7bec6a5c 369
d867229b 370 if (l2 > 0)
73233f0f 371 do_lseek(f,l2,SEEK_CUR);
d867229b
AT
372
373 return len;
374}
dc5ddbcc 375
7bec6a5c 376
d867229b
AT
377
378int write_file(int f,char *buf,int len)
379{
380 int ret = 0;
381
382 if (!sparse_files)
383 return write(f,buf,len);
384
385 while (len>0) {
386 int len1 = MIN(len, SPARSE_WRITE_SIZE);
387 int r1 = write_sparse(f, buf, len1);
388 if (r1 <= 0) {
389 if (ret > 0) return ret;
390 return r1;
391 }
392 len -= r1;
393 buf += r1;
394 ret += r1;
395 }
396 return ret;
7bec6a5c
AT
397}
398
720b47f2 399
d6dead6b 400static int writefd_unbuffered(int fd,char *buf,int len)
720b47f2 401{
8d9dc9f9
AT
402 int total = 0;
403 fd_set w_fds, r_fds;
404 int fd_count, count, got_select=0;
405 struct timeval tv;
720b47f2 406
8d9dc9f9
AT
407 while (total < len) {
408 int ret = write(fd,buf+total,len-total);
720b47f2 409
8d9dc9f9 410 if (ret == 0) return total;
720b47f2 411
8d9dc9f9
AT
412 if (ret == -1 && !(errno == EWOULDBLOCK || errno == EAGAIN))
413 return -1;
720b47f2 414
8d9dc9f9
AT
415 if (ret == -1 && got_select) {
416 /* hmmm, we got a write select on the fd and
417 then failed to write. Why doesn't that
418 mean that the fd is dead? It doesn't on
419 some systems it seems (eg. IRIX) */
420 u_sleep(1000);
421 }
720b47f2 422
8d9dc9f9 423 got_select = 0;
720b47f2 424
6ba9279f 425
8d9dc9f9
AT
426 if (ret != -1) {
427 total += ret;
428 continue;
429 }
430
431 if (read_buffer_len < MAX_READ_BUFFER && buffer_f_in != -1)
432 read_check(buffer_f_in);
433
434 fd_count = fd+1;
435 FD_ZERO(&w_fds);
436 FD_ZERO(&r_fds);
437 FD_SET(fd,&w_fds);
438 if (buffer_f_in != -1) {
439 FD_SET(buffer_f_in,&r_fds);
440 if (buffer_f_in > fd)
441 fd_count = buffer_f_in+1;
442 }
443
444 tv.tv_sec = BLOCKING_TIMEOUT;
445 tv.tv_usec = 0;
446 count = select(fd_count,buffer_f_in == -1? NULL: &r_fds,
447 &w_fds,NULL,&tv);
448
449 if (count == -1 && errno != EINTR) {
450 if (verbose > 1)
451 rprintf(FERROR,"select error: %s\n", strerror(errno));
452 exit_cleanup(1);
453 }
454
455 if (count == 0) {
456 check_timeout();
457 continue;
458 }
459
460 if (FD_ISSET(fd, &w_fds)) {
461 got_select = 1;
462 }
463 }
464
465 if (io_timeout)
466 last_io = time(NULL);
467
468 return total;
720b47f2
AT
469}
470
8d9dc9f9 471
d6dead6b
AT
472static char *io_buffer;
473static int io_buffer_count;
474
475void io_start_buffering(int fd)
476{
8d9dc9f9 477 if (io_buffer) return;
679e7657 478 multiplex_out_fd = fd;
8d9dc9f9 479 io_buffer = (char *)malloc(IO_BUFFER_SIZE+4);
d6dead6b
AT
480 if (!io_buffer) out_of_memory("writefd");
481 io_buffer_count = 0;
8d9dc9f9
AT
482
483 /* leave room for the multiplex header in case it's needed */
484 io_buffer += 4;
d6dead6b
AT
485}
486
8d9dc9f9 487void io_flush(void)
d6dead6b 488{
679e7657 489 int fd = multiplex_out_fd;
8d9dc9f9
AT
490 if (!io_buffer_count) return;
491
492 if (io_multiplexing_out) {
493 SIVAL(io_buffer-4, 0, (MPLEX_BASE<<24) + io_buffer_count);
494 if (writefd_unbuffered(fd, io_buffer-4, io_buffer_count+4) !=
495 io_buffer_count+4) {
496 rprintf(FERROR,"write failed\n");
497 exit_cleanup(1);
498 }
499 } else {
500 if (writefd_unbuffered(fd, io_buffer, io_buffer_count) !=
d6dead6b
AT
501 io_buffer_count) {
502 rprintf(FERROR,"write failed\n");
503 exit_cleanup(1);
504 }
d6dead6b 505 }
8d9dc9f9
AT
506 io_buffer_count = 0;
507}
508
509void io_end_buffering(int fd)
510{
511 io_flush();
512 if (!io_multiplexing_out) {
513 free(io_buffer-4);
514 io_buffer = NULL;
515 }
d6dead6b
AT
516}
517
518static int writefd(int fd,char *buf,int len1)
519{
520 int len = len1;
521
522 if (!io_buffer) return writefd_unbuffered(fd, buf, len);
523
524 while (len) {
525 int n = MIN(len, IO_BUFFER_SIZE-io_buffer_count);
526 if (n > 0) {
527 memcpy(io_buffer+io_buffer_count, buf, n);
528 buf += n;
529 len -= n;
530 io_buffer_count += n;
531 }
532
8d9dc9f9 533 if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
d6dead6b
AT
534 }
535
536 return len1;
537}
720b47f2
AT
538
539
b7922338 540void write_int(int f,int32 x)
720b47f2 541{
8d9dc9f9
AT
542 int ret;
543 char b[4];
544 SIVAL(b,0,x);
545 if ((ret=writefd(f,b,4)) != 4) {
546 rprintf(FERROR,"write_int failed : %s\n",
547 ret==-1?strerror(errno):"EOF");
548 exit_cleanup(1);
549 }
550 total_written += 4;
720b47f2
AT
551}
552
71c46176 553void write_longint(int f, int64 x)
3a6a366f
AT
554{
555 extern int remote_version;
556 char b[8];
557 int ret;
558
559 if (remote_version < 16 || x <= 0x7FFFFFFF) {
560 write_int(f, (int)x);
561 return;
562 }
563
564 write_int(f, -1);
565 SIVAL(b,0,(x&0xFFFFFFFF));
566 SIVAL(b,4,((x>>32)&0xFFFFFFFF));
567
568 if ((ret=writefd(f,b,8)) != 8) {
9486289c 569 rprintf(FERROR,"write_longint failed : %s\n",
3a6a366f
AT
570 ret==-1?strerror(errno):"EOF");
571 exit_cleanup(1);
572 }
573 total_written += 8;
574}
575
720b47f2
AT
576void write_buf(int f,char *buf,int len)
577{
8d9dc9f9
AT
578 int ret;
579 if ((ret=writefd(f,buf,len)) != len) {
580 rprintf(FERROR,"write_buf failed : %s\n",
581 ret==-1?strerror(errno):"EOF");
582 exit_cleanup(1);
583 }
584 total_written += len;
720b47f2
AT
585}
586
f0fca04e
AT
587/* write a string to the connection */
588void write_sbuf(int f,char *buf)
589{
590 write_buf(f, buf, strlen(buf));
591}
592
720b47f2 593
182dca5c
AT
594void write_byte(int f,unsigned char c)
595{
f0fca04e 596 write_buf(f,(char *)&c,1);
182dca5c
AT
597}
598
720b47f2
AT
599void write_flush(int f)
600{
601}
602
603
f0fca04e
AT
604int read_line(int f, char *buf, int maxlen)
605{
606 while (maxlen) {
607 read_buf(f, buf, 1);
608 if (buf[0] == '\n') {
609 buf[0] = 0;
610 break;
611 }
612 if (buf[0] != '\r') {
613 buf++;
614 maxlen--;
615 }
616 }
617 if (maxlen == 0) {
618 *buf = 0;
619 return 0;
620 }
621 return 1;
622}
623
624
625void io_printf(int fd, const char *format, ...)
626{
627 va_list ap;
628 char buf[1024];
629 int len;
630
631 va_start(ap, format);
e42c9458 632 len = vslprintf(buf, sizeof(buf)-1, format, ap);
f0fca04e
AT
633 va_end(ap);
634
635 if (len < 0) exit_cleanup(1);
636
637 write_sbuf(fd, buf);
638}
8d9dc9f9
AT
639
640
641/* setup for multiplexing an error stream with the data stream */
642void io_start_multiplex_out(int fd)
643{
679e7657
AT
644 multiplex_out_fd = fd;
645 io_flush();
8d9dc9f9
AT
646 io_start_buffering(fd);
647 io_multiplexing_out = 1;
648}
649
650/* setup for multiplexing an error stream with the data stream */
651void io_start_multiplex_in(int fd)
652{
679e7657
AT
653 multiplex_in_fd = fd;
654 io_flush();
8d9dc9f9
AT
655 if (read_buffer_len) {
656 fprintf(stderr,"ERROR: data in read buffer at mplx start\n");
657 exit_cleanup(1);
658 }
659
660 io_multiplexing_in = 1;
661}
662
663/* write an message to the error stream */
664int io_multiplex_write(int f, char *buf, int len)
665{
666 if (!io_multiplexing_out) return 0;
667
668 io_flush();
669
670 SIVAL(io_buffer-4, 0, ((MPLEX_BASE + f)<<24) + len);
671 memcpy(io_buffer, buf, len);
672
679e7657 673 writefd_unbuffered(multiplex_out_fd, io_buffer-4, len+4);
8d9dc9f9
AT
674 return 1;
675}
676
677void io_close_input(int fd)
678{
679 buffer_f_in = -1;
680}