Improved error message.
[rsync/rsync.git] / io.c
1 /* -*- c-file-style: "linux" -*-
2    
3    Copyright (C) 1996-2001 by Andrew Tridgell 
4    Copyright (C) Paul Mackerras 1996
5    
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 2 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, write to the Free Software
18    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
19 */
20
21 /*
22   socket and pipe IO utilities used in rsync 
23
24   tridge, June 1996
25   */
26 #include "rsync.h"
27
28 /* if no timeout is specified then use a 60 second select timeout */
29 #define SELECT_TIMEOUT 60
30
31 extern int bwlimit;
32
33 static int io_multiplexing_out;
34 static int io_multiplexing_in;
35 static int multiplex_in_fd;
36 static int multiplex_out_fd;
37 static time_t last_io;
38 static int eof_error=1;
39 extern int verbose;
40 extern int io_timeout;
41 extern struct stats stats;
42
43 static int io_error_fd = -1;
44
45 static void read_loop(int fd, char *buf, int len);
46
47 static void check_timeout(void)
48 {
49         extern int am_server, am_daemon;
50         time_t t;
51         
52         if (!io_timeout) return;
53
54         if (!last_io) {
55                 last_io = time(NULL);
56                 return;
57         }
58
59         t = time(NULL);
60
61         if (last_io && io_timeout && (t-last_io) >= io_timeout) {
62                 if (!am_server && !am_daemon) {
63                         rprintf(FERROR,"io timeout after %d seconds - exiting\n", 
64                                 (int)(t-last_io));
65                 }
66                 exit_cleanup(RERR_TIMEOUT);
67         }
68 }
69
70 /* setup the fd used to propogate errors */
71 void io_set_error_fd(int fd)
72 {
73         io_error_fd = fd;
74 }
75
76 /* read some data from the error fd and write it to the write log code */
77 static void read_error_fd(void)
78 {
79         char buf[200];
80         int n;
81         int fd = io_error_fd;
82         int tag, len;
83
84         io_error_fd = -1;
85
86         read_loop(fd, buf, 4);
87         tag = IVAL(buf, 0);
88
89         len = tag & 0xFFFFFF;
90         tag = tag >> 24;
91         tag -= MPLEX_BASE;
92
93         while (len) {
94                 n = len;
95                 if (n > (sizeof(buf)-1)) n = sizeof(buf)-1;
96                 read_loop(fd, buf, n);
97                 rwrite((enum logcode)tag, buf, n);
98                 len -= n;
99         }
100
101         io_error_fd = fd;
102 }
103
104
105 static int no_flush;
106
107 /*
108  * Read from a socket with IO timeout. return the number of bytes
109  * read. If no bytes can be read then exit, never return a number <= 0.
110  *
111  * TODO: If the remote shell connection fails, then current versions actually
112  * report an "unexpected EOF" error here.  Since it's a fairly common mistake
113  * to try to use rsh when ssh is required, we should trap that: if we fail
114  * to read any data at all, we should give a better explanation.
115  */
116 static int read_timeout(int fd, char *buf, int len)
117 {
118         int n, ret=0;
119
120         io_flush();
121
122         while (ret == 0) {
123                 fd_set fds;
124                 struct timeval tv;
125                 int fd_count = fd+1;
126
127                 FD_ZERO(&fds);
128                 FD_SET(fd, &fds);
129                 if (io_error_fd != -1) {
130                         FD_SET(io_error_fd, &fds);
131                         if (io_error_fd > fd) fd_count = io_error_fd+1;
132                 }
133
134                 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
135                 tv.tv_usec = 0;
136
137                 errno = 0;
138
139                 if (select(fd_count, &fds, NULL, NULL, &tv) < 1) {
140                         if (errno == EBADF) {
141                                 exit_cleanup(RERR_SOCKETIO);
142                         }
143                         check_timeout();
144                         continue;
145                 }
146
147                 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &fds)) {
148                         read_error_fd();
149                 }
150
151                 if (!FD_ISSET(fd, &fds)) continue;
152
153                 n = read(fd, buf, len);
154
155                 if (n > 0) {
156                         buf += n;
157                         len -= n;
158                         ret += n;
159                         if (io_timeout)
160                                 last_io = time(NULL);
161                         continue;
162                 }
163
164                 if (n == -1 && errno == EINTR) {
165                         continue;
166                 }
167
168                 if (n == -1 && 
169                     (errno == EWOULDBLOCK || errno == EAGAIN)) {
170                         continue;
171                 }
172
173
174                 if (n == 0) {
175                         if (eof_error) {
176                                 rprintf(FERROR,"unexpected EOF in read_timeout\n");
177                         }
178                         exit_cleanup(RERR_STREAMIO);
179                 }
180
181                 /* this prevents us trying to write errors on a dead socket */
182                 io_multiplexing_close();
183
184                 rprintf(FERROR,"read error: %s\n", strerror(errno));
185                 exit_cleanup(RERR_STREAMIO);
186         }
187
188         return ret;
189 }
190
191 /* continue trying to read len bytes - don't return until len
192    has been read */
193 static void read_loop(int fd, char *buf, int len)
194 {
195         while (len) {
196                 int n = read_timeout(fd, buf, len);
197
198                 buf += n;
199                 len -= n;
200         }
201 }
202
203 /* read from the file descriptor handling multiplexing - 
204    return number of bytes read
205    never return <= 0 */
206 static int read_unbuffered(int fd, char *buf, int len)
207 {
208         static int remaining;
209         int tag, ret=0;
210         char line[1024];
211
212         if (!io_multiplexing_in || fd != multiplex_in_fd) 
213                 return read_timeout(fd, buf, len);
214
215         while (ret == 0) {
216                 if (remaining) {
217                         len = MIN(len, remaining);
218                         read_loop(fd, buf, len);
219                         remaining -= len;
220                         ret = len;
221                         continue;
222                 }
223
224                 read_loop(fd, line, 4);
225                 tag = IVAL(line, 0);
226
227                 remaining = tag & 0xFFFFFF;
228                 tag = tag >> 24;
229
230                 if (tag == MPLEX_BASE) continue;
231
232                 tag -= MPLEX_BASE;
233
234                 if (tag != FERROR && tag != FINFO) {
235                         rprintf(FERROR,"unexpected tag %d\n", tag);
236                         exit_cleanup(RERR_STREAMIO);
237                 }
238
239                 if (remaining > sizeof(line)-1) {
240                         rprintf(FERROR,"multiplexing overflow %d\n\n", 
241                                 remaining);
242                         exit_cleanup(RERR_STREAMIO);
243                 }
244
245                 read_loop(fd, line, remaining);
246                 line[remaining] = 0;
247
248                 rprintf((enum logcode)tag,"%s", line);
249                 remaining = 0;
250         }
251
252         return ret;
253 }
254
255
256 /* do a buffered read from fd. don't return until all N bytes
257    have been read. If all N can't be read then exit with an error */
258 static void readfd(int fd,char *buffer,int N)
259 {
260         int  ret;
261         int total=0;  
262         
263         while (total < N) {
264                 io_flush();
265
266                 ret = read_unbuffered(fd,buffer + total,N-total);
267                 total += ret;
268         }
269
270         stats.total_read += total;
271 }
272
273
274 int32 read_int(int f)
275 {
276         char b[4];
277         int32 ret;
278
279         readfd(f,b,4);
280         ret = IVAL(b,0);
281         if (ret == (int32)0xffffffff) return -1;
282         return ret;
283 }
284
285 int64 read_longint(int f)
286 {
287         extern int remote_version;
288         int64 ret;
289         char b[8];
290         ret = read_int(f);
291
292         if ((int32)ret != (int32)0xffffffff) {
293                 return ret;
294         }
295
296 #ifdef NO_INT64
297         rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
298         exit_cleanup(RERR_UNSUPPORTED);
299 #else
300         if (remote_version >= 16) {
301                 readfd(f,b,8);
302                 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
303         }
304 #endif
305
306         return ret;
307 }
308
309 void read_buf(int f,char *buf,int len)
310 {
311         readfd(f,buf,len);
312 }
313
314 void read_sbuf(int f,char *buf,int len)
315 {
316         read_buf(f,buf,len);
317         buf[len] = 0;
318 }
319
320 unsigned char read_byte(int f)
321 {
322         unsigned char c;
323         read_buf(f,(char *)&c,1);
324         return c;
325 }
326
327 /* write len bytes to fd */
328 static void writefd_unbuffered(int fd,char *buf,int len)
329 {
330         int total = 0;
331         fd_set w_fds, r_fds;
332         int fd_count, count;
333         struct timeval tv;
334
335         no_flush++;
336
337         while (total < len) {
338                 FD_ZERO(&w_fds);
339                 FD_ZERO(&r_fds);
340                 FD_SET(fd,&w_fds);
341                 fd_count = fd;
342
343                 if (io_error_fd != -1) {
344                         FD_SET(io_error_fd,&r_fds);
345                         if (io_error_fd > fd_count) 
346                                 fd_count = io_error_fd;
347                 }
348
349                 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
350                 tv.tv_usec = 0;
351
352                 errno = 0;
353
354                 count = select(fd_count+1,
355                                io_error_fd != -1?&r_fds:NULL,
356                                &w_fds,NULL,
357                                &tv);
358
359                 if (count <= 0) {
360                         if (errno == EBADF) {
361                                 exit_cleanup(RERR_SOCKETIO);
362                         }
363                         check_timeout();
364                         continue;
365                 }
366
367                 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &r_fds)) {
368                         read_error_fd();
369                 }
370
371                 if (FD_ISSET(fd, &w_fds)) {
372                         int ret, n = len-total;
373                         
374                         ret = write(fd,buf+total,n);
375
376                         if (ret == -1 && errno == EINTR) {
377                                 continue;
378                         }
379
380                         if (ret == -1 && 
381                             (errno == EWOULDBLOCK || errno == EAGAIN)) {
382                                 msleep(1);
383                                 continue;
384                         }
385
386                         if (ret <= 0) {
387                                 rprintf(FERROR,
388                                         "error writing %d unbuffered bytes"
389                                         " - exiting: %s\n", len,
390                                         strerror(errno));
391                                 exit_cleanup(RERR_STREAMIO);
392                         }
393
394                         /* Sleep after writing to limit I/O bandwidth */
395                         if (bwlimit)
396                         {
397                             tv.tv_sec = 0;
398                             tv.tv_usec = ret * 1000 / bwlimit;
399                             while (tv.tv_usec > 1000000)
400                             {
401                                 tv.tv_sec++;
402                                 tv.tv_usec -= 1000000;
403                             }
404                             select(0, NULL, NULL, NULL, &tv);
405                         }
406  
407                         total += ret;
408
409                         if (io_timeout)
410                                 last_io = time(NULL);
411                 }
412         }
413
414         no_flush--;
415 }
416
417
418 static char *io_buffer;
419 static int io_buffer_count;
420
421 void io_start_buffering(int fd)
422 {
423         if (io_buffer) return;
424         multiplex_out_fd = fd;
425         io_buffer = (char *)malloc(IO_BUFFER_SIZE);
426         if (!io_buffer) out_of_memory("writefd");
427         io_buffer_count = 0;
428 }
429
430 /* write an message to a multiplexed stream. If this fails then rsync
431    exits */
432 static void mplex_write(int fd, enum logcode code, char *buf, int len)
433 {
434         char buffer[4096];
435         int n = len;
436
437         SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
438
439         if (n > (sizeof(buffer)-4)) {
440                 n = sizeof(buffer)-4;
441         }
442
443         memcpy(&buffer[4], buf, n);
444         writefd_unbuffered(fd, buffer, n+4);
445
446         len -= n;
447         buf += n;
448
449         if (len) {
450                 writefd_unbuffered(fd, buf, len);
451         }
452 }
453
454
455 void io_flush(void)
456 {
457         int fd = multiplex_out_fd;
458         if (!io_buffer_count || no_flush) return;
459
460         if (io_multiplexing_out) {
461                 mplex_write(fd, FNONE, io_buffer, io_buffer_count);
462         } else {
463                 writefd_unbuffered(fd, io_buffer, io_buffer_count);
464         }
465         io_buffer_count = 0;
466 }
467
468
469 /* XXX: fd is ignored, which seems a little strange. */
470 void io_end_buffering(int fd)
471 {
472         io_flush();
473         if (!io_multiplexing_out) {
474                 free(io_buffer);
475                 io_buffer = NULL;
476         }
477 }
478
479 /* some OSes have a bug where an exit causes the pending writes on
480    a socket to be flushed. Do an explicit shutdown to try to prevent this */
481 void io_shutdown(void)
482 {
483         if (multiplex_out_fd != -1) close(multiplex_out_fd);
484         if (io_error_fd != -1) close(io_error_fd);
485         multiplex_out_fd = -1;
486         io_error_fd = -1;
487 }
488
489
490 static void writefd(int fd,char *buf,int len)
491 {
492         stats.total_written += len;
493
494         if (!io_buffer || fd != multiplex_out_fd) {
495                 writefd_unbuffered(fd, buf, len);
496                 return;
497         }
498
499         while (len) {
500                 int n = MIN(len, IO_BUFFER_SIZE-io_buffer_count);
501                 if (n > 0) {
502                         memcpy(io_buffer+io_buffer_count, buf, n);
503                         buf += n;
504                         len -= n;
505                         io_buffer_count += n;
506                 }
507                 
508                 if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
509         }
510 }
511
512
513 void write_int(int f,int32 x)
514 {
515         char b[4];
516         SIVAL(b,0,x);
517         writefd(f,b,4);
518 }
519
520
521 /*
522  * Note: int64 may actually be a 32-bit type if ./configure couldn't find any
523  * 64-bit types on this platform.
524  */
525 void write_longint(int f, int64 x)
526 {
527         extern int remote_version;
528         char b[8];
529
530         if (remote_version < 16 || x <= 0x7FFFFFFF) {
531                 write_int(f, (int)x);
532                 return;
533         }
534
535         write_int(f, (int32)0xFFFFFFFF);
536         SIVAL(b,0,(x&0xFFFFFFFF));
537         SIVAL(b,4,((x>>32)&0xFFFFFFFF));
538
539         writefd(f,b,8);
540 }
541
542 void write_buf(int f,char *buf,int len)
543 {
544         writefd(f,buf,len);
545 }
546
547 /* write a string to the connection */
548 static void write_sbuf(int f,char *buf)
549 {
550         write_buf(f, buf, strlen(buf));
551 }
552
553
554 void write_byte(int f,unsigned char c)
555 {
556         write_buf(f,(char *)&c,1);
557 }
558
559 int read_line(int f, char *buf, int maxlen)
560 {
561         eof_error = 0;
562
563         while (maxlen) {
564                 buf[0] = 0;
565                 read_buf(f, buf, 1);
566                 if (buf[0] == 0) return 0;
567                 if (buf[0] == '\n') {
568                         buf[0] = 0;
569                         break;
570                 }
571                 if (buf[0] != '\r') {
572                         buf++;
573                         maxlen--;
574                 }
575         }
576         if (maxlen == 0) {
577                 *buf = 0;
578                 return 0;
579         }
580
581         eof_error = 1;
582
583         return 1;
584 }
585
586
587 void io_printf(int fd, const char *format, ...)
588 {
589         va_list ap;  
590         char buf[1024];
591         int len;
592         
593         va_start(ap, format);
594         len = vslprintf(buf, sizeof(buf), format, ap);
595         va_end(ap);
596
597         if (len < 0) exit_cleanup(RERR_STREAMIO);
598
599         write_sbuf(fd, buf);
600 }
601
602
603 /* setup for multiplexing an error stream with the data stream */
604 void io_start_multiplex_out(int fd)
605 {
606         multiplex_out_fd = fd;
607         io_flush();
608         io_start_buffering(fd);
609         io_multiplexing_out = 1;
610 }
611
612 /* setup for multiplexing an error stream with the data stream */
613 void io_start_multiplex_in(int fd)
614 {
615         multiplex_in_fd = fd;
616         io_flush();
617         io_multiplexing_in = 1;
618 }
619
620 /* write an message to the multiplexed error stream */
621 int io_multiplex_write(enum logcode code, char *buf, int len)
622 {
623         if (!io_multiplexing_out) return 0;
624
625         io_flush();
626         stats.total_written += (len+4);
627         mplex_write(multiplex_out_fd, code, buf, len);
628         return 1;
629 }
630
631 /* write a message to the special error fd */
632 int io_error_write(int f, enum logcode code, char *buf, int len)
633 {
634         if (f == -1) return 0;
635         mplex_write(f, code, buf, len);
636         return 1;
637 }
638
639 /* stop output multiplexing */
640 void io_multiplexing_close(void)
641 {
642         io_multiplexing_out = 0;
643 }
644