handle rsh clients that don't like half-open connections
[rsync/rsync.git] / io.c
... / ...
CommitLineData
1/*
2 Copyright (C) Andrew Tridgell 1996
3 Copyright (C) Paul Mackerras 1996
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18*/
19
20/*
21 Utilities used in rsync
22
23 tridge, June 1996
24 */
25#include "rsync.h"
26
27static int64 total_written;
28static int64 total_read;
29
30static int io_multiplexing_out;
31static int io_multiplexing_in;
32static time_t last_io;
33
34extern int verbose;
35extern int sparse_files;
36extern int io_timeout;
37
38int64 write_total(void)
39{
40 return total_written;
41}
42
43int64 read_total(void)
44{
45 return total_read;
46}
47
48static int buffer_f_in = -1;
49
50void setup_nonblocking(int f_in,int f_out)
51{
52 set_blocking(f_out,0);
53 buffer_f_in = f_in;
54}
55
56static void check_timeout(void)
57{
58 time_t t;
59
60 if (!io_timeout) return;
61
62 if (!last_io) {
63 last_io = time(NULL);
64 return;
65 }
66
67 t = time(NULL);
68
69 if (last_io && io_timeout && (t-last_io)>io_timeout) {
70 rprintf(FERROR,"read timeout after %d second - exiting\n",
71 (int)(t-last_io));
72 exit_cleanup(1);
73 }
74}
75
76
77static char *read_buffer;
78static char *read_buffer_p;
79static int read_buffer_len;
80static int read_buffer_size;
81
82
83/* continue trying to read len bytes - don't return until len
84 has been read */
85static void read_loop(int fd, char *buf, int len)
86{
87 while (len) {
88 int n = read(fd, buf, len);
89 if (n > 0) {
90 buf += n;
91 len -= n;
92 }
93 if (n == 0) {
94 rprintf(FERROR,"EOF in read_loop\n");
95 exit_cleanup(1);
96 }
97 if (n == -1) {
98 fd_set fds;
99 struct timeval tv;
100
101 if (errno != EAGAIN && errno != EWOULDBLOCK) {
102 rprintf(FERROR,"io error: %s\n",
103 strerror(errno));
104 exit_cleanup(1);
105 }
106
107 FD_ZERO(&fds);
108 FD_SET(fd, &fds);
109 tv.tv_sec = io_timeout;
110 tv.tv_usec = 0;
111
112 if (select(fd+1, &fds, NULL, NULL,
113 io_timeout?&tv:NULL) != 1) {
114 check_timeout();
115 }
116 }
117 }
118}
119
120static int read_unbuffered(int fd, char *buf, int len)
121{
122 static int remaining;
123 char ibuf[4];
124 int tag, ret=0;
125 char line[1024];
126
127 if (!io_multiplexing_in) return read(fd, buf, len);
128
129 while (ret == 0) {
130 if (remaining) {
131 len = MIN(len, remaining);
132 read_loop(fd, buf, len);
133 remaining -= len;
134 ret = len;
135 continue;
136 }
137
138 read_loop(fd, ibuf, 4);
139 tag = IVAL(ibuf, 0);
140 remaining = tag & 0xFFFFFF;
141 tag = tag >> 24;
142
143 if (tag == MPLEX_BASE) continue;
144
145 tag -= MPLEX_BASE;
146
147 if (tag != FERROR && tag != FINFO) {
148 rprintf(FERROR,"unexpected tag %d\n", tag);
149 exit_cleanup(1);
150 }
151
152 if (remaining > sizeof(line)-1) {
153 rprintf(FERROR,"multiplexing overflow %d\n\n",
154 remaining);
155 exit_cleanup(1);
156 }
157
158 read_loop(fd, line, remaining);
159 line[remaining] = 0;
160
161 rprintf(tag,"%s", line);
162 remaining = 0;
163 }
164
165 return ret;
166}
167
168
169/* This function was added to overcome a deadlock problem when using
170 * ssh. It looks like we can't allow our receive queue to get full or
171 * ssh will clag up. Uggh. */
172static void read_check(int f)
173{
174 int n;
175
176 if (f == -1) return;
177
178 if (read_buffer_len == 0) {
179 read_buffer_p = read_buffer;
180 }
181
182 if ((n=num_waiting(f)) <= 0)
183 return;
184
185 /* things could deteriorate if we read in really small chunks */
186 if (n < 10) n = 1024;
187
188 if (n > MAX_READ_BUFFER/4)
189 n = MAX_READ_BUFFER/4;
190
191 if (read_buffer_p != read_buffer) {
192 memmove(read_buffer,read_buffer_p,read_buffer_len);
193 read_buffer_p = read_buffer;
194 }
195
196 if (n > (read_buffer_size - read_buffer_len)) {
197 read_buffer_size += n;
198 if (!read_buffer)
199 read_buffer = (char *)malloc(read_buffer_size);
200 else
201 read_buffer = (char *)realloc(read_buffer,read_buffer_size);
202 if (!read_buffer) out_of_memory("read check");
203 read_buffer_p = read_buffer;
204 }
205
206 n = read_unbuffered(f,read_buffer+read_buffer_len,n);
207 if (n > 0) {
208 read_buffer_len += n;
209 }
210}
211
212static int readfd(int fd,char *buffer,int N)
213{
214 int ret;
215 int total=0;
216 struct timeval tv;
217
218 if (read_buffer_len < N)
219 read_check(buffer_f_in);
220
221 while (total < N) {
222 if (read_buffer_len > 0 && buffer_f_in == fd) {
223 ret = MIN(read_buffer_len,N-total);
224 memcpy(buffer+total,read_buffer_p,ret);
225 read_buffer_p += ret;
226 read_buffer_len -= ret;
227 total += ret;
228 continue;
229 }
230
231 io_flush();
232
233 while ((ret = read_unbuffered(fd,buffer + total,N-total)) == -1) {
234 fd_set fds;
235
236 if (errno != EAGAIN && errno != EWOULDBLOCK)
237 return -1;
238 FD_ZERO(&fds);
239 FD_SET(fd, &fds);
240 tv.tv_sec = io_timeout;
241 tv.tv_usec = 0;
242
243 if (select(fd+1, &fds, NULL, NULL,
244 io_timeout?&tv:NULL) != 1) {
245 check_timeout();
246 }
247 }
248
249 if (ret <= 0)
250 return total;
251 total += ret;
252 }
253
254 if (io_timeout)
255 last_io = time(NULL);
256 return total;
257}
258
259
260int32 read_int(int f)
261{
262 int ret;
263 char b[4];
264 if ((ret=readfd(f,b,4)) != 4) {
265 if (verbose > 1)
266 rprintf(FERROR,"(%d) read_int: Error reading %d bytes : %s\n",
267 getpid(),4,ret==-1?strerror(errno):"EOF");
268 exit_cleanup(1);
269 }
270 total_read += 4;
271 return IVAL(b,0);
272}
273
274int64 read_longint(int f)
275{
276 extern int remote_version;
277 int64 ret;
278 char b[8];
279 ret = read_int(f);
280
281 if ((int32)ret != (int32)0xffffffff) return ret;
282
283#ifdef NO_INT64
284 rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
285 exit_cleanup(1);
286#else
287 if (remote_version >= 16) {
288 if ((ret=readfd(f,b,8)) != 8) {
289 if (verbose > 1)
290 rprintf(FERROR,"(%d) read_longint: Error reading %d bytes : %s\n",
291 getpid(),8,ret==-1?strerror(errno):"EOF");
292 exit_cleanup(1);
293 }
294 total_read += 8;
295 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
296 }
297#endif
298
299 return ret;
300}
301
302void read_buf(int f,char *buf,int len)
303{
304 int ret;
305 if ((ret=readfd(f,buf,len)) != len) {
306 if (verbose > 1)
307 rprintf(FERROR,"(%d) read_buf: Error reading %d bytes : %s\n",
308 getpid(),len,ret==-1?strerror(errno):"EOF");
309 exit_cleanup(1);
310 }
311 total_read += len;
312}
313
314void read_sbuf(int f,char *buf,int len)
315{
316 read_buf(f,buf,len);
317 buf[len] = 0;
318}
319
320unsigned char read_byte(int f)
321{
322 unsigned char c;
323 read_buf(f,(char *)&c,1);
324 return c;
325}
326
327
328static char last_byte;
329static int last_sparse;
330
331int sparse_end(int f)
332{
333 if (last_sparse) {
334 do_lseek(f,-1,SEEK_CUR);
335 return (write(f,&last_byte,1) == 1 ? 0 : -1);
336 }
337 last_sparse = 0;
338 return 0;
339}
340
341
342static int write_sparse(int f,char *buf,int len)
343{
344 int l1=0,l2=0;
345 int ret;
346
347 for (l1=0;l1<len && buf[l1]==0;l1++) ;
348 for (l2=0;l2<(len-l1) && buf[len-(l2+1)]==0;l2++) ;
349
350 last_byte = buf[len-1];
351
352 if (l1 == len || l2 > 0)
353 last_sparse=1;
354
355 if (l1 > 0)
356 do_lseek(f,l1,SEEK_CUR);
357
358 if (l1 == len)
359 return len;
360
361 if ((ret=write(f,buf+l1,len-(l1+l2))) != len-(l1+l2)) {
362 if (ret == -1 || ret == 0) return ret;
363 return (l1+ret);
364 }
365
366 if (l2 > 0)
367 do_lseek(f,l2,SEEK_CUR);
368
369 return len;
370}
371
372
373
374int write_file(int f,char *buf,int len)
375{
376 int ret = 0;
377
378 if (!sparse_files)
379 return write(f,buf,len);
380
381 while (len>0) {
382 int len1 = MIN(len, SPARSE_WRITE_SIZE);
383 int r1 = write_sparse(f, buf, len1);
384 if (r1 <= 0) {
385 if (ret > 0) return ret;
386 return r1;
387 }
388 len -= r1;
389 buf += r1;
390 ret += r1;
391 }
392 return ret;
393}
394
395
396static int writefd_unbuffered(int fd,char *buf,int len)
397{
398 int total = 0;
399 fd_set w_fds, r_fds;
400 int fd_count, count, got_select=0;
401 struct timeval tv;
402
403 while (total < len) {
404 int ret = write(fd,buf+total,len-total);
405
406 if (ret == 0) return total;
407
408 if (ret == -1 && !(errno == EWOULDBLOCK || errno == EAGAIN))
409 return -1;
410
411 if (ret == -1 && got_select) {
412 /* hmmm, we got a write select on the fd and
413 then failed to write. Why doesn't that
414 mean that the fd is dead? It doesn't on
415 some systems it seems (eg. IRIX) */
416 u_sleep(1000);
417 }
418
419 got_select = 0;
420
421
422 if (ret != -1) {
423 total += ret;
424 continue;
425 }
426
427 if (read_buffer_len < MAX_READ_BUFFER && buffer_f_in != -1)
428 read_check(buffer_f_in);
429
430 fd_count = fd+1;
431 FD_ZERO(&w_fds);
432 FD_ZERO(&r_fds);
433 FD_SET(fd,&w_fds);
434 if (buffer_f_in != -1) {
435 FD_SET(buffer_f_in,&r_fds);
436 if (buffer_f_in > fd)
437 fd_count = buffer_f_in+1;
438 }
439
440 tv.tv_sec = BLOCKING_TIMEOUT;
441 tv.tv_usec = 0;
442 count = select(fd_count,buffer_f_in == -1? NULL: &r_fds,
443 &w_fds,NULL,&tv);
444
445 if (count == -1 && errno != EINTR) {
446 if (verbose > 1)
447 rprintf(FERROR,"select error: %s\n", strerror(errno));
448 exit_cleanup(1);
449 }
450
451 if (count == 0) {
452 check_timeout();
453 continue;
454 }
455
456 if (FD_ISSET(fd, &w_fds)) {
457 got_select = 1;
458 }
459 }
460
461 if (io_timeout)
462 last_io = time(NULL);
463
464 return total;
465}
466
467
468static char *io_buffer;
469static int io_buffer_count;
470static int io_out_fd;
471
472void io_start_buffering(int fd)
473{
474 if (io_buffer) return;
475 io_out_fd = fd;
476 io_buffer = (char *)malloc(IO_BUFFER_SIZE+4);
477 if (!io_buffer) out_of_memory("writefd");
478 io_buffer_count = 0;
479
480 /* leave room for the multiplex header in case it's needed */
481 io_buffer += 4;
482}
483
484void io_flush(void)
485{
486 int fd = io_out_fd;
487 if (!io_buffer_count) return;
488
489 if (io_multiplexing_out) {
490 SIVAL(io_buffer-4, 0, (MPLEX_BASE<<24) + io_buffer_count);
491 if (writefd_unbuffered(fd, io_buffer-4, io_buffer_count+4) !=
492 io_buffer_count+4) {
493 rprintf(FERROR,"write failed\n");
494 exit_cleanup(1);
495 }
496 } else {
497 if (writefd_unbuffered(fd, io_buffer, io_buffer_count) !=
498 io_buffer_count) {
499 rprintf(FERROR,"write failed\n");
500 exit_cleanup(1);
501 }
502 }
503 io_buffer_count = 0;
504}
505
506void io_end_buffering(int fd)
507{
508 io_flush();
509 if (!io_multiplexing_out) {
510 free(io_buffer-4);
511 io_buffer = NULL;
512 }
513}
514
515static int writefd(int fd,char *buf,int len1)
516{
517 int len = len1;
518
519 if (!io_buffer) return writefd_unbuffered(fd, buf, len);
520
521 while (len) {
522 int n = MIN(len, IO_BUFFER_SIZE-io_buffer_count);
523 if (n > 0) {
524 memcpy(io_buffer+io_buffer_count, buf, n);
525 buf += n;
526 len -= n;
527 io_buffer_count += n;
528 }
529
530 if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
531 }
532
533 return len1;
534}
535
536
537void write_int(int f,int32 x)
538{
539 int ret;
540 char b[4];
541 SIVAL(b,0,x);
542 if ((ret=writefd(f,b,4)) != 4) {
543 rprintf(FERROR,"write_int failed : %s\n",
544 ret==-1?strerror(errno):"EOF");
545 exit_cleanup(1);
546 }
547 total_written += 4;
548}
549
550void write_longint(int f, int64 x)
551{
552 extern int remote_version;
553 char b[8];
554 int ret;
555
556 if (remote_version < 16 || x <= 0x7FFFFFFF) {
557 write_int(f, (int)x);
558 return;
559 }
560
561 write_int(f, -1);
562 SIVAL(b,0,(x&0xFFFFFFFF));
563 SIVAL(b,4,((x>>32)&0xFFFFFFFF));
564
565 if ((ret=writefd(f,b,8)) != 8) {
566 rprintf(FERROR,"write_longint failed : %s\n",
567 ret==-1?strerror(errno):"EOF");
568 exit_cleanup(1);
569 }
570 total_written += 8;
571}
572
573void write_buf(int f,char *buf,int len)
574{
575 int ret;
576 if ((ret=writefd(f,buf,len)) != len) {
577 rprintf(FERROR,"write_buf failed : %s\n",
578 ret==-1?strerror(errno):"EOF");
579 exit_cleanup(1);
580 }
581 total_written += len;
582}
583
584/* write a string to the connection */
585void write_sbuf(int f,char *buf)
586{
587 write_buf(f, buf, strlen(buf));
588}
589
590
591void write_byte(int f,unsigned char c)
592{
593 write_buf(f,(char *)&c,1);
594}
595
596void write_flush(int f)
597{
598}
599
600
601int read_line(int f, char *buf, int maxlen)
602{
603 while (maxlen) {
604 read_buf(f, buf, 1);
605 if (buf[0] == '\n') {
606 buf[0] = 0;
607 break;
608 }
609 if (buf[0] != '\r') {
610 buf++;
611 maxlen--;
612 }
613 }
614 if (maxlen == 0) {
615 *buf = 0;
616 return 0;
617 }
618 return 1;
619}
620
621
622void io_printf(int fd, const char *format, ...)
623{
624 va_list ap;
625 char buf[1024];
626 int len;
627
628 va_start(ap, format);
629 len = vslprintf(buf, sizeof(buf)-1, format, ap);
630 va_end(ap);
631
632 if (len < 0) exit_cleanup(1);
633
634 write_sbuf(fd, buf);
635}
636
637
638/* setup for multiplexing an error stream with the data stream */
639void io_start_multiplex_out(int fd)
640{
641 io_start_buffering(fd);
642 io_multiplexing_out = 1;
643}
644
645/* setup for multiplexing an error stream with the data stream */
646void io_start_multiplex_in(int fd)
647{
648 if (read_buffer_len) {
649 fprintf(stderr,"ERROR: data in read buffer at mplx start\n");
650 exit_cleanup(1);
651 }
652
653 io_multiplexing_in = 1;
654}
655
656/* write an message to the error stream */
657int io_multiplex_write(int f, char *buf, int len)
658{
659 if (!io_multiplexing_out) return 0;
660
661 io_flush();
662
663 SIVAL(io_buffer-4, 0, ((MPLEX_BASE + f)<<24) + len);
664 memcpy(io_buffer, buf, len);
665
666 writefd_unbuffered(io_out_fd, io_buffer-4, len+4);
667 return 1;
668}
669
670void io_close_input(int fd)
671{
672 buffer_f_in = -1;
673}