handle rsh clients that don't like half-open connections
[rsync/rsync.git] / io.c
CommitLineData
720b47f2
AT
1/*
2 Copyright (C) Andrew Tridgell 1996
3 Copyright (C) Paul Mackerras 1996
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18*/
19
20/*
21 Utilities used in rsync
22
23 tridge, June 1996
24 */
25#include "rsync.h"
26
71c46176
AT
27static int64 total_written;
28static int64 total_read;
720b47f2 29
8d9dc9f9
AT
30static int io_multiplexing_out;
31static int io_multiplexing_in;
32static time_t last_io;
33
720b47f2 34extern int verbose;
dc5ddbcc 35extern int sparse_files;
6ba9279f 36extern int io_timeout;
720b47f2 37
71c46176 38int64 write_total(void)
720b47f2 39{
8d9dc9f9 40 return total_written;
720b47f2
AT
41}
42
71c46176 43int64 read_total(void)
720b47f2 44{
8d9dc9f9 45 return total_read;
720b47f2
AT
46}
47
48static int buffer_f_in = -1;
49
50void setup_nonblocking(int f_in,int f_out)
51{
22d6234e
AT
52 set_blocking(f_out,0);
53 buffer_f_in = f_in;
720b47f2
AT
54}
55
8d9dc9f9
AT
56static void check_timeout(void)
57{
58 time_t t;
59
60 if (!io_timeout) return;
61
62 if (!last_io) {
63 last_io = time(NULL);
64 return;
65 }
66
67 t = time(NULL);
68
69 if (last_io && io_timeout && (t-last_io)>io_timeout) {
70 rprintf(FERROR,"read timeout after %d second - exiting\n",
71 (int)(t-last_io));
72 exit_cleanup(1);
73 }
74}
75
720b47f2 76
3a6a366f
AT
77static char *read_buffer;
78static char *read_buffer_p;
79static int read_buffer_len;
80static int read_buffer_size;
720b47f2
AT
81
82
8d9dc9f9
AT
83/* continue trying to read len bytes - don't return until len
84 has been read */
85static void read_loop(int fd, char *buf, int len)
86{
87 while (len) {
88 int n = read(fd, buf, len);
89 if (n > 0) {
90 buf += n;
91 len -= n;
92 }
93 if (n == 0) {
94 rprintf(FERROR,"EOF in read_loop\n");
95 exit_cleanup(1);
96 }
97 if (n == -1) {
98 fd_set fds;
99 struct timeval tv;
100
101 if (errno != EAGAIN && errno != EWOULDBLOCK) {
102 rprintf(FERROR,"io error: %s\n",
103 strerror(errno));
104 exit_cleanup(1);
105 }
106
107 FD_ZERO(&fds);
108 FD_SET(fd, &fds);
109 tv.tv_sec = io_timeout;
110 tv.tv_usec = 0;
111
112 if (select(fd+1, &fds, NULL, NULL,
113 io_timeout?&tv:NULL) != 1) {
114 check_timeout();
115 }
116 }
117 }
118}
119
120static int read_unbuffered(int fd, char *buf, int len)
121{
122 static int remaining;
123 char ibuf[4];
124 int tag, ret=0;
125 char line[1024];
126
127 if (!io_multiplexing_in) return read(fd, buf, len);
128
129 while (ret == 0) {
130 if (remaining) {
131 len = MIN(len, remaining);
132 read_loop(fd, buf, len);
133 remaining -= len;
134 ret = len;
135 continue;
136 }
137
138 read_loop(fd, ibuf, 4);
139 tag = IVAL(ibuf, 0);
140 remaining = tag & 0xFFFFFF;
141 tag = tag >> 24;
142
143 if (tag == MPLEX_BASE) continue;
144
145 tag -= MPLEX_BASE;
146
147 if (tag != FERROR && tag != FINFO) {
148 rprintf(FERROR,"unexpected tag %d\n", tag);
149 exit_cleanup(1);
150 }
151
152 if (remaining > sizeof(line)-1) {
153 rprintf(FERROR,"multiplexing overflow %d\n\n",
154 remaining);
155 exit_cleanup(1);
156 }
157
158 read_loop(fd, line, remaining);
159 line[remaining] = 0;
160
161 rprintf(tag,"%s", line);
162 remaining = 0;
163 }
164
165 return ret;
166}
167
168
720b47f2
AT
169/* This function was added to overcome a deadlock problem when using
170 * ssh. It looks like we can't allow our receive queue to get full or
171 * ssh will clag up. Uggh. */
172static void read_check(int f)
173{
5dd7e031 174 int n;
720b47f2 175
5dd7e031 176 if (f == -1) return;
05c629f7 177
5dd7e031
AT
178 if (read_buffer_len == 0) {
179 read_buffer_p = read_buffer;
180 }
720b47f2 181
5dd7e031
AT
182 if ((n=num_waiting(f)) <= 0)
183 return;
720b47f2 184
5dd7e031
AT
185 /* things could deteriorate if we read in really small chunks */
186 if (n < 10) n = 1024;
05c629f7 187
5dd7e031
AT
188 if (n > MAX_READ_BUFFER/4)
189 n = MAX_READ_BUFFER/4;
720b47f2 190
5dd7e031
AT
191 if (read_buffer_p != read_buffer) {
192 memmove(read_buffer,read_buffer_p,read_buffer_len);
193 read_buffer_p = read_buffer;
194 }
720b47f2 195
5dd7e031
AT
196 if (n > (read_buffer_size - read_buffer_len)) {
197 read_buffer_size += n;
198 if (!read_buffer)
199 read_buffer = (char *)malloc(read_buffer_size);
200 else
201 read_buffer = (char *)realloc(read_buffer,read_buffer_size);
202 if (!read_buffer) out_of_memory("read check");
203 read_buffer_p = read_buffer;
204 }
205
8d9dc9f9 206 n = read_unbuffered(f,read_buffer+read_buffer_len,n);
5dd7e031
AT
207 if (n > 0) {
208 read_buffer_len += n;
209 }
720b47f2
AT
210}
211
720b47f2
AT
212static int readfd(int fd,char *buffer,int N)
213{
6ba9279f
AT
214 int ret;
215 int total=0;
216 struct timeval tv;
217
218 if (read_buffer_len < N)
219 read_check(buffer_f_in);
220
221 while (total < N) {
222 if (read_buffer_len > 0 && buffer_f_in == fd) {
223 ret = MIN(read_buffer_len,N-total);
224 memcpy(buffer+total,read_buffer_p,ret);
225 read_buffer_p += ret;
226 read_buffer_len -= ret;
a070c37b 227 total += ret;
6ba9279f
AT
228 continue;
229 }
230
8d9dc9f9
AT
231 io_flush();
232
233 while ((ret = read_unbuffered(fd,buffer + total,N-total)) == -1) {
6ba9279f
AT
234 fd_set fds;
235
236 if (errno != EAGAIN && errno != EWOULDBLOCK)
237 return -1;
238 FD_ZERO(&fds);
239 FD_SET(fd, &fds);
240 tv.tv_sec = io_timeout;
241 tv.tv_usec = 0;
242
344fb127
AT
243 if (select(fd+1, &fds, NULL, NULL,
244 io_timeout?&tv:NULL) != 1) {
6ba9279f
AT
245 check_timeout();
246 }
247 }
248
249 if (ret <= 0)
250 return total;
251 total += ret;
7f28dbee 252 }
720b47f2 253
6ba9279f
AT
254 if (io_timeout)
255 last_io = time(NULL);
720b47f2 256 return total;
720b47f2
AT
257}
258
259
b7922338 260int32 read_int(int f)
720b47f2 261{
4fe159a8 262 int ret;
720b47f2 263 char b[4];
4fe159a8 264 if ((ret=readfd(f,b,4)) != 4) {
720b47f2 265 if (verbose > 1)
8d9dc9f9 266 rprintf(FERROR,"(%d) read_int: Error reading %d bytes : %s\n",
9e31c482 267 getpid(),4,ret==-1?strerror(errno):"EOF");
34ccb63e 268 exit_cleanup(1);
720b47f2
AT
269 }
270 total_read += 4;
271 return IVAL(b,0);
272}
273
71c46176 274int64 read_longint(int f)
3a6a366f
AT
275{
276 extern int remote_version;
71c46176 277 int64 ret;
3a6a366f
AT
278 char b[8];
279 ret = read_int(f);
71c46176 280
b7922338 281 if ((int32)ret != (int32)0xffffffff) return ret;
71c46176 282
3bee6733 283#ifdef NO_INT64
9486289c 284 rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
71c46176
AT
285 exit_cleanup(1);
286#else
287 if (remote_version >= 16) {
3a6a366f
AT
288 if ((ret=readfd(f,b,8)) != 8) {
289 if (verbose > 1)
8d9dc9f9 290 rprintf(FERROR,"(%d) read_longint: Error reading %d bytes : %s\n",
3a6a366f
AT
291 getpid(),8,ret==-1?strerror(errno):"EOF");
292 exit_cleanup(1);
293 }
294 total_read += 8;
71c46176 295 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
3a6a366f 296 }
71c46176
AT
297#endif
298
3a6a366f
AT
299 return ret;
300}
301
720b47f2
AT
302void read_buf(int f,char *buf,int len)
303{
4fe159a8
AT
304 int ret;
305 if ((ret=readfd(f,buf,len)) != len) {
720b47f2 306 if (verbose > 1)
8d9dc9f9 307 rprintf(FERROR,"(%d) read_buf: Error reading %d bytes : %s\n",
9e31c482 308 getpid(),len,ret==-1?strerror(errno):"EOF");
34ccb63e 309 exit_cleanup(1);
720b47f2
AT
310 }
311 total_read += len;
312}
313
575f2fca
AT
314void read_sbuf(int f,char *buf,int len)
315{
316 read_buf(f,buf,len);
317 buf[len] = 0;
318}
319
182dca5c
AT
320unsigned char read_byte(int f)
321{
d89322c4
AT
322 unsigned char c;
323 read_buf(f,(char *)&c,1);
324 return c;
182dca5c 325}
720b47f2 326
7bec6a5c 327
3a6a366f
AT
328static char last_byte;
329static int last_sparse;
7bec6a5c
AT
330
331int sparse_end(int f)
332{
d867229b 333 if (last_sparse) {
73233f0f 334 do_lseek(f,-1,SEEK_CUR);
d867229b
AT
335 return (write(f,&last_byte,1) == 1 ? 0 : -1);
336 }
337 last_sparse = 0;
338 return 0;
7bec6a5c
AT
339}
340
d867229b
AT
341
342static int write_sparse(int f,char *buf,int len)
7bec6a5c 343{
d867229b
AT
344 int l1=0,l2=0;
345 int ret;
7bec6a5c 346
d867229b
AT
347 for (l1=0;l1<len && buf[l1]==0;l1++) ;
348 for (l2=0;l2<(len-l1) && buf[len-(l2+1)]==0;l2++) ;
7bec6a5c 349
d867229b 350 last_byte = buf[len-1];
7bec6a5c 351
d867229b
AT
352 if (l1 == len || l2 > 0)
353 last_sparse=1;
7bec6a5c 354
d867229b 355 if (l1 > 0)
73233f0f 356 do_lseek(f,l1,SEEK_CUR);
dc5ddbcc 357
d867229b
AT
358 if (l1 == len)
359 return len;
dc5ddbcc 360
d867229b
AT
361 if ((ret=write(f,buf+l1,len-(l1+l2))) != len-(l1+l2)) {
362 if (ret == -1 || ret == 0) return ret;
363 return (l1+ret);
364 }
7bec6a5c 365
d867229b 366 if (l2 > 0)
73233f0f 367 do_lseek(f,l2,SEEK_CUR);
d867229b
AT
368
369 return len;
370}
dc5ddbcc 371
7bec6a5c 372
d867229b
AT
373
374int write_file(int f,char *buf,int len)
375{
376 int ret = 0;
377
378 if (!sparse_files)
379 return write(f,buf,len);
380
381 while (len>0) {
382 int len1 = MIN(len, SPARSE_WRITE_SIZE);
383 int r1 = write_sparse(f, buf, len1);
384 if (r1 <= 0) {
385 if (ret > 0) return ret;
386 return r1;
387 }
388 len -= r1;
389 buf += r1;
390 ret += r1;
391 }
392 return ret;
7bec6a5c
AT
393}
394
720b47f2 395
d6dead6b 396static int writefd_unbuffered(int fd,char *buf,int len)
720b47f2 397{
8d9dc9f9
AT
398 int total = 0;
399 fd_set w_fds, r_fds;
400 int fd_count, count, got_select=0;
401 struct timeval tv;
720b47f2 402
8d9dc9f9
AT
403 while (total < len) {
404 int ret = write(fd,buf+total,len-total);
720b47f2 405
8d9dc9f9 406 if (ret == 0) return total;
720b47f2 407
8d9dc9f9
AT
408 if (ret == -1 && !(errno == EWOULDBLOCK || errno == EAGAIN))
409 return -1;
720b47f2 410
8d9dc9f9
AT
411 if (ret == -1 && got_select) {
412 /* hmmm, we got a write select on the fd and
413 then failed to write. Why doesn't that
414 mean that the fd is dead? It doesn't on
415 some systems it seems (eg. IRIX) */
416 u_sleep(1000);
417 }
720b47f2 418
8d9dc9f9 419 got_select = 0;
720b47f2 420
6ba9279f 421
8d9dc9f9
AT
422 if (ret != -1) {
423 total += ret;
424 continue;
425 }
426
427 if (read_buffer_len < MAX_READ_BUFFER && buffer_f_in != -1)
428 read_check(buffer_f_in);
429
430 fd_count = fd+1;
431 FD_ZERO(&w_fds);
432 FD_ZERO(&r_fds);
433 FD_SET(fd,&w_fds);
434 if (buffer_f_in != -1) {
435 FD_SET(buffer_f_in,&r_fds);
436 if (buffer_f_in > fd)
437 fd_count = buffer_f_in+1;
438 }
439
440 tv.tv_sec = BLOCKING_TIMEOUT;
441 tv.tv_usec = 0;
442 count = select(fd_count,buffer_f_in == -1? NULL: &r_fds,
443 &w_fds,NULL,&tv);
444
445 if (count == -1 && errno != EINTR) {
446 if (verbose > 1)
447 rprintf(FERROR,"select error: %s\n", strerror(errno));
448 exit_cleanup(1);
449 }
450
451 if (count == 0) {
452 check_timeout();
453 continue;
454 }
455
456 if (FD_ISSET(fd, &w_fds)) {
457 got_select = 1;
458 }
459 }
460
461 if (io_timeout)
462 last_io = time(NULL);
463
464 return total;
720b47f2
AT
465}
466
8d9dc9f9 467
d6dead6b
AT
468static char *io_buffer;
469static int io_buffer_count;
8d9dc9f9 470static int io_out_fd;
d6dead6b
AT
471
472void io_start_buffering(int fd)
473{
8d9dc9f9
AT
474 if (io_buffer) return;
475 io_out_fd = fd;
476 io_buffer = (char *)malloc(IO_BUFFER_SIZE+4);
d6dead6b
AT
477 if (!io_buffer) out_of_memory("writefd");
478 io_buffer_count = 0;
8d9dc9f9
AT
479
480 /* leave room for the multiplex header in case it's needed */
481 io_buffer += 4;
d6dead6b
AT
482}
483
8d9dc9f9 484void io_flush(void)
d6dead6b 485{
8d9dc9f9
AT
486 int fd = io_out_fd;
487 if (!io_buffer_count) return;
488
489 if (io_multiplexing_out) {
490 SIVAL(io_buffer-4, 0, (MPLEX_BASE<<24) + io_buffer_count);
491 if (writefd_unbuffered(fd, io_buffer-4, io_buffer_count+4) !=
492 io_buffer_count+4) {
493 rprintf(FERROR,"write failed\n");
494 exit_cleanup(1);
495 }
496 } else {
497 if (writefd_unbuffered(fd, io_buffer, io_buffer_count) !=
d6dead6b
AT
498 io_buffer_count) {
499 rprintf(FERROR,"write failed\n");
500 exit_cleanup(1);
501 }
d6dead6b 502 }
8d9dc9f9
AT
503 io_buffer_count = 0;
504}
505
506void io_end_buffering(int fd)
507{
508 io_flush();
509 if (!io_multiplexing_out) {
510 free(io_buffer-4);
511 io_buffer = NULL;
512 }
d6dead6b
AT
513}
514
515static int writefd(int fd,char *buf,int len1)
516{
517 int len = len1;
518
519 if (!io_buffer) return writefd_unbuffered(fd, buf, len);
520
521 while (len) {
522 int n = MIN(len, IO_BUFFER_SIZE-io_buffer_count);
523 if (n > 0) {
524 memcpy(io_buffer+io_buffer_count, buf, n);
525 buf += n;
526 len -= n;
527 io_buffer_count += n;
528 }
529
8d9dc9f9 530 if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
d6dead6b
AT
531 }
532
533 return len1;
534}
720b47f2
AT
535
536
b7922338 537void write_int(int f,int32 x)
720b47f2 538{
8d9dc9f9
AT
539 int ret;
540 char b[4];
541 SIVAL(b,0,x);
542 if ((ret=writefd(f,b,4)) != 4) {
543 rprintf(FERROR,"write_int failed : %s\n",
544 ret==-1?strerror(errno):"EOF");
545 exit_cleanup(1);
546 }
547 total_written += 4;
720b47f2
AT
548}
549
71c46176 550void write_longint(int f, int64 x)
3a6a366f
AT
551{
552 extern int remote_version;
553 char b[8];
554 int ret;
555
556 if (remote_version < 16 || x <= 0x7FFFFFFF) {
557 write_int(f, (int)x);
558 return;
559 }
560
561 write_int(f, -1);
562 SIVAL(b,0,(x&0xFFFFFFFF));
563 SIVAL(b,4,((x>>32)&0xFFFFFFFF));
564
565 if ((ret=writefd(f,b,8)) != 8) {
9486289c 566 rprintf(FERROR,"write_longint failed : %s\n",
3a6a366f
AT
567 ret==-1?strerror(errno):"EOF");
568 exit_cleanup(1);
569 }
570 total_written += 8;
571}
572
720b47f2
AT
573void write_buf(int f,char *buf,int len)
574{
8d9dc9f9
AT
575 int ret;
576 if ((ret=writefd(f,buf,len)) != len) {
577 rprintf(FERROR,"write_buf failed : %s\n",
578 ret==-1?strerror(errno):"EOF");
579 exit_cleanup(1);
580 }
581 total_written += len;
720b47f2
AT
582}
583
f0fca04e
AT
584/* write a string to the connection */
585void write_sbuf(int f,char *buf)
586{
587 write_buf(f, buf, strlen(buf));
588}
589
720b47f2 590
182dca5c
AT
591void write_byte(int f,unsigned char c)
592{
f0fca04e 593 write_buf(f,(char *)&c,1);
182dca5c
AT
594}
595
720b47f2
AT
596void write_flush(int f)
597{
598}
599
600
f0fca04e
AT
601int read_line(int f, char *buf, int maxlen)
602{
603 while (maxlen) {
604 read_buf(f, buf, 1);
605 if (buf[0] == '\n') {
606 buf[0] = 0;
607 break;
608 }
609 if (buf[0] != '\r') {
610 buf++;
611 maxlen--;
612 }
613 }
614 if (maxlen == 0) {
615 *buf = 0;
616 return 0;
617 }
618 return 1;
619}
620
621
622void io_printf(int fd, const char *format, ...)
623{
624 va_list ap;
625 char buf[1024];
626 int len;
627
628 va_start(ap, format);
e42c9458 629 len = vslprintf(buf, sizeof(buf)-1, format, ap);
f0fca04e
AT
630 va_end(ap);
631
632 if (len < 0) exit_cleanup(1);
633
634 write_sbuf(fd, buf);
635}
8d9dc9f9
AT
636
637
638/* setup for multiplexing an error stream with the data stream */
639void io_start_multiplex_out(int fd)
640{
641 io_start_buffering(fd);
642 io_multiplexing_out = 1;
643}
644
645/* setup for multiplexing an error stream with the data stream */
646void io_start_multiplex_in(int fd)
647{
648 if (read_buffer_len) {
649 fprintf(stderr,"ERROR: data in read buffer at mplx start\n");
650 exit_cleanup(1);
651 }
652
653 io_multiplexing_in = 1;
654}
655
656/* write an message to the error stream */
657int io_multiplex_write(int f, char *buf, int len)
658{
659 if (!io_multiplexing_out) return 0;
660
661 io_flush();
662
663 SIVAL(io_buffer-4, 0, ((MPLEX_BASE + f)<<24) + len);
664 memcpy(io_buffer, buf, len);
665
666 writefd_unbuffered(io_out_fd, io_buffer-4, len+4);
667 return 1;
668}
669
670void io_close_input(int fd)
671{
672 buffer_f_in = -1;
673}