Add comments.
[rsync/rsync.git] / io.c
1 /* -*- c-file-style: "linux" -*-
2    
3    Copyright (C) 1996-2001 by Andrew Tridgell 
4    Copyright (C) Paul Mackerras 1996
5    
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 2 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, write to the Free Software
18    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
19 */
20
21 /*
22   socket and pipe IO utilities used in rsync 
23
24   tridge, June 1996
25   */
26 #include "rsync.h"
27
28 /* if no timeout is specified then use a 60 second select timeout */
29 #define SELECT_TIMEOUT 60
30
31 extern int bwlimit;
32
33 static int io_multiplexing_out;
34 static int io_multiplexing_in;
35 static int multiplex_in_fd;
36 static int multiplex_out_fd;
37 static time_t last_io;
38 static int eof_error=1;
39 extern int verbose;
40 extern int io_timeout;
41 extern struct stats stats;
42
43 static int io_error_fd = -1;
44
45 static void read_loop(int fd, char *buf, int len);
46
47 static void check_timeout(void)
48 {
49         extern int am_server, am_daemon;
50         time_t t;
51         
52         if (!io_timeout) return;
53
54         if (!last_io) {
55                 last_io = time(NULL);
56                 return;
57         }
58
59         t = time(NULL);
60
61         if (last_io && io_timeout && (t-last_io) >= io_timeout) {
62                 if (!am_server && !am_daemon) {
63                         rprintf(FERROR,"io timeout after %d seconds - exiting\n", 
64                                 (int)(t-last_io));
65                 }
66                 exit_cleanup(RERR_TIMEOUT);
67         }
68 }
69
70 /* setup the fd used to propogate errors */
71 void io_set_error_fd(int fd)
72 {
73         io_error_fd = fd;
74 }
75
76 /* read some data from the error fd and write it to the write log code */
77 static void read_error_fd(void)
78 {
79         char buf[200];
80         int n;
81         int fd = io_error_fd;
82         int tag, len;
83
84         /* io_error_fd is temporarily disabled -- is this meant to
85          * prevent indefinite recursion? */
86         io_error_fd = -1;
87
88         read_loop(fd, buf, 4);
89         tag = IVAL(buf, 0);
90
91         len = tag & 0xFFFFFF;
92         tag = tag >> 24;
93         tag -= MPLEX_BASE;
94
95         while (len) {
96                 n = len;
97                 if (n > (sizeof(buf)-1)) n = sizeof(buf)-1;
98                 read_loop(fd, buf, n);
99                 rwrite((enum logcode)tag, buf, n);
100                 len -= n;
101         }
102
103         io_error_fd = fd;
104 }
105
106
107 static int no_flush;
108
109 /*
110  * Read from a socket with IO timeout. return the number of bytes
111  * read. If no bytes can be read then exit, never return a number <= 0.
112  *
113  * TODO: If the remote shell connection fails, then current versions
114  * actually report an "unexpected EOF" error here.  Since it's a
115  * fairly common mistake to try to use rsh when ssh is required, we
116  * should trap that: if we fail to read any data at all, we should
117  * give a better explanation.  We can tell whether the connection has
118  * started by looking e.g. at whether the remote version is known yet.
119  */
120 static int read_timeout(int fd, char *buf, int len)
121 {
122         int n, ret=0;
123
124         io_flush();
125
126         while (ret == 0) {
127                 fd_set fds;
128                 struct timeval tv;
129                 int fd_count = fd+1;
130
131                 FD_ZERO(&fds);
132                 FD_SET(fd, &fds);
133                 if (io_error_fd != -1) {
134                         FD_SET(io_error_fd, &fds);
135                         if (io_error_fd > fd) fd_count = io_error_fd+1;
136                 }
137
138                 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
139                 tv.tv_usec = 0;
140
141                 errno = 0;
142
143                 if (select(fd_count, &fds, NULL, NULL, &tv) < 1) {
144                         if (errno == EBADF) {
145                                 exit_cleanup(RERR_SOCKETIO);
146                         }
147                         check_timeout();
148                         continue;
149                 }
150
151                 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &fds)) {
152                         read_error_fd();
153                 }
154
155                 if (!FD_ISSET(fd, &fds)) continue;
156
157                 n = read(fd, buf, len);
158
159                 if (n > 0) {
160                         buf += n;
161                         len -= n;
162                         ret += n;
163                         if (io_timeout)
164                                 last_io = time(NULL);
165                         continue;
166                 }
167
168                 if (n == -1 && errno == EINTR) {
169                         continue;
170                 }
171
172                 if (n == -1 && 
173                     (errno == EWOULDBLOCK || errno == EAGAIN)) {
174                         continue;
175                 }
176
177
178                 if (n == 0) {
179                         if (eof_error) {
180                                 rprintf(FERROR,
181                                         "%s: connection to server unexpectedly closed"
182                                         " (%ld bytes read so far)\n",
183                                         RSYNC_NAME, stats.total_read);
184                         }
185                         exit_cleanup(RERR_STREAMIO);
186                 }
187
188                 /* this prevents us trying to write errors on a dead socket */
189                 io_multiplexing_close();
190
191                 rprintf(FERROR,"read error: %s\n", strerror(errno));
192                 exit_cleanup(RERR_STREAMIO);
193         }
194
195         return ret;
196 }
197
198 /* continue trying to read len bytes - don't return until len
199    has been read */
200 static void read_loop(int fd, char *buf, int len)
201 {
202         while (len) {
203                 int n = read_timeout(fd, buf, len);
204
205                 buf += n;
206                 len -= n;
207         }
208 }
209
210 /* read from the file descriptor handling multiplexing - 
211    return number of bytes read
212    never return <= 0 */
213 static int read_unbuffered(int fd, char *buf, int len)
214 {
215         static int remaining;
216         int tag, ret=0;
217         char line[1024];
218
219         if (!io_multiplexing_in || fd != multiplex_in_fd) 
220                 return read_timeout(fd, buf, len);
221
222         while (ret == 0) {
223                 if (remaining) {
224                         len = MIN(len, remaining);
225                         read_loop(fd, buf, len);
226                         remaining -= len;
227                         ret = len;
228                         continue;
229                 }
230
231                 read_loop(fd, line, 4);
232                 tag = IVAL(line, 0);
233
234                 remaining = tag & 0xFFFFFF;
235                 tag = tag >> 24;
236
237                 if (tag == MPLEX_BASE) continue;
238
239                 tag -= MPLEX_BASE;
240
241                 if (tag != FERROR && tag != FINFO) {
242                         rprintf(FERROR,"unexpected tag %d\n", tag);
243                         exit_cleanup(RERR_STREAMIO);
244                 }
245
246                 if (remaining > sizeof(line)-1) {
247                         rprintf(FERROR,"multiplexing overflow %d\n\n", 
248                                 remaining);
249                         exit_cleanup(RERR_STREAMIO);
250                 }
251
252                 read_loop(fd, line, remaining);
253                 line[remaining] = 0;
254
255                 rprintf((enum logcode)tag,"%s", line);
256                 remaining = 0;
257         }
258
259         return ret;
260 }
261
262
263 /* do a buffered read from fd. don't return until all N bytes
264    have been read. If all N can't be read then exit with an error */
265 static void readfd(int fd,char *buffer,int N)
266 {
267         int  ret;
268         int total=0;  
269         
270         while (total < N) {
271                 io_flush();
272
273                 ret = read_unbuffered(fd,buffer + total,N-total);
274                 total += ret;
275         }
276
277         stats.total_read += total;
278 }
279
280
281 int32 read_int(int f)
282 {
283         char b[4];
284         int32 ret;
285
286         readfd(f,b,4);
287         ret = IVAL(b,0);
288         if (ret == (int32)0xffffffff) return -1;
289         return ret;
290 }
291
292 int64 read_longint(int f)
293 {
294         extern int remote_version;
295         int64 ret;
296         char b[8];
297         ret = read_int(f);
298
299         if ((int32)ret != (int32)0xffffffff) {
300                 return ret;
301         }
302
303 #ifdef NO_INT64
304         rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
305         exit_cleanup(RERR_UNSUPPORTED);
306 #else
307         if (remote_version >= 16) {
308                 readfd(f,b,8);
309                 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
310         }
311 #endif
312
313         return ret;
314 }
315
316 void read_buf(int f,char *buf,int len)
317 {
318         readfd(f,buf,len);
319 }
320
321 void read_sbuf(int f,char *buf,int len)
322 {
323         read_buf(f,buf,len);
324         buf[len] = 0;
325 }
326
327 unsigned char read_byte(int f)
328 {
329         unsigned char c;
330         read_buf(f,(char *)&c,1);
331         return c;
332 }
333
334 /* write len bytes to fd */
335 static void writefd_unbuffered(int fd,char *buf,int len)
336 {
337         int total = 0;
338         fd_set w_fds, r_fds;
339         int fd_count, count;
340         struct timeval tv;
341
342         no_flush++;
343
344         while (total < len) {
345                 FD_ZERO(&w_fds);
346                 FD_ZERO(&r_fds);
347                 FD_SET(fd,&w_fds);
348                 fd_count = fd;
349
350                 if (io_error_fd != -1) {
351                         FD_SET(io_error_fd,&r_fds);
352                         if (io_error_fd > fd_count) 
353                                 fd_count = io_error_fd;
354                 }
355
356                 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
357                 tv.tv_usec = 0;
358
359                 errno = 0;
360
361                 count = select(fd_count+1,
362                                io_error_fd != -1?&r_fds:NULL,
363                                &w_fds,NULL,
364                                &tv);
365
366                 if (count <= 0) {
367                         if (errno == EBADF) {
368                                 exit_cleanup(RERR_SOCKETIO);
369                         }
370                         check_timeout();
371                         continue;
372                 }
373
374                 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &r_fds)) {
375                         read_error_fd();
376                 }
377
378                 if (FD_ISSET(fd, &w_fds)) {
379                         int ret, n = len-total;
380                         
381                         ret = write(fd,buf+total,n);
382
383                         if (ret == -1 && errno == EINTR) {
384                                 continue;
385                         }
386
387                         if (ret == -1 && 
388                             (errno == EWOULDBLOCK || errno == EAGAIN)) {
389                                 msleep(1);
390                                 continue;
391                         }
392
393                         if (ret <= 0) {
394                                 rprintf(FERROR,
395                                         "error writing %d unbuffered bytes"
396                                         " - exiting: %s\n", len,
397                                         strerror(errno));
398                                 exit_cleanup(RERR_STREAMIO);
399                         }
400
401                         /* Sleep after writing to limit I/O bandwidth */
402                         if (bwlimit)
403                         {
404                             tv.tv_sec = 0;
405                             tv.tv_usec = ret * 1000 / bwlimit;
406                             while (tv.tv_usec > 1000000)
407                             {
408                                 tv.tv_sec++;
409                                 tv.tv_usec -= 1000000;
410                             }
411                             select(0, NULL, NULL, NULL, &tv);
412                         }
413  
414                         total += ret;
415
416                         if (io_timeout)
417                                 last_io = time(NULL);
418                 }
419         }
420
421         no_flush--;
422 }
423
424
425 static char *io_buffer;
426 static int io_buffer_count;
427
428 void io_start_buffering(int fd)
429 {
430         if (io_buffer) return;
431         multiplex_out_fd = fd;
432         io_buffer = (char *)malloc(IO_BUFFER_SIZE);
433         if (!io_buffer) out_of_memory("writefd");
434         io_buffer_count = 0;
435 }
436
437 /* write an message to a multiplexed stream. If this fails then rsync
438    exits */
439 static void mplex_write(int fd, enum logcode code, char *buf, int len)
440 {
441         char buffer[4096];
442         int n = len;
443
444         SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
445
446         if (n > (sizeof(buffer)-4)) {
447                 n = sizeof(buffer)-4;
448         }
449
450         memcpy(&buffer[4], buf, n);
451         writefd_unbuffered(fd, buffer, n+4);
452
453         len -= n;
454         buf += n;
455
456         if (len) {
457                 writefd_unbuffered(fd, buf, len);
458         }
459 }
460
461
462 void io_flush(void)
463 {
464         int fd = multiplex_out_fd;
465         if (!io_buffer_count || no_flush) return;
466
467         if (io_multiplexing_out) {
468                 mplex_write(fd, FNONE, io_buffer, io_buffer_count);
469         } else {
470                 writefd_unbuffered(fd, io_buffer, io_buffer_count);
471         }
472         io_buffer_count = 0;
473 }
474
475
476 /* XXX: fd is ignored, which seems a little strange. */
477 void io_end_buffering(int fd)
478 {
479         io_flush();
480         if (!io_multiplexing_out) {
481                 free(io_buffer);
482                 io_buffer = NULL;
483         }
484 }
485
486 /* some OSes have a bug where an exit causes the pending writes on
487    a socket to be flushed. Do an explicit shutdown to try to prevent this */
488 void io_shutdown(void)
489 {
490         if (multiplex_out_fd != -1) close(multiplex_out_fd);
491         if (io_error_fd != -1) close(io_error_fd);
492         multiplex_out_fd = -1;
493         io_error_fd = -1;
494 }
495
496
497 static void writefd(int fd,char *buf,int len)
498 {
499         stats.total_written += len;
500
501         if (!io_buffer || fd != multiplex_out_fd) {
502                 writefd_unbuffered(fd, buf, len);
503                 return;
504         }
505
506         while (len) {
507                 int n = MIN(len, IO_BUFFER_SIZE-io_buffer_count);
508                 if (n > 0) {
509                         memcpy(io_buffer+io_buffer_count, buf, n);
510                         buf += n;
511                         len -= n;
512                         io_buffer_count += n;
513                 }
514                 
515                 if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
516         }
517 }
518
519
520 void write_int(int f,int32 x)
521 {
522         char b[4];
523         SIVAL(b,0,x);
524         writefd(f,b,4);
525 }
526
527
528 /*
529  * Note: int64 may actually be a 32-bit type if ./configure couldn't find any
530  * 64-bit types on this platform.
531  */
532 void write_longint(int f, int64 x)
533 {
534         extern int remote_version;
535         char b[8];
536
537         if (remote_version < 16 || x <= 0x7FFFFFFF) {
538                 write_int(f, (int)x);
539                 return;
540         }
541
542         write_int(f, (int32)0xFFFFFFFF);
543         SIVAL(b,0,(x&0xFFFFFFFF));
544         SIVAL(b,4,((x>>32)&0xFFFFFFFF));
545
546         writefd(f,b,8);
547 }
548
549 void write_buf(int f,char *buf,int len)
550 {
551         writefd(f,buf,len);
552 }
553
554 /* write a string to the connection */
555 static void write_sbuf(int f,char *buf)
556 {
557         write_buf(f, buf, strlen(buf));
558 }
559
560
561 void write_byte(int f,unsigned char c)
562 {
563         write_buf(f,(char *)&c,1);
564 }
565
566 int read_line(int f, char *buf, int maxlen)
567 {
568         eof_error = 0;
569
570         while (maxlen) {
571                 buf[0] = 0;
572                 read_buf(f, buf, 1);
573                 if (buf[0] == 0) return 0;
574                 if (buf[0] == '\n') {
575                         buf[0] = 0;
576                         break;
577                 }
578                 if (buf[0] != '\r') {
579                         buf++;
580                         maxlen--;
581                 }
582         }
583         if (maxlen == 0) {
584                 *buf = 0;
585                 return 0;
586         }
587
588         eof_error = 1;
589
590         return 1;
591 }
592
593
594 void io_printf(int fd, const char *format, ...)
595 {
596         va_list ap;  
597         char buf[1024];
598         int len;
599         
600         va_start(ap, format);
601         len = vslprintf(buf, sizeof(buf), format, ap);
602         va_end(ap);
603
604         if (len < 0) exit_cleanup(RERR_STREAMIO);
605
606         write_sbuf(fd, buf);
607 }
608
609
610 /* setup for multiplexing an error stream with the data stream */
611 void io_start_multiplex_out(int fd)
612 {
613         multiplex_out_fd = fd;
614         io_flush();
615         io_start_buffering(fd);
616         io_multiplexing_out = 1;
617 }
618
619 /* setup for multiplexing an error stream with the data stream */
620 void io_start_multiplex_in(int fd)
621 {
622         multiplex_in_fd = fd;
623         io_flush();
624         io_multiplexing_in = 1;
625 }
626
627 /* write an message to the multiplexed error stream */
628 int io_multiplex_write(enum logcode code, char *buf, int len)
629 {
630         if (!io_multiplexing_out) return 0;
631
632         io_flush();
633         stats.total_written += (len+4);
634         mplex_write(multiplex_out_fd, code, buf, len);
635         return 1;
636 }
637
638 /* write a message to the special error fd */
639 int io_error_write(int f, enum logcode code, char *buf, int len)
640 {
641         if (f == -1) return 0;
642         mplex_write(f, code, buf, len);
643         return 1;
644 }
645
646 /* stop output multiplexing */
647 void io_multiplexing_close(void)
648 {
649         io_multiplexing_out = 0;
650 }
651