Split code out into separate files and remove some global variables to
[rsync/rsync.git] / io.c
1 /* -*- c-file-style: "linux" -*-
2    
3    Copyright (C) 1996-2001 by Andrew Tridgell 
4    Copyright (C) Paul Mackerras 1996
5    Copyright (C) 2001, 2002 by Martin Pool <mbp@samba.org>
6    
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 2 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, write to the Free Software
19    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 */
21
22 /**
23  * @file io.c
24  *
25  * Socket and pipe IO utilities used in rsync.
26  *
27  * rsync provides its own multiplexing system, which is used to send
28  * stderr and stdout over a single socket.  We need this because
29  * stdout normally carries the binary data stream, and stderr all our
30  * error messages.
31  *
32  * For historical reasons this is off during the start of the
33  * connection, but it's switched on quite early using
34  * io_start_multiplex_out() and io_start_multiplex_in().
35  **/
36
37 #include "rsync.h"
38
39 /* if no timeout is specified then use a 60 second select timeout */
40 #define SELECT_TIMEOUT 60
41
42 static int io_multiplexing_out;
43 static int io_multiplexing_in;
44 static int multiplex_in_fd;
45 static int multiplex_out_fd;
46 static time_t last_io;
47 static int no_flush;
48
49 extern int bwlimit;
50 extern int verbose;
51 extern int io_timeout;
52 extern struct stats stats;
53
54
55 /** Ignore EOF errors while reading a module listing if the remote
56     version is 24 or less. */
57 int kludge_around_eof = False;
58
59
60 static int io_error_fd = -1;
61
62 static void read_loop(int fd, char *buf, size_t len);
63
64 static void check_timeout(void)
65 {
66         extern int am_server, am_daemon;
67         time_t t;
68
69         err_list_push();
70         
71         if (!io_timeout) return;
72
73         if (!last_io) {
74                 last_io = time(NULL);
75                 return;
76         }
77
78         t = time(NULL);
79
80         if (last_io && io_timeout && (t-last_io) >= io_timeout) {
81                 if (!am_server && !am_daemon) {
82                         rprintf(FERROR,"io timeout after %d seconds - exiting\n", 
83                                 (int)(t-last_io));
84                 }
85                 exit_cleanup(RERR_TIMEOUT);
86         }
87 }
88
89 /* setup the fd used to propogate errors */
90 void io_set_error_fd(int fd)
91 {
92         io_error_fd = fd;
93 }
94
95 /* read some data from the error fd and write it to the write log code */
96 static void read_error_fd(void)
97 {
98         char buf[200];
99         size_t n;
100         int fd = io_error_fd;
101         int tag, len;
102
103         /* io_error_fd is temporarily disabled -- is this meant to
104          * prevent indefinite recursion? */
105         io_error_fd = -1;
106
107         read_loop(fd, buf, 4);
108         tag = IVAL(buf, 0);
109
110         len = tag & 0xFFFFFF;
111         tag = tag >> 24;
112         tag -= MPLEX_BASE;
113
114         while (len) {
115                 n = len;
116                 if (n > (sizeof(buf)-1))
117                         n = sizeof(buf)-1;
118                 read_loop(fd, buf, n);
119                 rwrite((enum logcode)tag, buf, n);
120                 len -= n;
121         }
122
123         io_error_fd = fd;
124 }
125
126
127 static void whine_about_eof (void)
128 {
129         /**
130            It's almost always an error to get an EOF when we're trying
131            to read from the network, because the protocol is
132            self-terminating.
133            
134            However, there is one unfortunate cases where it is not,
135            which is rsync <2.4.6 sending a list of modules on a
136            server, since the list is terminated by closing the socket.
137            So, for the section of the program where that is a problem
138            (start_socket_client), kludge_around_eof is True and we
139            just exit.
140         */
141
142         if (kludge_around_eof)
143                 exit_cleanup (0);
144         else {
145                 rprintf (FERROR,
146                          "%s: connection unexpectedly closed "
147                          "(%.0f bytes read so far)\n",
148                          RSYNC_NAME, (double)stats.total_read);
149         
150                 exit_cleanup (RERR_STREAMIO);
151         }
152 }
153
154
155 static void die_from_readerr (int err)
156 {
157         /* this prevents us trying to write errors on a dead socket */
158         io_multiplexing_close();
159                                 
160         rprintf(FERROR, "%s: read error: %s\n",
161                 RSYNC_NAME, strerror (err));
162         exit_cleanup(RERR_STREAMIO);
163 }
164
165
166 /*!
167  * Read from a socket with IO timeout. return the number of bytes
168  * read. If no bytes can be read then exit, never return a number <= 0.
169  *
170  * TODO: If the remote shell connection fails, then current versions
171  * actually report an "unexpected EOF" error here.  Since it's a
172  * fairly common mistake to try to use rsh when ssh is required, we
173  * should trap that: if we fail to read any data at all, we should
174  * give a better explanation.  We can tell whether the connection has
175  * started by looking e.g. at whether the remote version is known yet.
176  */
177 static int read_timeout (int fd, char *buf, size_t len)
178 {
179         int n, ret=0;
180
181         io_flush();
182
183         while (ret == 0) {
184                 /* until we manage to read *something* */
185                 fd_set fds;
186                 struct timeval tv;
187                 int fd_count = fd+1;
188                 int count;
189
190                 FD_ZERO(&fds);
191                 FD_SET(fd, &fds);
192                 if (io_error_fd != -1) {
193                         FD_SET(io_error_fd, &fds);
194                         if (io_error_fd > fd) fd_count = io_error_fd+1;
195                 }
196
197                 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
198                 tv.tv_usec = 0;
199
200                 errno = 0;
201
202                 count = select(fd_count, &fds, NULL, NULL, &tv);
203
204                 if (count == 0) {
205                         check_timeout();
206                 }
207
208                 if (count <= 0) {
209                         if (errno == EBADF) {
210                                 exit_cleanup(RERR_SOCKETIO);
211                         }
212                         continue;
213                 }
214
215                 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &fds)) {
216                         read_error_fd();
217                 }
218
219                 if (!FD_ISSET(fd, &fds)) continue;
220
221                 n = read(fd, buf, len);
222
223                 if (n > 0) {
224                         buf += n;
225                         len -= n;
226                         ret += n;
227                         if (io_timeout)
228                                 last_io = time(NULL);
229                         continue;
230                 } else if (n == 0) {
231                         whine_about_eof ();
232                         return -1; /* doesn't return */
233                 } else if (n == -1) {
234                         if (errno == EINTR || errno == EWOULDBLOCK ||
235                             errno == EAGAIN) 
236                                 continue;
237                         else
238                                 die_from_readerr (errno);
239                 }
240         }
241
242         return ret;
243 }
244
245
246
247
248 /*! Continue trying to read len bytes - don't return until len has
249   been read.   */
250 static void read_loop (int fd, char *buf, size_t len)
251 {
252         while (len) {
253                 int n = read_timeout(fd, buf, len);
254
255                 buf += n;
256                 len -= n;
257         }
258 }
259
260
261 /**
262  * Read from the file descriptor handling multiplexing - return number
263  * of bytes read.
264  * 
265  * Never returns <= 0. 
266  */
267 static int read_unbuffered(int fd, char *buf, size_t len)
268 {
269         static size_t remaining;
270         int tag, ret = 0;
271         char line[1024];
272
273         if (!io_multiplexing_in || fd != multiplex_in_fd)
274                 return read_timeout(fd, buf, len);
275
276         while (ret == 0) {
277                 if (remaining) {
278                         len = MIN(len, remaining);
279                         read_loop(fd, buf, len);
280                         remaining -= len;
281                         ret = len;
282                         continue;
283                 }
284
285                 read_loop(fd, line, 4);
286                 tag = IVAL(line, 0);
287
288                 remaining = tag & 0xFFFFFF;
289                 tag = tag >> 24;
290
291                 if (tag == MPLEX_BASE)
292                         continue;
293
294                 tag -= MPLEX_BASE;
295
296                 if (tag != FERROR && tag != FINFO) {
297                         rprintf(FERROR, "unexpected tag %d\n", tag);
298                         exit_cleanup(RERR_STREAMIO);
299                 }
300
301                 if (remaining > sizeof(line) - 1) {
302                         rprintf(FERROR, "multiplexing overflow %d\n\n",
303                                 remaining);
304                         exit_cleanup(RERR_STREAMIO);
305                 }
306
307                 read_loop(fd, line, remaining);
308                 line[remaining] = 0;
309
310                 rprintf((enum logcode) tag, "%s", line);
311                 remaining = 0;
312         }
313
314         return ret;
315 }
316
317
318
319 /* do a buffered read from fd. don't return until all N bytes
320    have been read. If all N can't be read then exit with an error */
321 static void readfd (int fd, char *buffer, size_t N)
322 {
323         int  ret;
324         size_t total=0;  
325         
326         while (total < N) {
327                 io_flush();
328
329                 ret = read_unbuffered (fd, buffer + total, N-total);
330                 total += ret;
331         }
332
333         stats.total_read += total;
334 }
335
336
337 int32 read_int(int f)
338 {
339         char b[4];
340         int32 ret;
341
342         readfd(f,b,4);
343         ret = IVAL(b,0);
344         if (ret == (int32)0xffffffff) return -1;
345         return ret;
346 }
347
348 int64 read_longint(int f)
349 {
350         extern int remote_version;
351         int64 ret;
352         char b[8];
353         ret = read_int(f);
354
355         if ((int32)ret != (int32)0xffffffff) {
356                 return ret;
357         }
358
359 #ifdef NO_INT64
360         rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
361         exit_cleanup(RERR_UNSUPPORTED);
362 #else
363         if (remote_version >= 16) {
364                 readfd(f,b,8);
365                 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
366         }
367 #endif
368
369         return ret;
370 }
371
372 void read_buf(int f,char *buf,size_t len)
373 {
374         readfd(f,buf,len);
375 }
376
377 void read_sbuf(int f,char *buf,size_t len)
378 {
379         read_buf (f,buf,len);
380         buf[len] = 0;
381 }
382
383 unsigned char read_byte(int f)
384 {
385         unsigned char c;
386         read_buf (f, (char *)&c, 1);
387         return c;
388 }
389
390 /* Write len bytes to fd.  This underlies the multiplexing system,
391  * which is always called by application code.  */
392 static void writefd_unbuffered(int fd,char *buf,size_t len)
393 {
394         size_t total = 0;
395         fd_set w_fds, r_fds;
396         int fd_count, count;
397         struct timeval tv;
398
399         err_list_push();
400
401         no_flush++;
402
403         while (total < len) {
404                 FD_ZERO(&w_fds);
405                 FD_ZERO(&r_fds);
406                 FD_SET(fd,&w_fds);
407                 fd_count = fd;
408
409                 if (io_error_fd != -1) {
410                         FD_SET(io_error_fd,&r_fds);
411                         if (io_error_fd > fd_count) 
412                                 fd_count = io_error_fd;
413                 }
414
415                 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
416                 tv.tv_usec = 0;
417
418                 errno = 0;
419
420                 count = select(fd_count+1,
421                                io_error_fd != -1?&r_fds:NULL,
422                                &w_fds,NULL,
423                                &tv);
424
425                 if (count == 0) {
426                         check_timeout();
427                 }
428
429                 if (count <= 0) {
430                         if (errno == EBADF) {
431                                 exit_cleanup(RERR_SOCKETIO);
432                         }
433                         continue;
434                 }
435
436                 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &r_fds)) {
437                         read_error_fd();
438                 }
439
440                 if (FD_ISSET(fd, &w_fds)) {
441                         int ret;
442                         size_t n = len-total;
443                         ret = write(fd,buf+total,n);
444
445                         if (ret == -1 && errno == EINTR) {
446                                 continue;
447                         }
448
449                         if (ret == -1 && 
450                             (errno == EWOULDBLOCK || errno == EAGAIN)) {
451                                 msleep(1);
452                                 continue;
453                         }
454
455                         if (ret <= 0) {
456                                 /* Don't try to write errors back
457                                  * across the stream */
458                                 io_multiplexing_close();
459                                 rprintf(FERROR, RSYNC_NAME
460                                         ": error writing %d unbuffered bytes"
461                                         " - exiting: %s\n", len,
462                                         strerror(errno));
463                                 exit_cleanup(RERR_STREAMIO);
464                         }
465
466                         /* Sleep after writing to limit I/O bandwidth */
467                         if (bwlimit)
468                         {
469                             tv.tv_sec = 0;
470                             tv.tv_usec = ret * 1000 / bwlimit;
471                             while (tv.tv_usec > 1000000)
472                             {
473                                 tv.tv_sec++;
474                                 tv.tv_usec -= 1000000;
475                             }
476                             select(0, NULL, NULL, NULL, &tv);
477                         }
478  
479                         total += ret;
480
481                         if (io_timeout)
482                                 last_io = time(NULL);
483                 }
484         }
485
486         no_flush--;
487 }
488
489
490 static char *io_buffer;
491 static int io_buffer_count;
492
493 void io_start_buffering(int fd)
494 {
495         if (io_buffer) return;
496         multiplex_out_fd = fd;
497         io_buffer = (char *)malloc(IO_BUFFER_SIZE);
498         if (!io_buffer) out_of_memory("writefd");
499         io_buffer_count = 0;
500 }
501
502 /* write an message to a multiplexed stream. If this fails then rsync
503    exits */
504 static void mplex_write(int fd, enum logcode code, char *buf, size_t len)
505 {
506         char buffer[4096];
507         size_t n = len;
508
509         SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
510
511         if (n > (sizeof(buffer)-4)) {
512                 n = sizeof(buffer)-4;
513         }
514
515         memcpy(&buffer[4], buf, n);
516         writefd_unbuffered(fd, buffer, n+4);
517
518         len -= n;
519         buf += n;
520
521         if (len) {
522                 writefd_unbuffered(fd, buf, len);
523         }
524 }
525
526
527 void io_flush(void)
528 {
529         int fd = multiplex_out_fd;
530
531         err_list_push();
532
533         if (!io_buffer_count || no_flush) return;
534
535         if (io_multiplexing_out) {
536                 mplex_write(fd, FNONE, io_buffer, io_buffer_count);
537         } else {
538                 writefd_unbuffered(fd, io_buffer, io_buffer_count);
539         }
540         io_buffer_count = 0;
541 }
542
543
544 void io_end_buffering(void)
545 {
546         io_flush();
547         if (!io_multiplexing_out) {
548                 free(io_buffer);
549                 io_buffer = NULL;
550         }
551 }
552
553 static void writefd(int fd,char *buf,size_t len)
554 {
555         stats.total_written += len;
556
557         err_list_push();
558
559         if (!io_buffer || fd != multiplex_out_fd) {
560                 writefd_unbuffered(fd, buf, len);
561                 return;
562         }
563
564         while (len) {
565                 int n = MIN((int) len, IO_BUFFER_SIZE-io_buffer_count);
566                 if (n > 0) {
567                         memcpy(io_buffer+io_buffer_count, buf, n);
568                         buf += n;
569                         len -= n;
570                         io_buffer_count += n;
571                 }
572                 
573                 if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
574         }
575 }
576
577
578 void write_int(int f,int32 x)
579 {
580         char b[4];
581         SIVAL(b,0,x);
582         writefd(f,b,4);
583 }
584
585
586 /*
587  * Note: int64 may actually be a 32-bit type if ./configure couldn't find any
588  * 64-bit types on this platform.
589  */
590 void write_longint(int f, int64 x)
591 {
592         extern int remote_version;
593         char b[8];
594
595         if (remote_version < 16 || x <= 0x7FFFFFFF) {
596                 write_int(f, (int)x);
597                 return;
598         }
599
600         write_int(f, (int32)0xFFFFFFFF);
601         SIVAL(b,0,(x&0xFFFFFFFF));
602         SIVAL(b,4,((x>>32)&0xFFFFFFFF));
603
604         writefd(f,b,8);
605 }
606
607 void write_buf(int f,char *buf,size_t len)
608 {
609         writefd(f,buf,len);
610 }
611
612 /* write a string to the connection */
613 static void write_sbuf(int f,char *buf)
614 {
615         write_buf(f, buf, strlen(buf));
616 }
617
618
619 void write_byte(int f,unsigned char c)
620 {
621         write_buf(f,(char *)&c,1);
622 }
623
624
625
626 /**
627  * Read a line of up to @p maxlen characters into @p buf.  Does not
628  * contain a trailing newline or carriage return.
629  *
630  * @return 1 for success; 0 for io error or truncation.
631  **/
632 int read_line(int f, char *buf, size_t maxlen)
633 {
634         while (maxlen) {
635                 buf[0] = 0;
636                 read_buf(f, buf, 1);
637                 if (buf[0] == 0)
638                         return 0;
639                 if (buf[0] == '\n') {
640                         buf[0] = 0;
641                         break;
642                 }
643                 if (buf[0] != '\r') {
644                         buf++;
645                         maxlen--;
646                 }
647         }
648         if (maxlen == 0) {
649                 *buf = 0;
650                 return 0;
651         }
652
653         return 1;
654 }
655
656
657 void io_printf(int fd, const char *format, ...)
658 {
659         va_list ap;  
660         char buf[1024];
661         int len;
662         
663         va_start(ap, format);
664         len = vsnprintf(buf, sizeof(buf), format, ap);
665         va_end(ap);
666
667         if (len < 0) exit_cleanup(RERR_STREAMIO);
668
669         write_sbuf(fd, buf);
670 }
671
672
673 /* setup for multiplexing an error stream with the data stream */
674 void io_start_multiplex_out(int fd)
675 {
676         multiplex_out_fd = fd;
677         io_flush();
678         io_start_buffering(fd);
679         io_multiplexing_out = 1;
680 }
681
682 /* setup for multiplexing an error stream with the data stream */
683 void io_start_multiplex_in(int fd)
684 {
685         multiplex_in_fd = fd;
686         io_flush();
687         io_multiplexing_in = 1;
688 }
689
690 /* write an message to the multiplexed error stream */
691 int io_multiplex_write(enum logcode code, char *buf, size_t len)
692 {
693         if (!io_multiplexing_out) return 0;
694
695         io_flush();
696         stats.total_written += (len+4);
697         mplex_write(multiplex_out_fd, code, buf, len);
698         return 1;
699 }
700
701 /* stop output multiplexing */
702 void io_multiplexing_close(void)
703 {
704         io_multiplexing_out = 0;
705 }
706