Document multiplex stuff.
[rsync/rsync.git] / io.c
1 /* -*- c-file-style: "linux" -*-
2    
3    Copyright (C) 1996-2001 by Andrew Tridgell 
4    Copyright (C) Paul Mackerras 1996
5    Copyright (C) 2001 by Martin Pool <mbp@samba.org>
6    
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 2 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, write to the Free Software
19    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 */
21
22 /**
23  *
24  * @file io.c
25  *
26  * Socket and pipe IO utilities used in rsync.
27  *
28  * rsync provides its own multiplexing system, which is used to send
29  * stderr and stdout over a single socket.  We need this because
30  * stdout normally carries the binary data stream, and stderr all our
31  * error messages.
32  *
33  * For historical reasons this is off during the start of the
34  * connection, but it's switched on quite early using
35  * io_start_multiplex_out() and io_start_multiplex_in().
36  **/
37
38 #include "rsync.h"
39
40 /* if no timeout is specified then use a 60 second select timeout */
41 #define SELECT_TIMEOUT 60
42
43 static int io_multiplexing_out;
44 static int io_multiplexing_in;
45 static int multiplex_in_fd;
46 static int multiplex_out_fd;
47 static time_t last_io;
48 static int no_flush;
49
50 extern int bwlimit;
51 extern int verbose;
52 extern int io_timeout;
53 extern struct stats stats;
54
55
56 /** Ignore EOF errors while reading a module listing if the remote
57     version is 24 or less. */
58 int kludge_around_eof = False;
59
60
61 static int io_error_fd = -1;
62
63 static void read_loop(int fd, char *buf, size_t len);
64
65 static void check_timeout(void)
66 {
67         extern int am_server, am_daemon;
68         time_t t;
69
70         err_list_push();
71         
72         if (!io_timeout) return;
73
74         if (!last_io) {
75                 last_io = time(NULL);
76                 return;
77         }
78
79         t = time(NULL);
80
81         if (last_io && io_timeout && (t-last_io) >= io_timeout) {
82                 if (!am_server && !am_daemon) {
83                         rprintf(FERROR,"io timeout after %d seconds - exiting\n", 
84                                 (int)(t-last_io));
85                 }
86                 exit_cleanup(RERR_TIMEOUT);
87         }
88 }
89
90 /* setup the fd used to propogate errors */
91 void io_set_error_fd(int fd)
92 {
93         io_error_fd = fd;
94 }
95
96 /* read some data from the error fd and write it to the write log code */
97 static void read_error_fd(void)
98 {
99         char buf[200];
100         size_t n;
101         int fd = io_error_fd;
102         int tag, len;
103
104         /* io_error_fd is temporarily disabled -- is this meant to
105          * prevent indefinite recursion? */
106         io_error_fd = -1;
107
108         read_loop(fd, buf, 4);
109         tag = IVAL(buf, 0);
110
111         len = tag & 0xFFFFFF;
112         tag = tag >> 24;
113         tag -= MPLEX_BASE;
114
115         while (len) {
116                 n = len;
117                 if (n > (sizeof(buf)-1))
118                         n = sizeof(buf)-1;
119                 read_loop(fd, buf, n);
120                 rwrite((enum logcode)tag, buf, n);
121                 len -= n;
122         }
123
124         io_error_fd = fd;
125 }
126
127
128 static void whine_about_eof (void)
129 {
130         /**
131            It's almost always an error to get an EOF when we're trying
132            to read from the network, because the protocol is
133            self-terminating.
134            
135            However, there is one unfortunate cases where it is not,
136            which is rsync <2.4.6 sending a list of modules on a
137            server, since the list is terminated by closing the socket.
138            So, for the section of the program where that is a problem
139            (start_socket_client), kludge_around_eof is True and we
140            just exit.
141         */
142
143         if (kludge_around_eof)
144                 exit_cleanup (0);
145         else {
146                 rprintf (FERROR,
147                          "%s: connection unexpectedly closed "
148                          "(%.0f bytes read so far)\n",
149                          RSYNC_NAME, (double)stats.total_read);
150         
151                 exit_cleanup (RERR_STREAMIO);
152         }
153 }
154
155
156 static void die_from_readerr (int err)
157 {
158         /* this prevents us trying to write errors on a dead socket */
159         io_multiplexing_close();
160                                 
161         rprintf(FERROR, "%s: read error: %s\n",
162                 RSYNC_NAME, strerror (err));
163         exit_cleanup(RERR_STREAMIO);
164 }
165
166
167 /*!
168  * Read from a socket with IO timeout. return the number of bytes
169  * read. If no bytes can be read then exit, never return a number <= 0.
170  *
171  * TODO: If the remote shell connection fails, then current versions
172  * actually report an "unexpected EOF" error here.  Since it's a
173  * fairly common mistake to try to use rsh when ssh is required, we
174  * should trap that: if we fail to read any data at all, we should
175  * give a better explanation.  We can tell whether the connection has
176  * started by looking e.g. at whether the remote version is known yet.
177  */
178 static int read_timeout (int fd, char *buf, size_t len)
179 {
180         int n, ret=0;
181
182         io_flush();
183
184         while (ret == 0) {
185                 /* until we manage to read *something* */
186                 fd_set fds;
187                 struct timeval tv;
188                 int fd_count = fd+1;
189                 int count;
190
191                 FD_ZERO(&fds);
192                 FD_SET(fd, &fds);
193                 if (io_error_fd != -1) {
194                         FD_SET(io_error_fd, &fds);
195                         if (io_error_fd > fd) fd_count = io_error_fd+1;
196                 }
197
198                 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
199                 tv.tv_usec = 0;
200
201                 errno = 0;
202
203                 count = select(fd_count, &fds, NULL, NULL, &tv);
204
205                 if (count == 0) {
206                         check_timeout();
207                 }
208
209                 if (count <= 0) {
210                         if (errno == EBADF) {
211                                 exit_cleanup(RERR_SOCKETIO);
212                         }
213                         continue;
214                 }
215
216                 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &fds)) {
217                         read_error_fd();
218                 }
219
220                 if (!FD_ISSET(fd, &fds)) continue;
221
222                 n = read(fd, buf, len);
223
224                 if (n > 0) {
225                         buf += n;
226                         len -= n;
227                         ret += n;
228                         if (io_timeout)
229                                 last_io = time(NULL);
230                         continue;
231                 } else if (n == 0) {
232                         whine_about_eof ();
233                         return -1; /* doesn't return */
234                 } else if (n == -1) {
235                         if (errno == EINTR || errno == EWOULDBLOCK ||
236                             errno == EAGAIN) 
237                                 continue;
238                         else
239                                 die_from_readerr (errno);
240                 }
241         }
242
243         return ret;
244 }
245
246
247
248
249 /*! Continue trying to read len bytes - don't return until len has
250   been read.   */
251 static void read_loop (int fd, char *buf, size_t len)
252 {
253         while (len) {
254                 int n = read_timeout(fd, buf, len);
255
256                 buf += n;
257                 len -= n;
258         }
259 }
260
261
262 /**
263  * Read from the file descriptor handling multiplexing - return number
264  * of bytes read.
265  * 
266  * Never returns <= 0. 
267  */
268 static int read_unbuffered(int fd, char *buf, size_t len)
269 {
270         static size_t remaining;
271         int tag, ret = 0;
272         char line[1024];
273
274         if (!io_multiplexing_in || fd != multiplex_in_fd)
275                 return read_timeout(fd, buf, len);
276
277         while (ret == 0) {
278                 if (remaining) {
279                         len = MIN(len, remaining);
280                         read_loop(fd, buf, len);
281                         remaining -= len;
282                         ret = len;
283                         continue;
284                 }
285
286                 read_loop(fd, line, 4);
287                 tag = IVAL(line, 0);
288
289                 remaining = tag & 0xFFFFFF;
290                 tag = tag >> 24;
291
292                 if (tag == MPLEX_BASE)
293                         continue;
294
295                 tag -= MPLEX_BASE;
296
297                 if (tag != FERROR && tag != FINFO) {
298                         rprintf(FERROR, "unexpected tag %d\n", tag);
299                         exit_cleanup(RERR_STREAMIO);
300                 }
301
302                 if (remaining > sizeof(line) - 1) {
303                         rprintf(FERROR, "multiplexing overflow %d\n\n",
304                                 remaining);
305                         exit_cleanup(RERR_STREAMIO);
306                 }
307
308                 read_loop(fd, line, remaining);
309                 line[remaining] = 0;
310
311                 rprintf((enum logcode) tag, "%s", line);
312                 remaining = 0;
313         }
314
315         return ret;
316 }
317
318
319
320 /* do a buffered read from fd. don't return until all N bytes
321    have been read. If all N can't be read then exit with an error */
322 static void readfd (int fd, char *buffer, size_t N)
323 {
324         int  ret;
325         size_t total=0;  
326         
327         while (total < N) {
328                 io_flush();
329
330                 ret = read_unbuffered (fd, buffer + total, N-total);
331                 total += ret;
332         }
333
334         stats.total_read += total;
335 }
336
337
338 int32 read_int(int f)
339 {
340         char b[4];
341         int32 ret;
342
343         readfd(f,b,4);
344         ret = IVAL(b,0);
345         if (ret == (int32)0xffffffff) return -1;
346         return ret;
347 }
348
349 int64 read_longint(int f)
350 {
351         extern int remote_version;
352         int64 ret;
353         char b[8];
354         ret = read_int(f);
355
356         if ((int32)ret != (int32)0xffffffff) {
357                 return ret;
358         }
359
360 #ifdef NO_INT64
361         rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
362         exit_cleanup(RERR_UNSUPPORTED);
363 #else
364         if (remote_version >= 16) {
365                 readfd(f,b,8);
366                 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
367         }
368 #endif
369
370         return ret;
371 }
372
373 void read_buf(int f,char *buf,size_t len)
374 {
375         readfd(f,buf,len);
376 }
377
378 void read_sbuf(int f,char *buf,size_t len)
379 {
380         read_buf (f,buf,len);
381         buf[len] = 0;
382 }
383
384 unsigned char read_byte(int f)
385 {
386         unsigned char c;
387         read_buf (f, (char *)&c, 1);
388         return c;
389 }
390
391 /* Write len bytes to fd.  This underlies the multiplexing system,
392  * which is always called by application code.  */
393 static void writefd_unbuffered(int fd,char *buf,size_t len)
394 {
395         size_t total = 0;
396         fd_set w_fds, r_fds;
397         int fd_count, count;
398         struct timeval tv;
399
400         err_list_push();
401
402         no_flush++;
403
404         while (total < len) {
405                 FD_ZERO(&w_fds);
406                 FD_ZERO(&r_fds);
407                 FD_SET(fd,&w_fds);
408                 fd_count = fd;
409
410                 if (io_error_fd != -1) {
411                         FD_SET(io_error_fd,&r_fds);
412                         if (io_error_fd > fd_count) 
413                                 fd_count = io_error_fd;
414                 }
415
416                 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
417                 tv.tv_usec = 0;
418
419                 errno = 0;
420
421                 count = select(fd_count+1,
422                                io_error_fd != -1?&r_fds:NULL,
423                                &w_fds,NULL,
424                                &tv);
425
426                 if (count == 0) {
427                         check_timeout();
428                 }
429
430                 if (count <= 0) {
431                         if (errno == EBADF) {
432                                 exit_cleanup(RERR_SOCKETIO);
433                         }
434                         continue;
435                 }
436
437                 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &r_fds)) {
438                         read_error_fd();
439                 }
440
441                 if (FD_ISSET(fd, &w_fds)) {
442                         int ret;
443                         size_t n = len-total;
444                         ret = write(fd,buf+total,n);
445
446                         if (ret == -1 && errno == EINTR) {
447                                 continue;
448                         }
449
450                         if (ret == -1 && 
451                             (errno == EWOULDBLOCK || errno == EAGAIN)) {
452                                 msleep(1);
453                                 continue;
454                         }
455
456                         if (ret <= 0) {
457                                 rprintf(FERROR,
458                                         "error writing %d unbuffered bytes"
459                                         " - exiting: %s\n", len,
460                                         strerror(errno));
461                                 exit_cleanup(RERR_STREAMIO);
462                         }
463
464                         /* Sleep after writing to limit I/O bandwidth */
465                         if (bwlimit)
466                         {
467                             tv.tv_sec = 0;
468                             tv.tv_usec = ret * 1000 / bwlimit;
469                             while (tv.tv_usec > 1000000)
470                             {
471                                 tv.tv_sec++;
472                                 tv.tv_usec -= 1000000;
473                             }
474                             select(0, NULL, NULL, NULL, &tv);
475                         }
476  
477                         total += ret;
478
479                         if (io_timeout)
480                                 last_io = time(NULL);
481                 }
482         }
483
484         no_flush--;
485 }
486
487
488 static char *io_buffer;
489 static int io_buffer_count;
490
491 void io_start_buffering(int fd)
492 {
493         if (io_buffer) return;
494         multiplex_out_fd = fd;
495         io_buffer = (char *)malloc(IO_BUFFER_SIZE);
496         if (!io_buffer) out_of_memory("writefd");
497         io_buffer_count = 0;
498 }
499
500 /* write an message to a multiplexed stream. If this fails then rsync
501    exits */
502 static void mplex_write(int fd, enum logcode code, char *buf, size_t len)
503 {
504         char buffer[4096];
505         size_t n = len;
506
507         SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
508
509         if (n > (sizeof(buffer)-4)) {
510                 n = sizeof(buffer)-4;
511         }
512
513         memcpy(&buffer[4], buf, n);
514         writefd_unbuffered(fd, buffer, n+4);
515
516         len -= n;
517         buf += n;
518
519         if (len) {
520                 writefd_unbuffered(fd, buf, len);
521         }
522 }
523
524
525 void io_flush(void)
526 {
527         int fd = multiplex_out_fd;
528
529         err_list_push();
530
531         if (!io_buffer_count || no_flush) return;
532
533         if (io_multiplexing_out) {
534                 mplex_write(fd, FNONE, io_buffer, io_buffer_count);
535         } else {
536                 writefd_unbuffered(fd, io_buffer, io_buffer_count);
537         }
538         io_buffer_count = 0;
539 }
540
541
542 void io_end_buffering(void)
543 {
544         io_flush();
545         if (!io_multiplexing_out) {
546                 free(io_buffer);
547                 io_buffer = NULL;
548         }
549 }
550
551 static void writefd(int fd,char *buf,size_t len)
552 {
553         stats.total_written += len;
554
555         err_list_push();
556
557         if (!io_buffer || fd != multiplex_out_fd) {
558                 writefd_unbuffered(fd, buf, len);
559                 return;
560         }
561
562         while (len) {
563                 int n = MIN((int) len, IO_BUFFER_SIZE-io_buffer_count);
564                 if (n > 0) {
565                         memcpy(io_buffer+io_buffer_count, buf, n);
566                         buf += n;
567                         len -= n;
568                         io_buffer_count += n;
569                 }
570                 
571                 if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
572         }
573 }
574
575
576 void write_int(int f,int32 x)
577 {
578         char b[4];
579         SIVAL(b,0,x);
580         writefd(f,b,4);
581 }
582
583
584 /*
585  * Note: int64 may actually be a 32-bit type if ./configure couldn't find any
586  * 64-bit types on this platform.
587  */
588 void write_longint(int f, int64 x)
589 {
590         extern int remote_version;
591         char b[8];
592
593         if (remote_version < 16 || x <= 0x7FFFFFFF) {
594                 write_int(f, (int)x);
595                 return;
596         }
597
598         write_int(f, (int32)0xFFFFFFFF);
599         SIVAL(b,0,(x&0xFFFFFFFF));
600         SIVAL(b,4,((x>>32)&0xFFFFFFFF));
601
602         writefd(f,b,8);
603 }
604
605 void write_buf(int f,char *buf,size_t len)
606 {
607         writefd(f,buf,len);
608 }
609
610 /* write a string to the connection */
611 static void write_sbuf(int f,char *buf)
612 {
613         write_buf(f, buf, strlen(buf));
614 }
615
616
617 void write_byte(int f,unsigned char c)
618 {
619         write_buf(f,(char *)&c,1);
620 }
621
622
623
624 int read_line(int f, char *buf, size_t maxlen)
625 {
626         while (maxlen) {
627                 buf[0] = 0;
628                 read_buf(f, buf, 1);
629                 if (buf[0] == 0) return 0;
630                 if (buf[0] == '\n') {
631                         buf[0] = 0;
632                         break;
633                 }
634                 if (buf[0] != '\r') {
635                         buf++;
636                         maxlen--;
637                 }
638         }
639         if (maxlen == 0) {
640                 *buf = 0;
641                 return 0;
642         }
643
644         return 1;
645 }
646
647
648 void io_printf(int fd, const char *format, ...)
649 {
650         va_list ap;  
651         char buf[1024];
652         int len;
653         
654         va_start(ap, format);
655         len = vsnprintf(buf, sizeof(buf), format, ap);
656         va_end(ap);
657
658         if (len < 0) exit_cleanup(RERR_STREAMIO);
659
660         write_sbuf(fd, buf);
661 }
662
663
664 /* setup for multiplexing an error stream with the data stream */
665 void io_start_multiplex_out(int fd)
666 {
667         multiplex_out_fd = fd;
668         io_flush();
669         io_start_buffering(fd);
670         io_multiplexing_out = 1;
671 }
672
673 /* setup for multiplexing an error stream with the data stream */
674 void io_start_multiplex_in(int fd)
675 {
676         multiplex_in_fd = fd;
677         io_flush();
678         io_multiplexing_in = 1;
679 }
680
681 /* write an message to the multiplexed error stream */
682 int io_multiplex_write(enum logcode code, char *buf, size_t len)
683 {
684         if (!io_multiplexing_out) return 0;
685
686         io_flush();
687         stats.total_written += (len+4);
688         mplex_write(multiplex_out_fd, code, buf, len);
689         return 1;
690 }
691
692 /* stop output multiplexing */
693 void io_multiplexing_close(void)
694 {
695         io_multiplexing_out = 0;
696 }
697