added --stats option for verbose stats on the file transfer
[rsync/rsync.git] / io.c
1 /* 
2    Copyright (C) Andrew Tridgell 1996
3    Copyright (C) Paul Mackerras 1996
4    
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License as published by
7    the Free Software Foundation; either version 2 of the License, or
8    (at your option) any later version.
9    
10    This program is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13    GNU General Public License for more details.
14    
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 */
19
20 /*
21   Utilities used in rsync 
22
23   tridge, June 1996
24   */
25 #include "rsync.h"
26
27 static int io_multiplexing_out;
28 static int io_multiplexing_in;
29 static int multiplex_in_fd;
30 static int multiplex_out_fd;
31 static time_t last_io;
32 static int eof_error=1;
33 extern int verbose;
34 extern int io_timeout;
35 extern struct stats stats;
36
37 static int buffer_f_in = -1;
38
39 void setup_readbuffer(int f_in)
40 {
41         buffer_f_in = f_in;
42 }
43
44 static void check_timeout(void)
45 {
46         time_t t;
47         
48         if (!io_timeout) return;
49
50         if (!last_io) {
51                 last_io = time(NULL);
52                 return;
53         }
54
55         t = time(NULL);
56
57         if (last_io && io_timeout && (t-last_io)>io_timeout) {
58                 rprintf(FERROR,"read timeout after %d second - exiting\n", 
59                         (int)(t-last_io));
60                 exit_cleanup(1);
61         }
62 }
63
64
65 static char *read_buffer;
66 static char *read_buffer_p;
67 static int read_buffer_len;
68 static int read_buffer_size;
69 static int no_flush;
70
71 /* read from a socket with IO timeout. return the number of
72    bytes read. If no bytes can be read then exit, never return
73    a number <= 0 */
74 static int read_timeout(int fd, char *buf, int len)
75 {
76         int n, ret=0;
77
78         io_flush();
79
80         while (ret == 0) {
81                 fd_set fds;
82                 struct timeval tv;
83
84                 FD_ZERO(&fds);
85                 FD_SET(fd, &fds);
86                 tv.tv_sec = io_timeout;
87                 tv.tv_usec = 0;
88
89                 if (select(fd+1, &fds, NULL, NULL, 
90                            io_timeout?&tv:NULL) != 1) {
91                         check_timeout();
92                         continue;
93                 }
94
95                 n = read(fd, buf, len);
96
97                 if (n > 0) {
98                         stats.total_read += n;
99                         buf += n;
100                         len -= n;
101                         ret += n;
102                         if (io_timeout)
103                                 last_io = time(NULL);
104                         continue;
105                 }
106
107                 if (n == -1 && errno == EINTR) {
108                         continue;
109                 }
110
111                 if (n == 0) {
112                         if (eof_error) {
113                                 rprintf(FERROR,"EOF in read_timeout\n");
114                         }
115                         exit_cleanup(1);
116                 }
117
118                 rprintf(FERROR,"read error: %s\n", strerror(errno));
119                 exit_cleanup(1);
120         }
121
122         return ret;
123 }
124
125 /* continue trying to read len bytes - don't return until len
126    has been read */
127 static void read_loop(int fd, char *buf, int len)
128 {
129         while (len) {
130                 int n = read_timeout(fd, buf, len);
131
132                 buf += n;
133                 len -= n;
134         }
135 }
136
137 /* read from the file descriptor handling multiplexing - 
138    return number of bytes read
139    never return <= 0 */
140 static int read_unbuffered(int fd, char *buf, int len)
141 {
142         static int remaining;
143         char ibuf[4];
144         int tag, ret=0;
145         char line[1024];
146
147         if (!io_multiplexing_in || fd != multiplex_in_fd) 
148                 return read_timeout(fd, buf, len);
149
150         while (ret == 0) {
151                 if (remaining) {
152                         len = MIN(len, remaining);
153                         read_loop(fd, buf, len);
154                         remaining -= len;
155                         ret = len;
156                         continue;
157                 }
158
159                 read_loop(fd, ibuf, 4);
160                 tag = IVAL(ibuf, 0);
161
162                 remaining = tag & 0xFFFFFF;
163                 tag = tag >> 24;
164
165                 if (tag == MPLEX_BASE) continue;
166
167                 tag -= MPLEX_BASE;
168
169                 if (tag != FERROR && tag != FINFO) {
170                         rprintf(FERROR,"unexpected tag %d\n", tag);
171                         exit_cleanup(1);
172                 }
173
174                 if (remaining > sizeof(line)-1) {
175                         rprintf(FERROR,"multiplexing overflow %d\n\n", 
176                                 remaining);
177                         exit_cleanup(1);
178                 }
179
180                 read_loop(fd, line, remaining);
181                 line[remaining] = 0;
182
183                 rprintf(tag,"%s", line);
184                 remaining = 0;
185         }
186
187         return ret;
188 }
189
190
191
192 /* This function was added to overcome a deadlock problem when using
193  * ssh.  It looks like we can't allow our receive queue to get full or
194  * ssh will clag up. Uggh.  */
195 static void read_check(int f)
196 {
197         int n = 8192;
198
199         if (f == -1) return;
200
201         if (read_buffer_len == 0) {
202                 read_buffer_p = read_buffer;
203         }
204
205         if (n > MAX_READ_BUFFER/4)
206                 n = MAX_READ_BUFFER/4;
207
208         if (read_buffer_p != read_buffer) {
209                 memmove(read_buffer,read_buffer_p,read_buffer_len);
210                 read_buffer_p = read_buffer;
211         }
212
213         if (n > (read_buffer_size - read_buffer_len)) {
214                 read_buffer_size += n;
215                 read_buffer = (char *)Realloc(read_buffer,read_buffer_size);
216                 if (!read_buffer) out_of_memory("read check");      
217                 read_buffer_p = read_buffer;      
218         }
219
220         n = read_unbuffered(f,read_buffer+read_buffer_len,n);
221         read_buffer_len += n;
222 }
223
224
225 /* do a buffered read from fd. don't return until all N bytes
226    have been read. If all N can't be read then exit with an error */
227 static void readfd(int fd,char *buffer,int N)
228 {
229         int  ret;
230         int total=0;  
231         
232         if (read_buffer_len < N && N < 1024) {
233                 read_check(buffer_f_in);
234         }
235         
236         while (total < N) {
237                 if (read_buffer_len > 0 && buffer_f_in == fd) {
238                         ret = MIN(read_buffer_len,N-total);
239                         memcpy(buffer+total,read_buffer_p,ret);
240                         read_buffer_p += ret;
241                         read_buffer_len -= ret;
242                         total += ret;
243                         continue;
244                 } 
245
246                 io_flush();
247
248                 ret = read_unbuffered(fd,buffer + total,N-total);
249                 total += ret;
250         }
251 }
252
253
254 int32 read_int(int f)
255 {
256         char b[4];
257         readfd(f,b,4);
258         return IVAL(b,0);
259 }
260
261 int64 read_longint(int f)
262 {
263         extern int remote_version;
264         int64 ret;
265         char b[8];
266         ret = read_int(f);
267
268         if ((int32)ret != (int32)0xffffffff) return ret;
269
270 #ifdef NO_INT64
271         rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
272         exit_cleanup(1);
273 #else
274         if (remote_version >= 16) {
275                 readfd(f,b,8);
276                 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
277         }
278 #endif
279
280         return ret;
281 }
282
283 void read_buf(int f,char *buf,int len)
284 {
285         readfd(f,buf,len);
286 }
287
288 void read_sbuf(int f,char *buf,int len)
289 {
290         read_buf(f,buf,len);
291         buf[len] = 0;
292 }
293
294 unsigned char read_byte(int f)
295 {
296         unsigned char c;
297         read_buf(f,(char *)&c,1);
298         return c;
299 }
300
301
302
303 /* write len bytes to fd, possibly reading from buffer_f_in if set
304    in order to unclog the pipe. don't return until all len
305    bytes have been written */
306 static void writefd_unbuffered(int fd,char *buf,int len)
307 {
308         int total = 0;
309         fd_set w_fds, r_fds;
310         int fd_count, count;
311         struct timeval tv;
312         int reading;
313
314         no_flush++;
315
316         reading = (buffer_f_in != -1 && read_buffer_len < MAX_READ_BUFFER);
317
318         while (total < len) {
319                 FD_ZERO(&w_fds);
320                 FD_ZERO(&r_fds);
321                 FD_SET(fd,&w_fds);
322                 fd_count = fd+1;
323
324                 if (reading) {
325                         FD_SET(buffer_f_in,&r_fds);
326                         if (buffer_f_in > fd) 
327                                 fd_count = buffer_f_in+1;
328                 }
329
330                 tv.tv_sec = io_timeout;
331                 tv.tv_usec = 0;
332
333                 count = select(fd_count,
334                                reading?&r_fds:NULL,
335                                &w_fds,NULL,
336                                io_timeout?&tv:NULL);
337
338                 if (count <= 0) {
339                         check_timeout();
340                         continue;
341                 }
342
343                 if (FD_ISSET(fd, &w_fds)) {
344                         int ret = write(fd,buf+total,len-total);
345
346                         if (ret == -1 && errno == EINTR) {
347                                 continue;
348                         }
349
350                         if (ret <= 0) {
351                                 rprintf(FERROR,"erroring writing %d bytes - exiting\n", len);
352                                 exit_cleanup(1);
353                         }
354
355                         total += ret;
356                         stats.total_written += ret;
357
358                         if (io_timeout)
359                                 last_io = time(NULL);
360                         continue;
361                 }
362
363                 if (reading && FD_ISSET(buffer_f_in, &r_fds)) {
364                         read_check(buffer_f_in);
365                 }
366         }
367
368         no_flush--;
369 }
370
371
372 static char *io_buffer;
373 static int io_buffer_count;
374
375 void io_start_buffering(int fd)
376 {
377         if (io_buffer) return;
378         multiplex_out_fd = fd;
379         io_buffer = (char *)malloc(IO_BUFFER_SIZE+4);
380         if (!io_buffer) out_of_memory("writefd");
381         io_buffer_count = 0;
382
383         /* leave room for the multiplex header in case it's needed */
384         io_buffer += 4;
385 }
386
387 void io_flush(void)
388 {
389         int fd = multiplex_out_fd;
390         if (!io_buffer_count || no_flush) return;
391
392         if (io_multiplexing_out) {
393                 SIVAL(io_buffer-4, 0, (MPLEX_BASE<<24) + io_buffer_count);
394                 writefd_unbuffered(fd, io_buffer-4, io_buffer_count+4);
395         } else {
396                 writefd_unbuffered(fd, io_buffer, io_buffer_count);
397         }
398         io_buffer_count = 0;
399 }
400
401 void io_end_buffering(int fd)
402 {
403         io_flush();
404         if (!io_multiplexing_out) {
405                 free(io_buffer-4);
406                 io_buffer = NULL;
407         }
408 }
409
410 static void writefd(int fd,char *buf,int len)
411 {
412         if (!io_buffer) {
413                 writefd_unbuffered(fd, buf, len);
414                 return;
415         }
416
417         while (len) {
418                 int n = MIN(len, IO_BUFFER_SIZE-io_buffer_count);
419                 if (n > 0) {
420                         memcpy(io_buffer+io_buffer_count, buf, n);
421                         buf += n;
422                         len -= n;
423                         io_buffer_count += n;
424                 }
425                 
426                 if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
427         }
428 }
429
430
431 void write_int(int f,int32 x)
432 {
433         char b[4];
434         SIVAL(b,0,x);
435         writefd(f,b,4);
436 }
437
438 void write_longint(int f, int64 x)
439 {
440         extern int remote_version;
441         char b[8];
442
443         if (remote_version < 16 || x <= 0x7FFFFFFF) {
444                 write_int(f, (int)x);
445                 return;
446         }
447
448         write_int(f, -1);
449         SIVAL(b,0,(x&0xFFFFFFFF));
450         SIVAL(b,4,((x>>32)&0xFFFFFFFF));
451
452         writefd(f,b,8);
453 }
454
455 void write_buf(int f,char *buf,int len)
456 {
457         writefd(f,buf,len);
458 }
459
460 /* write a string to the connection */
461 void write_sbuf(int f,char *buf)
462 {
463         write_buf(f, buf, strlen(buf));
464 }
465
466
467 void write_byte(int f,unsigned char c)
468 {
469         write_buf(f,(char *)&c,1);
470 }
471
472 int read_line(int f, char *buf, int maxlen)
473 {
474         eof_error = 0;
475
476         while (maxlen) {
477                 buf[0] = 0;
478                 read_buf(f, buf, 1);
479                 if (buf[0] == 0) return 0;
480                 if (buf[0] == '\n') {
481                         buf[0] = 0;
482                         break;
483                 }
484                 if (buf[0] != '\r') {
485                         buf++;
486                         maxlen--;
487                 }
488         }
489         if (maxlen == 0) {
490                 *buf = 0;
491                 return 0;
492         }
493
494         eof_error = 1;
495
496         return 1;
497 }
498
499
500 void io_printf(int fd, const char *format, ...)
501 {
502         va_list ap;  
503         char buf[1024];
504         int len;
505         
506         va_start(ap, format);
507         len = vslprintf(buf, sizeof(buf)-1, format, ap);
508         va_end(ap);
509
510         if (len < 0) exit_cleanup(1);
511
512         write_sbuf(fd, buf);
513 }
514
515
516 /* setup for multiplexing an error stream with the data stream */
517 void io_start_multiplex_out(int fd)
518 {
519         multiplex_out_fd = fd;
520         io_flush();
521         io_start_buffering(fd);
522         io_multiplexing_out = 1;
523 }
524
525 /* setup for multiplexing an error stream with the data stream */
526 void io_start_multiplex_in(int fd)
527 {
528         multiplex_in_fd = fd;
529         io_flush();
530         if (read_buffer_len) {
531                 fprintf(stderr,"ERROR: data in read buffer at mplx start\n");
532                 exit_cleanup(1);
533         }
534
535         io_multiplexing_in = 1;
536 }
537
538 /* write an message to the error stream */
539 int io_multiplex_write(int f, char *buf, int len)
540 {
541         if (!io_multiplexing_out) return 0;
542
543         io_flush();
544
545         SIVAL(io_buffer-4, 0, ((MPLEX_BASE + f)<<24) + len);
546         memcpy(io_buffer, buf, len);
547
548         writefd_unbuffered(multiplex_out_fd, io_buffer-4, len+4);
549         return 1;
550 }
551
552 void io_close_input(int fd)
553 {
554         buffer_f_in = -1;
555 }