6f3854e3b23ae2e84957ed8819479e26edb29e61
[rsync/rsync.git] / io.c
1 /* 
2    Copyright (C) Andrew Tridgell 1996
3    Copyright (C) Paul Mackerras 1996
4    
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License as published by
7    the Free Software Foundation; either version 2 of the License, or
8    (at your option) any later version.
9    
10    This program is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13    GNU General Public License for more details.
14    
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 */
19
20 /*
21   socket and pipe IO utilities used in rsync 
22
23   tridge, June 1996
24   */
25 #include "rsync.h"
26
27 /* if no timeout is specified then use a 60 second select timeout */
28 #define SELECT_TIMEOUT 60
29
30 static int io_multiplexing_out;
31 static int io_multiplexing_in;
32 static int multiplex_in_fd;
33 static int multiplex_out_fd;
34 static time_t last_io;
35 static int eof_error=1;
36 extern int verbose;
37 extern int io_timeout;
38 extern struct stats stats;
39
40 static int buffer_f_in = -1;
41 static int io_error_fd = -1;
42
43 void setup_readbuffer(int f_in)
44 {
45         buffer_f_in = f_in;
46 }
47
48 static void check_timeout(void)
49 {
50         time_t t;
51         
52         if (!io_timeout) return;
53
54         if (!last_io) {
55                 last_io = time(NULL);
56                 return;
57         }
58
59         t = time(NULL);
60
61         if (last_io && io_timeout && (t-last_io) >= io_timeout) {
62                 rprintf(FERROR,"io timeout after %d second - exiting\n", 
63                         (int)(t-last_io));
64                 exit_cleanup(RERR_TIMEOUT);
65         }
66 }
67
68 /* setup the fd used to propogate errors */
69 void io_set_error_fd(int fd)
70 {
71         io_error_fd = fd;
72 }
73
74 /* read some data from the error fd and write it to FERROR */
75 static void read_error_fd(void)
76 {
77         char buf[200];
78         int n;
79         int fd = io_error_fd;
80         io_error_fd = -1;
81
82         n = read(fd, buf, sizeof(buf)-1);
83         if (n > 0) {
84                 rwrite(FERROR, buf, n);
85         }
86
87         io_error_fd = fd;
88 }
89
90
91 static int no_flush;
92
93 /* read from a socket with IO timeout. return the number of
94    bytes read. If no bytes can be read then exit, never return
95    a number <= 0 */
96 static int read_timeout(int fd, char *buf, int len)
97 {
98         int n, ret=0;
99
100         io_flush();
101
102         while (ret == 0) {
103                 fd_set fds;
104                 struct timeval tv;
105                 int fd_count = fd+1;
106
107                 FD_ZERO(&fds);
108                 FD_SET(fd, &fds);
109                 if (io_error_fd != -1) {
110                         FD_SET(io_error_fd, &fds);
111                         if (io_error_fd > fd) fd_count = io_error_fd+1;
112                 }
113
114                 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
115                 tv.tv_usec = 0;
116
117                 errno = 0;
118
119                 if (select(fd_count, &fds, NULL, NULL, &tv) < 1) {
120                         if (errno == EBADF) {
121                                 exit_cleanup(RERR_SOCKETIO);
122                         }
123                         check_timeout();
124                         continue;
125                 }
126
127                 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &fds)) {
128                         read_error_fd();
129                 }
130
131                 if (!FD_ISSET(fd, &fds)) continue;
132
133                 n = read(fd, buf, len);
134
135                 if (n > 0) {
136                         buf += n;
137                         len -= n;
138                         ret += n;
139                         if (io_timeout)
140                                 last_io = time(NULL);
141                         continue;
142                 }
143
144                 if (n == -1 && errno == EINTR) {
145                         continue;
146                 }
147
148
149                 if (n == 0) {
150                         if (eof_error) {
151                                 rprintf(FERROR,"unexpected EOF in read_timeout\n");
152                         }
153                         exit_cleanup(RERR_STREAMIO);
154                 }
155
156                 /* this prevents us trying to write errors on a dead socket */
157                 io_multiplexing_close();
158
159                 rprintf(FERROR,"read error: %s\n", strerror(errno));
160                 exit_cleanup(RERR_STREAMIO);
161         }
162
163         return ret;
164 }
165
166 /* continue trying to read len bytes - don't return until len
167    has been read */
168 static void read_loop(int fd, char *buf, int len)
169 {
170         while (len) {
171                 int n = read_timeout(fd, buf, len);
172
173                 buf += n;
174                 len -= n;
175         }
176 }
177
178 /* read from the file descriptor handling multiplexing - 
179    return number of bytes read
180    never return <= 0 */
181 static int read_unbuffered(int fd, char *buf, int len)
182 {
183         static int remaining;
184         char ibuf[4];
185         int tag, ret=0;
186         char line[1024];
187
188         if (!io_multiplexing_in || fd != multiplex_in_fd) 
189                 return read_timeout(fd, buf, len);
190
191         while (ret == 0) {
192                 if (remaining) {
193                         len = MIN(len, remaining);
194                         read_loop(fd, buf, len);
195                         remaining -= len;
196                         ret = len;
197                         continue;
198                 }
199
200                 read_loop(fd, ibuf, 4);
201                 tag = IVAL(ibuf, 0);
202
203                 remaining = tag & 0xFFFFFF;
204                 tag = tag >> 24;
205
206                 if (tag == MPLEX_BASE) continue;
207
208                 tag -= MPLEX_BASE;
209
210                 if (tag != FERROR && tag != FINFO) {
211                         rprintf(FERROR,"unexpected tag %d\n", tag);
212                         exit_cleanup(RERR_STREAMIO);
213                 }
214
215                 if (remaining > sizeof(line)-1) {
216                         rprintf(FERROR,"multiplexing overflow %d\n\n", 
217                                 remaining);
218                         exit_cleanup(RERR_STREAMIO);
219                 }
220
221                 read_loop(fd, line, remaining);
222                 line[remaining] = 0;
223
224                 rprintf(tag,"%s", line);
225                 remaining = 0;
226         }
227
228         return ret;
229 }
230
231
232 /* do a buffered read from fd. don't return until all N bytes
233    have been read. If all N can't be read then exit with an error */
234 static void readfd(int fd,char *buffer,int N)
235 {
236         int  ret;
237         int total=0;  
238         
239         while (total < N) {
240                 io_flush();
241
242                 ret = read_unbuffered(fd,buffer + total,N-total);
243                 total += ret;
244         }
245
246         stats.total_read += total;
247 }
248
249
250 int32 read_int(int f)
251 {
252         char b[4];
253         int32 ret;
254
255         readfd(f,b,4);
256         ret = IVAL(b,0);
257         if (ret == (int32)0xffffffff) return -1;
258         return ret;
259 }
260
261 int64 read_longint(int f)
262 {
263         extern int remote_version;
264         int64 ret;
265         char b[8];
266         ret = read_int(f);
267
268         if ((int32)ret != (int32)0xffffffff) {
269                 return ret;
270         }
271
272 #ifdef NO_INT64
273         rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
274         exit_cleanup(RERR_UNSUPPORTED);
275 #else
276         if (remote_version >= 16) {
277                 readfd(f,b,8);
278                 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
279         }
280 #endif
281
282         return ret;
283 }
284
285 void read_buf(int f,char *buf,int len)
286 {
287         readfd(f,buf,len);
288 }
289
290 void read_sbuf(int f,char *buf,int len)
291 {
292         read_buf(f,buf,len);
293         buf[len] = 0;
294 }
295
296 unsigned char read_byte(int f)
297 {
298         unsigned char c;
299         read_buf(f,(char *)&c,1);
300         return c;
301 }
302
303
304
305 /* write len bytes to fd, possibly reading from buffer_f_in if set
306    in order to unclog the pipe. don't return until all len
307    bytes have been written */
308 static void writefd_unbuffered(int fd,char *buf,int len)
309 {
310         int total = 0;
311         fd_set w_fds, r_fds;
312         int fd_count, count;
313         struct timeval tv;
314
315         no_flush++;
316
317         while (total < len) {
318                 FD_ZERO(&w_fds);
319                 FD_ZERO(&r_fds);
320                 FD_SET(fd,&w_fds);
321                 fd_count = fd;
322
323                 if (io_error_fd != -1) {
324                         FD_SET(io_error_fd,&r_fds);
325                         if (io_error_fd > fd_count) 
326                                 fd_count = io_error_fd;
327                 }
328
329                 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
330                 tv.tv_usec = 0;
331
332                 errno = 0;
333
334                 count = select(fd_count+1,
335                                io_error_fd != -1?&r_fds:NULL,
336                                &w_fds,NULL,
337                                &tv);
338
339                 if (count <= 0) {
340                         if (errno == EBADF) {
341                                 exit_cleanup(RERR_SOCKETIO);
342                         }
343                         check_timeout();
344                         continue;
345                 }
346
347                 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &r_fds)) {
348                         read_error_fd();
349                 }
350
351                 if (FD_ISSET(fd, &w_fds)) {
352                         int ret, n = len-total;
353                         
354                         if (n > PIPE_BUF) n = PIPE_BUF;
355
356                         ret = write(fd,buf+total,n?n:1);
357
358                         if (ret == -1 && errno == EINTR) {
359                                 continue;
360                         }
361
362                         if (ret <= 0) {
363                                 rprintf(FERROR,"erroring writing %d bytes - exiting\n", len);
364                                 exit_cleanup(RERR_STREAMIO);
365                         }
366
367                         total += ret;
368
369                         if (io_timeout)
370                                 last_io = time(NULL);
371                 }
372         }
373
374         no_flush--;
375 }
376
377
378 static char *io_buffer;
379 static int io_buffer_count;
380
381 void io_start_buffering(int fd)
382 {
383         if (io_buffer) return;
384         multiplex_out_fd = fd;
385         io_buffer = (char *)malloc(IO_BUFFER_SIZE+4);
386         if (!io_buffer) out_of_memory("writefd");
387         io_buffer_count = 0;
388
389         /* leave room for the multiplex header in case it's needed */
390         io_buffer += 4;
391 }
392
393 void io_flush(void)
394 {
395         int fd = multiplex_out_fd;
396         if (!io_buffer_count || no_flush) return;
397
398         if (io_multiplexing_out) {
399                 SIVAL(io_buffer-4, 0, (MPLEX_BASE<<24) + io_buffer_count);
400                 writefd_unbuffered(fd, io_buffer-4, io_buffer_count+4);
401         } else {
402                 writefd_unbuffered(fd, io_buffer, io_buffer_count);
403         }
404         io_buffer_count = 0;
405 }
406
407 void io_end_buffering(int fd)
408 {
409         io_flush();
410         if (!io_multiplexing_out) {
411                 free(io_buffer-4);
412                 io_buffer = NULL;
413         }
414 }
415
416 static void writefd(int fd,char *buf,int len)
417 {
418         stats.total_written += len;
419
420         if (!io_buffer || fd != multiplex_out_fd) {
421                 writefd_unbuffered(fd, buf, len);
422                 return;
423         }
424
425         while (len) {
426                 int n = MIN(len, IO_BUFFER_SIZE-io_buffer_count);
427                 if (n > 0) {
428                         memcpy(io_buffer+io_buffer_count, buf, n);
429                         buf += n;
430                         len -= n;
431                         io_buffer_count += n;
432                 }
433                 
434                 if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
435         }
436 }
437
438
439 void write_int(int f,int32 x)
440 {
441         char b[4];
442         SIVAL(b,0,x);
443         writefd(f,b,4);
444 }
445
446 void write_longint(int f, int64 x)
447 {
448         extern int remote_version;
449         char b[8];
450
451         if (remote_version < 16 || x <= 0x7FFFFFFF) {
452                 write_int(f, (int)x);
453                 return;
454         }
455
456         write_int(f, (int32)0xFFFFFFFF);
457         SIVAL(b,0,(x&0xFFFFFFFF));
458         SIVAL(b,4,((x>>32)&0xFFFFFFFF));
459
460         writefd(f,b,8);
461 }
462
463 void write_buf(int f,char *buf,int len)
464 {
465         writefd(f,buf,len);
466 }
467
468 /* write a string to the connection */
469 static void write_sbuf(int f,char *buf)
470 {
471         write_buf(f, buf, strlen(buf));
472 }
473
474
475 void write_byte(int f,unsigned char c)
476 {
477         write_buf(f,(char *)&c,1);
478 }
479
480 int read_line(int f, char *buf, int maxlen)
481 {
482         eof_error = 0;
483
484         while (maxlen) {
485                 buf[0] = 0;
486                 read_buf(f, buf, 1);
487                 if (buf[0] == 0) return 0;
488                 if (buf[0] == '\n') {
489                         buf[0] = 0;
490                         break;
491                 }
492                 if (buf[0] != '\r') {
493                         buf++;
494                         maxlen--;
495                 }
496         }
497         if (maxlen == 0) {
498                 *buf = 0;
499                 return 0;
500         }
501
502         eof_error = 1;
503
504         return 1;
505 }
506
507
508 void io_printf(int fd, const char *format, ...)
509 {
510         va_list ap;  
511         char buf[1024];
512         int len;
513         
514         va_start(ap, format);
515         len = vslprintf(buf, sizeof(buf), format, ap);
516         va_end(ap);
517
518         if (len < 0) exit_cleanup(RERR_STREAMIO);
519
520         write_sbuf(fd, buf);
521 }
522
523
524 /* setup for multiplexing an error stream with the data stream */
525 void io_start_multiplex_out(int fd)
526 {
527         multiplex_out_fd = fd;
528         io_flush();
529         io_start_buffering(fd);
530         io_multiplexing_out = 1;
531 }
532
533 /* setup for multiplexing an error stream with the data stream */
534 void io_start_multiplex_in(int fd)
535 {
536         multiplex_in_fd = fd;
537         io_flush();
538         io_multiplexing_in = 1;
539 }
540
541 /* write an message to the multiplexed error stream */
542 int io_multiplex_write(int f, char *buf, int len)
543 {
544         if (!io_multiplexing_out) return 0;
545
546         io_flush();
547
548         SIVAL(io_buffer-4, 0, ((MPLEX_BASE + f)<<24) + len);
549         memcpy(io_buffer, buf, len);
550
551         stats.total_written += (len+4);
552
553         writefd_unbuffered(multiplex_out_fd, io_buffer-4, len+4);
554         return 1;
555 }
556
557 /* write a message to the special error fd */
558 int io_error_write(int f, char *buf, int len)
559 {
560         if (f == -1) return 0;
561         writefd_unbuffered(f, buf, len);
562         return 1;
563 }
564
565 /* stop output multiplexing */
566 void io_multiplexing_close(void)
567 {
568         io_multiplexing_out = 0;
569 }
570
571 void io_close_input(int fd)
572 {
573         buffer_f_in = -1;
574 }