added some really ugly code to allow errors to propogate to
[rsync/rsync.git] / io.c
1 /* 
2    Copyright (C) Andrew Tridgell 1996
3    Copyright (C) Paul Mackerras 1996
4    
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License as published by
7    the Free Software Foundation; either version 2 of the License, or
8    (at your option) any later version.
9    
10    This program is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13    GNU General Public License for more details.
14    
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 */
19
20 /*
21   Utilities used in rsync 
22
23   tridge, June 1996
24   */
25 #include "rsync.h"
26
27 /* if no timeout is specified then use a 60 second select timeout */
28 #define SELECT_TIMEOUT 60
29
30 static int io_multiplexing_out;
31 static int io_multiplexing_in;
32 static int multiplex_in_fd;
33 static int multiplex_out_fd;
34 static time_t last_io;
35 static int eof_error=1;
36 extern int verbose;
37 extern int io_timeout;
38 extern struct stats stats;
39
40 static int buffer_f_in = -1;
41 static int io_error_fd = -1;
42 static int in_read_check;
43
44 void setup_readbuffer(int f_in)
45 {
46         buffer_f_in = f_in;
47 }
48
49 static void check_timeout(void)
50 {
51         time_t t;
52         
53         if (!io_timeout) return;
54
55         if (!last_io) {
56                 last_io = time(NULL);
57                 return;
58         }
59
60         t = time(NULL);
61
62         if (last_io && io_timeout && (t-last_io) >= io_timeout) {
63                 rprintf(FERROR,"io timeout after %d second - exiting\n", 
64                         (int)(t-last_io));
65                 exit_cleanup(RERR_TIMEOUT);
66         }
67 }
68
69 /* setup the fd used to propogate errors */
70 void io_set_error_fd(int fd)
71 {
72         io_error_fd = fd;
73 }
74
75 /* read some data from the error fd and write it to FERROR */
76 static void read_error_fd(void)
77 {
78         char buf[200];
79         int n;
80         int fd = io_error_fd;
81         io_error_fd = -1;
82
83         n = read(fd, buf, sizeof(buf)-1);
84         if (n > 0) {
85                 rwrite(FERROR, buf, n);
86         }
87
88         io_error_fd = fd;
89 }
90
91
92 static char *read_buffer;
93 static char *read_buffer_p;
94 static int read_buffer_len;
95 static int read_buffer_size;
96 static int no_flush;
97 static int no_flush_read;
98
99 /* read from a socket with IO timeout. return the number of
100    bytes read. If no bytes can be read then exit, never return
101    a number <= 0 */
102 static int read_timeout(int fd, char *buf, int len)
103 {
104         int n, ret=0;
105
106         no_flush_read++;
107         io_flush();
108         no_flush_read--;
109
110         while (ret == 0) {
111                 fd_set fds;
112                 struct timeval tv;
113                 int fd_count = fd+1;
114
115                 FD_ZERO(&fds);
116                 FD_SET(fd, &fds);
117                 if (io_error_fd != -1) {
118                         FD_SET(io_error_fd, &fds);
119                         if (io_error_fd > fd) fd_count = io_error_fd+1;
120                 }
121
122                 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
123                 tv.tv_usec = 0;
124
125                 errno = 0;
126
127                 if (select(fd_count, &fds, NULL, NULL, &tv) < 1) {
128                         if (errno == EBADF) {
129                                 exit_cleanup(RERR_SOCKETIO);
130                         }
131                         check_timeout();
132                         continue;
133                 }
134
135                 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &fds)) {
136                         read_error_fd();
137                 }
138
139                 if (!FD_ISSET(fd, &fds)) continue;
140
141                 n = read(fd, buf, len);
142
143                 if (n > 0) {
144                         buf += n;
145                         len -= n;
146                         ret += n;
147                         if (io_timeout)
148                                 last_io = time(NULL);
149                         continue;
150                 }
151
152                 if (n == -1 && errno == EINTR) {
153                         continue;
154                 }
155
156
157                 if (n == 0) {
158                         if (eof_error) {
159                                 rprintf(FERROR,"unexpected EOF in read_timeout\n");
160                         }
161                         exit_cleanup(RERR_STREAMIO);
162                 }
163
164                 /* this prevents us trying to write errors on a dead socket */
165                 io_multiplexing_close();
166
167                 rprintf(FERROR,"read error: %s\n", strerror(errno));
168                 exit_cleanup(RERR_STREAMIO);
169         }
170
171         return ret;
172 }
173
174 /* continue trying to read len bytes - don't return until len
175    has been read */
176 static void read_loop(int fd, char *buf, int len)
177 {
178         while (len) {
179                 int n = read_timeout(fd, buf, len);
180
181                 buf += n;
182                 len -= n;
183         }
184 }
185
186 /* read from the file descriptor handling multiplexing - 
187    return number of bytes read
188    never return <= 0 */
189 static int read_unbuffered(int fd, char *buf, int len)
190 {
191         static int remaining;
192         char ibuf[4];
193         int tag, ret=0;
194         char line[1024];
195
196         if (!io_multiplexing_in || fd != multiplex_in_fd) 
197                 return read_timeout(fd, buf, len);
198
199         while (ret == 0) {
200                 if (remaining) {
201                         len = MIN(len, remaining);
202                         read_loop(fd, buf, len);
203                         remaining -= len;
204                         ret = len;
205                         continue;
206                 }
207
208                 read_loop(fd, ibuf, 4);
209                 tag = IVAL(ibuf, 0);
210
211                 remaining = tag & 0xFFFFFF;
212                 tag = tag >> 24;
213
214                 if (tag == MPLEX_BASE) continue;
215
216                 tag -= MPLEX_BASE;
217
218                 if (tag != FERROR && tag != FINFO) {
219                         rprintf(FERROR,"unexpected tag %d\n", tag);
220                         exit_cleanup(RERR_STREAMIO);
221                 }
222
223                 if (remaining > sizeof(line)-1) {
224                         rprintf(FERROR,"multiplexing overflow %d\n\n", 
225                                 remaining);
226                         exit_cleanup(RERR_STREAMIO);
227                 }
228
229                 read_loop(fd, line, remaining);
230                 line[remaining] = 0;
231
232                 rprintf(tag,"%s", line);
233                 remaining = 0;
234
235                 if (in_read_check) break;
236         }
237
238         return ret;
239 }
240
241
242
243 /* This function was added to overcome a deadlock problem when using
244  * ssh.  It looks like we can't allow our receive queue to get full or
245  * ssh will clag up. Uggh.  */
246 static void read_check(int f)
247 {
248         int n = 8192;
249         
250         in_read_check = 1;
251
252         if (f == -1) return;
253
254         if (read_buffer_len == 0) {
255                 read_buffer_p = read_buffer;
256         }
257
258         if (n > MAX_READ_BUFFER/4)
259                 n = MAX_READ_BUFFER/4;
260
261         if (read_buffer_p != read_buffer) {
262                 memmove(read_buffer,read_buffer_p,read_buffer_len);
263                 read_buffer_p = read_buffer;
264         }
265
266         if (n > (read_buffer_size - read_buffer_len)) {
267                 read_buffer_size += n;
268                 read_buffer = (char *)Realloc(read_buffer,read_buffer_size);
269                 if (!read_buffer) out_of_memory("read check");      
270                 read_buffer_p = read_buffer;      
271         }
272
273         n = read_unbuffered(f,read_buffer+read_buffer_len,n);
274         read_buffer_len += n;
275
276         in_read_check = 0;
277 }
278
279
280 /* do a buffered read from fd. don't return until all N bytes
281    have been read. If all N can't be read then exit with an error */
282 static void readfd(int fd,char *buffer,int N)
283 {
284         int  ret;
285         int total=0;  
286         
287         if ((read_buffer_len < N) && (N < 1024)) {
288                 read_check(buffer_f_in);
289         }
290         
291         while (total < N) {
292                 if (read_buffer_len > 0 && buffer_f_in == fd) {
293                         ret = MIN(read_buffer_len,N-total);
294                         memcpy(buffer+total,read_buffer_p,ret);
295                         read_buffer_p += ret;
296                         read_buffer_len -= ret;
297                         total += ret;
298                         continue;
299                 } 
300
301                 no_flush_read++;
302                 io_flush();
303                 no_flush_read--;
304
305                 ret = read_unbuffered(fd,buffer + total,N-total);
306                 total += ret;
307         }
308
309         stats.total_read += total;
310 }
311
312
313 int32 read_int(int f)
314 {
315         char b[4];
316         int32 ret;
317
318         readfd(f,b,4);
319         ret = IVAL(b,0);
320         if (ret == (int32)0xffffffff) return -1;
321         return ret;
322 }
323
324 int64 read_longint(int f)
325 {
326         extern int remote_version;
327         int64 ret;
328         char b[8];
329         ret = read_int(f);
330
331         if ((int32)ret != (int32)0xffffffff) {
332                 return ret;
333         }
334
335 #ifdef NO_INT64
336         rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
337         exit_cleanup(RERR_UNSUPPORTED);
338 #else
339         if (remote_version >= 16) {
340                 readfd(f,b,8);
341                 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
342         }
343 #endif
344
345         return ret;
346 }
347
348 void read_buf(int f,char *buf,int len)
349 {
350         readfd(f,buf,len);
351 }
352
353 void read_sbuf(int f,char *buf,int len)
354 {
355         read_buf(f,buf,len);
356         buf[len] = 0;
357 }
358
359 unsigned char read_byte(int f)
360 {
361         unsigned char c;
362         read_buf(f,(char *)&c,1);
363         return c;
364 }
365
366
367
368 /* write len bytes to fd, possibly reading from buffer_f_in if set
369    in order to unclog the pipe. don't return until all len
370    bytes have been written */
371 static void writefd_unbuffered(int fd,char *buf,int len)
372 {
373         int total = 0;
374         fd_set w_fds, r_fds;
375         int fd_count, count;
376         struct timeval tv;
377         int reading=0;
378
379         no_flush++;
380
381         while (total < len) {
382                 FD_ZERO(&w_fds);
383                 FD_ZERO(&r_fds);
384                 FD_SET(fd,&w_fds);
385                 fd_count = fd;
386
387                 if (!no_flush_read) {
388                         reading = (buffer_f_in != -1);
389                 }
390
391                 if (reading) {
392                         FD_SET(buffer_f_in,&r_fds);
393                         if (buffer_f_in > fd_count) 
394                                 fd_count = buffer_f_in;
395                 }
396
397                 if (io_error_fd != -1) {
398                         FD_SET(io_error_fd,&r_fds);
399                         if (io_error_fd > fd_count) 
400                                 fd_count = io_error_fd;
401                 }
402
403                 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
404                 tv.tv_usec = 0;
405
406                 errno = 0;
407
408                 count = select(fd_count+1,
409                                (reading || io_error_fd != -1)?&r_fds:NULL,
410                                &w_fds,NULL,
411                                &tv);
412
413                 if (count <= 0) {
414                         if (errno == EBADF) {
415                                 exit_cleanup(RERR_SOCKETIO);
416                         }
417                         check_timeout();
418                         continue;
419                 }
420
421                 if (reading && FD_ISSET(buffer_f_in, &r_fds)) {
422                         read_check(buffer_f_in);
423                 }
424
425                 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &r_fds)) {
426                         read_error_fd();
427                 }
428
429                 if (FD_ISSET(fd, &w_fds)) {
430                         int ret, n = len-total;
431                         
432                         if (n > PIPE_BUF) n = PIPE_BUF;
433
434                         ret = write(fd,buf+total,n?n:1);
435
436                         if (ret == -1 && errno == EINTR) {
437                                 continue;
438                         }
439
440                         if (ret <= 0) {
441                                 rprintf(FERROR,"erroring writing %d bytes - exiting\n", len);
442                                 exit_cleanup(RERR_STREAMIO);
443                         }
444
445                         total += ret;
446
447                         if (io_timeout)
448                                 last_io = time(NULL);
449                 }
450         }
451
452         no_flush--;
453 }
454
455
456 static char *io_buffer;
457 static int io_buffer_count;
458
459 void io_start_buffering(int fd)
460 {
461         if (io_buffer) return;
462         multiplex_out_fd = fd;
463         io_buffer = (char *)malloc(IO_BUFFER_SIZE+4);
464         if (!io_buffer) out_of_memory("writefd");
465         io_buffer_count = 0;
466
467         /* leave room for the multiplex header in case it's needed */
468         io_buffer += 4;
469 }
470
471 void io_flush(void)
472 {
473         int fd = multiplex_out_fd;
474         if (!io_buffer_count || no_flush) return;
475
476         if (io_multiplexing_out) {
477                 SIVAL(io_buffer-4, 0, (MPLEX_BASE<<24) + io_buffer_count);
478                 writefd_unbuffered(fd, io_buffer-4, io_buffer_count+4);
479         } else {
480                 writefd_unbuffered(fd, io_buffer, io_buffer_count);
481         }
482         io_buffer_count = 0;
483 }
484
485 void io_end_buffering(int fd)
486 {
487         io_flush();
488         if (!io_multiplexing_out) {
489                 free(io_buffer-4);
490                 io_buffer = NULL;
491         }
492 }
493
494 static void writefd(int fd,char *buf,int len)
495 {
496         stats.total_written += len;
497
498         if (!io_buffer || fd != multiplex_out_fd) {
499                 writefd_unbuffered(fd, buf, len);
500                 return;
501         }
502
503         while (len) {
504                 int n = MIN(len, IO_BUFFER_SIZE-io_buffer_count);
505                 if (n > 0) {
506                         memcpy(io_buffer+io_buffer_count, buf, n);
507                         buf += n;
508                         len -= n;
509                         io_buffer_count += n;
510                 }
511                 
512                 if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
513         }
514 }
515
516
517 void write_int(int f,int32 x)
518 {
519         char b[4];
520         SIVAL(b,0,x);
521         writefd(f,b,4);
522 }
523
524 void write_longint(int f, int64 x)
525 {
526         extern int remote_version;
527         char b[8];
528
529         if (remote_version < 16 || x <= 0x7FFFFFFF) {
530                 write_int(f, (int)x);
531                 return;
532         }
533
534         write_int(f, (int32)0xFFFFFFFF);
535         SIVAL(b,0,(x&0xFFFFFFFF));
536         SIVAL(b,4,((x>>32)&0xFFFFFFFF));
537
538         writefd(f,b,8);
539 }
540
541 void write_buf(int f,char *buf,int len)
542 {
543         writefd(f,buf,len);
544 }
545
546 /* write a string to the connection */
547 static void write_sbuf(int f,char *buf)
548 {
549         write_buf(f, buf, strlen(buf));
550 }
551
552
553 void write_byte(int f,unsigned char c)
554 {
555         write_buf(f,(char *)&c,1);
556 }
557
558 int read_line(int f, char *buf, int maxlen)
559 {
560         eof_error = 0;
561
562         while (maxlen) {
563                 buf[0] = 0;
564                 read_buf(f, buf, 1);
565                 if (buf[0] == 0) return 0;
566                 if (buf[0] == '\n') {
567                         buf[0] = 0;
568                         break;
569                 }
570                 if (buf[0] != '\r') {
571                         buf++;
572                         maxlen--;
573                 }
574         }
575         if (maxlen == 0) {
576                 *buf = 0;
577                 return 0;
578         }
579
580         eof_error = 1;
581
582         return 1;
583 }
584
585
586 void io_printf(int fd, const char *format, ...)
587 {
588         va_list ap;  
589         char buf[1024];
590         int len;
591         
592         va_start(ap, format);
593         len = vslprintf(buf, sizeof(buf), format, ap);
594         va_end(ap);
595
596         if (len < 0) exit_cleanup(RERR_STREAMIO);
597
598         write_sbuf(fd, buf);
599 }
600
601
602 /* setup for multiplexing an error stream with the data stream */
603 void io_start_multiplex_out(int fd)
604 {
605         multiplex_out_fd = fd;
606         io_flush();
607         io_start_buffering(fd);
608         io_multiplexing_out = 1;
609 }
610
611 /* setup for multiplexing an error stream with the data stream */
612 void io_start_multiplex_in(int fd)
613 {
614         multiplex_in_fd = fd;
615         io_flush();
616         if (read_buffer_len) {
617                 fprintf(stderr,"ERROR: data in read buffer at mplx start\n");
618                 exit_cleanup(RERR_STREAMIO);
619         }
620
621         io_multiplexing_in = 1;
622 }
623
624 /* write an message to the multiplexed error stream */
625 int io_multiplex_write(int f, char *buf, int len)
626 {
627         if (!io_multiplexing_out) return 0;
628
629         io_flush();
630
631         SIVAL(io_buffer-4, 0, ((MPLEX_BASE + f)<<24) + len);
632         memcpy(io_buffer, buf, len);
633
634         stats.total_written += (len+4);
635
636         writefd_unbuffered(multiplex_out_fd, io_buffer-4, len+4);
637         return 1;
638 }
639
640 /* write a message to the special error fd */
641 int io_error_write(int f, char *buf, int len)
642 {
643         if (f == -1) return 0;
644         writefd_unbuffered(f, buf, len);
645         return 1;
646 }
647
648 /* stop output multiplexing */
649 void io_multiplexing_close(void)
650 {
651         io_multiplexing_out = 0;
652 }
653
654 void io_close_input(int fd)
655 {
656         buffer_f_in = -1;
657 }