added msleep() function
[rsync/rsync.git] / io.c
... / ...
CommitLineData
1/*
2 Copyright (C) Andrew Tridgell 1996
3 Copyright (C) Paul Mackerras 1996
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18*/
19
20/*
21 socket and pipe IO utilities used in rsync
22
23 tridge, June 1996
24 */
25#include "rsync.h"
26
27/* if no timeout is specified then use a 60 second select timeout */
28#define SELECT_TIMEOUT 60
29
30extern int bwlimit;
31
32static int io_multiplexing_out;
33static int io_multiplexing_in;
34static int multiplex_in_fd;
35static int multiplex_out_fd;
36static time_t last_io;
37static int eof_error=1;
38extern int verbose;
39extern int io_timeout;
40extern struct stats stats;
41
42static int buffer_f_in = -1;
43static int io_error_fd = -1;
44
45static void read_loop(int fd, char *buf, int len);
46
47void setup_readbuffer(int f_in)
48{
49 buffer_f_in = f_in;
50}
51
52static void check_timeout(void)
53{
54 extern int am_server, am_daemon;
55 time_t t;
56
57 if (!io_timeout) return;
58
59 if (!last_io) {
60 last_io = time(NULL);
61 return;
62 }
63
64 t = time(NULL);
65
66 if (last_io && io_timeout && (t-last_io) >= io_timeout) {
67 if (!am_server && !am_daemon) {
68 rprintf(FERROR,"io timeout after %d second - exiting\n",
69 (int)(t-last_io));
70 }
71 exit_cleanup(RERR_TIMEOUT);
72 }
73}
74
75/* setup the fd used to propogate errors */
76void io_set_error_fd(int fd)
77{
78 io_error_fd = fd;
79}
80
81/* read some data from the error fd and write it to the write log code */
82static void read_error_fd(void)
83{
84 char buf[200];
85 int n;
86 int fd = io_error_fd;
87 int tag, len;
88
89 io_error_fd = -1;
90
91 read_loop(fd, buf, 4);
92 tag = IVAL(buf, 0);
93
94 len = tag & 0xFFFFFF;
95 tag = tag >> 24;
96 tag -= MPLEX_BASE;
97
98 while (len) {
99 n = len;
100 if (n > (sizeof(buf)-1)) n = sizeof(buf)-1;
101 read_loop(fd, buf, n);
102 rwrite((enum logcode)tag, buf, n);
103 len -= n;
104 }
105
106 io_error_fd = fd;
107}
108
109
110static int no_flush;
111
112/* read from a socket with IO timeout. return the number of
113 bytes read. If no bytes can be read then exit, never return
114 a number <= 0 */
115static int read_timeout(int fd, char *buf, int len)
116{
117 int n, ret=0;
118
119 io_flush();
120
121 while (ret == 0) {
122 fd_set fds;
123 struct timeval tv;
124 int fd_count = fd+1;
125
126 FD_ZERO(&fds);
127 FD_SET(fd, &fds);
128 if (io_error_fd != -1) {
129 FD_SET(io_error_fd, &fds);
130 if (io_error_fd > fd) fd_count = io_error_fd+1;
131 }
132
133 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
134 tv.tv_usec = 0;
135
136 errno = 0;
137
138 if (select(fd_count, &fds, NULL, NULL, &tv) < 1) {
139 if (errno == EBADF) {
140 exit_cleanup(RERR_SOCKETIO);
141 }
142 check_timeout();
143 continue;
144 }
145
146 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &fds)) {
147 read_error_fd();
148 }
149
150 if (!FD_ISSET(fd, &fds)) continue;
151
152 n = read(fd, buf, len);
153
154 if (n > 0) {
155 buf += n;
156 len -= n;
157 ret += n;
158 if (io_timeout)
159 last_io = time(NULL);
160 continue;
161 }
162
163 if (n == -1 && errno == EINTR) {
164 continue;
165 }
166
167 if (n == -1 &&
168 (errno == EWOULDBLOCK || errno == EAGAIN)) {
169 continue;
170 }
171
172
173 if (n == 0) {
174 if (eof_error) {
175 rprintf(FERROR,"unexpected EOF in read_timeout\n");
176 }
177 exit_cleanup(RERR_STREAMIO);
178 }
179
180 /* this prevents us trying to write errors on a dead socket */
181 io_multiplexing_close();
182
183 rprintf(FERROR,"read error: %s\n", strerror(errno));
184 exit_cleanup(RERR_STREAMIO);
185 }
186
187 return ret;
188}
189
190/* continue trying to read len bytes - don't return until len
191 has been read */
192static void read_loop(int fd, char *buf, int len)
193{
194 while (len) {
195 int n = read_timeout(fd, buf, len);
196
197 buf += n;
198 len -= n;
199 }
200}
201
202/* read from the file descriptor handling multiplexing -
203 return number of bytes read
204 never return <= 0 */
205static int read_unbuffered(int fd, char *buf, int len)
206{
207 static int remaining;
208 int tag, ret=0;
209 char line[1024];
210
211 if (!io_multiplexing_in || fd != multiplex_in_fd)
212 return read_timeout(fd, buf, len);
213
214 while (ret == 0) {
215 if (remaining) {
216 len = MIN(len, remaining);
217 read_loop(fd, buf, len);
218 remaining -= len;
219 ret = len;
220 continue;
221 }
222
223 read_loop(fd, line, 4);
224 tag = IVAL(line, 0);
225
226 remaining = tag & 0xFFFFFF;
227 tag = tag >> 24;
228
229 if (tag == MPLEX_BASE) continue;
230
231 tag -= MPLEX_BASE;
232
233 if (tag != FERROR && tag != FINFO) {
234 rprintf(FERROR,"unexpected tag %d\n", tag);
235 exit_cleanup(RERR_STREAMIO);
236 }
237
238 if (remaining > sizeof(line)-1) {
239 rprintf(FERROR,"multiplexing overflow %d\n\n",
240 remaining);
241 exit_cleanup(RERR_STREAMIO);
242 }
243
244 read_loop(fd, line, remaining);
245 line[remaining] = 0;
246
247 rprintf((enum logcode)tag,"%s", line);
248 remaining = 0;
249 }
250
251 return ret;
252}
253
254
255/* do a buffered read from fd. don't return until all N bytes
256 have been read. If all N can't be read then exit with an error */
257static void readfd(int fd,char *buffer,int N)
258{
259 int ret;
260 int total=0;
261
262 while (total < N) {
263 io_flush();
264
265 ret = read_unbuffered(fd,buffer + total,N-total);
266 total += ret;
267 }
268
269 stats.total_read += total;
270}
271
272
273int32 read_int(int f)
274{
275 char b[4];
276 int32 ret;
277
278 readfd(f,b,4);
279 ret = IVAL(b,0);
280 if (ret == (int32)0xffffffff) return -1;
281 return ret;
282}
283
284int64 read_longint(int f)
285{
286 extern int remote_version;
287 int64 ret;
288 char b[8];
289 ret = read_int(f);
290
291 if ((int32)ret != (int32)0xffffffff) {
292 return ret;
293 }
294
295#ifdef NO_INT64
296 rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
297 exit_cleanup(RERR_UNSUPPORTED);
298#else
299 if (remote_version >= 16) {
300 readfd(f,b,8);
301 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
302 }
303#endif
304
305 return ret;
306}
307
308void read_buf(int f,char *buf,int len)
309{
310 readfd(f,buf,len);
311}
312
313void read_sbuf(int f,char *buf,int len)
314{
315 read_buf(f,buf,len);
316 buf[len] = 0;
317}
318
319unsigned char read_byte(int f)
320{
321 unsigned char c;
322 read_buf(f,(char *)&c,1);
323 return c;
324}
325
326
327
328/* write len bytes to fd, possibly reading from buffer_f_in if set
329 in order to unclog the pipe. don't return until all len
330 bytes have been written */
331static void writefd_unbuffered(int fd,char *buf,int len)
332{
333 int total = 0;
334 fd_set w_fds, r_fds;
335 int fd_count, count;
336 struct timeval tv;
337
338 no_flush++;
339
340 while (total < len) {
341 FD_ZERO(&w_fds);
342 FD_ZERO(&r_fds);
343 FD_SET(fd,&w_fds);
344 fd_count = fd;
345
346 if (io_error_fd != -1) {
347 FD_SET(io_error_fd,&r_fds);
348 if (io_error_fd > fd_count)
349 fd_count = io_error_fd;
350 }
351
352 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
353 tv.tv_usec = 0;
354
355 errno = 0;
356
357 count = select(fd_count+1,
358 io_error_fd != -1?&r_fds:NULL,
359 &w_fds,NULL,
360 &tv);
361
362 if (count <= 0) {
363 if (errno == EBADF) {
364 exit_cleanup(RERR_SOCKETIO);
365 }
366 check_timeout();
367 continue;
368 }
369
370 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &r_fds)) {
371 read_error_fd();
372 }
373
374 if (FD_ISSET(fd, &w_fds)) {
375 int ret, n = len-total;
376
377 ret = write(fd,buf+total,n);
378
379 if (ret == -1 && errno == EINTR) {
380 continue;
381 }
382
383 if (ret == -1 &&
384 (errno == EWOULDBLOCK || errno == EAGAIN)) {
385 continue;
386 }
387
388 if (ret <= 0) {
389 rprintf(FERROR,"erroring writing %d bytes - exiting\n", len);
390 exit_cleanup(RERR_STREAMIO);
391 }
392
393 /* Sleep after writing to limit I/O bandwidth */
394 if (bwlimit)
395 {
396 tv.tv_sec = 0;
397 tv.tv_usec = ret * 1000 / bwlimit;
398 while (tv.tv_usec > 1000000)
399 {
400 tv.tv_sec++;
401 tv.tv_usec -= 1000000;
402 }
403 select(0, NULL, NULL, NULL, &tv);
404 }
405
406 total += ret;
407
408 if (io_timeout)
409 last_io = time(NULL);
410 }
411 }
412
413 no_flush--;
414}
415
416
417static char *io_buffer;
418static int io_buffer_count;
419
420void io_start_buffering(int fd)
421{
422 if (io_buffer) return;
423 multiplex_out_fd = fd;
424 io_buffer = (char *)malloc(IO_BUFFER_SIZE);
425 if (!io_buffer) out_of_memory("writefd");
426 io_buffer_count = 0;
427}
428
429/* write an message to a multiplexed stream. If this fails then rsync
430 exits */
431static void mplex_write(int fd, enum logcode code, char *buf, int len)
432{
433 char buffer[4096];
434 int n = len;
435
436 SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
437
438 if (n > (sizeof(buffer)-4)) {
439 n = sizeof(buffer)-4;
440 }
441
442 memcpy(&buffer[4], buf, n);
443 writefd_unbuffered(fd, buffer, n+4);
444
445 len -= n;
446 buf += n;
447
448 if (len) {
449 writefd_unbuffered(fd, buf, len);
450 }
451}
452
453
454void io_flush(void)
455{
456 int fd = multiplex_out_fd;
457 if (!io_buffer_count || no_flush) return;
458
459 if (io_multiplexing_out) {
460 mplex_write(fd, FNONE, io_buffer, io_buffer_count);
461 } else {
462 writefd_unbuffered(fd, io_buffer, io_buffer_count);
463 }
464 io_buffer_count = 0;
465}
466
467void io_end_buffering(int fd)
468{
469 io_flush();
470 if (!io_multiplexing_out) {
471 free(io_buffer);
472 io_buffer = NULL;
473 }
474}
475
476static void writefd(int fd,char *buf,int len)
477{
478 stats.total_written += len;
479
480 if (!io_buffer || fd != multiplex_out_fd) {
481 writefd_unbuffered(fd, buf, len);
482 return;
483 }
484
485 while (len) {
486 int n = MIN(len, IO_BUFFER_SIZE-io_buffer_count);
487 if (n > 0) {
488 memcpy(io_buffer+io_buffer_count, buf, n);
489 buf += n;
490 len -= n;
491 io_buffer_count += n;
492 }
493
494 if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
495 }
496}
497
498
499void write_int(int f,int32 x)
500{
501 char b[4];
502 SIVAL(b,0,x);
503 writefd(f,b,4);
504}
505
506void write_longint(int f, int64 x)
507{
508 extern int remote_version;
509 char b[8];
510
511 if (remote_version < 16 || x <= 0x7FFFFFFF) {
512 write_int(f, (int)x);
513 return;
514 }
515
516 write_int(f, (int32)0xFFFFFFFF);
517 SIVAL(b,0,(x&0xFFFFFFFF));
518 SIVAL(b,4,((x>>32)&0xFFFFFFFF));
519
520 writefd(f,b,8);
521}
522
523void write_buf(int f,char *buf,int len)
524{
525 writefd(f,buf,len);
526}
527
528/* write a string to the connection */
529static void write_sbuf(int f,char *buf)
530{
531 write_buf(f, buf, strlen(buf));
532}
533
534
535void write_byte(int f,unsigned char c)
536{
537 write_buf(f,(char *)&c,1);
538}
539
540int read_line(int f, char *buf, int maxlen)
541{
542 eof_error = 0;
543
544 while (maxlen) {
545 buf[0] = 0;
546 read_buf(f, buf, 1);
547 if (buf[0] == 0) return 0;
548 if (buf[0] == '\n') {
549 buf[0] = 0;
550 break;
551 }
552 if (buf[0] != '\r') {
553 buf++;
554 maxlen--;
555 }
556 }
557 if (maxlen == 0) {
558 *buf = 0;
559 return 0;
560 }
561
562 eof_error = 1;
563
564 return 1;
565}
566
567
568void io_printf(int fd, const char *format, ...)
569{
570 va_list ap;
571 char buf[1024];
572 int len;
573
574 va_start(ap, format);
575 len = vslprintf(buf, sizeof(buf), format, ap);
576 va_end(ap);
577
578 if (len < 0) exit_cleanup(RERR_STREAMIO);
579
580 write_sbuf(fd, buf);
581}
582
583
584/* setup for multiplexing an error stream with the data stream */
585void io_start_multiplex_out(int fd)
586{
587 multiplex_out_fd = fd;
588 io_flush();
589 io_start_buffering(fd);
590 io_multiplexing_out = 1;
591}
592
593/* setup for multiplexing an error stream with the data stream */
594void io_start_multiplex_in(int fd)
595{
596 multiplex_in_fd = fd;
597 io_flush();
598 io_multiplexing_in = 1;
599}
600
601/* write an message to the multiplexed error stream */
602int io_multiplex_write(enum logcode code, char *buf, int len)
603{
604 if (!io_multiplexing_out) return 0;
605
606 io_flush();
607 stats.total_written += (len+4);
608 mplex_write(multiplex_out_fd, code, buf, len);
609 return 1;
610}
611
612/* write a message to the special error fd */
613int io_error_write(int f, enum logcode code, char *buf, int len)
614{
615 if (f == -1) return 0;
616 mplex_write(f, code, buf, len);
617 return 1;
618}
619
620/* stop output multiplexing */
621void io_multiplexing_close(void)
622{
623 io_multiplexing_out = 0;
624}
625
626void io_close_input(int fd)
627{
628 buffer_f_in = -1;
629}
630