Doc.
[rsync/rsync.git] / io.c
1 /* -*- c-file-style: "linux" -*-
2    
3    Copyright (C) 1996-2000 by Andrew Tridgell 
4    Copyright (C) Paul Mackerras 1996
5    
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 2 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, write to the Free Software
18    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
19 */
20
21 /*
22   socket and pipe IO utilities used in rsync 
23
24   tridge, June 1996
25   */
26 #include "rsync.h"
27
28 /* if no timeout is specified then use a 60 second select timeout */
29 #define SELECT_TIMEOUT 60
30
31 extern int bwlimit;
32
33 static int io_multiplexing_out;
34 static int io_multiplexing_in;
35 static int multiplex_in_fd;
36 static int multiplex_out_fd;
37 static time_t last_io;
38 static int eof_error=1;
39 extern int verbose;
40 extern int io_timeout;
41 extern struct stats stats;
42
43 static int io_error_fd = -1;
44
45 static void read_loop(int fd, char *buf, int len);
46
47 static void check_timeout(void)
48 {
49         extern int am_server, am_daemon;
50         time_t t;
51         
52         if (!io_timeout) return;
53
54         if (!last_io) {
55                 last_io = time(NULL);
56                 return;
57         }
58
59         t = time(NULL);
60
61         if (last_io && io_timeout && (t-last_io) >= io_timeout) {
62                 if (!am_server && !am_daemon) {
63                         rprintf(FERROR,"io timeout after %d second - exiting\n", 
64                                 (int)(t-last_io));
65                 }
66                 exit_cleanup(RERR_TIMEOUT);
67         }
68 }
69
70 /* setup the fd used to propogate errors */
71 void io_set_error_fd(int fd)
72 {
73         io_error_fd = fd;
74 }
75
76 /* read some data from the error fd and write it to the write log code */
77 static void read_error_fd(void)
78 {
79         char buf[200];
80         int n;
81         int fd = io_error_fd;
82         int tag, len;
83
84         io_error_fd = -1;
85
86         read_loop(fd, buf, 4);
87         tag = IVAL(buf, 0);
88
89         len = tag & 0xFFFFFF;
90         tag = tag >> 24;
91         tag -= MPLEX_BASE;
92
93         while (len) {
94                 n = len;
95                 if (n > (sizeof(buf)-1)) n = sizeof(buf)-1;
96                 read_loop(fd, buf, n);
97                 rwrite((enum logcode)tag, buf, n);
98                 len -= n;
99         }
100
101         io_error_fd = fd;
102 }
103
104
105 static int no_flush;
106
107 /*
108  * Read from a socket with IO timeout. return the number of bytes
109  * read. If no bytes can be read then exit, never return a number <= 0.
110  *
111  * TODO: If the remote shell connection fails, then current versions actually
112  * report an "unexpected EOF" error here.  Since it's a fairly common mistake
113  * to try to use rsh when ssh is required, we should trap that: if we fail
114  * to read any data at all, we should give a better explanation.
115  */
116 static int read_timeout(int fd, char *buf, int len)
117 {
118         int n, ret=0;
119
120         io_flush();
121
122         while (ret == 0) {
123                 fd_set fds;
124                 struct timeval tv;
125                 int fd_count = fd+1;
126
127                 FD_ZERO(&fds);
128                 FD_SET(fd, &fds);
129                 if (io_error_fd != -1) {
130                         FD_SET(io_error_fd, &fds);
131                         if (io_error_fd > fd) fd_count = io_error_fd+1;
132                 }
133
134                 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
135                 tv.tv_usec = 0;
136
137                 errno = 0;
138
139                 if (select(fd_count, &fds, NULL, NULL, &tv) < 1) {
140                         if (errno == EBADF) {
141                                 exit_cleanup(RERR_SOCKETIO);
142                         }
143                         check_timeout();
144                         continue;
145                 }
146
147                 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &fds)) {
148                         read_error_fd();
149                 }
150
151                 if (!FD_ISSET(fd, &fds)) continue;
152
153                 n = read(fd, buf, len);
154
155                 if (n > 0) {
156                         buf += n;
157                         len -= n;
158                         ret += n;
159                         if (io_timeout)
160                                 last_io = time(NULL);
161                         continue;
162                 }
163
164                 if (n == -1 && errno == EINTR) {
165                         continue;
166                 }
167
168                 if (n == -1 && 
169                     (errno == EWOULDBLOCK || errno == EAGAIN)) {
170                         continue;
171                 }
172
173
174                 if (n == 0) {
175                         if (eof_error) {
176                                 rprintf(FERROR,"unexpected EOF in read_timeout\n");
177                         }
178                         exit_cleanup(RERR_STREAMIO);
179                 }
180
181                 /* this prevents us trying to write errors on a dead socket */
182                 io_multiplexing_close();
183
184                 rprintf(FERROR,"read error: %s\n", strerror(errno));
185                 exit_cleanup(RERR_STREAMIO);
186         }
187
188         return ret;
189 }
190
191 /* continue trying to read len bytes - don't return until len
192    has been read */
193 static void read_loop(int fd, char *buf, int len)
194 {
195         while (len) {
196                 int n = read_timeout(fd, buf, len);
197
198                 buf += n;
199                 len -= n;
200         }
201 }
202
203 /* read from the file descriptor handling multiplexing - 
204    return number of bytes read
205    never return <= 0 */
206 static int read_unbuffered(int fd, char *buf, int len)
207 {
208         static int remaining;
209         int tag, ret=0;
210         char line[1024];
211
212         if (!io_multiplexing_in || fd != multiplex_in_fd) 
213                 return read_timeout(fd, buf, len);
214
215         while (ret == 0) {
216                 if (remaining) {
217                         len = MIN(len, remaining);
218                         read_loop(fd, buf, len);
219                         remaining -= len;
220                         ret = len;
221                         continue;
222                 }
223
224                 read_loop(fd, line, 4);
225                 tag = IVAL(line, 0);
226
227                 remaining = tag & 0xFFFFFF;
228                 tag = tag >> 24;
229
230                 if (tag == MPLEX_BASE) continue;
231
232                 tag -= MPLEX_BASE;
233
234                 if (tag != FERROR && tag != FINFO) {
235                         rprintf(FERROR,"unexpected tag %d\n", tag);
236                         exit_cleanup(RERR_STREAMIO);
237                 }
238
239                 if (remaining > sizeof(line)-1) {
240                         rprintf(FERROR,"multiplexing overflow %d\n\n", 
241                                 remaining);
242                         exit_cleanup(RERR_STREAMIO);
243                 }
244
245                 read_loop(fd, line, remaining);
246                 line[remaining] = 0;
247
248                 rprintf((enum logcode)tag,"%s", line);
249                 remaining = 0;
250         }
251
252         return ret;
253 }
254
255
256 /* do a buffered read from fd. don't return until all N bytes
257    have been read. If all N can't be read then exit with an error */
258 static void readfd(int fd,char *buffer,int N)
259 {
260         int  ret;
261         int total=0;  
262         
263         while (total < N) {
264                 io_flush();
265
266                 ret = read_unbuffered(fd,buffer + total,N-total);
267                 total += ret;
268         }
269
270         stats.total_read += total;
271 }
272
273
274 int32 read_int(int f)
275 {
276         char b[4];
277         int32 ret;
278
279         readfd(f,b,4);
280         ret = IVAL(b,0);
281         if (ret == (int32)0xffffffff) return -1;
282         return ret;
283 }
284
285 int64 read_longint(int f)
286 {
287         extern int remote_version;
288         int64 ret;
289         char b[8];
290         ret = read_int(f);
291
292         if ((int32)ret != (int32)0xffffffff) {
293                 return ret;
294         }
295
296 #ifdef NO_INT64
297         rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
298         exit_cleanup(RERR_UNSUPPORTED);
299 #else
300         if (remote_version >= 16) {
301                 readfd(f,b,8);
302                 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
303         }
304 #endif
305
306         return ret;
307 }
308
309 void read_buf(int f,char *buf,int len)
310 {
311         readfd(f,buf,len);
312 }
313
314 void read_sbuf(int f,char *buf,int len)
315 {
316         read_buf(f,buf,len);
317         buf[len] = 0;
318 }
319
320 unsigned char read_byte(int f)
321 {
322         unsigned char c;
323         read_buf(f,(char *)&c,1);
324         return c;
325 }
326
327 /* write len bytes to fd */
328 static void writefd_unbuffered(int fd,char *buf,int len)
329 {
330         int total = 0;
331         fd_set w_fds, r_fds;
332         int fd_count, count;
333         struct timeval tv;
334
335         no_flush++;
336
337         while (total < len) {
338                 FD_ZERO(&w_fds);
339                 FD_ZERO(&r_fds);
340                 FD_SET(fd,&w_fds);
341                 fd_count = fd;
342
343                 if (io_error_fd != -1) {
344                         FD_SET(io_error_fd,&r_fds);
345                         if (io_error_fd > fd_count) 
346                                 fd_count = io_error_fd;
347                 }
348
349                 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
350                 tv.tv_usec = 0;
351
352                 errno = 0;
353
354                 count = select(fd_count+1,
355                                io_error_fd != -1?&r_fds:NULL,
356                                &w_fds,NULL,
357                                &tv);
358
359                 if (count <= 0) {
360                         if (errno == EBADF) {
361                                 exit_cleanup(RERR_SOCKETIO);
362                         }
363                         check_timeout();
364                         continue;
365                 }
366
367                 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &r_fds)) {
368                         read_error_fd();
369                 }
370
371                 if (FD_ISSET(fd, &w_fds)) {
372                         int ret, n = len-total;
373                         
374                         ret = write(fd,buf+total,n);
375
376                         if (ret == -1 && errno == EINTR) {
377                                 continue;
378                         }
379
380                         if (ret == -1 && 
381                             (errno == EWOULDBLOCK || errno == EAGAIN)) {
382                                 msleep(1);
383                                 continue;
384                         }
385
386                         if (ret <= 0) {
387                                 rprintf(FERROR,"erroring writing %d bytes - exiting\n", len);
388                                 exit_cleanup(RERR_STREAMIO);
389                         }
390
391                         /* Sleep after writing to limit I/O bandwidth */
392                         if (bwlimit)
393                         {
394                             tv.tv_sec = 0;
395                             tv.tv_usec = ret * 1000 / bwlimit;
396                             while (tv.tv_usec > 1000000)
397                             {
398                                 tv.tv_sec++;
399                                 tv.tv_usec -= 1000000;
400                             }
401                             select(0, NULL, NULL, NULL, &tv);
402                         }
403  
404                         total += ret;
405
406                         if (io_timeout)
407                                 last_io = time(NULL);
408                 }
409         }
410
411         no_flush--;
412 }
413
414
415 static char *io_buffer;
416 static int io_buffer_count;
417
418 void io_start_buffering(int fd)
419 {
420         if (io_buffer) return;
421         multiplex_out_fd = fd;
422         io_buffer = (char *)malloc(IO_BUFFER_SIZE);
423         if (!io_buffer) out_of_memory("writefd");
424         io_buffer_count = 0;
425 }
426
427 /* write an message to a multiplexed stream. If this fails then rsync
428    exits */
429 static void mplex_write(int fd, enum logcode code, char *buf, int len)
430 {
431         char buffer[4096];
432         int n = len;
433
434         SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
435
436         if (n > (sizeof(buffer)-4)) {
437                 n = sizeof(buffer)-4;
438         }
439
440         memcpy(&buffer[4], buf, n);
441         writefd_unbuffered(fd, buffer, n+4);
442
443         len -= n;
444         buf += n;
445
446         if (len) {
447                 writefd_unbuffered(fd, buf, len);
448         }
449 }
450
451
452 void io_flush(void)
453 {
454         int fd = multiplex_out_fd;
455         if (!io_buffer_count || no_flush) return;
456
457         if (io_multiplexing_out) {
458                 mplex_write(fd, FNONE, io_buffer, io_buffer_count);
459         } else {
460                 writefd_unbuffered(fd, io_buffer, io_buffer_count);
461         }
462         io_buffer_count = 0;
463 }
464
465
466 /* XXX: fd is ignored, which seems a little strange. */
467 void io_end_buffering(int fd)
468 {
469         io_flush();
470         if (!io_multiplexing_out) {
471                 free(io_buffer);
472                 io_buffer = NULL;
473         }
474 }
475
476 /* some OSes have a bug where an exit causes the pending writes on
477    a socket to be flushed. Do an explicit shutdown to try to prevent this */
478 void io_shutdown(void)
479 {
480         if (multiplex_out_fd != -1) close(multiplex_out_fd);
481         if (io_error_fd != -1) close(io_error_fd);
482         multiplex_out_fd = -1;
483         io_error_fd = -1;
484 }
485
486
487 static void writefd(int fd,char *buf,int len)
488 {
489         stats.total_written += len;
490
491         if (!io_buffer || fd != multiplex_out_fd) {
492                 writefd_unbuffered(fd, buf, len);
493                 return;
494         }
495
496         while (len) {
497                 int n = MIN(len, IO_BUFFER_SIZE-io_buffer_count);
498                 if (n > 0) {
499                         memcpy(io_buffer+io_buffer_count, buf, n);
500                         buf += n;
501                         len -= n;
502                         io_buffer_count += n;
503                 }
504                 
505                 if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
506         }
507 }
508
509
510 void write_int(int f,int32 x)
511 {
512         char b[4];
513         SIVAL(b,0,x);
514         writefd(f,b,4);
515 }
516
517
518 /*
519  * Note: int64 may actually be a 32-bit type if ./configure couldn't find any
520  * 64-bit types on this platform.
521  */
522 void write_longint(int f, int64 x)
523 {
524         extern int remote_version;
525         char b[8];
526
527         if (remote_version < 16 || x <= 0x7FFFFFFF) {
528                 write_int(f, (int)x);
529                 return;
530         }
531
532         write_int(f, (int32)0xFFFFFFFF);
533         SIVAL(b,0,(x&0xFFFFFFFF));
534         SIVAL(b,4,((x>>32)&0xFFFFFFFF));
535
536         writefd(f,b,8);
537 }
538
539 void write_buf(int f,char *buf,int len)
540 {
541         writefd(f,buf,len);
542 }
543
544 /* write a string to the connection */
545 static void write_sbuf(int f,char *buf)
546 {
547         write_buf(f, buf, strlen(buf));
548 }
549
550
551 void write_byte(int f,unsigned char c)
552 {
553         write_buf(f,(char *)&c,1);
554 }
555
556 int read_line(int f, char *buf, int maxlen)
557 {
558         eof_error = 0;
559
560         while (maxlen) {
561                 buf[0] = 0;
562                 read_buf(f, buf, 1);
563                 if (buf[0] == 0) return 0;
564                 if (buf[0] == '\n') {
565                         buf[0] = 0;
566                         break;
567                 }
568                 if (buf[0] != '\r') {
569                         buf++;
570                         maxlen--;
571                 }
572         }
573         if (maxlen == 0) {
574                 *buf = 0;
575                 return 0;
576         }
577
578         eof_error = 1;
579
580         return 1;
581 }
582
583
584 void io_printf(int fd, const char *format, ...)
585 {
586         va_list ap;  
587         char buf[1024];
588         int len;
589         
590         va_start(ap, format);
591         len = vslprintf(buf, sizeof(buf), format, ap);
592         va_end(ap);
593
594         if (len < 0) exit_cleanup(RERR_STREAMIO);
595
596         write_sbuf(fd, buf);
597 }
598
599
600 /* setup for multiplexing an error stream with the data stream */
601 void io_start_multiplex_out(int fd)
602 {
603         multiplex_out_fd = fd;
604         io_flush();
605         io_start_buffering(fd);
606         io_multiplexing_out = 1;
607 }
608
609 /* setup for multiplexing an error stream with the data stream */
610 void io_start_multiplex_in(int fd)
611 {
612         multiplex_in_fd = fd;
613         io_flush();
614         io_multiplexing_in = 1;
615 }
616
617 /* write an message to the multiplexed error stream */
618 int io_multiplex_write(enum logcode code, char *buf, int len)
619 {
620         if (!io_multiplexing_out) return 0;
621
622         io_flush();
623         stats.total_written += (len+4);
624         mplex_write(multiplex_out_fd, code, buf, len);
625         return 1;
626 }
627
628 /* write a message to the special error fd */
629 int io_error_write(int f, enum logcode code, char *buf, int len)
630 {
631         if (f == -1) return 0;
632         mplex_write(f, code, buf, len);
633         return 1;
634 }
635
636 /* stop output multiplexing */
637 void io_multiplexing_close(void)
638 {
639         io_multiplexing_out = 0;
640 }
641