Mention changes to the patches dir.
[rsync/rsync.git] / io.c
CommitLineData
7a24c346 1/* -*- c-file-style: "linux" -*-
d62bcc17
WD
2 *
3 * Copyright (C) 1996-2001 by Andrew Tridgell
880da007
MP
4 * Copyright (C) Paul Mackerras 1996
5 * Copyright (C) 2001, 2002 by Martin Pool <mbp@samba.org>
d62bcc17 6 *
880da007
MP
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
d62bcc17 11 *
880da007
MP
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
d62bcc17 16 *
880da007
MP
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 */
720b47f2 21
87ee2481 22/**
87ee2481
MP
23 * @file io.c
24 *
6ed6d7f5 25 * Socket and pipe I/O utilities used in rsync.
87ee2481
MP
26 *
27 * rsync provides its own multiplexing system, which is used to send
28 * stderr and stdout over a single socket. We need this because
29 * stdout normally carries the binary data stream, and stderr all our
30 * error messages.
31 *
32 * For historical reasons this is off during the start of the
33 * connection, but it's switched on quite early using
34 * io_start_multiplex_out() and io_start_multiplex_in().
35 **/
720b47f2 36
720b47f2
AT
37#include "rsync.h"
38
880da007 39/** If no timeout is specified then use a 60 second select timeout */
8cd9fd4e
AT
40#define SELECT_TIMEOUT 60
41
8d9dc9f9
AT
42static int io_multiplexing_out;
43static int io_multiplexing_in;
76c21947
WD
44static int multiplex_in_fd = -1;
45static int multiplex_out_fd = -1;
8d9dc9f9 46static time_t last_io;
7a55d06e
MP
47static int no_flush;
48
49extern int bwlimit;
71e58630 50extern size_t bwlimit_writemax;
720b47f2 51extern int verbose;
6ba9279f 52extern int io_timeout;
d17e1dd2
WD
53extern int am_server;
54extern int am_daemon;
55extern int am_sender;
a800434a 56extern struct stats stats;
720b47f2 57
7a55d06e 58
a86179f4 59const char phase_unknown[] = "unknown";
805edf9d 60
98b332ed
MP
61/**
62 * The connection might be dropped at some point; perhaps because the
63 * remote instance crashed. Just giving the offset on the stream is
64 * not very helpful. So instead we try to make io_phase_name point to
65 * something useful.
eca2adb4 66 *
6ed6d7f5 67 * For buffered/multiplexed I/O these names will be somewhat
805edf9d 68 * approximate; perhaps for ease of support we would rather make the
6ed6d7f5 69 * buffer always flush when a single application-level I/O finishes.
805edf9d 70 *
eca2adb4
MP
71 * @todo Perhaps we want some simple stack functionality, but there's
72 * no need to overdo it.
98b332ed 73 **/
805edf9d
MP
74const char *io_write_phase = phase_unknown;
75const char *io_read_phase = phase_unknown;
98b332ed 76
7a55d06e
MP
77/** Ignore EOF errors while reading a module listing if the remote
78 version is 24 or less. */
79int kludge_around_eof = False;
80
d17e1dd2
WD
81int msg_fd_in = -1;
82int msg_fd_out = -1;
7a55d06e 83
56014c8c
WD
84static int io_filesfrom_f_in = -1;
85static int io_filesfrom_f_out = -1;
86static char io_filesfrom_buf[2048];
87static char *io_filesfrom_bp;
88static char io_filesfrom_lastchar;
89static int io_filesfrom_buflen;
720b47f2 90
9dd891bb 91static void read_loop(int fd, char *buf, size_t len);
ff41a59f 92
d17e1dd2
WD
93struct redo_list {
94 struct redo_list *next;
95 int num;
96};
97
98static struct redo_list *redo_list_head;
99static struct redo_list *redo_list_tail;
100
101struct msg_list {
102 struct msg_list *next;
103 char *buf;
104 int len;
105};
106
107static struct msg_list *msg_list_head;
108static struct msg_list *msg_list_tail;
109
110static void redo_list_add(int num)
111{
112 struct redo_list *rl;
113
114 if (!(rl = new(struct redo_list)))
115 exit_cleanup(RERR_MALLOC);
116 rl->next = NULL;
117 rl->num = num;
118 if (redo_list_tail)
119 redo_list_tail->next = rl;
120 else
121 redo_list_head = rl;
122 redo_list_tail = rl;
123}
124
8d9dc9f9
AT
125static void check_timeout(void)
126{
127 time_t t;
90ba34e2 128
d17e1dd2
WD
129 if (!io_timeout)
130 return;
8d9dc9f9
AT
131
132 if (!last_io) {
133 last_io = time(NULL);
134 return;
135 }
136
137 t = time(NULL);
138
86ffe37f 139 if (last_io && io_timeout && (t-last_io) >= io_timeout) {
0adb99b9 140 if (!am_server && !am_daemon) {
d62bcc17 141 rprintf(FERROR, "io timeout after %d seconds - exiting\n",
0adb99b9
AT
142 (int)(t-last_io));
143 }
65417579 144 exit_cleanup(RERR_TIMEOUT);
8d9dc9f9
AT
145 }
146}
147
d17e1dd2
WD
148/** Setup the fd used to receive MSG_* messages. Only needed when
149 * we're the generator because the sender and receiver both use the
6ed6d7f5 150 * multiplexed I/O setup. */
d17e1dd2
WD
151void set_msg_fd_in(int fd)
152{
153 msg_fd_in = fd;
154}
155
156/** Setup the fd used to send our MSG_* messages. Only needed when
157 * we're the receiver because the generator and the sender both use
6ed6d7f5 158 * the multiplexed I/O setup. */
d17e1dd2
WD
159void set_msg_fd_out(int fd)
160{
161 msg_fd_out = fd;
162 set_nonblocking(msg_fd_out);
163}
164
165/* Add a message to the pending MSG_* list. */
166static void msg_list_add(int code, char *buf, int len)
554e0a8d 167{
d17e1dd2
WD
168 struct msg_list *ml;
169
170 if (!(ml = new(struct msg_list)))
171 exit_cleanup(RERR_MALLOC);
172 ml->next = NULL;
173 if (!(ml->buf = new_array(char, len+4)))
174 exit_cleanup(RERR_MALLOC);
175 SIVAL(ml->buf, 0, ((code+MPLEX_BASE)<<24) | len);
176 memcpy(ml->buf+4, buf, len);
177 ml->len = len+4;
178 if (msg_list_tail)
179 msg_list_tail->next = ml;
180 else
181 msg_list_head = ml;
182 msg_list_tail = ml;
554e0a8d
AT
183}
184
d17e1dd2
WD
185void send_msg(enum msgcode code, char *buf, int len)
186{
187 msg_list_add(code, buf, len);
188 msg_list_push(NORMAL_FLUSH);
189}
190
191/** Read a message from the MSG_* fd and dispatch it. This is only
192 * called by the generator. */
193static void read_msg_fd(void)
554e0a8d
AT
194{
195 char buf[200];
06ce139f 196 size_t n;
d17e1dd2 197 int fd = msg_fd_in;
ff41a59f
AT
198 int tag, len;
199
d17e1dd2
WD
200 /* Temporarily disable msg_fd_in. This is needed because we
201 * may call a write routine that could try to call us back. */
202 msg_fd_in = -1;
554e0a8d 203
ff41a59f
AT
204 read_loop(fd, buf, 4);
205 tag = IVAL(buf, 0);
206
207 len = tag & 0xFFFFFF;
d17e1dd2 208 tag = (tag >> 24) - MPLEX_BASE;
ff41a59f 209
d17e1dd2
WD
210 switch (tag) {
211 case MSG_DONE:
13c7bcbb
WD
212 if (len != 0) {
213 rprintf(FERROR, "invalid message %d:%d\n", tag, len);
d17e1dd2 214 exit_cleanup(RERR_STREAMIO);
13c7bcbb 215 }
d17e1dd2
WD
216 redo_list_add(-1);
217 break;
218 case MSG_REDO:
13c7bcbb
WD
219 if (len != 4) {
220 rprintf(FERROR, "invalid message %d:%d\n", tag, len);
d17e1dd2 221 exit_cleanup(RERR_STREAMIO);
13c7bcbb 222 }
d17e1dd2
WD
223 read_loop(fd, buf, 4);
224 redo_list_add(IVAL(buf,0));
225 break;
226 case MSG_INFO:
227 case MSG_ERROR:
228 case MSG_LOG:
229 while (len) {
230 n = len;
231 if (n >= sizeof buf)
232 n = sizeof buf - 1;
233 read_loop(fd, buf, n);
234 rwrite((enum logcode)tag, buf, n);
235 len -= n;
236 }
237 break;
238 default:
13c7bcbb 239 rprintf(FERROR, "unknown message %d:%d\n", tag, len);
d17e1dd2
WD
240 exit_cleanup(RERR_STREAMIO);
241 }
242
243 msg_fd_in = fd;
244}
245
246/* Try to push messages off the list onto the wire. If we leave with more
247 * to do, return 0. On error, return -1. If everything flushed, return 1.
248 * This is only called by the receiver. */
249int msg_list_push(int flush_it_all)
250{
251 static int written = 0;
252 struct timeval tv;
253 fd_set fds;
254
255 if (msg_fd_out < 0)
256 return -1;
257
258 while (msg_list_head) {
259 struct msg_list *ml = msg_list_head;
260 int n = write(msg_fd_out, ml->buf + written, ml->len - written);
261 if (n < 0) {
262 if (errno == EINTR)
263 continue;
264 if (errno != EWOULDBLOCK && errno != EAGAIN)
265 return -1;
266 if (!flush_it_all)
267 return 0;
268 FD_ZERO(&fds);
269 FD_SET(msg_fd_out, &fds);
270 tv.tv_sec = io_timeout ? io_timeout : SELECT_TIMEOUT;
271 tv.tv_usec = 0;
272 if (!select(msg_fd_out+1, NULL, &fds, NULL, &tv))
273 check_timeout();
274 } else if ((written += n) == ml->len) {
275 free(ml->buf);
276 msg_list_head = ml->next;
277 if (!msg_list_head)
278 msg_list_tail = NULL;
279 free(ml);
280 written = 0;
281 }
554e0a8d 282 }
d17e1dd2
WD
283 return 1;
284}
285
286int get_redo_num(void)
287{
288 struct redo_list *next;
289 int num;
290
291 while (!redo_list_head)
292 read_msg_fd();
554e0a8d 293
d17e1dd2
WD
294 num = redo_list_head->num;
295 next = redo_list_head->next;
296 free(redo_list_head);
297 redo_list_head = next;
298 if (!next)
299 redo_list_tail = NULL;
300
301 return num;
554e0a8d
AT
302}
303
56014c8c
WD
304/**
305 * When we're the receiver and we have a local --files-from list of names
306 * that needs to be sent over the socket to the sender, we have to do two
307 * things at the same time: send the sender a list of what files we're
308 * processing and read the incoming file+info list from the sender. We do
309 * this by augmenting the read_timeout() function to copy this data. It
310 * uses the io_filesfrom_buf to read a block of data from f_in (when it is
311 * ready, since it might be a pipe) and then blast it out f_out (when it
312 * is ready to receive more data).
313 */
314void io_set_filesfrom_fds(int f_in, int f_out)
315{
316 io_filesfrom_f_in = f_in;
317 io_filesfrom_f_out = f_out;
318 io_filesfrom_bp = io_filesfrom_buf;
319 io_filesfrom_lastchar = '\0';
320 io_filesfrom_buflen = 0;
321}
720b47f2 322
880da007
MP
323/**
324 * It's almost always an error to get an EOF when we're trying to read
325 * from the network, because the protocol is self-terminating.
326 *
327 * However, there is one unfortunate cases where it is not, which is
328 * rsync <2.4.6 sending a list of modules on a server, since the list
329 * is terminated by closing the socket. So, for the section of the
330 * program where that is a problem (start_socket_client),
331 * kludge_around_eof is True and we just exit.
332 */
3151cbae 333static void whine_about_eof(void)
7a55d06e 334{
7a55d06e 335 if (kludge_around_eof)
3151cbae 336 exit_cleanup(0);
7a55d06e 337 else {
d62bcc17 338 rprintf(FERROR, RSYNC_NAME ": connection unexpectedly closed "
3151cbae 339 "(%.0f bytes read so far)\n",
d62bcc17 340 (double)stats.total_read);
3151cbae
WD
341
342 exit_cleanup(RERR_STREAMIO);
7a55d06e
MP
343 }
344}
720b47f2 345
7a55d06e 346
3151cbae 347static void die_from_readerr(int err)
7a55d06e
MP
348{
349 /* this prevents us trying to write errors on a dead socket */
350 io_multiplexing_close();
3151cbae 351
d62bcc17 352 rsyserr(FERROR, err, "read error");
7a55d06e
MP
353 exit_cleanup(RERR_STREAMIO);
354}
355
356
880da007 357/**
6ed6d7f5 358 * Read from a socket with I/O timeout. return the number of bytes
c3563c46
MP
359 * read. If no bytes can be read then exit, never return a number <= 0.
360 *
8886f8d0
MP
361 * TODO: If the remote shell connection fails, then current versions
362 * actually report an "unexpected EOF" error here. Since it's a
363 * fairly common mistake to try to use rsh when ssh is required, we
364 * should trap that: if we fail to read any data at all, we should
365 * give a better explanation. We can tell whether the connection has
366 * started by looking e.g. at whether the remote version is known yet.
c3563c46 367 */
3151cbae 368static int read_timeout(int fd, char *buf, size_t len)
8d9dc9f9 369{
d62bcc17 370 int n, ret = 0;
4c36ddbe 371
d17e1dd2 372 io_flush(NORMAL_FLUSH);
ea2111d1 373
4c36ddbe 374 while (ret == 0) {
7a55d06e 375 /* until we manage to read *something* */
56014c8c 376 fd_set r_fds, w_fds;
4c36ddbe 377 struct timeval tv;
554e0a8d 378 int fd_count = fd+1;
a57873b7 379 int count;
4c36ddbe 380
56014c8c
WD
381 FD_ZERO(&r_fds);
382 FD_SET(fd, &r_fds);
d17e1dd2
WD
383 if (msg_fd_in >= 0) {
384 FD_SET(msg_fd_in, &r_fds);
385 if (msg_fd_in >= fd_count)
386 fd_count = msg_fd_in+1;
56014c8c 387 }
3309507d 388 if (io_filesfrom_f_out >= 0) {
56014c8c
WD
389 int new_fd;
390 if (io_filesfrom_buflen == 0) {
3309507d 391 if (io_filesfrom_f_in >= 0) {
56014c8c
WD
392 FD_SET(io_filesfrom_f_in, &r_fds);
393 new_fd = io_filesfrom_f_in;
394 } else {
395 io_filesfrom_f_out = -1;
396 new_fd = -1;
397 }
398 } else {
399 FD_ZERO(&w_fds);
400 FD_SET(io_filesfrom_f_out, &w_fds);
401 new_fd = io_filesfrom_f_out;
402 }
d17e1dd2
WD
403 if (new_fd >= fd_count)
404 fd_count = new_fd+1;
554e0a8d
AT
405 }
406
8cd9fd4e 407 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
4c36ddbe
AT
408 tv.tv_usec = 0;
409
554e0a8d
AT
410 errno = 0;
411
56014c8c
WD
412 count = select(fd_count, &r_fds,
413 io_filesfrom_buflen? &w_fds : NULL,
414 NULL, &tv);
a57873b7
AT
415
416 if (count == 0) {
d17e1dd2 417 msg_list_push(NORMAL_FLUSH);
a57873b7
AT
418 check_timeout();
419 }
420
421 if (count <= 0) {
554e0a8d
AT
422 if (errno == EBADF) {
423 exit_cleanup(RERR_SOCKETIO);
424 }
4c36ddbe
AT
425 continue;
426 }
427
d17e1dd2
WD
428 if (msg_fd_in >= 0 && FD_ISSET(msg_fd_in, &r_fds))
429 read_msg_fd();
554e0a8d 430
3309507d 431 if (io_filesfrom_f_out >= 0) {
56014c8c
WD
432 if (io_filesfrom_buflen) {
433 if (FD_ISSET(io_filesfrom_f_out, &w_fds)) {
434 int l = write(io_filesfrom_f_out,
435 io_filesfrom_bp,
436 io_filesfrom_buflen);
437 if (l > 0) {
438 if (!(io_filesfrom_buflen -= l))
439 io_filesfrom_bp = io_filesfrom_buf;
440 else
441 io_filesfrom_bp += l;
442 } else {
443 /* XXX should we complain? */
444 io_filesfrom_f_out = -1;
445 }
446 }
3309507d 447 } else if (io_filesfrom_f_in >= 0) {
56014c8c
WD
448 if (FD_ISSET(io_filesfrom_f_in, &r_fds)) {
449 int l = read(io_filesfrom_f_in,
450 io_filesfrom_buf,
451 sizeof io_filesfrom_buf);
452 if (l <= 0) {
453 /* Send end-of-file marker */
454 io_filesfrom_buf[0] = '\0';
455 io_filesfrom_buf[1] = '\0';
456 io_filesfrom_buflen = io_filesfrom_lastchar? 2 : 1;
457 io_filesfrom_f_in = -1;
458 } else {
459 extern int eol_nulls;
460 if (!eol_nulls) {
461 char *s = io_filesfrom_buf + l;
462 /* Transform CR and/or LF into '\0' */
463 while (s-- > io_filesfrom_buf) {
464 if (*s == '\n' || *s == '\r')
465 *s = '\0';
466 }
467 }
468 if (!io_filesfrom_lastchar) {
469 /* Last buf ended with a '\0', so don't
470 * let this buf start with one. */
471 while (l && !*io_filesfrom_bp)
472 io_filesfrom_bp++, l--;
473 }
474 if (!l)
475 io_filesfrom_bp = io_filesfrom_buf;
476 else {
477 char *f = io_filesfrom_bp;
478 char *t = f;
479 char *eob = f + l;
480 /* Eliminate any multi-'\0' runs. */
481 while (f != eob) {
482 if (!(*t++ = *f++)) {
483 while (f != eob && !*f)
484 f++, l--;
485 }
486 }
487 io_filesfrom_lastchar = f[-1];
488 }
489 io_filesfrom_buflen = l;
490 }
491 }
492 }
493 }
494
495 if (!FD_ISSET(fd, &r_fds)) continue;
554e0a8d 496
4c36ddbe
AT
497 n = read(fd, buf, len);
498
8d9dc9f9
AT
499 if (n > 0) {
500 buf += n;
501 len -= n;
4c36ddbe
AT
502 ret += n;
503 if (io_timeout)
504 last_io = time(NULL);
505 continue;
7a55d06e 506 } else if (n == 0) {
3151cbae 507 whine_about_eof();
7a55d06e 508 return -1; /* doesn't return */
3309507d 509 } else if (n < 0) {
d62bcc17
WD
510 if (errno == EINTR || errno == EWOULDBLOCK
511 || errno == EAGAIN)
7a55d06e 512 continue;
3151cbae 513 die_from_readerr(errno);
8d9dc9f9 514 }
4c36ddbe 515 }
8d9dc9f9 516
4c36ddbe
AT
517 return ret;
518}
8d9dc9f9 519
56014c8c
WD
520/**
521 * Read a line into the "fname" buffer (which must be at least MAXPATHLEN
522 * characters long).
523 */
524int read_filesfrom_line(int fd, char *fname)
525{
526 char ch, *s, *eob = fname + MAXPATHLEN - 1;
527 int cnt;
528 extern int io_timeout;
529 extern int eol_nulls;
530 extern char *remote_filesfrom_file;
55d5937d 531 int reading_remotely = remote_filesfrom_file != NULL;
56014c8c
WD
532 int nulls = eol_nulls || reading_remotely;
533
534 start:
535 s = fname;
536 while (1) {
537 cnt = read(fd, &ch, 1);
538 if (cnt < 0 && (errno == EWOULDBLOCK
539 || errno == EINTR || errno == EAGAIN)) {
540 struct timeval tv;
541 fd_set fds;
542 FD_ZERO(&fds);
543 FD_SET(fd, &fds);
544 tv.tv_sec = io_timeout? io_timeout : SELECT_TIMEOUT;
545 tv.tv_usec = 0;
546 if (!select(fd+1, &fds, NULL, NULL, &tv))
547 check_timeout();
548 continue;
549 }
550 if (cnt != 1)
551 break;
552 if (nulls? !ch : (ch == '\r' || ch == '\n')) {
553 /* Skip empty lines if reading locally. */
554 if (!reading_remotely && s == fname)
555 continue;
556 break;
557 }
558 if (s < eob)
559 *s++ = ch;
560 }
561 *s = '\0';
7a55d06e 562
6b45fcf1
WD
563 /* Dump comments. */
564 if (*fname == '#' || *fname == ';')
56014c8c
WD
565 goto start;
566
567 return s - fname;
568}
7a55d06e
MP
569
570
880da007
MP
571/**
572 * Continue trying to read len bytes - don't return until len has been
573 * read.
574 **/
3151cbae 575static void read_loop(int fd, char *buf, size_t len)
4c36ddbe
AT
576{
577 while (len) {
578 int n = read_timeout(fd, buf, len);
579
580 buf += n;
581 len -= n;
8d9dc9f9
AT
582 }
583}
584
7a55d06e
MP
585
586/**
587 * Read from the file descriptor handling multiplexing - return number
588 * of bytes read.
d62bcc17
WD
589 *
590 * Never returns <= 0.
7a55d06e 591 */
9dd891bb 592static int read_unbuffered(int fd, char *buf, size_t len)
8d9dc9f9 593{
6fe25398 594 static size_t remaining;
909ce14f 595 int tag, ret = 0;
8d9dc9f9 596 char line[1024];
76c21947
WD
597 static char *buffer;
598 static size_t bufferIdx = 0;
599 static size_t bufferSz;
8d9dc9f9 600
76c21947 601 if (fd != multiplex_in_fd)
4c36ddbe 602 return read_timeout(fd, buf, len);
8d9dc9f9 603
76c21947
WD
604 if (!io_multiplexing_in && remaining == 0) {
605 if (!buffer) {
606 bufferSz = 2 * IO_BUFFER_SIZE;
607 buffer = new_array(char, bufferSz);
608 if (!buffer) out_of_memory("read_unbuffered");
609 }
610 remaining = read_timeout(fd, buffer, bufferSz);
611 bufferIdx = 0;
612 }
613
8d9dc9f9
AT
614 while (ret == 0) {
615 if (remaining) {
616 len = MIN(len, remaining);
76c21947
WD
617 memcpy(buf, buffer + bufferIdx, len);
618 bufferIdx += len;
8d9dc9f9
AT
619 remaining -= len;
620 ret = len;
76c21947 621 break;
8d9dc9f9
AT
622 }
623
909ce14f 624 read_loop(fd, line, 4);
ff41a59f 625 tag = IVAL(line, 0);
679e7657 626
8d9dc9f9 627 remaining = tag & 0xFFFFFF;
d17e1dd2 628 tag = (tag >> 24) - MPLEX_BASE;
8d9dc9f9 629
d17e1dd2
WD
630 switch (tag) {
631 case MSG_DATA:
76c21947
WD
632 if (!buffer || remaining > bufferSz) {
633 buffer = realloc_array(buffer, char, remaining);
634 if (!buffer) out_of_memory("read_unbuffered");
635 bufferSz = remaining;
636 }
637 read_loop(fd, buffer, remaining);
638 bufferIdx = 0;
d17e1dd2
WD
639 break;
640 case MSG_INFO:
641 case MSG_ERROR:
642 if (remaining >= sizeof line) {
643 rprintf(FERROR, "multiplexing overflow %d:%ld\n\n",
644 tag, (long)remaining);
645 exit_cleanup(RERR_STREAMIO);
646 }
647 read_loop(fd, line, remaining);
648 rwrite((enum logcode)tag, line, remaining);
649 remaining = 0;
650 break;
651 default:
909ce14f 652 rprintf(FERROR, "unexpected tag %d\n", tag);
65417579 653 exit_cleanup(RERR_STREAMIO);
8d9dc9f9 654 }
8d9dc9f9
AT
655 }
656
76c21947 657 if (remaining == 0)
d17e1dd2 658 io_flush(NORMAL_FLUSH);
76c21947 659
8d9dc9f9
AT
660 return ret;
661}
662
663
909ce14f 664
880da007
MP
665/**
666 * Do a buffered read from @p fd. Don't return until all @p n bytes
667 * have been read. If all @p n can't be read then exit with an
668 * error.
669 **/
3151cbae 670static void readfd(int fd, char *buffer, size_t N)
720b47f2 671{
6ba9279f 672 int ret;
d62bcc17 673 size_t total = 0;
3151cbae 674
6ba9279f 675 while (total < N) {
3151cbae 676 ret = read_unbuffered(fd, buffer + total, N-total);
6ba9279f 677 total += ret;
7f28dbee 678 }
1b7c47cb
AT
679
680 stats.total_read += total;
720b47f2
AT
681}
682
683
b7922338 684int32 read_int(int f)
720b47f2 685{
4c36ddbe 686 char b[4];
d730b113
AT
687 int32 ret;
688
4c36ddbe 689 readfd(f,b,4);
d730b113
AT
690 ret = IVAL(b,0);
691 if (ret == (int32)0xffffffff) return -1;
692 return ret;
720b47f2
AT
693}
694
71c46176 695int64 read_longint(int f)
3a6a366f 696{
71c46176 697 int64 ret;
3a6a366f
AT
698 char b[8];
699 ret = read_int(f);
71c46176 700
8de330a3
AT
701 if ((int32)ret != (int32)0xffffffff) {
702 return ret;
703 }
71c46176 704
3bee6733 705#ifdef NO_INT64
9486289c 706 rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
65417579 707 exit_cleanup(RERR_UNSUPPORTED);
71c46176 708#else
91c4da3f
S
709 readfd(f,b,8);
710 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
71c46176
AT
711#endif
712
3a6a366f
AT
713 return ret;
714}
715
9dd891bb 716void read_buf(int f,char *buf,size_t len)
720b47f2 717{
4c36ddbe 718 readfd(f,buf,len);
720b47f2
AT
719}
720
9dd891bb 721void read_sbuf(int f,char *buf,size_t len)
575f2fca 722{
3151cbae 723 read_buf(f,buf,len);
575f2fca
AT
724 buf[len] = 0;
725}
726
182dca5c
AT
727unsigned char read_byte(int f)
728{
4c36ddbe 729 unsigned char c;
3151cbae 730 read_buf(f, (char *)&c, 1);
4c36ddbe 731 return c;
182dca5c 732}
720b47f2 733
880da007 734
08571358
MP
735/**
736 * Sleep after writing to limit I/O bandwidth usage.
737 *
738 * @todo Rather than sleeping after each write, it might be better to
739 * use some kind of averaging. The current algorithm seems to always
740 * use a bit less bandwidth than specified, because it doesn't make up
741 * for slow periods. But arguably this is a feature. In addition, we
742 * ought to take the time used to write the data into account.
71e58630
WD
743 *
744 * During some phases of big transfers (file FOO is uptodate) this is
745 * called with a small bytes_written every time. As the kernel has to
746 * round small waits up to guarantee that we actually wait at least the
747 * requested number of microseconds, this can become grossly inaccurate.
748 * We therefore keep track of the bytes we've written over time and only
749 * sleep when the accumulated delay is at least 1 tenth of a second.
08571358
MP
750 **/
751static void sleep_for_bwlimit(int bytes_written)
752{
71e58630
WD
753 static struct timeval prior_tv;
754 static long total_written = 0;
755 struct timeval tv, start_tv;
756 long elapsed_usec, sleep_usec;
757
758#define ONE_SEC 1000000L /* # of microseconds in a second */
08571358
MP
759
760 if (!bwlimit)
761 return;
e681e820 762
71e58630
WD
763 total_written += bytes_written;
764
765 gettimeofday(&start_tv, NULL);
766 if (prior_tv.tv_sec) {
767 elapsed_usec = (start_tv.tv_sec - prior_tv.tv_sec) * ONE_SEC
768 + (start_tv.tv_usec - prior_tv.tv_usec);
769 total_written -= elapsed_usec * bwlimit / (ONE_SEC/1024);
770 if (total_written < 0)
771 total_written = 0;
772 }
3151cbae 773
71e58630
WD
774 sleep_usec = total_written * (ONE_SEC/1024) / bwlimit;
775 if (sleep_usec < ONE_SEC / 10) {
776 prior_tv = start_tv;
777 return;
778 }
08571358 779
71e58630
WD
780 tv.tv_sec = sleep_usec / ONE_SEC;
781 tv.tv_usec = sleep_usec % ONE_SEC;
98b332ed 782 select(0, NULL, NULL, NULL, &tv);
71e58630
WD
783
784 gettimeofday(&prior_tv, NULL);
785 elapsed_usec = (prior_tv.tv_sec - start_tv.tv_sec) * ONE_SEC
786 + (prior_tv.tv_usec - start_tv.tv_usec);
787 total_written = (sleep_usec - elapsed_usec) * bwlimit / (ONE_SEC/1024);
08571358
MP
788}
789
790
880da007
MP
791/**
792 * Write len bytes to the file descriptor @p fd.
793 *
794 * This function underlies the multiplexing system. The body of the
795 * application never calls this function directly.
796 **/
9dd891bb 797static void writefd_unbuffered(int fd,char *buf,size_t len)
720b47f2 798{
06ce139f 799 size_t total = 0;
8d9dc9f9 800 fd_set w_fds, r_fds;
4c36ddbe 801 int fd_count, count;
8d9dc9f9 802 struct timeval tv;
720b47f2 803
d17e1dd2 804 msg_list_push(NORMAL_FLUSH);
90ba34e2 805
e44f9a12
AT
806 no_flush++;
807
4c36ddbe 808 while (total < len) {
8d9dc9f9 809 FD_ZERO(&w_fds);
8d9dc9f9 810 FD_SET(fd,&w_fds);
554e0a8d 811 fd_count = fd;
4c36ddbe 812
d17e1dd2 813 if (msg_fd_in >= 0) {
56014c8c 814 FD_ZERO(&r_fds);
d17e1dd2 815 FD_SET(msg_fd_in,&r_fds);
d62bcc17 816 if (msg_fd_in > fd_count)
d17e1dd2 817 fd_count = msg_fd_in;
8d9dc9f9
AT
818 }
819
8cd9fd4e 820 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
8d9dc9f9 821 tv.tv_usec = 0;
4c36ddbe 822
554e0a8d 823 errno = 0;
d17e1dd2
WD
824 count = select(fd_count+1, msg_fd_in >= 0 ? &r_fds : NULL,
825 &w_fds, NULL, &tv);
4c36ddbe 826
a57873b7 827 if (count == 0) {
d17e1dd2 828 msg_list_push(NORMAL_FLUSH);
a57873b7
AT
829 check_timeout();
830 }
831
4c36ddbe 832 if (count <= 0) {
554e0a8d
AT
833 if (errno == EBADF) {
834 exit_cleanup(RERR_SOCKETIO);
835 }
8d9dc9f9
AT
836 continue;
837 }
4c36ddbe 838
d17e1dd2
WD
839 if (msg_fd_in >= 0 && FD_ISSET(msg_fd_in, &r_fds))
840 read_msg_fd();
554e0a8d 841
8d9dc9f9 842 if (FD_ISSET(fd, &w_fds)) {
06ce139f
MP
843 int ret;
844 size_t n = len-total;
71e58630
WD
845 if (bwlimit && n > bwlimit_writemax)
846 n = bwlimit_writemax;
f0359dd0 847 ret = write(fd,buf+total,n);
4c36ddbe 848
3309507d
WD
849 if (ret < 0) {
850 if (errno == EINTR)
851 continue;
852 if (errno == EWOULDBLOCK || errno == EAGAIN) {
853 msleep(1);
854 continue;
855 }
f0359dd0
AT
856 }
857
4c36ddbe 858 if (ret <= 0) {
befbfe61
MP
859 /* Don't try to write errors back
860 * across the stream */
861 io_multiplexing_close();
d62bcc17
WD
862 rsyserr(FERROR, errno,
863 "writefd_unbuffered failed to write %ld bytes: phase \"%s\"",
864 (long) len, io_write_phase);
65417579 865 exit_cleanup(RERR_STREAMIO);
4c36ddbe
AT
866 }
867
08571358 868 sleep_for_bwlimit(ret);
d62bcc17 869
4c36ddbe 870 total += ret;
a800434a 871
4c36ddbe
AT
872 if (io_timeout)
873 last_io = time(NULL);
8d9dc9f9 874 }
4c36ddbe 875 }
e44f9a12
AT
876
877 no_flush--;
720b47f2
AT
878}
879
8d9dc9f9 880
d6dead6b
AT
881static char *io_buffer;
882static int io_buffer_count;
883
76c21947 884void io_start_buffering_out(int fd)
d6dead6b 885{
8d9dc9f9 886 if (io_buffer) return;
679e7657 887 multiplex_out_fd = fd;
58cadc86 888 io_buffer = new_array(char, IO_BUFFER_SIZE);
d6dead6b
AT
889 if (!io_buffer) out_of_memory("writefd");
890 io_buffer_count = 0;
ff41a59f
AT
891}
892
76c21947
WD
893void io_start_buffering_in(int fd)
894{
895 multiplex_in_fd = fd;
896}
897
880da007
MP
898/**
899 * Write an message to a multiplexed stream. If this fails then rsync
900 * exits.
901 **/
d17e1dd2 902static void mplex_write(int fd, enum msgcode code, char *buf, size_t len)
ff41a59f
AT
903{
904 char buffer[4096];
06ce139f 905 size_t n = len;
8d9dc9f9 906
ff41a59f
AT
907 SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
908
3151cbae
WD
909 if (n > (sizeof buffer - 4)) {
910 n = sizeof buffer - 4;
ff41a59f
AT
911 }
912
913 memcpy(&buffer[4], buf, n);
914 writefd_unbuffered(fd, buffer, n+4);
915
916 len -= n;
917 buf += n;
918
6d7b6081
AT
919 if (len) {
920 writefd_unbuffered(fd, buf, len);
921 }
d6dead6b
AT
922}
923
ff41a59f 924
d17e1dd2 925void io_flush(int flush_it_all)
d6dead6b 926{
679e7657 927 int fd = multiplex_out_fd;
d62bcc17 928
d17e1dd2 929 msg_list_push(flush_it_all);
90ba34e2 930
d17e1dd2
WD
931 if (!io_buffer_count || no_flush)
932 return;
8d9dc9f9 933
d17e1dd2
WD
934 if (io_multiplexing_out)
935 mplex_write(fd, MSG_DATA, io_buffer, io_buffer_count);
936 else
4c36ddbe 937 writefd_unbuffered(fd, io_buffer, io_buffer_count);
8d9dc9f9
AT
938 io_buffer_count = 0;
939}
940
0ba48136 941
7b5c3eb0 942void io_end_buffering(void)
8d9dc9f9 943{
d17e1dd2 944 io_flush(NORMAL_FLUSH);
8d9dc9f9 945 if (!io_multiplexing_out) {
ff41a59f 946 free(io_buffer);
8d9dc9f9
AT
947 io_buffer = NULL;
948 }
d6dead6b
AT
949}
950
9dd891bb 951static void writefd(int fd,char *buf,size_t len)
d6dead6b 952{
1b7c47cb
AT
953 stats.total_written += len;
954
d17e1dd2 955 msg_list_push(NORMAL_FLUSH);
90ba34e2 956
554e0a8d 957 if (!io_buffer || fd != multiplex_out_fd) {
4c36ddbe
AT
958 writefd_unbuffered(fd, buf, len);
959 return;
960 }
d6dead6b
AT
961
962 while (len) {
7b5c3eb0 963 int n = MIN((int) len, IO_BUFFER_SIZE-io_buffer_count);
d6dead6b
AT
964 if (n > 0) {
965 memcpy(io_buffer+io_buffer_count, buf, n);
966 buf += n;
967 len -= n;
968 io_buffer_count += n;
969 }
3151cbae 970
d17e1dd2
WD
971 if (io_buffer_count == IO_BUFFER_SIZE)
972 io_flush(NORMAL_FLUSH);
d6dead6b 973 }
d6dead6b 974}
720b47f2
AT
975
976
b7922338 977void write_int(int f,int32 x)
720b47f2 978{
8d9dc9f9
AT
979 char b[4];
980 SIVAL(b,0,x);
4c36ddbe 981 writefd(f,b,4);
720b47f2
AT
982}
983
7a24c346 984
805edf9d
MP
985void write_int_named(int f, int32 x, const char *phase)
986{
987 io_write_phase = phase;
988 write_int(f, x);
989 io_write_phase = phase_unknown;
990}
991
992
7a24c346
MP
993/*
994 * Note: int64 may actually be a 32-bit type if ./configure couldn't find any
995 * 64-bit types on this platform.
996 */
71c46176 997void write_longint(int f, int64 x)
3a6a366f 998{
3a6a366f 999 char b[8];
3a6a366f 1000
91c4da3f 1001 if (x <= 0x7FFFFFFF) {
3a6a366f
AT
1002 write_int(f, (int)x);
1003 return;
1004 }
1005
67863f46
S
1006#ifdef NO_INT64
1007 rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
1008 exit_cleanup(RERR_UNSUPPORTED);
1009#else
8de330a3 1010 write_int(f, (int32)0xFFFFFFFF);
3a6a366f
AT
1011 SIVAL(b,0,(x&0xFFFFFFFF));
1012 SIVAL(b,4,((x>>32)&0xFFFFFFFF));
1013
4c36ddbe 1014 writefd(f,b,8);
67863f46 1015#endif
3a6a366f
AT
1016}
1017
9dd891bb 1018void write_buf(int f,char *buf,size_t len)
720b47f2 1019{
4c36ddbe 1020 writefd(f,buf,len);
720b47f2
AT
1021}
1022
880da007 1023/** Write a string to the connection */
6e4fb64e 1024static void write_sbuf(int f,char *buf)
f0fca04e
AT
1025{
1026 write_buf(f, buf, strlen(buf));
1027}
1028
720b47f2 1029
182dca5c
AT
1030void write_byte(int f,unsigned char c)
1031{
f0fca04e 1032 write_buf(f,(char *)&c,1);
182dca5c
AT
1033}
1034
7a55d06e
MP
1035
1036
914cc65c 1037/**
6ed6d7f5
WD
1038 * Read a line of up to @p maxlen characters into @p buf (not counting
1039 * the trailing null). Strips the (required) trailing newline and all
1040 * carriage returns.
914cc65c 1041 *
6ed6d7f5 1042 * @return 1 for success; 0 for I/O error or truncation.
914cc65c 1043 **/
9dd891bb 1044int read_line(int f, char *buf, size_t maxlen)
f0fca04e
AT
1045{
1046 while (maxlen) {
528bfcd7 1047 buf[0] = 0;
f0fca04e 1048 read_buf(f, buf, 1);
914cc65c
MP
1049 if (buf[0] == 0)
1050 return 0;
6ed6d7f5 1051 if (buf[0] == '\n')
f0fca04e 1052 break;
f0fca04e
AT
1053 if (buf[0] != '\r') {
1054 buf++;
1055 maxlen--;
1056 }
1057 }
6ed6d7f5
WD
1058 *buf = '\0';
1059 return maxlen > 0;
f0fca04e
AT
1060}
1061
1062
1063void io_printf(int fd, const char *format, ...)
1064{
d62bcc17 1065 va_list ap;
f0fca04e
AT
1066 char buf[1024];
1067 int len;
3151cbae 1068
f0fca04e 1069 va_start(ap, format);
3151cbae 1070 len = vsnprintf(buf, sizeof buf, format, ap);
f0fca04e
AT
1071 va_end(ap);
1072
65417579 1073 if (len < 0) exit_cleanup(RERR_STREAMIO);
f0fca04e
AT
1074
1075 write_sbuf(fd, buf);
1076}
8d9dc9f9
AT
1077
1078
d17e1dd2 1079/** Setup for multiplexing a MSG_* stream with the data stream. */
8d9dc9f9
AT
1080void io_start_multiplex_out(int fd)
1081{
679e7657 1082 multiplex_out_fd = fd;
d17e1dd2 1083 io_flush(NORMAL_FLUSH);
76c21947 1084 io_start_buffering_out(fd);
8d9dc9f9
AT
1085 io_multiplexing_out = 1;
1086}
1087
d17e1dd2 1088/** Setup for multiplexing a MSG_* stream with the data stream. */
8d9dc9f9
AT
1089void io_start_multiplex_in(int fd)
1090{
679e7657 1091 multiplex_in_fd = fd;
d17e1dd2 1092 io_flush(NORMAL_FLUSH);
8d9dc9f9
AT
1093 io_multiplexing_in = 1;
1094}
1095
d17e1dd2
WD
1096/** Write an message to the multiplexed data stream. */
1097int io_multiplex_write(enum msgcode code, char *buf, size_t len)
8d9dc9f9
AT
1098{
1099 if (!io_multiplexing_out) return 0;
1100
d17e1dd2 1101 io_flush(NORMAL_FLUSH);
1b7c47cb 1102 stats.total_written += (len+4);
ff41a59f 1103 mplex_write(multiplex_out_fd, code, buf, len);
8d9dc9f9
AT
1104 return 1;
1105}
1106
d17e1dd2 1107/** Stop output multiplexing. */
554e0a8d
AT
1108void io_multiplexing_close(void)
1109{
1110 io_multiplexing_out = 0;
1111}
1112