Print a warning message in the version if the platform cannot support 32-bit ints
[rsync/rsync.git] / io.c
1 /* -*- c-file-style: "linux" -*-
2    
3    Copyright (C) 1996-2000 by Andrew Tridgell 
4    Copyright (C) Paul Mackerras 1996
5    
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 2 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, write to the Free Software
18    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
19 */
20
21 /*
22   socket and pipe IO utilities used in rsync 
23
24   tridge, June 1996
25   */
26 #include "rsync.h"
27
28 /* if no timeout is specified then use a 60 second select timeout */
29 #define SELECT_TIMEOUT 60
30
31 extern int bwlimit;
32
33 static int io_multiplexing_out;
34 static int io_multiplexing_in;
35 static int multiplex_in_fd;
36 static int multiplex_out_fd;
37 static time_t last_io;
38 static int eof_error=1;
39 extern int verbose;
40 extern int io_timeout;
41 extern struct stats stats;
42
43 static int io_error_fd = -1;
44
45 static void read_loop(int fd, char *buf, int len);
46
47 static void check_timeout(void)
48 {
49         extern int am_server, am_daemon;
50         time_t t;
51         
52         if (!io_timeout) return;
53
54         if (!last_io) {
55                 last_io = time(NULL);
56                 return;
57         }
58
59         t = time(NULL);
60
61         if (last_io && io_timeout && (t-last_io) >= io_timeout) {
62                 if (!am_server && !am_daemon) {
63                         rprintf(FERROR,"io timeout after %d second - exiting\n", 
64                                 (int)(t-last_io));
65                 }
66                 exit_cleanup(RERR_TIMEOUT);
67         }
68 }
69
70 /* setup the fd used to propogate errors */
71 void io_set_error_fd(int fd)
72 {
73         io_error_fd = fd;
74 }
75
76 /* read some data from the error fd and write it to the write log code */
77 static void read_error_fd(void)
78 {
79         char buf[200];
80         int n;
81         int fd = io_error_fd;
82         int tag, len;
83
84         io_error_fd = -1;
85
86         read_loop(fd, buf, 4);
87         tag = IVAL(buf, 0);
88
89         len = tag & 0xFFFFFF;
90         tag = tag >> 24;
91         tag -= MPLEX_BASE;
92
93         while (len) {
94                 n = len;
95                 if (n > (sizeof(buf)-1)) n = sizeof(buf)-1;
96                 read_loop(fd, buf, n);
97                 rwrite((enum logcode)tag, buf, n);
98                 len -= n;
99         }
100
101         io_error_fd = fd;
102 }
103
104
105 static int no_flush;
106
107 /* read from a socket with IO timeout. return the number of
108    bytes read. If no bytes can be read then exit, never return
109    a number <= 0 */
110 static int read_timeout(int fd, char *buf, int len)
111 {
112         int n, ret=0;
113
114         io_flush();
115
116         while (ret == 0) {
117                 fd_set fds;
118                 struct timeval tv;
119                 int fd_count = fd+1;
120
121                 FD_ZERO(&fds);
122                 FD_SET(fd, &fds);
123                 if (io_error_fd != -1) {
124                         FD_SET(io_error_fd, &fds);
125                         if (io_error_fd > fd) fd_count = io_error_fd+1;
126                 }
127
128                 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
129                 tv.tv_usec = 0;
130
131                 errno = 0;
132
133                 if (select(fd_count, &fds, NULL, NULL, &tv) < 1) {
134                         if (errno == EBADF) {
135                                 exit_cleanup(RERR_SOCKETIO);
136                         }
137                         check_timeout();
138                         continue;
139                 }
140
141                 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &fds)) {
142                         read_error_fd();
143                 }
144
145                 if (!FD_ISSET(fd, &fds)) continue;
146
147                 n = read(fd, buf, len);
148
149                 if (n > 0) {
150                         buf += n;
151                         len -= n;
152                         ret += n;
153                         if (io_timeout)
154                                 last_io = time(NULL);
155                         continue;
156                 }
157
158                 if (n == -1 && errno == EINTR) {
159                         continue;
160                 }
161
162                 if (n == -1 && 
163                     (errno == EWOULDBLOCK || errno == EAGAIN)) {
164                         continue;
165                 }
166
167
168                 if (n == 0) {
169                         if (eof_error) {
170                                 rprintf(FERROR,"unexpected EOF in read_timeout\n");
171                         }
172                         exit_cleanup(RERR_STREAMIO);
173                 }
174
175                 /* this prevents us trying to write errors on a dead socket */
176                 io_multiplexing_close();
177
178                 rprintf(FERROR,"read error: %s\n", strerror(errno));
179                 exit_cleanup(RERR_STREAMIO);
180         }
181
182         return ret;
183 }
184
185 /* continue trying to read len bytes - don't return until len
186    has been read */
187 static void read_loop(int fd, char *buf, int len)
188 {
189         while (len) {
190                 int n = read_timeout(fd, buf, len);
191
192                 buf += n;
193                 len -= n;
194         }
195 }
196
197 /* read from the file descriptor handling multiplexing - 
198    return number of bytes read
199    never return <= 0 */
200 static int read_unbuffered(int fd, char *buf, int len)
201 {
202         static int remaining;
203         int tag, ret=0;
204         char line[1024];
205
206         if (!io_multiplexing_in || fd != multiplex_in_fd) 
207                 return read_timeout(fd, buf, len);
208
209         while (ret == 0) {
210                 if (remaining) {
211                         len = MIN(len, remaining);
212                         read_loop(fd, buf, len);
213                         remaining -= len;
214                         ret = len;
215                         continue;
216                 }
217
218                 read_loop(fd, line, 4);
219                 tag = IVAL(line, 0);
220
221                 remaining = tag & 0xFFFFFF;
222                 tag = tag >> 24;
223
224                 if (tag == MPLEX_BASE) continue;
225
226                 tag -= MPLEX_BASE;
227
228                 if (tag != FERROR && tag != FINFO) {
229                         rprintf(FERROR,"unexpected tag %d\n", tag);
230                         exit_cleanup(RERR_STREAMIO);
231                 }
232
233                 if (remaining > sizeof(line)-1) {
234                         rprintf(FERROR,"multiplexing overflow %d\n\n", 
235                                 remaining);
236                         exit_cleanup(RERR_STREAMIO);
237                 }
238
239                 read_loop(fd, line, remaining);
240                 line[remaining] = 0;
241
242                 rprintf((enum logcode)tag,"%s", line);
243                 remaining = 0;
244         }
245
246         return ret;
247 }
248
249
250 /* do a buffered read from fd. don't return until all N bytes
251    have been read. If all N can't be read then exit with an error */
252 static void readfd(int fd,char *buffer,int N)
253 {
254         int  ret;
255         int total=0;  
256         
257         while (total < N) {
258                 io_flush();
259
260                 ret = read_unbuffered(fd,buffer + total,N-total);
261                 total += ret;
262         }
263
264         stats.total_read += total;
265 }
266
267
268 int32 read_int(int f)
269 {
270         char b[4];
271         int32 ret;
272
273         readfd(f,b,4);
274         ret = IVAL(b,0);
275         if (ret == (int32)0xffffffff) return -1;
276         return ret;
277 }
278
279 int64 read_longint(int f)
280 {
281         extern int remote_version;
282         int64 ret;
283         char b[8];
284         ret = read_int(f);
285
286         if ((int32)ret != (int32)0xffffffff) {
287                 return ret;
288         }
289
290 #ifdef NO_INT64
291         rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
292         exit_cleanup(RERR_UNSUPPORTED);
293 #else
294         if (remote_version >= 16) {
295                 readfd(f,b,8);
296                 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
297         }
298 #endif
299
300         return ret;
301 }
302
303 void read_buf(int f,char *buf,int len)
304 {
305         readfd(f,buf,len);
306 }
307
308 void read_sbuf(int f,char *buf,int len)
309 {
310         read_buf(f,buf,len);
311         buf[len] = 0;
312 }
313
314 unsigned char read_byte(int f)
315 {
316         unsigned char c;
317         read_buf(f,(char *)&c,1);
318         return c;
319 }
320
321 /* write len bytes to fd */
322 static void writefd_unbuffered(int fd,char *buf,int len)
323 {
324         int total = 0;
325         fd_set w_fds, r_fds;
326         int fd_count, count;
327         struct timeval tv;
328
329         no_flush++;
330
331         while (total < len) {
332                 FD_ZERO(&w_fds);
333                 FD_ZERO(&r_fds);
334                 FD_SET(fd,&w_fds);
335                 fd_count = fd;
336
337                 if (io_error_fd != -1) {
338                         FD_SET(io_error_fd,&r_fds);
339                         if (io_error_fd > fd_count) 
340                                 fd_count = io_error_fd;
341                 }
342
343                 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
344                 tv.tv_usec = 0;
345
346                 errno = 0;
347
348                 count = select(fd_count+1,
349                                io_error_fd != -1?&r_fds:NULL,
350                                &w_fds,NULL,
351                                &tv);
352
353                 if (count <= 0) {
354                         if (errno == EBADF) {
355                                 exit_cleanup(RERR_SOCKETIO);
356                         }
357                         check_timeout();
358                         continue;
359                 }
360
361                 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &r_fds)) {
362                         read_error_fd();
363                 }
364
365                 if (FD_ISSET(fd, &w_fds)) {
366                         int ret, n = len-total;
367                         
368                         ret = write(fd,buf+total,n);
369
370                         if (ret == -1 && errno == EINTR) {
371                                 continue;
372                         }
373
374                         if (ret == -1 && 
375                             (errno == EWOULDBLOCK || errno == EAGAIN)) {
376                                 msleep(1);
377                                 continue;
378                         }
379
380                         if (ret <= 0) {
381                                 rprintf(FERROR,"erroring writing %d bytes - exiting\n", len);
382                                 exit_cleanup(RERR_STREAMIO);
383                         }
384
385                         /* Sleep after writing to limit I/O bandwidth */
386                         if (bwlimit)
387                         {
388                             tv.tv_sec = 0;
389                             tv.tv_usec = ret * 1000 / bwlimit;
390                             while (tv.tv_usec > 1000000)
391                             {
392                                 tv.tv_sec++;
393                                 tv.tv_usec -= 1000000;
394                             }
395                             select(0, NULL, NULL, NULL, &tv);
396                         }
397  
398                         total += ret;
399
400                         if (io_timeout)
401                                 last_io = time(NULL);
402                 }
403         }
404
405         no_flush--;
406 }
407
408
409 static char *io_buffer;
410 static int io_buffer_count;
411
412 void io_start_buffering(int fd)
413 {
414         if (io_buffer) return;
415         multiplex_out_fd = fd;
416         io_buffer = (char *)malloc(IO_BUFFER_SIZE);
417         if (!io_buffer) out_of_memory("writefd");
418         io_buffer_count = 0;
419 }
420
421 /* write an message to a multiplexed stream. If this fails then rsync
422    exits */
423 static void mplex_write(int fd, enum logcode code, char *buf, int len)
424 {
425         char buffer[4096];
426         int n = len;
427
428         SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
429
430         if (n > (sizeof(buffer)-4)) {
431                 n = sizeof(buffer)-4;
432         }
433
434         memcpy(&buffer[4], buf, n);
435         writefd_unbuffered(fd, buffer, n+4);
436
437         len -= n;
438         buf += n;
439
440         if (len) {
441                 writefd_unbuffered(fd, buf, len);
442         }
443 }
444
445
446 void io_flush(void)
447 {
448         int fd = multiplex_out_fd;
449         if (!io_buffer_count || no_flush) return;
450
451         if (io_multiplexing_out) {
452                 mplex_write(fd, FNONE, io_buffer, io_buffer_count);
453         } else {
454                 writefd_unbuffered(fd, io_buffer, io_buffer_count);
455         }
456         io_buffer_count = 0;
457 }
458
459
460 /* XXX: fd is ignored, which seems a little strange. */
461 void io_end_buffering(int fd)
462 {
463         io_flush();
464         if (!io_multiplexing_out) {
465                 free(io_buffer);
466                 io_buffer = NULL;
467         }
468 }
469
470 /* some OSes have a bug where an exit causes the pending writes on
471    a socket to be flushed. Do an explicit shutdown to try to prevent this */
472 void io_shutdown(void)
473 {
474         if (multiplex_out_fd != -1) close(multiplex_out_fd);
475         if (io_error_fd != -1) close(io_error_fd);
476         multiplex_out_fd = -1;
477         io_error_fd = -1;
478 }
479
480
481 static void writefd(int fd,char *buf,int len)
482 {
483         stats.total_written += len;
484
485         if (!io_buffer || fd != multiplex_out_fd) {
486                 writefd_unbuffered(fd, buf, len);
487                 return;
488         }
489
490         while (len) {
491                 int n = MIN(len, IO_BUFFER_SIZE-io_buffer_count);
492                 if (n > 0) {
493                         memcpy(io_buffer+io_buffer_count, buf, n);
494                         buf += n;
495                         len -= n;
496                         io_buffer_count += n;
497                 }
498                 
499                 if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
500         }
501 }
502
503
504 void write_int(int f,int32 x)
505 {
506         char b[4];
507         SIVAL(b,0,x);
508         writefd(f,b,4);
509 }
510
511
512 /*
513  * Note: int64 may actually be a 32-bit type if ./configure couldn't find any
514  * 64-bit types on this platform.
515  */
516 void write_longint(int f, int64 x)
517 {
518         extern int remote_version;
519         char b[8];
520
521         if (remote_version < 16 || x <= 0x7FFFFFFF) {
522                 write_int(f, (int)x);
523                 return;
524         }
525
526         write_int(f, (int32)0xFFFFFFFF);
527         SIVAL(b,0,(x&0xFFFFFFFF));
528         SIVAL(b,4,((x>>32)&0xFFFFFFFF));
529
530         writefd(f,b,8);
531 }
532
533 void write_buf(int f,char *buf,int len)
534 {
535         writefd(f,buf,len);
536 }
537
538 /* write a string to the connection */
539 static void write_sbuf(int f,char *buf)
540 {
541         write_buf(f, buf, strlen(buf));
542 }
543
544
545 void write_byte(int f,unsigned char c)
546 {
547         write_buf(f,(char *)&c,1);
548 }
549
550 int read_line(int f, char *buf, int maxlen)
551 {
552         eof_error = 0;
553
554         while (maxlen) {
555                 buf[0] = 0;
556                 read_buf(f, buf, 1);
557                 if (buf[0] == 0) return 0;
558                 if (buf[0] == '\n') {
559                         buf[0] = 0;
560                         break;
561                 }
562                 if (buf[0] != '\r') {
563                         buf++;
564                         maxlen--;
565                 }
566         }
567         if (maxlen == 0) {
568                 *buf = 0;
569                 return 0;
570         }
571
572         eof_error = 1;
573
574         return 1;
575 }
576
577
578 void io_printf(int fd, const char *format, ...)
579 {
580         va_list ap;  
581         char buf[1024];
582         int len;
583         
584         va_start(ap, format);
585         len = vslprintf(buf, sizeof(buf), format, ap);
586         va_end(ap);
587
588         if (len < 0) exit_cleanup(RERR_STREAMIO);
589
590         write_sbuf(fd, buf);
591 }
592
593
594 /* setup for multiplexing an error stream with the data stream */
595 void io_start_multiplex_out(int fd)
596 {
597         multiplex_out_fd = fd;
598         io_flush();
599         io_start_buffering(fd);
600         io_multiplexing_out = 1;
601 }
602
603 /* setup for multiplexing an error stream with the data stream */
604 void io_start_multiplex_in(int fd)
605 {
606         multiplex_in_fd = fd;
607         io_flush();
608         io_multiplexing_in = 1;
609 }
610
611 /* write an message to the multiplexed error stream */
612 int io_multiplex_write(enum logcode code, char *buf, int len)
613 {
614         if (!io_multiplexing_out) return 0;
615
616         io_flush();
617         stats.total_written += (len+4);
618         mplex_write(multiplex_out_fd, code, buf, len);
619         return 1;
620 }
621
622 /* write a message to the special error fd */
623 int io_error_write(int f, enum logcode code, char *buf, int len)
624 {
625         if (f == -1) return 0;
626         mplex_write(f, code, buf, len);
627         return 1;
628 }
629
630 /* stop output multiplexing */
631 void io_multiplexing_close(void)
632 {
633         io_multiplexing_out = 0;
634 }
635