Add --bwlimit option contributed by Matthew Demicco and Jamie Gritton.
[rsync/rsync.git] / io.c
1 /* 
2    Copyright (C) Andrew Tridgell 1996
3    Copyright (C) Paul Mackerras 1996
4    
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License as published by
7    the Free Software Foundation; either version 2 of the License, or
8    (at your option) any later version.
9    
10    This program is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13    GNU General Public License for more details.
14    
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 */
19
20 /*
21   socket and pipe IO utilities used in rsync 
22
23   tridge, June 1996
24   */
25 #include "rsync.h"
26
27 /* if no timeout is specified then use a 60 second select timeout */
28 #define SELECT_TIMEOUT 60
29
30 extern int bwlimit;
31
32 static int io_multiplexing_out;
33 static int io_multiplexing_in;
34 static int multiplex_in_fd;
35 static int multiplex_out_fd;
36 static time_t last_io;
37 static int eof_error=1;
38 extern int verbose;
39 extern int io_timeout;
40 extern struct stats stats;
41
42 static int buffer_f_in = -1;
43 static int io_error_fd = -1;
44
45 static void read_loop(int fd, char *buf, int len);
46
47 void setup_readbuffer(int f_in)
48 {
49         buffer_f_in = f_in;
50 }
51
52 static void check_timeout(void)
53 {
54         extern int am_server, am_daemon;
55         time_t t;
56         
57         if (!io_timeout) return;
58
59         if (!last_io) {
60                 last_io = time(NULL);
61                 return;
62         }
63
64         t = time(NULL);
65
66         if (last_io && io_timeout && (t-last_io) >= io_timeout) {
67                 if (!am_server && !am_daemon) {
68                         rprintf(FERROR,"io timeout after %d second - exiting\n", 
69                                 (int)(t-last_io));
70                 }
71                 exit_cleanup(RERR_TIMEOUT);
72         }
73 }
74
75 /* setup the fd used to propogate errors */
76 void io_set_error_fd(int fd)
77 {
78         io_error_fd = fd;
79 }
80
81 /* read some data from the error fd and write it to the write log code */
82 static void read_error_fd(void)
83 {
84         char buf[200];
85         int n;
86         int fd = io_error_fd;
87         int tag, len;
88
89         io_error_fd = -1;
90
91         read_loop(fd, buf, 4);
92         tag = IVAL(buf, 0);
93
94         len = tag & 0xFFFFFF;
95         tag = tag >> 24;
96         tag -= MPLEX_BASE;
97
98         while (len) {
99                 n = len;
100                 if (n > (sizeof(buf)-1)) n = sizeof(buf)-1;
101                 read_loop(fd, buf, n);
102                 rwrite((enum logcode)tag, buf, n);
103                 len -= n;
104         }
105
106         io_error_fd = fd;
107 }
108
109
110 static int no_flush;
111
112 /* read from a socket with IO timeout. return the number of
113    bytes read. If no bytes can be read then exit, never return
114    a number <= 0 */
115 static int read_timeout(int fd, char *buf, int len)
116 {
117         int n, ret=0;
118
119         io_flush();
120
121         while (ret == 0) {
122                 fd_set fds;
123                 struct timeval tv;
124                 int fd_count = fd+1;
125
126                 FD_ZERO(&fds);
127                 FD_SET(fd, &fds);
128                 if (io_error_fd != -1) {
129                         FD_SET(io_error_fd, &fds);
130                         if (io_error_fd > fd) fd_count = io_error_fd+1;
131                 }
132
133                 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
134                 tv.tv_usec = 0;
135
136                 errno = 0;
137
138                 if (select(fd_count, &fds, NULL, NULL, &tv) < 1) {
139                         if (errno == EBADF) {
140                                 exit_cleanup(RERR_SOCKETIO);
141                         }
142                         check_timeout();
143                         continue;
144                 }
145
146                 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &fds)) {
147                         read_error_fd();
148                 }
149
150                 if (!FD_ISSET(fd, &fds)) continue;
151
152                 n = read(fd, buf, len);
153
154                 if (n > 0) {
155                         buf += n;
156                         len -= n;
157                         ret += n;
158                         if (io_timeout)
159                                 last_io = time(NULL);
160                         continue;
161                 }
162
163                 if (n == -1 && errno == EINTR) {
164                         continue;
165                 }
166
167                 if (n == -1 && 
168                     (errno == EWOULDBLOCK || errno == EAGAIN)) {
169                         continue;
170                 }
171
172
173                 if (n == 0) {
174                         if (eof_error) {
175                                 rprintf(FERROR,"unexpected EOF in read_timeout\n");
176                         }
177                         exit_cleanup(RERR_STREAMIO);
178                 }
179
180                 /* this prevents us trying to write errors on a dead socket */
181                 io_multiplexing_close();
182
183                 rprintf(FERROR,"read error: %s\n", strerror(errno));
184                 exit_cleanup(RERR_STREAMIO);
185         }
186
187         return ret;
188 }
189
190 /* continue trying to read len bytes - don't return until len
191    has been read */
192 static void read_loop(int fd, char *buf, int len)
193 {
194         while (len) {
195                 int n = read_timeout(fd, buf, len);
196
197                 buf += n;
198                 len -= n;
199         }
200 }
201
202 /* read from the file descriptor handling multiplexing - 
203    return number of bytes read
204    never return <= 0 */
205 static int read_unbuffered(int fd, char *buf, int len)
206 {
207         static int remaining;
208         int tag, ret=0;
209         char line[1024];
210
211         if (!io_multiplexing_in || fd != multiplex_in_fd) 
212                 return read_timeout(fd, buf, len);
213
214         while (ret == 0) {
215                 if (remaining) {
216                         len = MIN(len, remaining);
217                         read_loop(fd, buf, len);
218                         remaining -= len;
219                         ret = len;
220                         continue;
221                 }
222
223                 read_loop(fd, line, 4);
224                 tag = IVAL(line, 0);
225
226                 remaining = tag & 0xFFFFFF;
227                 tag = tag >> 24;
228
229                 if (tag == MPLEX_BASE) continue;
230
231                 tag -= MPLEX_BASE;
232
233                 if (tag != FERROR && tag != FINFO) {
234                         rprintf(FERROR,"unexpected tag %d\n", tag);
235                         exit_cleanup(RERR_STREAMIO);
236                 }
237
238                 if (remaining > sizeof(line)-1) {
239                         rprintf(FERROR,"multiplexing overflow %d\n\n", 
240                                 remaining);
241                         exit_cleanup(RERR_STREAMIO);
242                 }
243
244                 read_loop(fd, line, remaining);
245                 line[remaining] = 0;
246
247                 rprintf((enum logcode)tag,"%s", line);
248                 remaining = 0;
249         }
250
251         return ret;
252 }
253
254
255 /* do a buffered read from fd. don't return until all N bytes
256    have been read. If all N can't be read then exit with an error */
257 static void readfd(int fd,char *buffer,int N)
258 {
259         int  ret;
260         int total=0;  
261         
262         while (total < N) {
263                 io_flush();
264
265                 ret = read_unbuffered(fd,buffer + total,N-total);
266                 total += ret;
267         }
268
269         stats.total_read += total;
270 }
271
272
273 int32 read_int(int f)
274 {
275         char b[4];
276         int32 ret;
277
278         readfd(f,b,4);
279         ret = IVAL(b,0);
280         if (ret == (int32)0xffffffff) return -1;
281         return ret;
282 }
283
284 int64 read_longint(int f)
285 {
286         extern int remote_version;
287         int64 ret;
288         char b[8];
289         ret = read_int(f);
290
291         if ((int32)ret != (int32)0xffffffff) {
292                 return ret;
293         }
294
295 #ifdef NO_INT64
296         rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
297         exit_cleanup(RERR_UNSUPPORTED);
298 #else
299         if (remote_version >= 16) {
300                 readfd(f,b,8);
301                 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
302         }
303 #endif
304
305         return ret;
306 }
307
308 void read_buf(int f,char *buf,int len)
309 {
310         readfd(f,buf,len);
311 }
312
313 void read_sbuf(int f,char *buf,int len)
314 {
315         read_buf(f,buf,len);
316         buf[len] = 0;
317 }
318
319 unsigned char read_byte(int f)
320 {
321         unsigned char c;
322         read_buf(f,(char *)&c,1);
323         return c;
324 }
325
326
327
328 /* write len bytes to fd, possibly reading from buffer_f_in if set
329    in order to unclog the pipe. don't return until all len
330    bytes have been written */
331 static void writefd_unbuffered(int fd,char *buf,int len)
332 {
333         int total = 0;
334         fd_set w_fds, r_fds;
335         int fd_count, count;
336         struct timeval tv;
337
338         no_flush++;
339
340         while (total < len) {
341                 FD_ZERO(&w_fds);
342                 FD_ZERO(&r_fds);
343                 FD_SET(fd,&w_fds);
344                 fd_count = fd;
345
346                 if (io_error_fd != -1) {
347                         FD_SET(io_error_fd,&r_fds);
348                         if (io_error_fd > fd_count) 
349                                 fd_count = io_error_fd;
350                 }
351
352                 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
353                 tv.tv_usec = 0;
354
355                 errno = 0;
356
357                 count = select(fd_count+1,
358                                io_error_fd != -1?&r_fds:NULL,
359                                &w_fds,NULL,
360                                &tv);
361
362                 if (count <= 0) {
363                         if (errno == EBADF) {
364                                 exit_cleanup(RERR_SOCKETIO);
365                         }
366                         check_timeout();
367                         continue;
368                 }
369
370                 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &r_fds)) {
371                         read_error_fd();
372                 }
373
374                 if (FD_ISSET(fd, &w_fds)) {
375                         int ret, n = len-total;
376                         
377                         ret = write(fd,buf+total,n);
378
379                         if (ret == -1 && errno == EINTR) {
380                                 continue;
381                         }
382
383                         if (ret == -1 && 
384                             (errno == EWOULDBLOCK || errno == EAGAIN)) {
385                                 continue;
386                         }
387
388                         if (ret <= 0) {
389                                 rprintf(FERROR,"erroring writing %d bytes - exiting\n", len);
390                                 exit_cleanup(RERR_STREAMIO);
391                         }
392
393                         /* Sleep after writing to limit I/O bandwidth */
394                         if (bwlimit)
395                         {
396                             tv.tv_sec = 0;
397                             tv.tv_usec = ret * 1000 / bwlimit;
398                             while (tv.tv_usec > 1000000)
399                             {
400                                 tv.tv_sec++;
401                                 tv.tv_usec -= 1000000;
402                             }
403                             select(0, NULL, NULL, NULL, &tv);
404                         }
405  
406                         total += ret;
407
408                         if (io_timeout)
409                                 last_io = time(NULL);
410                 }
411         }
412
413         no_flush--;
414 }
415
416
417 static char *io_buffer;
418 static int io_buffer_count;
419
420 void io_start_buffering(int fd)
421 {
422         if (io_buffer) return;
423         multiplex_out_fd = fd;
424         io_buffer = (char *)malloc(IO_BUFFER_SIZE);
425         if (!io_buffer) out_of_memory("writefd");
426         io_buffer_count = 0;
427 }
428
429 /* write an message to a multiplexed stream. If this fails then rsync
430    exits */
431 static void mplex_write(int fd, enum logcode code, char *buf, int len)
432 {
433         char buffer[4096];
434         int n = len;
435
436         SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
437
438         if (n > (sizeof(buffer)-4)) {
439                 n = sizeof(buffer)-4;
440         }
441
442         memcpy(&buffer[4], buf, n);
443         writefd_unbuffered(fd, buffer, n+4);
444
445         len -= n;
446         buf += n;
447
448         if (len) {
449                 writefd_unbuffered(fd, buf, len);
450         }
451 }
452
453
454 void io_flush(void)
455 {
456         int fd = multiplex_out_fd;
457         if (!io_buffer_count || no_flush) return;
458
459         if (io_multiplexing_out) {
460                 mplex_write(fd, FNONE, io_buffer, io_buffer_count);
461         } else {
462                 writefd_unbuffered(fd, io_buffer, io_buffer_count);
463         }
464         io_buffer_count = 0;
465 }
466
467 void io_end_buffering(int fd)
468 {
469         io_flush();
470         if (!io_multiplexing_out) {
471                 free(io_buffer);
472                 io_buffer = NULL;
473         }
474 }
475
476 static void writefd(int fd,char *buf,int len)
477 {
478         stats.total_written += len;
479
480         if (!io_buffer || fd != multiplex_out_fd) {
481                 writefd_unbuffered(fd, buf, len);
482                 return;
483         }
484
485         while (len) {
486                 int n = MIN(len, IO_BUFFER_SIZE-io_buffer_count);
487                 if (n > 0) {
488                         memcpy(io_buffer+io_buffer_count, buf, n);
489                         buf += n;
490                         len -= n;
491                         io_buffer_count += n;
492                 }
493                 
494                 if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
495         }
496 }
497
498
499 void write_int(int f,int32 x)
500 {
501         char b[4];
502         SIVAL(b,0,x);
503         writefd(f,b,4);
504 }
505
506 void write_longint(int f, int64 x)
507 {
508         extern int remote_version;
509         char b[8];
510
511         if (remote_version < 16 || x <= 0x7FFFFFFF) {
512                 write_int(f, (int)x);
513                 return;
514         }
515
516         write_int(f, (int32)0xFFFFFFFF);
517         SIVAL(b,0,(x&0xFFFFFFFF));
518         SIVAL(b,4,((x>>32)&0xFFFFFFFF));
519
520         writefd(f,b,8);
521 }
522
523 void write_buf(int f,char *buf,int len)
524 {
525         writefd(f,buf,len);
526 }
527
528 /* write a string to the connection */
529 static void write_sbuf(int f,char *buf)
530 {
531         write_buf(f, buf, strlen(buf));
532 }
533
534
535 void write_byte(int f,unsigned char c)
536 {
537         write_buf(f,(char *)&c,1);
538 }
539
540 int read_line(int f, char *buf, int maxlen)
541 {
542         eof_error = 0;
543
544         while (maxlen) {
545                 buf[0] = 0;
546                 read_buf(f, buf, 1);
547                 if (buf[0] == 0) return 0;
548                 if (buf[0] == '\n') {
549                         buf[0] = 0;
550                         break;
551                 }
552                 if (buf[0] != '\r') {
553                         buf++;
554                         maxlen--;
555                 }
556         }
557         if (maxlen == 0) {
558                 *buf = 0;
559                 return 0;
560         }
561
562         eof_error = 1;
563
564         return 1;
565 }
566
567
568 void io_printf(int fd, const char *format, ...)
569 {
570         va_list ap;  
571         char buf[1024];
572         int len;
573         
574         va_start(ap, format);
575         len = vslprintf(buf, sizeof(buf), format, ap);
576         va_end(ap);
577
578         if (len < 0) exit_cleanup(RERR_STREAMIO);
579
580         write_sbuf(fd, buf);
581 }
582
583
584 /* setup for multiplexing an error stream with the data stream */
585 void io_start_multiplex_out(int fd)
586 {
587         multiplex_out_fd = fd;
588         io_flush();
589         io_start_buffering(fd);
590         io_multiplexing_out = 1;
591 }
592
593 /* setup for multiplexing an error stream with the data stream */
594 void io_start_multiplex_in(int fd)
595 {
596         multiplex_in_fd = fd;
597         io_flush();
598         io_multiplexing_in = 1;
599 }
600
601 /* write an message to the multiplexed error stream */
602 int io_multiplex_write(enum logcode code, char *buf, int len)
603 {
604         if (!io_multiplexing_out) return 0;
605
606         io_flush();
607         stats.total_written += (len+4);
608         mplex_write(multiplex_out_fd, code, buf, len);
609         return 1;
610 }
611
612 /* write a message to the special error fd */
613 int io_error_write(int f, enum logcode code, char *buf, int len)
614 {
615         if (f == -1) return 0;
616         mplex_write(f, code, buf, len);
617         return 1;
618 }
619
620 /* stop output multiplexing */
621 void io_multiplexing_close(void)
622 {
623         io_multiplexing_out = 0;
624 }
625
626 void io_close_input(int fd)
627 {
628         buffer_f_in = -1;
629 }
630