Don't use single-line "if (condition) statement;" idiom.
[rsync/rsync.git] / io.c
CommitLineData
7a24c346 1/* -*- c-file-style: "linux" -*-
d62bcc17
WD
2 *
3 * Copyright (C) 1996-2001 by Andrew Tridgell
880da007
MP
4 * Copyright (C) Paul Mackerras 1996
5 * Copyright (C) 2001, 2002 by Martin Pool <mbp@samba.org>
d62bcc17 6 *
880da007
MP
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
d62bcc17 11 *
880da007
MP
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
d62bcc17 16 *
880da007
MP
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 */
720b47f2 21
87ee2481 22/**
87ee2481
MP
23 * @file io.c
24 *
6ed6d7f5 25 * Socket and pipe I/O utilities used in rsync.
87ee2481
MP
26 *
27 * rsync provides its own multiplexing system, which is used to send
28 * stderr and stdout over a single socket. We need this because
29 * stdout normally carries the binary data stream, and stderr all our
30 * error messages.
31 *
32 * For historical reasons this is off during the start of the
33 * connection, but it's switched on quite early using
34 * io_start_multiplex_out() and io_start_multiplex_in().
35 **/
720b47f2 36
720b47f2
AT
37#include "rsync.h"
38
880da007 39/** If no timeout is specified then use a 60 second select timeout */
8cd9fd4e
AT
40#define SELECT_TIMEOUT 60
41
8d9dc9f9
AT
42static int io_multiplexing_out;
43static int io_multiplexing_in;
76c21947
WD
44static int multiplex_in_fd = -1;
45static int multiplex_out_fd = -1;
8d9dc9f9 46static time_t last_io;
7a55d06e
MP
47static int no_flush;
48
49extern int bwlimit;
71e58630 50extern size_t bwlimit_writemax;
720b47f2 51extern int verbose;
6ba9279f 52extern int io_timeout;
d17e1dd2
WD
53extern int am_server;
54extern int am_daemon;
55extern int am_sender;
a800434a 56extern struct stats stats;
720b47f2 57
7a55d06e 58
a86179f4 59const char phase_unknown[] = "unknown";
805edf9d 60
98b332ed
MP
61/**
62 * The connection might be dropped at some point; perhaps because the
63 * remote instance crashed. Just giving the offset on the stream is
64 * not very helpful. So instead we try to make io_phase_name point to
65 * something useful.
eca2adb4 66 *
6ed6d7f5 67 * For buffered/multiplexed I/O these names will be somewhat
805edf9d 68 * approximate; perhaps for ease of support we would rather make the
6ed6d7f5 69 * buffer always flush when a single application-level I/O finishes.
805edf9d 70 *
eca2adb4
MP
71 * @todo Perhaps we want some simple stack functionality, but there's
72 * no need to overdo it.
98b332ed 73 **/
805edf9d
MP
74const char *io_write_phase = phase_unknown;
75const char *io_read_phase = phase_unknown;
98b332ed 76
7a55d06e
MP
77/** Ignore EOF errors while reading a module listing if the remote
78 version is 24 or less. */
79int kludge_around_eof = False;
80
d17e1dd2
WD
81int msg_fd_in = -1;
82int msg_fd_out = -1;
7a55d06e 83
56014c8c
WD
84static int io_filesfrom_f_in = -1;
85static int io_filesfrom_f_out = -1;
86static char io_filesfrom_buf[2048];
87static char *io_filesfrom_bp;
88static char io_filesfrom_lastchar;
89static int io_filesfrom_buflen;
720b47f2 90
9dd891bb 91static void read_loop(int fd, char *buf, size_t len);
ff41a59f 92
d17e1dd2
WD
93struct redo_list {
94 struct redo_list *next;
95 int num;
96};
97
98static struct redo_list *redo_list_head;
99static struct redo_list *redo_list_tail;
100
101struct msg_list {
102 struct msg_list *next;
103 char *buf;
104 int len;
105};
106
107static struct msg_list *msg_list_head;
108static struct msg_list *msg_list_tail;
109
110static void redo_list_add(int num)
111{
112 struct redo_list *rl;
113
114 if (!(rl = new(struct redo_list)))
115 exit_cleanup(RERR_MALLOC);
116 rl->next = NULL;
117 rl->num = num;
118 if (redo_list_tail)
119 redo_list_tail->next = rl;
120 else
121 redo_list_head = rl;
122 redo_list_tail = rl;
123}
124
8d9dc9f9
AT
125static void check_timeout(void)
126{
127 time_t t;
90ba34e2 128
d17e1dd2
WD
129 if (!io_timeout)
130 return;
8d9dc9f9
AT
131
132 if (!last_io) {
133 last_io = time(NULL);
134 return;
135 }
136
137 t = time(NULL);
138
86ffe37f 139 if (last_io && io_timeout && (t-last_io) >= io_timeout) {
0adb99b9 140 if (!am_server && !am_daemon) {
d62bcc17 141 rprintf(FERROR, "io timeout after %d seconds - exiting\n",
0adb99b9
AT
142 (int)(t-last_io));
143 }
65417579 144 exit_cleanup(RERR_TIMEOUT);
8d9dc9f9
AT
145 }
146}
147
d17e1dd2
WD
148/** Setup the fd used to receive MSG_* messages. Only needed when
149 * we're the generator because the sender and receiver both use the
6ed6d7f5 150 * multiplexed I/O setup. */
d17e1dd2
WD
151void set_msg_fd_in(int fd)
152{
153 msg_fd_in = fd;
154}
155
156/** Setup the fd used to send our MSG_* messages. Only needed when
157 * we're the receiver because the generator and the sender both use
6ed6d7f5 158 * the multiplexed I/O setup. */
d17e1dd2
WD
159void set_msg_fd_out(int fd)
160{
161 msg_fd_out = fd;
162 set_nonblocking(msg_fd_out);
163}
164
165/* Add a message to the pending MSG_* list. */
166static void msg_list_add(int code, char *buf, int len)
554e0a8d 167{
d17e1dd2
WD
168 struct msg_list *ml;
169
170 if (!(ml = new(struct msg_list)))
171 exit_cleanup(RERR_MALLOC);
172 ml->next = NULL;
173 if (!(ml->buf = new_array(char, len+4)))
174 exit_cleanup(RERR_MALLOC);
175 SIVAL(ml->buf, 0, ((code+MPLEX_BASE)<<24) | len);
176 memcpy(ml->buf+4, buf, len);
177 ml->len = len+4;
178 if (msg_list_tail)
179 msg_list_tail->next = ml;
180 else
181 msg_list_head = ml;
182 msg_list_tail = ml;
554e0a8d
AT
183}
184
d17e1dd2
WD
185void send_msg(enum msgcode code, char *buf, int len)
186{
187 msg_list_add(code, buf, len);
188 msg_list_push(NORMAL_FLUSH);
189}
190
191/** Read a message from the MSG_* fd and dispatch it. This is only
192 * called by the generator. */
193static void read_msg_fd(void)
554e0a8d
AT
194{
195 char buf[200];
06ce139f 196 size_t n;
d17e1dd2 197 int fd = msg_fd_in;
ff41a59f
AT
198 int tag, len;
199
d17e1dd2
WD
200 /* Temporarily disable msg_fd_in. This is needed because we
201 * may call a write routine that could try to call us back. */
202 msg_fd_in = -1;
554e0a8d 203
ff41a59f
AT
204 read_loop(fd, buf, 4);
205 tag = IVAL(buf, 0);
206
207 len = tag & 0xFFFFFF;
d17e1dd2 208 tag = (tag >> 24) - MPLEX_BASE;
ff41a59f 209
d17e1dd2
WD
210 switch (tag) {
211 case MSG_DONE:
13c7bcbb
WD
212 if (len != 0) {
213 rprintf(FERROR, "invalid message %d:%d\n", tag, len);
d17e1dd2 214 exit_cleanup(RERR_STREAMIO);
13c7bcbb 215 }
d17e1dd2
WD
216 redo_list_add(-1);
217 break;
218 case MSG_REDO:
13c7bcbb
WD
219 if (len != 4) {
220 rprintf(FERROR, "invalid message %d:%d\n", tag, len);
d17e1dd2 221 exit_cleanup(RERR_STREAMIO);
13c7bcbb 222 }
d17e1dd2
WD
223 read_loop(fd, buf, 4);
224 redo_list_add(IVAL(buf,0));
225 break;
226 case MSG_INFO:
227 case MSG_ERROR:
228 case MSG_LOG:
229 while (len) {
230 n = len;
231 if (n >= sizeof buf)
232 n = sizeof buf - 1;
233 read_loop(fd, buf, n);
234 rwrite((enum logcode)tag, buf, n);
235 len -= n;
236 }
237 break;
238 default:
13c7bcbb 239 rprintf(FERROR, "unknown message %d:%d\n", tag, len);
d17e1dd2
WD
240 exit_cleanup(RERR_STREAMIO);
241 }
242
243 msg_fd_in = fd;
244}
245
246/* Try to push messages off the list onto the wire. If we leave with more
247 * to do, return 0. On error, return -1. If everything flushed, return 1.
248 * This is only called by the receiver. */
249int msg_list_push(int flush_it_all)
250{
251 static int written = 0;
252 struct timeval tv;
253 fd_set fds;
254
255 if (msg_fd_out < 0)
256 return -1;
257
258 while (msg_list_head) {
259 struct msg_list *ml = msg_list_head;
260 int n = write(msg_fd_out, ml->buf + written, ml->len - written);
261 if (n < 0) {
262 if (errno == EINTR)
263 continue;
264 if (errno != EWOULDBLOCK && errno != EAGAIN)
265 return -1;
266 if (!flush_it_all)
267 return 0;
268 FD_ZERO(&fds);
269 FD_SET(msg_fd_out, &fds);
270 tv.tv_sec = io_timeout ? io_timeout : SELECT_TIMEOUT;
271 tv.tv_usec = 0;
272 if (!select(msg_fd_out+1, NULL, &fds, NULL, &tv))
273 check_timeout();
274 } else if ((written += n) == ml->len) {
275 free(ml->buf);
276 msg_list_head = ml->next;
277 if (!msg_list_head)
278 msg_list_tail = NULL;
279 free(ml);
280 written = 0;
281 }
554e0a8d 282 }
d17e1dd2
WD
283 return 1;
284}
285
286int get_redo_num(void)
287{
288 struct redo_list *next;
289 int num;
290
291 while (!redo_list_head)
292 read_msg_fd();
554e0a8d 293
d17e1dd2
WD
294 num = redo_list_head->num;
295 next = redo_list_head->next;
296 free(redo_list_head);
297 redo_list_head = next;
298 if (!next)
299 redo_list_tail = NULL;
300
301 return num;
554e0a8d
AT
302}
303
56014c8c
WD
304/**
305 * When we're the receiver and we have a local --files-from list of names
306 * that needs to be sent over the socket to the sender, we have to do two
307 * things at the same time: send the sender a list of what files we're
308 * processing and read the incoming file+info list from the sender. We do
309 * this by augmenting the read_timeout() function to copy this data. It
310 * uses the io_filesfrom_buf to read a block of data from f_in (when it is
311 * ready, since it might be a pipe) and then blast it out f_out (when it
312 * is ready to receive more data).
313 */
314void io_set_filesfrom_fds(int f_in, int f_out)
315{
316 io_filesfrom_f_in = f_in;
317 io_filesfrom_f_out = f_out;
318 io_filesfrom_bp = io_filesfrom_buf;
319 io_filesfrom_lastchar = '\0';
320 io_filesfrom_buflen = 0;
321}
720b47f2 322
880da007
MP
323/**
324 * It's almost always an error to get an EOF when we're trying to read
325 * from the network, because the protocol is self-terminating.
326 *
327 * However, there is one unfortunate cases where it is not, which is
328 * rsync <2.4.6 sending a list of modules on a server, since the list
329 * is terminated by closing the socket. So, for the section of the
330 * program where that is a problem (start_socket_client),
331 * kludge_around_eof is True and we just exit.
332 */
3151cbae 333static void whine_about_eof(void)
7a55d06e 334{
7a55d06e 335 if (kludge_around_eof)
3151cbae 336 exit_cleanup(0);
7a55d06e 337 else {
d62bcc17 338 rprintf(FERROR, RSYNC_NAME ": connection unexpectedly closed "
3151cbae 339 "(%.0f bytes read so far)\n",
d62bcc17 340 (double)stats.total_read);
3151cbae
WD
341
342 exit_cleanup(RERR_STREAMIO);
7a55d06e
MP
343 }
344}
720b47f2 345
7a55d06e 346
3151cbae 347static void die_from_readerr(int err)
7a55d06e
MP
348{
349 /* this prevents us trying to write errors on a dead socket */
350 io_multiplexing_close();
3151cbae 351
d62bcc17 352 rsyserr(FERROR, err, "read error");
7a55d06e
MP
353 exit_cleanup(RERR_STREAMIO);
354}
355
356
880da007 357/**
6ed6d7f5 358 * Read from a socket with I/O timeout. return the number of bytes
c3563c46
MP
359 * read. If no bytes can be read then exit, never return a number <= 0.
360 *
8886f8d0
MP
361 * TODO: If the remote shell connection fails, then current versions
362 * actually report an "unexpected EOF" error here. Since it's a
363 * fairly common mistake to try to use rsh when ssh is required, we
364 * should trap that: if we fail to read any data at all, we should
365 * give a better explanation. We can tell whether the connection has
366 * started by looking e.g. at whether the remote version is known yet.
c3563c46 367 */
3151cbae 368static int read_timeout(int fd, char *buf, size_t len)
8d9dc9f9 369{
d62bcc17 370 int n, ret = 0;
4c36ddbe 371
d17e1dd2 372 io_flush(NORMAL_FLUSH);
ea2111d1 373
4c36ddbe 374 while (ret == 0) {
7a55d06e 375 /* until we manage to read *something* */
56014c8c 376 fd_set r_fds, w_fds;
4c36ddbe 377 struct timeval tv;
554e0a8d 378 int fd_count = fd+1;
a57873b7 379 int count;
4c36ddbe 380
56014c8c
WD
381 FD_ZERO(&r_fds);
382 FD_SET(fd, &r_fds);
d17e1dd2
WD
383 if (msg_fd_in >= 0) {
384 FD_SET(msg_fd_in, &r_fds);
385 if (msg_fd_in >= fd_count)
386 fd_count = msg_fd_in+1;
56014c8c 387 }
3309507d 388 if (io_filesfrom_f_out >= 0) {
56014c8c
WD
389 int new_fd;
390 if (io_filesfrom_buflen == 0) {
3309507d 391 if (io_filesfrom_f_in >= 0) {
56014c8c
WD
392 FD_SET(io_filesfrom_f_in, &r_fds);
393 new_fd = io_filesfrom_f_in;
394 } else {
395 io_filesfrom_f_out = -1;
396 new_fd = -1;
397 }
398 } else {
399 FD_ZERO(&w_fds);
400 FD_SET(io_filesfrom_f_out, &w_fds);
401 new_fd = io_filesfrom_f_out;
402 }
d17e1dd2
WD
403 if (new_fd >= fd_count)
404 fd_count = new_fd+1;
554e0a8d
AT
405 }
406
8cd9fd4e 407 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
4c36ddbe
AT
408 tv.tv_usec = 0;
409
554e0a8d
AT
410 errno = 0;
411
56014c8c
WD
412 count = select(fd_count, &r_fds,
413 io_filesfrom_buflen? &w_fds : NULL,
414 NULL, &tv);
a57873b7
AT
415
416 if (count == 0) {
d17e1dd2 417 msg_list_push(NORMAL_FLUSH);
a57873b7
AT
418 check_timeout();
419 }
420
421 if (count <= 0) {
f89b9368 422 if (errno == EBADF)
554e0a8d 423 exit_cleanup(RERR_SOCKETIO);
4c36ddbe
AT
424 continue;
425 }
426
d17e1dd2
WD
427 if (msg_fd_in >= 0 && FD_ISSET(msg_fd_in, &r_fds))
428 read_msg_fd();
554e0a8d 429
3309507d 430 if (io_filesfrom_f_out >= 0) {
56014c8c
WD
431 if (io_filesfrom_buflen) {
432 if (FD_ISSET(io_filesfrom_f_out, &w_fds)) {
433 int l = write(io_filesfrom_f_out,
434 io_filesfrom_bp,
435 io_filesfrom_buflen);
436 if (l > 0) {
437 if (!(io_filesfrom_buflen -= l))
438 io_filesfrom_bp = io_filesfrom_buf;
439 else
440 io_filesfrom_bp += l;
441 } else {
442 /* XXX should we complain? */
443 io_filesfrom_f_out = -1;
444 }
445 }
3309507d 446 } else if (io_filesfrom_f_in >= 0) {
56014c8c
WD
447 if (FD_ISSET(io_filesfrom_f_in, &r_fds)) {
448 int l = read(io_filesfrom_f_in,
449 io_filesfrom_buf,
450 sizeof io_filesfrom_buf);
451 if (l <= 0) {
452 /* Send end-of-file marker */
453 io_filesfrom_buf[0] = '\0';
454 io_filesfrom_buf[1] = '\0';
455 io_filesfrom_buflen = io_filesfrom_lastchar? 2 : 1;
456 io_filesfrom_f_in = -1;
457 } else {
458 extern int eol_nulls;
459 if (!eol_nulls) {
460 char *s = io_filesfrom_buf + l;
461 /* Transform CR and/or LF into '\0' */
462 while (s-- > io_filesfrom_buf) {
463 if (*s == '\n' || *s == '\r')
464 *s = '\0';
465 }
466 }
467 if (!io_filesfrom_lastchar) {
468 /* Last buf ended with a '\0', so don't
469 * let this buf start with one. */
470 while (l && !*io_filesfrom_bp)
471 io_filesfrom_bp++, l--;
472 }
473 if (!l)
474 io_filesfrom_bp = io_filesfrom_buf;
475 else {
476 char *f = io_filesfrom_bp;
477 char *t = f;
478 char *eob = f + l;
479 /* Eliminate any multi-'\0' runs. */
480 while (f != eob) {
481 if (!(*t++ = *f++)) {
482 while (f != eob && !*f)
483 f++, l--;
484 }
485 }
486 io_filesfrom_lastchar = f[-1];
487 }
488 io_filesfrom_buflen = l;
489 }
490 }
491 }
492 }
493
f89b9368
WD
494 if (!FD_ISSET(fd, &r_fds))
495 continue;
554e0a8d 496
4c36ddbe
AT
497 n = read(fd, buf, len);
498
8d9dc9f9
AT
499 if (n > 0) {
500 buf += n;
501 len -= n;
4c36ddbe
AT
502 ret += n;
503 if (io_timeout)
504 last_io = time(NULL);
505 continue;
7a55d06e 506 } else if (n == 0) {
3151cbae 507 whine_about_eof();
7a55d06e 508 return -1; /* doesn't return */
3309507d 509 } else if (n < 0) {
d62bcc17
WD
510 if (errno == EINTR || errno == EWOULDBLOCK
511 || errno == EAGAIN)
7a55d06e 512 continue;
3151cbae 513 die_from_readerr(errno);
8d9dc9f9 514 }
4c36ddbe 515 }
8d9dc9f9 516
4c36ddbe
AT
517 return ret;
518}
8d9dc9f9 519
56014c8c
WD
520/**
521 * Read a line into the "fname" buffer (which must be at least MAXPATHLEN
522 * characters long).
523 */
524int read_filesfrom_line(int fd, char *fname)
525{
526 char ch, *s, *eob = fname + MAXPATHLEN - 1;
527 int cnt;
528 extern int io_timeout;
529 extern int eol_nulls;
530 extern char *remote_filesfrom_file;
55d5937d 531 int reading_remotely = remote_filesfrom_file != NULL;
56014c8c
WD
532 int nulls = eol_nulls || reading_remotely;
533
534 start:
535 s = fname;
536 while (1) {
537 cnt = read(fd, &ch, 1);
538 if (cnt < 0 && (errno == EWOULDBLOCK
539 || errno == EINTR || errno == EAGAIN)) {
540 struct timeval tv;
541 fd_set fds;
542 FD_ZERO(&fds);
543 FD_SET(fd, &fds);
544 tv.tv_sec = io_timeout? io_timeout : SELECT_TIMEOUT;
545 tv.tv_usec = 0;
546 if (!select(fd+1, &fds, NULL, NULL, &tv))
547 check_timeout();
548 continue;
549 }
550 if (cnt != 1)
551 break;
552 if (nulls? !ch : (ch == '\r' || ch == '\n')) {
553 /* Skip empty lines if reading locally. */
554 if (!reading_remotely && s == fname)
555 continue;
556 break;
557 }
558 if (s < eob)
559 *s++ = ch;
560 }
561 *s = '\0';
7a55d06e 562
6b45fcf1
WD
563 /* Dump comments. */
564 if (*fname == '#' || *fname == ';')
56014c8c
WD
565 goto start;
566
567 return s - fname;
568}
7a55d06e
MP
569
570
880da007
MP
571/**
572 * Continue trying to read len bytes - don't return until len has been
573 * read.
574 **/
3151cbae 575static void read_loop(int fd, char *buf, size_t len)
4c36ddbe
AT
576{
577 while (len) {
578 int n = read_timeout(fd, buf, len);
579
580 buf += n;
581 len -= n;
8d9dc9f9
AT
582 }
583}
584
7a55d06e
MP
585
586/**
587 * Read from the file descriptor handling multiplexing - return number
588 * of bytes read.
d62bcc17
WD
589 *
590 * Never returns <= 0.
7a55d06e 591 */
9dd891bb 592static int read_unbuffered(int fd, char *buf, size_t len)
8d9dc9f9 593{
6fe25398 594 static size_t remaining;
909ce14f 595 int tag, ret = 0;
8d9dc9f9 596 char line[1024];
76c21947
WD
597 static char *buffer;
598 static size_t bufferIdx = 0;
599 static size_t bufferSz;
8d9dc9f9 600
76c21947 601 if (fd != multiplex_in_fd)
4c36ddbe 602 return read_timeout(fd, buf, len);
8d9dc9f9 603
76c21947
WD
604 if (!io_multiplexing_in && remaining == 0) {
605 if (!buffer) {
606 bufferSz = 2 * IO_BUFFER_SIZE;
607 buffer = new_array(char, bufferSz);
f89b9368
WD
608 if (!buffer)
609 out_of_memory("read_unbuffered");
76c21947
WD
610 }
611 remaining = read_timeout(fd, buffer, bufferSz);
612 bufferIdx = 0;
613 }
614
8d9dc9f9
AT
615 while (ret == 0) {
616 if (remaining) {
617 len = MIN(len, remaining);
76c21947
WD
618 memcpy(buf, buffer + bufferIdx, len);
619 bufferIdx += len;
8d9dc9f9
AT
620 remaining -= len;
621 ret = len;
76c21947 622 break;
8d9dc9f9
AT
623 }
624
909ce14f 625 read_loop(fd, line, 4);
ff41a59f 626 tag = IVAL(line, 0);
679e7657 627
8d9dc9f9 628 remaining = tag & 0xFFFFFF;
d17e1dd2 629 tag = (tag >> 24) - MPLEX_BASE;
8d9dc9f9 630
d17e1dd2
WD
631 switch (tag) {
632 case MSG_DATA:
76c21947
WD
633 if (!buffer || remaining > bufferSz) {
634 buffer = realloc_array(buffer, char, remaining);
f89b9368
WD
635 if (!buffer)
636 out_of_memory("read_unbuffered");
76c21947
WD
637 bufferSz = remaining;
638 }
639 read_loop(fd, buffer, remaining);
640 bufferIdx = 0;
d17e1dd2
WD
641 break;
642 case MSG_INFO:
643 case MSG_ERROR:
644 if (remaining >= sizeof line) {
645 rprintf(FERROR, "multiplexing overflow %d:%ld\n\n",
646 tag, (long)remaining);
647 exit_cleanup(RERR_STREAMIO);
648 }
649 read_loop(fd, line, remaining);
650 rwrite((enum logcode)tag, line, remaining);
651 remaining = 0;
652 break;
653 default:
909ce14f 654 rprintf(FERROR, "unexpected tag %d\n", tag);
65417579 655 exit_cleanup(RERR_STREAMIO);
8d9dc9f9 656 }
8d9dc9f9
AT
657 }
658
76c21947 659 if (remaining == 0)
d17e1dd2 660 io_flush(NORMAL_FLUSH);
76c21947 661
8d9dc9f9
AT
662 return ret;
663}
664
665
909ce14f 666
880da007
MP
667/**
668 * Do a buffered read from @p fd. Don't return until all @p n bytes
669 * have been read. If all @p n can't be read then exit with an
670 * error.
671 **/
3151cbae 672static void readfd(int fd, char *buffer, size_t N)
720b47f2 673{
6ba9279f 674 int ret;
d62bcc17 675 size_t total = 0;
3151cbae 676
6ba9279f 677 while (total < N) {
3151cbae 678 ret = read_unbuffered(fd, buffer + total, N-total);
6ba9279f 679 total += ret;
7f28dbee 680 }
1b7c47cb
AT
681
682 stats.total_read += total;
720b47f2
AT
683}
684
685
b7922338 686int32 read_int(int f)
720b47f2 687{
4c36ddbe 688 char b[4];
d730b113
AT
689 int32 ret;
690
4c36ddbe 691 readfd(f,b,4);
d730b113 692 ret = IVAL(b,0);
f89b9368
WD
693 if (ret == (int32)0xffffffff)
694 return -1;
d730b113 695 return ret;
720b47f2
AT
696}
697
71c46176 698int64 read_longint(int f)
3a6a366f 699{
71c46176 700 int64 ret;
3a6a366f
AT
701 char b[8];
702 ret = read_int(f);
71c46176 703
f89b9368 704 if ((int32)ret != (int32)0xffffffff)
8de330a3 705 return ret;
71c46176 706
3bee6733 707#ifdef NO_INT64
9486289c 708 rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
65417579 709 exit_cleanup(RERR_UNSUPPORTED);
71c46176 710#else
91c4da3f
S
711 readfd(f,b,8);
712 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
71c46176
AT
713#endif
714
3a6a366f
AT
715 return ret;
716}
717
9dd891bb 718void read_buf(int f,char *buf,size_t len)
720b47f2 719{
4c36ddbe 720 readfd(f,buf,len);
720b47f2
AT
721}
722
9dd891bb 723void read_sbuf(int f,char *buf,size_t len)
575f2fca 724{
3151cbae 725 read_buf(f,buf,len);
575f2fca
AT
726 buf[len] = 0;
727}
728
182dca5c
AT
729unsigned char read_byte(int f)
730{
4c36ddbe 731 unsigned char c;
3151cbae 732 read_buf(f, (char *)&c, 1);
4c36ddbe 733 return c;
182dca5c 734}
720b47f2 735
880da007 736
08571358
MP
737/**
738 * Sleep after writing to limit I/O bandwidth usage.
739 *
740 * @todo Rather than sleeping after each write, it might be better to
741 * use some kind of averaging. The current algorithm seems to always
742 * use a bit less bandwidth than specified, because it doesn't make up
743 * for slow periods. But arguably this is a feature. In addition, we
744 * ought to take the time used to write the data into account.
71e58630
WD
745 *
746 * During some phases of big transfers (file FOO is uptodate) this is
747 * called with a small bytes_written every time. As the kernel has to
748 * round small waits up to guarantee that we actually wait at least the
749 * requested number of microseconds, this can become grossly inaccurate.
750 * We therefore keep track of the bytes we've written over time and only
751 * sleep when the accumulated delay is at least 1 tenth of a second.
08571358
MP
752 **/
753static void sleep_for_bwlimit(int bytes_written)
754{
71e58630
WD
755 static struct timeval prior_tv;
756 static long total_written = 0;
757 struct timeval tv, start_tv;
758 long elapsed_usec, sleep_usec;
759
760#define ONE_SEC 1000000L /* # of microseconds in a second */
08571358
MP
761
762 if (!bwlimit)
763 return;
e681e820 764
71e58630
WD
765 total_written += bytes_written;
766
767 gettimeofday(&start_tv, NULL);
768 if (prior_tv.tv_sec) {
769 elapsed_usec = (start_tv.tv_sec - prior_tv.tv_sec) * ONE_SEC
770 + (start_tv.tv_usec - prior_tv.tv_usec);
771 total_written -= elapsed_usec * bwlimit / (ONE_SEC/1024);
772 if (total_written < 0)
773 total_written = 0;
774 }
3151cbae 775
71e58630
WD
776 sleep_usec = total_written * (ONE_SEC/1024) / bwlimit;
777 if (sleep_usec < ONE_SEC / 10) {
778 prior_tv = start_tv;
779 return;
780 }
08571358 781
71e58630
WD
782 tv.tv_sec = sleep_usec / ONE_SEC;
783 tv.tv_usec = sleep_usec % ONE_SEC;
98b332ed 784 select(0, NULL, NULL, NULL, &tv);
71e58630
WD
785
786 gettimeofday(&prior_tv, NULL);
787 elapsed_usec = (prior_tv.tv_sec - start_tv.tv_sec) * ONE_SEC
788 + (prior_tv.tv_usec - start_tv.tv_usec);
789 total_written = (sleep_usec - elapsed_usec) * bwlimit / (ONE_SEC/1024);
08571358
MP
790}
791
792
880da007
MP
793/**
794 * Write len bytes to the file descriptor @p fd.
795 *
796 * This function underlies the multiplexing system. The body of the
797 * application never calls this function directly.
798 **/
9dd891bb 799static void writefd_unbuffered(int fd,char *buf,size_t len)
720b47f2 800{
06ce139f 801 size_t total = 0;
8d9dc9f9 802 fd_set w_fds, r_fds;
4c36ddbe 803 int fd_count, count;
8d9dc9f9 804 struct timeval tv;
720b47f2 805
d17e1dd2 806 msg_list_push(NORMAL_FLUSH);
90ba34e2 807
e44f9a12
AT
808 no_flush++;
809
4c36ddbe 810 while (total < len) {
8d9dc9f9 811 FD_ZERO(&w_fds);
8d9dc9f9 812 FD_SET(fd,&w_fds);
554e0a8d 813 fd_count = fd;
4c36ddbe 814
d17e1dd2 815 if (msg_fd_in >= 0) {
56014c8c 816 FD_ZERO(&r_fds);
d17e1dd2 817 FD_SET(msg_fd_in,&r_fds);
d62bcc17 818 if (msg_fd_in > fd_count)
d17e1dd2 819 fd_count = msg_fd_in;
8d9dc9f9
AT
820 }
821
8cd9fd4e 822 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
8d9dc9f9 823 tv.tv_usec = 0;
4c36ddbe 824
554e0a8d 825 errno = 0;
d17e1dd2
WD
826 count = select(fd_count+1, msg_fd_in >= 0 ? &r_fds : NULL,
827 &w_fds, NULL, &tv);
4c36ddbe 828
a57873b7 829 if (count == 0) {
d17e1dd2 830 msg_list_push(NORMAL_FLUSH);
a57873b7
AT
831 check_timeout();
832 }
833
4c36ddbe 834 if (count <= 0) {
f89b9368 835 if (errno == EBADF)
554e0a8d 836 exit_cleanup(RERR_SOCKETIO);
8d9dc9f9
AT
837 continue;
838 }
4c36ddbe 839
d17e1dd2
WD
840 if (msg_fd_in >= 0 && FD_ISSET(msg_fd_in, &r_fds))
841 read_msg_fd();
554e0a8d 842
8d9dc9f9 843 if (FD_ISSET(fd, &w_fds)) {
06ce139f
MP
844 int ret;
845 size_t n = len-total;
71e58630
WD
846 if (bwlimit && n > bwlimit_writemax)
847 n = bwlimit_writemax;
f0359dd0 848 ret = write(fd,buf+total,n);
4c36ddbe 849
3309507d
WD
850 if (ret < 0) {
851 if (errno == EINTR)
852 continue;
853 if (errno == EWOULDBLOCK || errno == EAGAIN) {
854 msleep(1);
855 continue;
856 }
f0359dd0
AT
857 }
858
4c36ddbe 859 if (ret <= 0) {
befbfe61
MP
860 /* Don't try to write errors back
861 * across the stream */
862 io_multiplexing_close();
d62bcc17
WD
863 rsyserr(FERROR, errno,
864 "writefd_unbuffered failed to write %ld bytes: phase \"%s\"",
f89b9368 865 (long)len, io_write_phase);
65417579 866 exit_cleanup(RERR_STREAMIO);
4c36ddbe
AT
867 }
868
08571358 869 sleep_for_bwlimit(ret);
d62bcc17 870
4c36ddbe 871 total += ret;
a800434a 872
4c36ddbe
AT
873 if (io_timeout)
874 last_io = time(NULL);
8d9dc9f9 875 }
4c36ddbe 876 }
e44f9a12
AT
877
878 no_flush--;
720b47f2
AT
879}
880
8d9dc9f9 881
d6dead6b
AT
882static char *io_buffer;
883static int io_buffer_count;
884
76c21947 885void io_start_buffering_out(int fd)
d6dead6b 886{
f89b9368
WD
887 if (io_buffer)
888 return;
679e7657 889 multiplex_out_fd = fd;
58cadc86 890 io_buffer = new_array(char, IO_BUFFER_SIZE);
f89b9368
WD
891 if (!io_buffer)
892 out_of_memory("writefd");
d6dead6b 893 io_buffer_count = 0;
ff41a59f
AT
894}
895
76c21947
WD
896void io_start_buffering_in(int fd)
897{
898 multiplex_in_fd = fd;
899}
900
880da007
MP
901/**
902 * Write an message to a multiplexed stream. If this fails then rsync
903 * exits.
904 **/
d17e1dd2 905static void mplex_write(int fd, enum msgcode code, char *buf, size_t len)
ff41a59f
AT
906{
907 char buffer[4096];
06ce139f 908 size_t n = len;
8d9dc9f9 909
ff41a59f
AT
910 SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
911
3151cbae
WD
912 if (n > (sizeof buffer - 4)) {
913 n = sizeof buffer - 4;
ff41a59f
AT
914 }
915
916 memcpy(&buffer[4], buf, n);
917 writefd_unbuffered(fd, buffer, n+4);
918
919 len -= n;
920 buf += n;
921
6d7b6081
AT
922 if (len) {
923 writefd_unbuffered(fd, buf, len);
924 }
d6dead6b
AT
925}
926
ff41a59f 927
d17e1dd2 928void io_flush(int flush_it_all)
d6dead6b 929{
679e7657 930 int fd = multiplex_out_fd;
d62bcc17 931
d17e1dd2 932 msg_list_push(flush_it_all);
90ba34e2 933
d17e1dd2
WD
934 if (!io_buffer_count || no_flush)
935 return;
8d9dc9f9 936
d17e1dd2
WD
937 if (io_multiplexing_out)
938 mplex_write(fd, MSG_DATA, io_buffer, io_buffer_count);
939 else
4c36ddbe 940 writefd_unbuffered(fd, io_buffer, io_buffer_count);
8d9dc9f9
AT
941 io_buffer_count = 0;
942}
943
0ba48136 944
7b5c3eb0 945void io_end_buffering(void)
8d9dc9f9 946{
d17e1dd2 947 io_flush(NORMAL_FLUSH);
8d9dc9f9 948 if (!io_multiplexing_out) {
ff41a59f 949 free(io_buffer);
8d9dc9f9
AT
950 io_buffer = NULL;
951 }
d6dead6b
AT
952}
953
9dd891bb 954static void writefd(int fd,char *buf,size_t len)
d6dead6b 955{
1b7c47cb
AT
956 stats.total_written += len;
957
d17e1dd2 958 msg_list_push(NORMAL_FLUSH);
90ba34e2 959
554e0a8d 960 if (!io_buffer || fd != multiplex_out_fd) {
4c36ddbe
AT
961 writefd_unbuffered(fd, buf, len);
962 return;
963 }
d6dead6b
AT
964
965 while (len) {
f89b9368 966 int n = MIN((int)len, IO_BUFFER_SIZE-io_buffer_count);
d6dead6b
AT
967 if (n > 0) {
968 memcpy(io_buffer+io_buffer_count, buf, n);
969 buf += n;
970 len -= n;
971 io_buffer_count += n;
972 }
3151cbae 973
d17e1dd2
WD
974 if (io_buffer_count == IO_BUFFER_SIZE)
975 io_flush(NORMAL_FLUSH);
d6dead6b 976 }
d6dead6b 977}
720b47f2
AT
978
979
b7922338 980void write_int(int f,int32 x)
720b47f2 981{
8d9dc9f9
AT
982 char b[4];
983 SIVAL(b,0,x);
4c36ddbe 984 writefd(f,b,4);
720b47f2
AT
985}
986
7a24c346 987
805edf9d
MP
988void write_int_named(int f, int32 x, const char *phase)
989{
990 io_write_phase = phase;
991 write_int(f, x);
992 io_write_phase = phase_unknown;
993}
994
995
7a24c346
MP
996/*
997 * Note: int64 may actually be a 32-bit type if ./configure couldn't find any
998 * 64-bit types on this platform.
999 */
71c46176 1000void write_longint(int f, int64 x)
3a6a366f 1001{
3a6a366f 1002 char b[8];
3a6a366f 1003
91c4da3f 1004 if (x <= 0x7FFFFFFF) {
3a6a366f
AT
1005 write_int(f, (int)x);
1006 return;
1007 }
1008
67863f46
S
1009#ifdef NO_INT64
1010 rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
1011 exit_cleanup(RERR_UNSUPPORTED);
1012#else
8de330a3 1013 write_int(f, (int32)0xFFFFFFFF);
3a6a366f
AT
1014 SIVAL(b,0,(x&0xFFFFFFFF));
1015 SIVAL(b,4,((x>>32)&0xFFFFFFFF));
1016
4c36ddbe 1017 writefd(f,b,8);
67863f46 1018#endif
3a6a366f
AT
1019}
1020
9dd891bb 1021void write_buf(int f,char *buf,size_t len)
720b47f2 1022{
4c36ddbe 1023 writefd(f,buf,len);
720b47f2
AT
1024}
1025
880da007 1026/** Write a string to the connection */
6e4fb64e 1027static void write_sbuf(int f,char *buf)
f0fca04e
AT
1028{
1029 write_buf(f, buf, strlen(buf));
1030}
1031
720b47f2 1032
182dca5c
AT
1033void write_byte(int f,unsigned char c)
1034{
f0fca04e 1035 write_buf(f,(char *)&c,1);
182dca5c
AT
1036}
1037
7a55d06e
MP
1038
1039
914cc65c 1040/**
6ed6d7f5
WD
1041 * Read a line of up to @p maxlen characters into @p buf (not counting
1042 * the trailing null). Strips the (required) trailing newline and all
1043 * carriage returns.
914cc65c 1044 *
6ed6d7f5 1045 * @return 1 for success; 0 for I/O error or truncation.
914cc65c 1046 **/
9dd891bb 1047int read_line(int f, char *buf, size_t maxlen)
f0fca04e
AT
1048{
1049 while (maxlen) {
528bfcd7 1050 buf[0] = 0;
f0fca04e 1051 read_buf(f, buf, 1);
914cc65c
MP
1052 if (buf[0] == 0)
1053 return 0;
6ed6d7f5 1054 if (buf[0] == '\n')
f0fca04e 1055 break;
f0fca04e
AT
1056 if (buf[0] != '\r') {
1057 buf++;
1058 maxlen--;
1059 }
1060 }
6ed6d7f5
WD
1061 *buf = '\0';
1062 return maxlen > 0;
f0fca04e
AT
1063}
1064
1065
1066void io_printf(int fd, const char *format, ...)
1067{
d62bcc17 1068 va_list ap;
f0fca04e
AT
1069 char buf[1024];
1070 int len;
3151cbae 1071
f0fca04e 1072 va_start(ap, format);
3151cbae 1073 len = vsnprintf(buf, sizeof buf, format, ap);
f0fca04e
AT
1074 va_end(ap);
1075
f89b9368
WD
1076 if (len < 0)
1077 exit_cleanup(RERR_STREAMIO);
f0fca04e
AT
1078
1079 write_sbuf(fd, buf);
1080}
8d9dc9f9
AT
1081
1082
d17e1dd2 1083/** Setup for multiplexing a MSG_* stream with the data stream. */
8d9dc9f9
AT
1084void io_start_multiplex_out(int fd)
1085{
679e7657 1086 multiplex_out_fd = fd;
d17e1dd2 1087 io_flush(NORMAL_FLUSH);
76c21947 1088 io_start_buffering_out(fd);
8d9dc9f9
AT
1089 io_multiplexing_out = 1;
1090}
1091
d17e1dd2 1092/** Setup for multiplexing a MSG_* stream with the data stream. */
8d9dc9f9
AT
1093void io_start_multiplex_in(int fd)
1094{
679e7657 1095 multiplex_in_fd = fd;
d17e1dd2 1096 io_flush(NORMAL_FLUSH);
8d9dc9f9
AT
1097 io_multiplexing_in = 1;
1098}
1099
d17e1dd2
WD
1100/** Write an message to the multiplexed data stream. */
1101int io_multiplex_write(enum msgcode code, char *buf, size_t len)
8d9dc9f9 1102{
f89b9368
WD
1103 if (!io_multiplexing_out)
1104 return 0;
8d9dc9f9 1105
d17e1dd2 1106 io_flush(NORMAL_FLUSH);
1b7c47cb 1107 stats.total_written += (len+4);
ff41a59f 1108 mplex_write(multiplex_out_fd, code, buf, len);
8d9dc9f9
AT
1109 return 1;
1110}
1111
d17e1dd2 1112/** Stop output multiplexing. */
554e0a8d
AT
1113void io_multiplexing_close(void)
1114{
1115 io_multiplexing_out = 0;
1116}
1117