went back to non-blokcing IO
[rsync/rsync.git] / io.c
1 /* 
2    Copyright (C) Andrew Tridgell 1996
3    Copyright (C) Paul Mackerras 1996
4    
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License as published by
7    the Free Software Foundation; either version 2 of the License, or
8    (at your option) any later version.
9    
10    This program is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13    GNU General Public License for more details.
14    
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 */
19
20 /*
21   socket and pipe IO utilities used in rsync 
22
23   tridge, June 1996
24   */
25 #include "rsync.h"
26
27 /* if no timeout is specified then use a 60 second select timeout */
28 #define SELECT_TIMEOUT 60
29
30 static int io_multiplexing_out;
31 static int io_multiplexing_in;
32 static int multiplex_in_fd;
33 static int multiplex_out_fd;
34 static time_t last_io;
35 static int eof_error=1;
36 extern int verbose;
37 extern int io_timeout;
38 extern struct stats stats;
39
40 static int buffer_f_in = -1;
41 static int io_error_fd = -1;
42
43 static void read_loop(int fd, char *buf, int len);
44
45 void setup_readbuffer(int f_in)
46 {
47         buffer_f_in = f_in;
48 }
49
50 static void check_timeout(void)
51 {
52         time_t t;
53         
54         if (!io_timeout) return;
55
56         if (!last_io) {
57                 last_io = time(NULL);
58                 return;
59         }
60
61         t = time(NULL);
62
63         if (last_io && io_timeout && (t-last_io) >= io_timeout) {
64                 rprintf(FERROR,"io timeout after %d second - exiting\n", 
65                         (int)(t-last_io));
66                 exit_cleanup(RERR_TIMEOUT);
67         }
68 }
69
70 /* setup the fd used to propogate errors */
71 void io_set_error_fd(int fd)
72 {
73         io_error_fd = fd;
74 }
75
76 /* read some data from the error fd and write it to the write log code */
77 static void read_error_fd(void)
78 {
79         char buf[200];
80         int n;
81         int fd = io_error_fd;
82         int tag, len;
83
84         io_error_fd = -1;
85
86         read_loop(fd, buf, 4);
87         tag = IVAL(buf, 0);
88
89         len = tag & 0xFFFFFF;
90         tag = tag >> 24;
91         tag -= MPLEX_BASE;
92
93         while (len) {
94                 n = len;
95                 if (n > (sizeof(buf)-1)) n = sizeof(buf)-1;
96                 read_loop(fd, buf, n);
97                 rwrite((enum logcode)tag, buf, n);
98                 len -= n;
99         }
100
101         io_error_fd = fd;
102 }
103
104
105 static int no_flush;
106
107 /* read from a socket with IO timeout. return the number of
108    bytes read. If no bytes can be read then exit, never return
109    a number <= 0 */
110 static int read_timeout(int fd, char *buf, int len)
111 {
112         int n, ret=0;
113
114         io_flush();
115
116         while (ret == 0) {
117                 fd_set fds;
118                 struct timeval tv;
119                 int fd_count = fd+1;
120
121                 FD_ZERO(&fds);
122                 FD_SET(fd, &fds);
123                 if (io_error_fd != -1) {
124                         FD_SET(io_error_fd, &fds);
125                         if (io_error_fd > fd) fd_count = io_error_fd+1;
126                 }
127
128                 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
129                 tv.tv_usec = 0;
130
131                 errno = 0;
132
133                 if (select(fd_count, &fds, NULL, NULL, &tv) < 1) {
134                         if (errno == EBADF) {
135                                 exit_cleanup(RERR_SOCKETIO);
136                         }
137                         check_timeout();
138                         continue;
139                 }
140
141                 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &fds)) {
142                         read_error_fd();
143                 }
144
145                 if (!FD_ISSET(fd, &fds)) continue;
146
147                 n = read(fd, buf, len);
148
149                 if (n > 0) {
150                         buf += n;
151                         len -= n;
152                         ret += n;
153                         if (io_timeout)
154                                 last_io = time(NULL);
155                         continue;
156                 }
157
158                 if (n == -1 && errno == EINTR) {
159                         continue;
160                 }
161
162                 if (n == -1 && 
163                     (errno == EWOULDBLOCK || errno == EAGAIN)) {
164                         continue;
165                 }
166
167
168                 if (n == 0) {
169                         if (eof_error) {
170                                 rprintf(FERROR,"unexpected EOF in read_timeout\n");
171                         }
172                         exit_cleanup(RERR_STREAMIO);
173                 }
174
175                 /* this prevents us trying to write errors on a dead socket */
176                 io_multiplexing_close();
177
178                 rprintf(FERROR,"read error: %s\n", strerror(errno));
179                 exit_cleanup(RERR_STREAMIO);
180         }
181
182         return ret;
183 }
184
185 /* continue trying to read len bytes - don't return until len
186    has been read */
187 static void read_loop(int fd, char *buf, int len)
188 {
189         while (len) {
190                 int n = read_timeout(fd, buf, len);
191
192                 buf += n;
193                 len -= n;
194         }
195 }
196
197 /* read from the file descriptor handling multiplexing - 
198    return number of bytes read
199    never return <= 0 */
200 static int read_unbuffered(int fd, char *buf, int len)
201 {
202         static int remaining;
203         int tag, ret=0;
204         char line[1024];
205
206         if (!io_multiplexing_in || fd != multiplex_in_fd) 
207                 return read_timeout(fd, buf, len);
208
209         while (ret == 0) {
210                 if (remaining) {
211                         len = MIN(len, remaining);
212                         read_loop(fd, buf, len);
213                         remaining -= len;
214                         ret = len;
215                         continue;
216                 }
217
218                 read_loop(fd, line, 4);
219                 tag = IVAL(line, 0);
220
221                 remaining = tag & 0xFFFFFF;
222                 tag = tag >> 24;
223
224                 if (tag == MPLEX_BASE) continue;
225
226                 tag -= MPLEX_BASE;
227
228                 if (tag != FERROR && tag != FINFO) {
229                         rprintf(FERROR,"unexpected tag %d\n", tag);
230                         exit_cleanup(RERR_STREAMIO);
231                 }
232
233                 if (remaining > sizeof(line)-1) {
234                         rprintf(FERROR,"multiplexing overflow %d\n\n", 
235                                 remaining);
236                         exit_cleanup(RERR_STREAMIO);
237                 }
238
239                 read_loop(fd, line, remaining);
240                 line[remaining] = 0;
241
242                 rprintf((enum logcode)tag,"%s", line);
243                 remaining = 0;
244         }
245
246         return ret;
247 }
248
249
250 /* do a buffered read from fd. don't return until all N bytes
251    have been read. If all N can't be read then exit with an error */
252 static void readfd(int fd,char *buffer,int N)
253 {
254         int  ret;
255         int total=0;  
256         
257         while (total < N) {
258                 io_flush();
259
260                 ret = read_unbuffered(fd,buffer + total,N-total);
261                 total += ret;
262         }
263
264         stats.total_read += total;
265 }
266
267
268 int32 read_int(int f)
269 {
270         char b[4];
271         int32 ret;
272
273         readfd(f,b,4);
274         ret = IVAL(b,0);
275         if (ret == (int32)0xffffffff) return -1;
276         return ret;
277 }
278
279 int64 read_longint(int f)
280 {
281         extern int remote_version;
282         int64 ret;
283         char b[8];
284         ret = read_int(f);
285
286         if ((int32)ret != (int32)0xffffffff) {
287                 return ret;
288         }
289
290 #ifdef NO_INT64
291         rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
292         exit_cleanup(RERR_UNSUPPORTED);
293 #else
294         if (remote_version >= 16) {
295                 readfd(f,b,8);
296                 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
297         }
298 #endif
299
300         return ret;
301 }
302
303 void read_buf(int f,char *buf,int len)
304 {
305         readfd(f,buf,len);
306 }
307
308 void read_sbuf(int f,char *buf,int len)
309 {
310         read_buf(f,buf,len);
311         buf[len] = 0;
312 }
313
314 unsigned char read_byte(int f)
315 {
316         unsigned char c;
317         read_buf(f,(char *)&c,1);
318         return c;
319 }
320
321
322
323 /* write len bytes to fd, possibly reading from buffer_f_in if set
324    in order to unclog the pipe. don't return until all len
325    bytes have been written */
326 static void writefd_unbuffered(int fd,char *buf,int len)
327 {
328         int total = 0;
329         fd_set w_fds, r_fds;
330         int fd_count, count;
331         struct timeval tv;
332
333         no_flush++;
334
335         while (total < len) {
336                 FD_ZERO(&w_fds);
337                 FD_ZERO(&r_fds);
338                 FD_SET(fd,&w_fds);
339                 fd_count = fd;
340
341                 if (io_error_fd != -1) {
342                         FD_SET(io_error_fd,&r_fds);
343                         if (io_error_fd > fd_count) 
344                                 fd_count = io_error_fd;
345                 }
346
347                 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
348                 tv.tv_usec = 0;
349
350                 errno = 0;
351
352                 count = select(fd_count+1,
353                                io_error_fd != -1?&r_fds:NULL,
354                                &w_fds,NULL,
355                                &tv);
356
357                 if (count <= 0) {
358                         if (errno == EBADF) {
359                                 exit_cleanup(RERR_SOCKETIO);
360                         }
361                         check_timeout();
362                         continue;
363                 }
364
365                 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &r_fds)) {
366                         read_error_fd();
367                 }
368
369                 if (FD_ISSET(fd, &w_fds)) {
370                         int ret, n = len-total;
371                         
372                         ret = write(fd,buf+total,n);
373
374                         if (ret == -1 && errno == EINTR) {
375                                 continue;
376                         }
377
378                         if (ret == -1 && 
379                             (errno == EWOULDBLOCK || errno == EAGAIN)) {
380                                 continue;
381                         }
382
383                         if (ret <= 0) {
384                                 rprintf(FERROR,"erroring writing %d bytes - exiting\n", len);
385                                 exit_cleanup(RERR_STREAMIO);
386                         }
387
388                         total += ret;
389
390                         if (io_timeout)
391                                 last_io = time(NULL);
392                 }
393         }
394
395         no_flush--;
396 }
397
398
399 static char *io_buffer;
400 static int io_buffer_count;
401
402 void io_start_buffering(int fd)
403 {
404         if (io_buffer) return;
405         multiplex_out_fd = fd;
406         io_buffer = (char *)malloc(IO_BUFFER_SIZE);
407         if (!io_buffer) out_of_memory("writefd");
408         io_buffer_count = 0;
409 }
410
411 /* write an message to a multiplexed stream. If this fails then rsync
412    exits */
413 static void mplex_write(int fd, enum logcode code, char *buf, int len)
414 {
415         char buffer[4096];
416         int n = len;
417
418         SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
419
420         if (n > (sizeof(buffer)-4)) {
421                 n = sizeof(buffer)-4;
422         }
423
424         memcpy(&buffer[4], buf, n);
425         writefd_unbuffered(fd, buffer, n+4);
426
427         len -= n;
428         buf += n;
429
430         if (len) {
431                 writefd_unbuffered(fd, buf, len);
432         }
433 }
434
435
436 void io_flush(void)
437 {
438         int fd = multiplex_out_fd;
439         if (!io_buffer_count || no_flush) return;
440
441         if (io_multiplexing_out) {
442                 mplex_write(fd, FNONE, io_buffer, io_buffer_count);
443         } else {
444                 writefd_unbuffered(fd, io_buffer, io_buffer_count);
445         }
446         io_buffer_count = 0;
447 }
448
449 void io_end_buffering(int fd)
450 {
451         io_flush();
452         if (!io_multiplexing_out) {
453                 free(io_buffer);
454                 io_buffer = NULL;
455         }
456 }
457
458 static void writefd(int fd,char *buf,int len)
459 {
460         stats.total_written += len;
461
462         if (!io_buffer || fd != multiplex_out_fd) {
463                 writefd_unbuffered(fd, buf, len);
464                 return;
465         }
466
467         while (len) {
468                 int n = MIN(len, IO_BUFFER_SIZE-io_buffer_count);
469                 if (n > 0) {
470                         memcpy(io_buffer+io_buffer_count, buf, n);
471                         buf += n;
472                         len -= n;
473                         io_buffer_count += n;
474                 }
475                 
476                 if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
477         }
478 }
479
480
481 void write_int(int f,int32 x)
482 {
483         char b[4];
484         SIVAL(b,0,x);
485         writefd(f,b,4);
486 }
487
488 void write_longint(int f, int64 x)
489 {
490         extern int remote_version;
491         char b[8];
492
493         if (remote_version < 16 || x <= 0x7FFFFFFF) {
494                 write_int(f, (int)x);
495                 return;
496         }
497
498         write_int(f, (int32)0xFFFFFFFF);
499         SIVAL(b,0,(x&0xFFFFFFFF));
500         SIVAL(b,4,((x>>32)&0xFFFFFFFF));
501
502         writefd(f,b,8);
503 }
504
505 void write_buf(int f,char *buf,int len)
506 {
507         writefd(f,buf,len);
508 }
509
510 /* write a string to the connection */
511 static void write_sbuf(int f,char *buf)
512 {
513         write_buf(f, buf, strlen(buf));
514 }
515
516
517 void write_byte(int f,unsigned char c)
518 {
519         write_buf(f,(char *)&c,1);
520 }
521
522 int read_line(int f, char *buf, int maxlen)
523 {
524         eof_error = 0;
525
526         while (maxlen) {
527                 buf[0] = 0;
528                 read_buf(f, buf, 1);
529                 if (buf[0] == 0) return 0;
530                 if (buf[0] == '\n') {
531                         buf[0] = 0;
532                         break;
533                 }
534                 if (buf[0] != '\r') {
535                         buf++;
536                         maxlen--;
537                 }
538         }
539         if (maxlen == 0) {
540                 *buf = 0;
541                 return 0;
542         }
543
544         eof_error = 1;
545
546         return 1;
547 }
548
549
550 void io_printf(int fd, const char *format, ...)
551 {
552         va_list ap;  
553         char buf[1024];
554         int len;
555         
556         va_start(ap, format);
557         len = vslprintf(buf, sizeof(buf), format, ap);
558         va_end(ap);
559
560         if (len < 0) exit_cleanup(RERR_STREAMIO);
561
562         write_sbuf(fd, buf);
563 }
564
565
566 /* setup for multiplexing an error stream with the data stream */
567 void io_start_multiplex_out(int fd)
568 {
569         multiplex_out_fd = fd;
570         io_flush();
571         io_start_buffering(fd);
572         io_multiplexing_out = 1;
573 }
574
575 /* setup for multiplexing an error stream with the data stream */
576 void io_start_multiplex_in(int fd)
577 {
578         multiplex_in_fd = fd;
579         io_flush();
580         io_multiplexing_in = 1;
581 }
582
583 /* write an message to the multiplexed error stream */
584 int io_multiplex_write(enum logcode code, char *buf, int len)
585 {
586         if (!io_multiplexing_out) return 0;
587
588         io_flush();
589         stats.total_written += (len+4);
590         mplex_write(multiplex_out_fd, code, buf, len);
591         return 1;
592 }
593
594 /* write a message to the special error fd */
595 int io_error_write(int f, enum logcode code, char *buf, int len)
596 {
597         if (f == -1) return 0;
598         mplex_write(f, code, buf, len);
599         return 1;
600 }
601
602 /* stop output multiplexing */
603 void io_multiplexing_close(void)
604 {
605         io_multiplexing_out = 0;
606 }
607
608 void io_close_input(int fd)
609 {
610         buffer_f_in = -1;
611 }
612