fixed backup_dir bug introduced with recent memory handling patches
[rsync/rsync.git] / io.c
CommitLineData
720b47f2
AT
1/*
2 Copyright (C) Andrew Tridgell 1996
3 Copyright (C) Paul Mackerras 1996
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18*/
19
20/*
08f15335 21 socket and pipe IO utilities used in rsync
720b47f2
AT
22
23 tridge, June 1996
24 */
25#include "rsync.h"
26
8cd9fd4e
AT
27/* if no timeout is specified then use a 60 second select timeout */
28#define SELECT_TIMEOUT 60
29
ef5d23eb
DD
30extern int bwlimit;
31
8d9dc9f9
AT
32static int io_multiplexing_out;
33static int io_multiplexing_in;
679e7657
AT
34static int multiplex_in_fd;
35static int multiplex_out_fd;
8d9dc9f9 36static time_t last_io;
528bfcd7 37static int eof_error=1;
720b47f2 38extern int verbose;
6ba9279f 39extern int io_timeout;
a800434a 40extern struct stats stats;
720b47f2
AT
41
42static int buffer_f_in = -1;
554e0a8d 43static int io_error_fd = -1;
720b47f2 44
ff41a59f
AT
45static void read_loop(int fd, char *buf, int len);
46
4c36ddbe 47void setup_readbuffer(int f_in)
720b47f2 48{
22d6234e 49 buffer_f_in = f_in;
720b47f2
AT
50}
51
8d9dc9f9
AT
52static void check_timeout(void)
53{
0adb99b9 54 extern int am_server, am_daemon;
8d9dc9f9
AT
55 time_t t;
56
57 if (!io_timeout) return;
58
59 if (!last_io) {
60 last_io = time(NULL);
61 return;
62 }
63
64 t = time(NULL);
65
86ffe37f 66 if (last_io && io_timeout && (t-last_io) >= io_timeout) {
0adb99b9
AT
67 if (!am_server && !am_daemon) {
68 rprintf(FERROR,"io timeout after %d second - exiting\n",
69 (int)(t-last_io));
70 }
65417579 71 exit_cleanup(RERR_TIMEOUT);
8d9dc9f9
AT
72 }
73}
74
554e0a8d
AT
75/* setup the fd used to propogate errors */
76void io_set_error_fd(int fd)
77{
78 io_error_fd = fd;
79}
80
ff41a59f 81/* read some data from the error fd and write it to the write log code */
554e0a8d
AT
82static void read_error_fd(void)
83{
84 char buf[200];
85 int n;
86 int fd = io_error_fd;
ff41a59f
AT
87 int tag, len;
88
554e0a8d
AT
89 io_error_fd = -1;
90
ff41a59f
AT
91 read_loop(fd, buf, 4);
92 tag = IVAL(buf, 0);
93
94 len = tag & 0xFFFFFF;
95 tag = tag >> 24;
96 tag -= MPLEX_BASE;
97
98 while (len) {
99 n = len;
100 if (n > (sizeof(buf)-1)) n = sizeof(buf)-1;
101 read_loop(fd, buf, n);
102 rwrite((enum logcode)tag, buf, n);
103 len -= n;
554e0a8d
AT
104 }
105
106 io_error_fd = fd;
107}
108
720b47f2 109
e44f9a12 110static int no_flush;
720b47f2 111
4c36ddbe
AT
112/* read from a socket with IO timeout. return the number of
113 bytes read. If no bytes can be read then exit, never return
114 a number <= 0 */
115static int read_timeout(int fd, char *buf, int len)
8d9dc9f9 116{
4c36ddbe
AT
117 int n, ret=0;
118
ea2111d1
AT
119 io_flush();
120
4c36ddbe
AT
121 while (ret == 0) {
122 fd_set fds;
123 struct timeval tv;
554e0a8d 124 int fd_count = fd+1;
4c36ddbe
AT
125
126 FD_ZERO(&fds);
127 FD_SET(fd, &fds);
554e0a8d
AT
128 if (io_error_fd != -1) {
129 FD_SET(io_error_fd, &fds);
130 if (io_error_fd > fd) fd_count = io_error_fd+1;
131 }
132
8cd9fd4e 133 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
4c36ddbe
AT
134 tv.tv_usec = 0;
135
554e0a8d
AT
136 errno = 0;
137
138 if (select(fd_count, &fds, NULL, NULL, &tv) < 1) {
139 if (errno == EBADF) {
140 exit_cleanup(RERR_SOCKETIO);
141 }
4c36ddbe
AT
142 check_timeout();
143 continue;
144 }
145
554e0a8d
AT
146 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &fds)) {
147 read_error_fd();
148 }
149
150 if (!FD_ISSET(fd, &fds)) continue;
151
4c36ddbe
AT
152 n = read(fd, buf, len);
153
8d9dc9f9
AT
154 if (n > 0) {
155 buf += n;
156 len -= n;
4c36ddbe
AT
157 ret += n;
158 if (io_timeout)
159 last_io = time(NULL);
160 continue;
8d9dc9f9 161 }
4c36ddbe
AT
162
163 if (n == -1 && errno == EINTR) {
164 continue;
165 }
166
f0359dd0
AT
167 if (n == -1 &&
168 (errno == EWOULDBLOCK || errno == EAGAIN)) {
169 continue;
170 }
171
5d1e1dcf 172
8d9dc9f9 173 if (n == 0) {
528bfcd7 174 if (eof_error) {
ca8e9694 175 rprintf(FERROR,"unexpected EOF in read_timeout\n");
528bfcd7 176 }
65417579 177 exit_cleanup(RERR_STREAMIO);
8d9dc9f9 178 }
8d9dc9f9 179
5d1e1dcf 180 /* this prevents us trying to write errors on a dead socket */
554e0a8d 181 io_multiplexing_close();
5d1e1dcf 182
4c36ddbe 183 rprintf(FERROR,"read error: %s\n", strerror(errno));
65417579 184 exit_cleanup(RERR_STREAMIO);
4c36ddbe 185 }
8d9dc9f9 186
4c36ddbe
AT
187 return ret;
188}
8d9dc9f9 189
4c36ddbe
AT
190/* continue trying to read len bytes - don't return until len
191 has been read */
192static void read_loop(int fd, char *buf, int len)
193{
194 while (len) {
195 int n = read_timeout(fd, buf, len);
196
197 buf += n;
198 len -= n;
8d9dc9f9
AT
199 }
200}
201
cad2bba7 202/* read from the file descriptor handling multiplexing -
4c36ddbe
AT
203 return number of bytes read
204 never return <= 0 */
8d9dc9f9
AT
205static int read_unbuffered(int fd, char *buf, int len)
206{
207 static int remaining;
8d9dc9f9
AT
208 int tag, ret=0;
209 char line[1024];
210
679e7657 211 if (!io_multiplexing_in || fd != multiplex_in_fd)
4c36ddbe 212 return read_timeout(fd, buf, len);
8d9dc9f9
AT
213
214 while (ret == 0) {
215 if (remaining) {
216 len = MIN(len, remaining);
217 read_loop(fd, buf, len);
218 remaining -= len;
219 ret = len;
220 continue;
221 }
222
ff41a59f
AT
223 read_loop(fd, line, 4);
224 tag = IVAL(line, 0);
679e7657 225
8d9dc9f9
AT
226 remaining = tag & 0xFFFFFF;
227 tag = tag >> 24;
228
229 if (tag == MPLEX_BASE) continue;
230
231 tag -= MPLEX_BASE;
232
233 if (tag != FERROR && tag != FINFO) {
234 rprintf(FERROR,"unexpected tag %d\n", tag);
65417579 235 exit_cleanup(RERR_STREAMIO);
8d9dc9f9
AT
236 }
237
238 if (remaining > sizeof(line)-1) {
239 rprintf(FERROR,"multiplexing overflow %d\n\n",
240 remaining);
65417579 241 exit_cleanup(RERR_STREAMIO);
8d9dc9f9
AT
242 }
243
244 read_loop(fd, line, remaining);
245 line[remaining] = 0;
246
ff41a59f 247 rprintf((enum logcode)tag,"%s", line);
8d9dc9f9
AT
248 remaining = 0;
249 }
250
251 return ret;
252}
253
254
4c36ddbe
AT
255/* do a buffered read from fd. don't return until all N bytes
256 have been read. If all N can't be read then exit with an error */
257static void readfd(int fd,char *buffer,int N)
720b47f2 258{
6ba9279f
AT
259 int ret;
260 int total=0;
6ba9279f 261
6ba9279f 262 while (total < N) {
8d9dc9f9
AT
263 io_flush();
264
4c36ddbe 265 ret = read_unbuffered(fd,buffer + total,N-total);
6ba9279f 266 total += ret;
7f28dbee 267 }
1b7c47cb
AT
268
269 stats.total_read += total;
720b47f2
AT
270}
271
272
b7922338 273int32 read_int(int f)
720b47f2 274{
4c36ddbe 275 char b[4];
d730b113
AT
276 int32 ret;
277
4c36ddbe 278 readfd(f,b,4);
d730b113
AT
279 ret = IVAL(b,0);
280 if (ret == (int32)0xffffffff) return -1;
281 return ret;
720b47f2
AT
282}
283
71c46176 284int64 read_longint(int f)
3a6a366f
AT
285{
286 extern int remote_version;
71c46176 287 int64 ret;
3a6a366f
AT
288 char b[8];
289 ret = read_int(f);
71c46176 290
8de330a3
AT
291 if ((int32)ret != (int32)0xffffffff) {
292 return ret;
293 }
71c46176 294
3bee6733 295#ifdef NO_INT64
9486289c 296 rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
65417579 297 exit_cleanup(RERR_UNSUPPORTED);
71c46176
AT
298#else
299 if (remote_version >= 16) {
4c36ddbe 300 readfd(f,b,8);
71c46176 301 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
3a6a366f 302 }
71c46176
AT
303#endif
304
3a6a366f
AT
305 return ret;
306}
307
720b47f2
AT
308void read_buf(int f,char *buf,int len)
309{
4c36ddbe 310 readfd(f,buf,len);
720b47f2
AT
311}
312
575f2fca
AT
313void read_sbuf(int f,char *buf,int len)
314{
315 read_buf(f,buf,len);
316 buf[len] = 0;
317}
318
182dca5c
AT
319unsigned char read_byte(int f)
320{
4c36ddbe
AT
321 unsigned char c;
322 read_buf(f,(char *)&c,1);
323 return c;
182dca5c 324}
720b47f2 325
7bec6a5c 326
7bec6a5c 327
4c36ddbe
AT
328/* write len bytes to fd, possibly reading from buffer_f_in if set
329 in order to unclog the pipe. don't return until all len
330 bytes have been written */
331static void writefd_unbuffered(int fd,char *buf,int len)
720b47f2 332{
8d9dc9f9
AT
333 int total = 0;
334 fd_set w_fds, r_fds;
4c36ddbe 335 int fd_count, count;
8d9dc9f9 336 struct timeval tv;
720b47f2 337
e44f9a12
AT
338 no_flush++;
339
4c36ddbe 340 while (total < len) {
8d9dc9f9
AT
341 FD_ZERO(&w_fds);
342 FD_ZERO(&r_fds);
343 FD_SET(fd,&w_fds);
554e0a8d 344 fd_count = fd;
4c36ddbe 345
554e0a8d
AT
346 if (io_error_fd != -1) {
347 FD_SET(io_error_fd,&r_fds);
348 if (io_error_fd > fd_count)
349 fd_count = io_error_fd;
8d9dc9f9
AT
350 }
351
8cd9fd4e 352 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
8d9dc9f9 353 tv.tv_usec = 0;
4c36ddbe 354
554e0a8d
AT
355 errno = 0;
356
357 count = select(fd_count+1,
08f15335 358 io_error_fd != -1?&r_fds:NULL,
4c36ddbe 359 &w_fds,NULL,
8cd9fd4e 360 &tv);
4c36ddbe
AT
361
362 if (count <= 0) {
554e0a8d
AT
363 if (errno == EBADF) {
364 exit_cleanup(RERR_SOCKETIO);
365 }
8d9dc9f9
AT
366 check_timeout();
367 continue;
368 }
4c36ddbe 369
554e0a8d
AT
370 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &r_fds)) {
371 read_error_fd();
372 }
373
8d9dc9f9 374 if (FD_ISSET(fd, &w_fds)) {
07b7c86c
AT
375 int ret, n = len-total;
376
f0359dd0 377 ret = write(fd,buf+total,n);
4c36ddbe
AT
378
379 if (ret == -1 && errno == EINTR) {
380 continue;
381 }
382
f0359dd0
AT
383 if (ret == -1 &&
384 (errno == EWOULDBLOCK || errno == EAGAIN)) {
385 continue;
386 }
387
4c36ddbe
AT
388 if (ret <= 0) {
389 rprintf(FERROR,"erroring writing %d bytes - exiting\n", len);
65417579 390 exit_cleanup(RERR_STREAMIO);
4c36ddbe
AT
391 }
392
ef5d23eb
DD
393 /* Sleep after writing to limit I/O bandwidth */
394 if (bwlimit)
395 {
396 tv.tv_sec = 0;
397 tv.tv_usec = ret * 1000 / bwlimit;
398 while (tv.tv_usec > 1000000)
399 {
400 tv.tv_sec++;
401 tv.tv_usec -= 1000000;
402 }
403 select(0, NULL, NULL, NULL, &tv);
404 }
405
4c36ddbe 406 total += ret;
a800434a 407
4c36ddbe
AT
408 if (io_timeout)
409 last_io = time(NULL);
8d9dc9f9 410 }
4c36ddbe 411 }
e44f9a12
AT
412
413 no_flush--;
720b47f2
AT
414}
415
8d9dc9f9 416
d6dead6b
AT
417static char *io_buffer;
418static int io_buffer_count;
419
420void io_start_buffering(int fd)
421{
8d9dc9f9 422 if (io_buffer) return;
679e7657 423 multiplex_out_fd = fd;
ff41a59f 424 io_buffer = (char *)malloc(IO_BUFFER_SIZE);
d6dead6b
AT
425 if (!io_buffer) out_of_memory("writefd");
426 io_buffer_count = 0;
ff41a59f
AT
427}
428
429/* write an message to a multiplexed stream. If this fails then rsync
430 exits */
431static void mplex_write(int fd, enum logcode code, char *buf, int len)
432{
433 char buffer[4096];
434 int n = len;
8d9dc9f9 435
ff41a59f
AT
436 SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
437
6d7b6081
AT
438 if (n > (sizeof(buffer)-4)) {
439 n = sizeof(buffer)-4;
ff41a59f
AT
440 }
441
442 memcpy(&buffer[4], buf, n);
443 writefd_unbuffered(fd, buffer, n+4);
444
445 len -= n;
446 buf += n;
447
6d7b6081
AT
448 if (len) {
449 writefd_unbuffered(fd, buf, len);
450 }
d6dead6b
AT
451}
452
ff41a59f 453
8d9dc9f9 454void io_flush(void)
d6dead6b 455{
679e7657 456 int fd = multiplex_out_fd;
e44f9a12 457 if (!io_buffer_count || no_flush) return;
8d9dc9f9
AT
458
459 if (io_multiplexing_out) {
0f3203c3 460 mplex_write(fd, FNONE, io_buffer, io_buffer_count);
8d9dc9f9 461 } else {
4c36ddbe 462 writefd_unbuffered(fd, io_buffer, io_buffer_count);
d6dead6b 463 }
8d9dc9f9
AT
464 io_buffer_count = 0;
465}
466
467void io_end_buffering(int fd)
468{
469 io_flush();
470 if (!io_multiplexing_out) {
ff41a59f 471 free(io_buffer);
8d9dc9f9
AT
472 io_buffer = NULL;
473 }
d6dead6b
AT
474}
475
4c36ddbe 476static void writefd(int fd,char *buf,int len)
d6dead6b 477{
1b7c47cb
AT
478 stats.total_written += len;
479
554e0a8d 480 if (!io_buffer || fd != multiplex_out_fd) {
4c36ddbe
AT
481 writefd_unbuffered(fd, buf, len);
482 return;
483 }
d6dead6b
AT
484
485 while (len) {
486 int n = MIN(len, IO_BUFFER_SIZE-io_buffer_count);
487 if (n > 0) {
488 memcpy(io_buffer+io_buffer_count, buf, n);
489 buf += n;
490 len -= n;
491 io_buffer_count += n;
492 }
493
8d9dc9f9 494 if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
d6dead6b 495 }
d6dead6b 496}
720b47f2
AT
497
498
b7922338 499void write_int(int f,int32 x)
720b47f2 500{
8d9dc9f9
AT
501 char b[4];
502 SIVAL(b,0,x);
4c36ddbe 503 writefd(f,b,4);
720b47f2
AT
504}
505
71c46176 506void write_longint(int f, int64 x)
3a6a366f
AT
507{
508 extern int remote_version;
509 char b[8];
3a6a366f
AT
510
511 if (remote_version < 16 || x <= 0x7FFFFFFF) {
512 write_int(f, (int)x);
513 return;
514 }
515
8de330a3 516 write_int(f, (int32)0xFFFFFFFF);
3a6a366f
AT
517 SIVAL(b,0,(x&0xFFFFFFFF));
518 SIVAL(b,4,((x>>32)&0xFFFFFFFF));
519
4c36ddbe 520 writefd(f,b,8);
3a6a366f
AT
521}
522
720b47f2
AT
523void write_buf(int f,char *buf,int len)
524{
4c36ddbe 525 writefd(f,buf,len);
720b47f2
AT
526}
527
f0fca04e 528/* write a string to the connection */
6e4fb64e 529static void write_sbuf(int f,char *buf)
f0fca04e
AT
530{
531 write_buf(f, buf, strlen(buf));
532}
533
720b47f2 534
182dca5c
AT
535void write_byte(int f,unsigned char c)
536{
f0fca04e 537 write_buf(f,(char *)&c,1);
182dca5c
AT
538}
539
f0fca04e
AT
540int read_line(int f, char *buf, int maxlen)
541{
528bfcd7
AT
542 eof_error = 0;
543
f0fca04e 544 while (maxlen) {
528bfcd7 545 buf[0] = 0;
f0fca04e 546 read_buf(f, buf, 1);
528bfcd7 547 if (buf[0] == 0) return 0;
f0fca04e
AT
548 if (buf[0] == '\n') {
549 buf[0] = 0;
550 break;
551 }
552 if (buf[0] != '\r') {
553 buf++;
554 maxlen--;
555 }
556 }
557 if (maxlen == 0) {
558 *buf = 0;
559 return 0;
560 }
528bfcd7
AT
561
562 eof_error = 1;
563
f0fca04e
AT
564 return 1;
565}
566
567
568void io_printf(int fd, const char *format, ...)
569{
570 va_list ap;
571 char buf[1024];
572 int len;
573
574 va_start(ap, format);
37f9805d 575 len = vslprintf(buf, sizeof(buf), format, ap);
f0fca04e
AT
576 va_end(ap);
577
65417579 578 if (len < 0) exit_cleanup(RERR_STREAMIO);
f0fca04e
AT
579
580 write_sbuf(fd, buf);
581}
8d9dc9f9
AT
582
583
584/* setup for multiplexing an error stream with the data stream */
585void io_start_multiplex_out(int fd)
586{
679e7657
AT
587 multiplex_out_fd = fd;
588 io_flush();
8d9dc9f9
AT
589 io_start_buffering(fd);
590 io_multiplexing_out = 1;
591}
592
593/* setup for multiplexing an error stream with the data stream */
594void io_start_multiplex_in(int fd)
595{
679e7657
AT
596 multiplex_in_fd = fd;
597 io_flush();
8d9dc9f9
AT
598 io_multiplexing_in = 1;
599}
600
554e0a8d 601/* write an message to the multiplexed error stream */
ff41a59f 602int io_multiplex_write(enum logcode code, char *buf, int len)
8d9dc9f9
AT
603{
604 if (!io_multiplexing_out) return 0;
605
606 io_flush();
1b7c47cb 607 stats.total_written += (len+4);
ff41a59f 608 mplex_write(multiplex_out_fd, code, buf, len);
8d9dc9f9
AT
609 return 1;
610}
611
554e0a8d 612/* write a message to the special error fd */
ff41a59f 613int io_error_write(int f, enum logcode code, char *buf, int len)
554e0a8d
AT
614{
615 if (f == -1) return 0;
ff41a59f 616 mplex_write(f, code, buf, len);
554e0a8d
AT
617 return 1;
618}
619
620/* stop output multiplexing */
621void io_multiplexing_close(void)
622{
623 io_multiplexing_out = 0;
624}
625
8d9dc9f9
AT
626void io_close_input(int fd)
627{
628 buffer_f_in = -1;
629}
8b35435f 630