One more inplace code tweak.
[rsync/rsync.git] / io.c
CommitLineData
7a24c346 1/* -*- c-file-style: "linux" -*-
d62bcc17
WD
2 *
3 * Copyright (C) 1996-2001 by Andrew Tridgell
880da007
MP
4 * Copyright (C) Paul Mackerras 1996
5 * Copyright (C) 2001, 2002 by Martin Pool <mbp@samba.org>
d62bcc17 6 *
880da007
MP
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
d62bcc17 11 *
880da007
MP
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
d62bcc17 16 *
880da007
MP
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 */
720b47f2 21
87ee2481 22/**
87ee2481
MP
23 * @file io.c
24 *
6ed6d7f5 25 * Socket and pipe I/O utilities used in rsync.
87ee2481
MP
26 *
27 * rsync provides its own multiplexing system, which is used to send
28 * stderr and stdout over a single socket. We need this because
29 * stdout normally carries the binary data stream, and stderr all our
30 * error messages.
31 *
32 * For historical reasons this is off during the start of the
33 * connection, but it's switched on quite early using
34 * io_start_multiplex_out() and io_start_multiplex_in().
35 **/
720b47f2 36
720b47f2
AT
37#include "rsync.h"
38
880da007 39/** If no timeout is specified then use a 60 second select timeout */
8cd9fd4e
AT
40#define SELECT_TIMEOUT 60
41
7a55d06e 42extern int bwlimit;
71e58630 43extern size_t bwlimit_writemax;
720b47f2 44extern int verbose;
6ba9279f 45extern int io_timeout;
d17e1dd2
WD
46extern int am_server;
47extern int am_daemon;
48extern int am_sender;
e626b29e 49extern int eol_nulls;
b9f592fb
WD
50extern int checksum_seed;
51extern int protocol_version;
e626b29e 52extern char *remote_filesfrom_file;
a800434a 53extern struct stats stats;
720b47f2 54
a86179f4 55const char phase_unknown[] = "unknown";
e626b29e 56int select_timeout = SELECT_TIMEOUT;
b9f592fb 57int batch_fd = -1;
b0ad5429 58int batch_gen_fd = -1;
805edf9d 59
98b332ed
MP
60/**
61 * The connection might be dropped at some point; perhaps because the
62 * remote instance crashed. Just giving the offset on the stream is
63 * not very helpful. So instead we try to make io_phase_name point to
64 * something useful.
eca2adb4 65 *
6ed6d7f5 66 * For buffered/multiplexed I/O these names will be somewhat
805edf9d 67 * approximate; perhaps for ease of support we would rather make the
6ed6d7f5 68 * buffer always flush when a single application-level I/O finishes.
805edf9d 69 *
eca2adb4
MP
70 * @todo Perhaps we want some simple stack functionality, but there's
71 * no need to overdo it.
98b332ed 72 **/
805edf9d
MP
73const char *io_write_phase = phase_unknown;
74const char *io_read_phase = phase_unknown;
98b332ed 75
7a55d06e
MP
76/** Ignore EOF errors while reading a module listing if the remote
77 version is 24 or less. */
78int kludge_around_eof = False;
79
d17e1dd2
WD
80int msg_fd_in = -1;
81int msg_fd_out = -1;
7a55d06e 82
1f75bb10
WD
83static int io_multiplexing_out;
84static int io_multiplexing_in;
85static int sock_f_in = -1;
86static int sock_f_out = -1;
87static time_t last_io;
88static int no_flush;
89
b9f592fb
WD
90static int write_batch_monitor_in = -1;
91static int write_batch_monitor_out = -1;
92
56014c8c
WD
93static int io_filesfrom_f_in = -1;
94static int io_filesfrom_f_out = -1;
95static char io_filesfrom_buf[2048];
96static char *io_filesfrom_bp;
97static char io_filesfrom_lastchar;
98static int io_filesfrom_buflen;
720b47f2 99
9dd891bb 100static void read_loop(int fd, char *buf, size_t len);
ff41a59f 101
d17e1dd2
WD
102struct redo_list {
103 struct redo_list *next;
104 int num;
105};
106
107static struct redo_list *redo_list_head;
108static struct redo_list *redo_list_tail;
109
110struct msg_list {
111 struct msg_list *next;
112 char *buf;
113 int len;
114};
115
116static struct msg_list *msg_list_head;
117static struct msg_list *msg_list_tail;
118
119static void redo_list_add(int num)
120{
121 struct redo_list *rl;
122
123 if (!(rl = new(struct redo_list)))
124 exit_cleanup(RERR_MALLOC);
125 rl->next = NULL;
126 rl->num = num;
127 if (redo_list_tail)
128 redo_list_tail->next = rl;
129 else
130 redo_list_head = rl;
131 redo_list_tail = rl;
132}
133
8d9dc9f9
AT
134static void check_timeout(void)
135{
136 time_t t;
90ba34e2 137
d17e1dd2
WD
138 if (!io_timeout)
139 return;
8d9dc9f9
AT
140
141 if (!last_io) {
142 last_io = time(NULL);
143 return;
144 }
145
146 t = time(NULL);
147
1f75bb10 148 if (t - last_io >= io_timeout) {
0adb99b9 149 if (!am_server && !am_daemon) {
d62bcc17 150 rprintf(FERROR, "io timeout after %d seconds - exiting\n",
0adb99b9
AT
151 (int)(t-last_io));
152 }
65417579 153 exit_cleanup(RERR_TIMEOUT);
8d9dc9f9
AT
154 }
155}
156
1f75bb10
WD
157/* Note the fds used for the main socket (which might really be a pipe
158 * for a local transfer, but we can ignore that). */
159void io_set_sock_fds(int f_in, int f_out)
160{
161 sock_f_in = f_in;
162 sock_f_out = f_out;
163}
164
d17e1dd2
WD
165/** Setup the fd used to receive MSG_* messages. Only needed when
166 * we're the generator because the sender and receiver both use the
6ed6d7f5 167 * multiplexed I/O setup. */
d17e1dd2
WD
168void set_msg_fd_in(int fd)
169{
170 msg_fd_in = fd;
171}
172
173/** Setup the fd used to send our MSG_* messages. Only needed when
174 * we're the receiver because the generator and the sender both use
6ed6d7f5 175 * the multiplexed I/O setup. */
d17e1dd2
WD
176void set_msg_fd_out(int fd)
177{
178 msg_fd_out = fd;
179 set_nonblocking(msg_fd_out);
180}
181
182/* Add a message to the pending MSG_* list. */
183static void msg_list_add(int code, char *buf, int len)
554e0a8d 184{
d17e1dd2
WD
185 struct msg_list *ml;
186
187 if (!(ml = new(struct msg_list)))
188 exit_cleanup(RERR_MALLOC);
189 ml->next = NULL;
190 if (!(ml->buf = new_array(char, len+4)))
191 exit_cleanup(RERR_MALLOC);
192 SIVAL(ml->buf, 0, ((code+MPLEX_BASE)<<24) | len);
193 memcpy(ml->buf+4, buf, len);
194 ml->len = len+4;
195 if (msg_list_tail)
196 msg_list_tail->next = ml;
197 else
198 msg_list_head = ml;
199 msg_list_tail = ml;
554e0a8d
AT
200}
201
d17e1dd2
WD
202void send_msg(enum msgcode code, char *buf, int len)
203{
204 msg_list_add(code, buf, len);
205 msg_list_push(NORMAL_FLUSH);
206}
207
208/** Read a message from the MSG_* fd and dispatch it. This is only
209 * called by the generator. */
210static void read_msg_fd(void)
554e0a8d 211{
f9c6b3e7 212 char buf[2048];
06ce139f 213 size_t n;
d17e1dd2 214 int fd = msg_fd_in;
ff41a59f
AT
215 int tag, len;
216
00bdf899
WD
217 /* Temporarily disable msg_fd_in. This is needed to avoid looping back
218 * to this routine from read_timeout() and writefd_unbuffered(). */
d17e1dd2 219 msg_fd_in = -1;
554e0a8d 220
ff41a59f
AT
221 read_loop(fd, buf, 4);
222 tag = IVAL(buf, 0);
223
224 len = tag & 0xFFFFFF;
d17e1dd2 225 tag = (tag >> 24) - MPLEX_BASE;
ff41a59f 226
d17e1dd2
WD
227 switch (tag) {
228 case MSG_DONE:
13c7bcbb
WD
229 if (len != 0) {
230 rprintf(FERROR, "invalid message %d:%d\n", tag, len);
d17e1dd2 231 exit_cleanup(RERR_STREAMIO);
13c7bcbb 232 }
d17e1dd2
WD
233 redo_list_add(-1);
234 break;
235 case MSG_REDO:
13c7bcbb
WD
236 if (len != 4) {
237 rprintf(FERROR, "invalid message %d:%d\n", tag, len);
d17e1dd2 238 exit_cleanup(RERR_STREAMIO);
13c7bcbb 239 }
d17e1dd2
WD
240 read_loop(fd, buf, 4);
241 redo_list_add(IVAL(buf,0));
242 break;
243 case MSG_INFO:
244 case MSG_ERROR:
245 case MSG_LOG:
246 while (len) {
247 n = len;
248 if (n >= sizeof buf)
249 n = sizeof buf - 1;
250 read_loop(fd, buf, n);
251 rwrite((enum logcode)tag, buf, n);
252 len -= n;
253 }
254 break;
255 default:
13c7bcbb 256 rprintf(FERROR, "unknown message %d:%d\n", tag, len);
d17e1dd2
WD
257 exit_cleanup(RERR_STREAMIO);
258 }
259
260 msg_fd_in = fd;
261}
262
263/* Try to push messages off the list onto the wire. If we leave with more
264 * to do, return 0. On error, return -1. If everything flushed, return 1.
f9c6b3e7 265 * This is only active in the receiver. */
d17e1dd2
WD
266int msg_list_push(int flush_it_all)
267{
268 static int written = 0;
269 struct timeval tv;
270 fd_set fds;
271
272 if (msg_fd_out < 0)
273 return -1;
274
275 while (msg_list_head) {
276 struct msg_list *ml = msg_list_head;
277 int n = write(msg_fd_out, ml->buf + written, ml->len - written);
278 if (n < 0) {
279 if (errno == EINTR)
280 continue;
281 if (errno != EWOULDBLOCK && errno != EAGAIN)
282 return -1;
283 if (!flush_it_all)
284 return 0;
285 FD_ZERO(&fds);
286 FD_SET(msg_fd_out, &fds);
e626b29e 287 tv.tv_sec = select_timeout;
d17e1dd2
WD
288 tv.tv_usec = 0;
289 if (!select(msg_fd_out+1, NULL, &fds, NULL, &tv))
290 check_timeout();
291 } else if ((written += n) == ml->len) {
292 free(ml->buf);
293 msg_list_head = ml->next;
294 if (!msg_list_head)
295 msg_list_tail = NULL;
296 free(ml);
297 written = 0;
298 }
554e0a8d 299 }
d17e1dd2
WD
300 return 1;
301}
302
303int get_redo_num(void)
304{
305 struct redo_list *next;
306 int num;
307
308 while (!redo_list_head)
309 read_msg_fd();
554e0a8d 310
d17e1dd2
WD
311 num = redo_list_head->num;
312 next = redo_list_head->next;
313 free(redo_list_head);
314 redo_list_head = next;
315 if (!next)
316 redo_list_tail = NULL;
317
318 return num;
554e0a8d
AT
319}
320
56014c8c
WD
321/**
322 * When we're the receiver and we have a local --files-from list of names
323 * that needs to be sent over the socket to the sender, we have to do two
324 * things at the same time: send the sender a list of what files we're
325 * processing and read the incoming file+info list from the sender. We do
326 * this by augmenting the read_timeout() function to copy this data. It
327 * uses the io_filesfrom_buf to read a block of data from f_in (when it is
328 * ready, since it might be a pipe) and then blast it out f_out (when it
329 * is ready to receive more data).
330 */
331void io_set_filesfrom_fds(int f_in, int f_out)
332{
333 io_filesfrom_f_in = f_in;
334 io_filesfrom_f_out = f_out;
335 io_filesfrom_bp = io_filesfrom_buf;
336 io_filesfrom_lastchar = '\0';
337 io_filesfrom_buflen = 0;
338}
720b47f2 339
880da007
MP
340/**
341 * It's almost always an error to get an EOF when we're trying to read
342 * from the network, because the protocol is self-terminating.
343 *
344 * However, there is one unfortunate cases where it is not, which is
345 * rsync <2.4.6 sending a list of modules on a server, since the list
346 * is terminated by closing the socket. So, for the section of the
347 * program where that is a problem (start_socket_client),
348 * kludge_around_eof is True and we just exit.
349 */
1f75bb10 350static void whine_about_eof(int fd)
7a55d06e 351{
1f75bb10 352 if (kludge_around_eof && fd == sock_f_in)
3151cbae 353 exit_cleanup(0);
3151cbae 354
00bdf899
WD
355 rprintf(FERROR, RSYNC_NAME ": connection unexpectedly closed "
356 "(%.0f bytes read so far)\n",
357 (double)stats.total_read);
358
359 exit_cleanup(RERR_STREAMIO);
7a55d06e 360}
720b47f2 361
7a55d06e 362
880da007 363/**
6ed6d7f5 364 * Read from a socket with I/O timeout. return the number of bytes
c3563c46
MP
365 * read. If no bytes can be read then exit, never return a number <= 0.
366 *
8886f8d0
MP
367 * TODO: If the remote shell connection fails, then current versions
368 * actually report an "unexpected EOF" error here. Since it's a
369 * fairly common mistake to try to use rsh when ssh is required, we
370 * should trap that: if we fail to read any data at all, we should
371 * give a better explanation. We can tell whether the connection has
372 * started by looking e.g. at whether the remote version is known yet.
c3563c46 373 */
3151cbae 374static int read_timeout(int fd, char *buf, size_t len)
8d9dc9f9 375{
d62bcc17 376 int n, ret = 0;
4c36ddbe 377
d17e1dd2 378 io_flush(NORMAL_FLUSH);
ea2111d1 379
4c36ddbe 380 while (ret == 0) {
7a55d06e 381 /* until we manage to read *something* */
56014c8c 382 fd_set r_fds, w_fds;
4c36ddbe 383 struct timeval tv;
1ea087a7 384 int maxfd = fd;
a57873b7 385 int count;
4c36ddbe 386
56014c8c 387 FD_ZERO(&r_fds);
a7026ba9 388 FD_ZERO(&w_fds);
56014c8c 389 FD_SET(fd, &r_fds);
d17e1dd2
WD
390 if (msg_fd_in >= 0) {
391 FD_SET(msg_fd_in, &r_fds);
1ea087a7
WD
392 if (msg_fd_in > maxfd)
393 maxfd = msg_fd_in;
4033b80b
WD
394 } else if (msg_list_head) {
395 FD_SET(msg_fd_out, &w_fds);
1ea087a7
WD
396 if (msg_fd_out > maxfd)
397 maxfd = msg_fd_out;
56014c8c 398 }
3309507d 399 if (io_filesfrom_f_out >= 0) {
56014c8c
WD
400 int new_fd;
401 if (io_filesfrom_buflen == 0) {
3309507d 402 if (io_filesfrom_f_in >= 0) {
56014c8c
WD
403 FD_SET(io_filesfrom_f_in, &r_fds);
404 new_fd = io_filesfrom_f_in;
405 } else {
406 io_filesfrom_f_out = -1;
407 new_fd = -1;
408 }
409 } else {
56014c8c
WD
410 FD_SET(io_filesfrom_f_out, &w_fds);
411 new_fd = io_filesfrom_f_out;
412 }
1ea087a7
WD
413 if (new_fd > maxfd)
414 maxfd = new_fd;
554e0a8d
AT
415 }
416
e626b29e 417 tv.tv_sec = select_timeout;
4c36ddbe
AT
418 tv.tv_usec = 0;
419
554e0a8d
AT
420 errno = 0;
421
a7026ba9 422 count = select(maxfd + 1, &r_fds, &w_fds, NULL, &tv);
a57873b7 423
a57873b7 424 if (count <= 0) {
f89b9368 425 if (errno == EBADF)
554e0a8d 426 exit_cleanup(RERR_SOCKETIO);
bd717af8 427 check_timeout();
4c36ddbe
AT
428 continue;
429 }
430
d17e1dd2
WD
431 if (msg_fd_in >= 0 && FD_ISSET(msg_fd_in, &r_fds))
432 read_msg_fd();
4033b80b
WD
433 else if (msg_list_head && FD_ISSET(msg_fd_out, &w_fds))
434 msg_list_push(NORMAL_FLUSH);
554e0a8d 435
3309507d 436 if (io_filesfrom_f_out >= 0) {
56014c8c
WD
437 if (io_filesfrom_buflen) {
438 if (FD_ISSET(io_filesfrom_f_out, &w_fds)) {
439 int l = write(io_filesfrom_f_out,
440 io_filesfrom_bp,
441 io_filesfrom_buflen);
442 if (l > 0) {
443 if (!(io_filesfrom_buflen -= l))
444 io_filesfrom_bp = io_filesfrom_buf;
445 else
446 io_filesfrom_bp += l;
447 } else {
448 /* XXX should we complain? */
449 io_filesfrom_f_out = -1;
450 }
451 }
3309507d 452 } else if (io_filesfrom_f_in >= 0) {
56014c8c
WD
453 if (FD_ISSET(io_filesfrom_f_in, &r_fds)) {
454 int l = read(io_filesfrom_f_in,
455 io_filesfrom_buf,
456 sizeof io_filesfrom_buf);
457 if (l <= 0) {
458 /* Send end-of-file marker */
459 io_filesfrom_buf[0] = '\0';
460 io_filesfrom_buf[1] = '\0';
461 io_filesfrom_buflen = io_filesfrom_lastchar? 2 : 1;
462 io_filesfrom_f_in = -1;
463 } else {
56014c8c
WD
464 if (!eol_nulls) {
465 char *s = io_filesfrom_buf + l;
466 /* Transform CR and/or LF into '\0' */
467 while (s-- > io_filesfrom_buf) {
468 if (*s == '\n' || *s == '\r')
469 *s = '\0';
470 }
471 }
472 if (!io_filesfrom_lastchar) {
473 /* Last buf ended with a '\0', so don't
474 * let this buf start with one. */
475 while (l && !*io_filesfrom_bp)
476 io_filesfrom_bp++, l--;
477 }
478 if (!l)
479 io_filesfrom_bp = io_filesfrom_buf;
480 else {
481 char *f = io_filesfrom_bp;
482 char *t = f;
483 char *eob = f + l;
484 /* Eliminate any multi-'\0' runs. */
485 while (f != eob) {
486 if (!(*t++ = *f++)) {
487 while (f != eob && !*f)
488 f++, l--;
489 }
490 }
491 io_filesfrom_lastchar = f[-1];
492 }
493 io_filesfrom_buflen = l;
494 }
495 }
496 }
497 }
498
f89b9368
WD
499 if (!FD_ISSET(fd, &r_fds))
500 continue;
554e0a8d 501
4c36ddbe
AT
502 n = read(fd, buf, len);
503
1ea087a7
WD
504 if (n <= 0) {
505 if (n == 0)
1f75bb10 506 whine_about_eof(fd); /* Doesn't return. */
d62bcc17
WD
507 if (errno == EINTR || errno == EWOULDBLOCK
508 || errno == EAGAIN)
7a55d06e 509 continue;
1f75bb10
WD
510
511 /* Don't write errors on a dead socket. */
512 if (fd == sock_f_in)
513 io_multiplexing_close();
514 rsyserr(FERROR, errno, "read error");
515 exit_cleanup(RERR_STREAMIO);
8d9dc9f9 516 }
00bdf899
WD
517
518 buf += n;
519 len -= n;
520 ret += n;
1f75bb10
WD
521
522 if (io_timeout && fd == sock_f_in)
00bdf899 523 last_io = time(NULL);
4c36ddbe 524 }
8d9dc9f9 525
4c36ddbe
AT
526 return ret;
527}
8d9dc9f9 528
56014c8c
WD
529/**
530 * Read a line into the "fname" buffer (which must be at least MAXPATHLEN
531 * characters long).
532 */
533int read_filesfrom_line(int fd, char *fname)
534{
535 char ch, *s, *eob = fname + MAXPATHLEN - 1;
536 int cnt;
55d5937d 537 int reading_remotely = remote_filesfrom_file != NULL;
56014c8c
WD
538 int nulls = eol_nulls || reading_remotely;
539
540 start:
541 s = fname;
542 while (1) {
543 cnt = read(fd, &ch, 1);
544 if (cnt < 0 && (errno == EWOULDBLOCK
545 || errno == EINTR || errno == EAGAIN)) {
546 struct timeval tv;
547 fd_set fds;
548 FD_ZERO(&fds);
549 FD_SET(fd, &fds);
e626b29e 550 tv.tv_sec = select_timeout;
56014c8c
WD
551 tv.tv_usec = 0;
552 if (!select(fd+1, &fds, NULL, NULL, &tv))
553 check_timeout();
554 continue;
555 }
556 if (cnt != 1)
557 break;
558 if (nulls? !ch : (ch == '\r' || ch == '\n')) {
559 /* Skip empty lines if reading locally. */
560 if (!reading_remotely && s == fname)
561 continue;
562 break;
563 }
564 if (s < eob)
565 *s++ = ch;
566 }
567 *s = '\0';
7a55d06e 568
6b45fcf1
WD
569 /* Dump comments. */
570 if (*fname == '#' || *fname == ';')
56014c8c
WD
571 goto start;
572
573 return s - fname;
574}
7a55d06e
MP
575
576
1f75bb10
WD
577static char *iobuf_out;
578static int iobuf_out_cnt;
579
580void io_start_buffering_out(void)
581{
582 if (iobuf_out)
583 return;
584 if (!(iobuf_out = new_array(char, IO_BUFFER_SIZE)))
585 out_of_memory("io_start_buffering_out");
586 iobuf_out_cnt = 0;
587}
588
589
590static char *iobuf_in;
591static size_t iobuf_in_siz;
592
593void io_start_buffering_in(void)
594{
595 if (iobuf_in)
596 return;
597 iobuf_in_siz = 2 * IO_BUFFER_SIZE;
598 if (!(iobuf_in = new_array(char, iobuf_in_siz)))
599 out_of_memory("io_start_buffering_in");
600}
601
602
603void io_end_buffering(void)
604{
605 io_flush(NORMAL_FLUSH);
606 if (!io_multiplexing_out) {
607 free(iobuf_out);
608 iobuf_out = NULL;
609 }
610}
611
612
880da007
MP
613/**
614 * Continue trying to read len bytes - don't return until len has been
615 * read.
616 **/
3151cbae 617static void read_loop(int fd, char *buf, size_t len)
4c36ddbe
AT
618{
619 while (len) {
620 int n = read_timeout(fd, buf, len);
621
622 buf += n;
623 len -= n;
8d9dc9f9
AT
624 }
625}
626
7a55d06e
MP
627
628/**
629 * Read from the file descriptor handling multiplexing - return number
630 * of bytes read.
d62bcc17
WD
631 *
632 * Never returns <= 0.
7a55d06e 633 */
c399d22a 634static int readfd_unbuffered(int fd, char *buf, size_t len)
8d9dc9f9 635{
6fe25398 636 static size_t remaining;
1f75bb10 637 static size_t iobuf_in_ndx;
909ce14f 638 int tag, ret = 0;
8d9dc9f9
AT
639 char line[1024];
640
1f75bb10 641 if (!iobuf_in || fd != sock_f_in)
4c36ddbe 642 return read_timeout(fd, buf, len);
8d9dc9f9 643
76c21947 644 if (!io_multiplexing_in && remaining == 0) {
1f75bb10
WD
645 remaining = read_timeout(fd, iobuf_in, iobuf_in_siz);
646 iobuf_in_ndx = 0;
76c21947
WD
647 }
648
8d9dc9f9
AT
649 while (ret == 0) {
650 if (remaining) {
651 len = MIN(len, remaining);
1f75bb10
WD
652 memcpy(buf, iobuf_in + iobuf_in_ndx, len);
653 iobuf_in_ndx += len;
8d9dc9f9
AT
654 remaining -= len;
655 ret = len;
76c21947 656 break;
8d9dc9f9
AT
657 }
658
909ce14f 659 read_loop(fd, line, 4);
ff41a59f 660 tag = IVAL(line, 0);
679e7657 661
8d9dc9f9 662 remaining = tag & 0xFFFFFF;
d17e1dd2 663 tag = (tag >> 24) - MPLEX_BASE;
8d9dc9f9 664
d17e1dd2
WD
665 switch (tag) {
666 case MSG_DATA:
1f75bb10
WD
667 if (remaining > iobuf_in_siz) {
668 if (!(iobuf_in = realloc_array(iobuf_in, char,
669 remaining)))
c399d22a 670 out_of_memory("readfd_unbuffered");
1f75bb10 671 iobuf_in_siz = remaining;
76c21947 672 }
1f75bb10
WD
673 read_loop(fd, iobuf_in, remaining);
674 iobuf_in_ndx = 0;
d17e1dd2
WD
675 break;
676 case MSG_INFO:
677 case MSG_ERROR:
678 if (remaining >= sizeof line) {
679 rprintf(FERROR, "multiplexing overflow %d:%ld\n\n",
680 tag, (long)remaining);
681 exit_cleanup(RERR_STREAMIO);
682 }
683 read_loop(fd, line, remaining);
684 rwrite((enum logcode)tag, line, remaining);
685 remaining = 0;
686 break;
687 default:
909ce14f 688 rprintf(FERROR, "unexpected tag %d\n", tag);
65417579 689 exit_cleanup(RERR_STREAMIO);
8d9dc9f9 690 }
8d9dc9f9
AT
691 }
692
76c21947 693 if (remaining == 0)
d17e1dd2 694 io_flush(NORMAL_FLUSH);
76c21947 695
8d9dc9f9
AT
696 return ret;
697}
698
699
909ce14f 700
880da007
MP
701/**
702 * Do a buffered read from @p fd. Don't return until all @p n bytes
703 * have been read. If all @p n can't be read then exit with an
704 * error.
705 **/
3151cbae 706static void readfd(int fd, char *buffer, size_t N)
720b47f2 707{
6ba9279f 708 int ret;
d62bcc17 709 size_t total = 0;
3151cbae 710
6ba9279f 711 while (total < N) {
c399d22a 712 ret = readfd_unbuffered(fd, buffer + total, N-total);
6ba9279f 713 total += ret;
7f28dbee 714 }
1b7c47cb 715
b9f592fb
WD
716 if (fd == write_batch_monitor_in) {
717 if ((size_t)write(batch_fd, buffer, total) != total)
718 exit_cleanup(RERR_FILEIO);
719 }
1f75bb10
WD
720
721 if (fd == sock_f_in)
722 stats.total_read += total;
720b47f2
AT
723}
724
725
b7922338 726int32 read_int(int f)
720b47f2 727{
4c36ddbe 728 char b[4];
d730b113
AT
729 int32 ret;
730
4c36ddbe 731 readfd(f,b,4);
d730b113 732 ret = IVAL(b,0);
f89b9368
WD
733 if (ret == (int32)0xffffffff)
734 return -1;
d730b113 735 return ret;
720b47f2
AT
736}
737
71c46176 738int64 read_longint(int f)
3a6a366f 739{
71c46176 740 int64 ret;
3a6a366f
AT
741 char b[8];
742 ret = read_int(f);
71c46176 743
f89b9368 744 if ((int32)ret != (int32)0xffffffff)
8de330a3 745 return ret;
71c46176 746
28deecca
WD
747#ifdef INT64_IS_OFF_T
748 if (sizeof (int64) < 8) {
749 rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
750 exit_cleanup(RERR_UNSUPPORTED);
751 }
752#endif
91c4da3f
S
753 readfd(f,b,8);
754 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
71c46176 755
3a6a366f
AT
756 return ret;
757}
758
9dd891bb 759void read_buf(int f,char *buf,size_t len)
720b47f2 760{
4c36ddbe 761 readfd(f,buf,len);
720b47f2
AT
762}
763
9dd891bb 764void read_sbuf(int f,char *buf,size_t len)
575f2fca 765{
93095cbe 766 readfd(f, buf, len);
575f2fca
AT
767 buf[len] = 0;
768}
769
182dca5c
AT
770unsigned char read_byte(int f)
771{
4c36ddbe 772 unsigned char c;
93095cbe 773 readfd(f, (char *)&c, 1);
4c36ddbe 774 return c;
182dca5c 775}
720b47f2 776
880da007 777
08571358
MP
778/**
779 * Sleep after writing to limit I/O bandwidth usage.
780 *
781 * @todo Rather than sleeping after each write, it might be better to
782 * use some kind of averaging. The current algorithm seems to always
783 * use a bit less bandwidth than specified, because it doesn't make up
784 * for slow periods. But arguably this is a feature. In addition, we
785 * ought to take the time used to write the data into account.
71e58630
WD
786 *
787 * During some phases of big transfers (file FOO is uptodate) this is
788 * called with a small bytes_written every time. As the kernel has to
789 * round small waits up to guarantee that we actually wait at least the
790 * requested number of microseconds, this can become grossly inaccurate.
791 * We therefore keep track of the bytes we've written over time and only
792 * sleep when the accumulated delay is at least 1 tenth of a second.
08571358
MP
793 **/
794static void sleep_for_bwlimit(int bytes_written)
795{
71e58630
WD
796 static struct timeval prior_tv;
797 static long total_written = 0;
798 struct timeval tv, start_tv;
799 long elapsed_usec, sleep_usec;
800
801#define ONE_SEC 1000000L /* # of microseconds in a second */
08571358
MP
802
803 if (!bwlimit)
804 return;
e681e820 805
71e58630
WD
806 total_written += bytes_written;
807
808 gettimeofday(&start_tv, NULL);
809 if (prior_tv.tv_sec) {
810 elapsed_usec = (start_tv.tv_sec - prior_tv.tv_sec) * ONE_SEC
811 + (start_tv.tv_usec - prior_tv.tv_usec);
812 total_written -= elapsed_usec * bwlimit / (ONE_SEC/1024);
813 if (total_written < 0)
814 total_written = 0;
815 }
3151cbae 816
71e58630
WD
817 sleep_usec = total_written * (ONE_SEC/1024) / bwlimit;
818 if (sleep_usec < ONE_SEC / 10) {
819 prior_tv = start_tv;
820 return;
821 }
08571358 822
71e58630
WD
823 tv.tv_sec = sleep_usec / ONE_SEC;
824 tv.tv_usec = sleep_usec % ONE_SEC;
98b332ed 825 select(0, NULL, NULL, NULL, &tv);
71e58630
WD
826
827 gettimeofday(&prior_tv, NULL);
828 elapsed_usec = (prior_tv.tv_sec - start_tv.tv_sec) * ONE_SEC
829 + (prior_tv.tv_usec - start_tv.tv_usec);
830 total_written = (sleep_usec - elapsed_usec) * bwlimit / (ONE_SEC/1024);
08571358
MP
831}
832
833
1f75bb10
WD
834/* Write len bytes to the file descriptor fd, looping as necessary to get
835 * the job done and also (in the generator) reading any data on msg_fd_in
836 * (to avoid deadlock).
880da007
MP
837 *
838 * This function underlies the multiplexing system. The body of the
1f75bb10 839 * application never calls this function directly. */
9dd891bb 840static void writefd_unbuffered(int fd,char *buf,size_t len)
720b47f2 841{
1ea087a7 842 size_t n, total = 0;
8d9dc9f9 843 fd_set w_fds, r_fds;
1ea087a7 844 int maxfd, count, ret;
8d9dc9f9 845 struct timeval tv;
720b47f2 846
e44f9a12
AT
847 no_flush++;
848
4c36ddbe 849 while (total < len) {
8d9dc9f9 850 FD_ZERO(&w_fds);
8d9dc9f9 851 FD_SET(fd,&w_fds);
1ea087a7 852 maxfd = fd;
4c36ddbe 853
d17e1dd2 854 if (msg_fd_in >= 0) {
56014c8c 855 FD_ZERO(&r_fds);
d17e1dd2 856 FD_SET(msg_fd_in,&r_fds);
1ea087a7
WD
857 if (msg_fd_in > maxfd)
858 maxfd = msg_fd_in;
8d9dc9f9
AT
859 }
860
e626b29e 861 tv.tv_sec = select_timeout;
8d9dc9f9 862 tv.tv_usec = 0;
4c36ddbe 863
554e0a8d 864 errno = 0;
1ea087a7 865 count = select(maxfd + 1, msg_fd_in >= 0 ? &r_fds : NULL,
d17e1dd2 866 &w_fds, NULL, &tv);
4c36ddbe
AT
867
868 if (count <= 0) {
1ea087a7 869 if (count < 0 && errno == EBADF)
554e0a8d 870 exit_cleanup(RERR_SOCKETIO);
bd717af8 871 check_timeout();
8d9dc9f9
AT
872 continue;
873 }
4c36ddbe 874
d17e1dd2
WD
875 if (msg_fd_in >= 0 && FD_ISSET(msg_fd_in, &r_fds))
876 read_msg_fd();
554e0a8d 877
1ea087a7
WD
878 if (!FD_ISSET(fd, &w_fds))
879 continue;
4c36ddbe 880
1ea087a7
WD
881 n = len - total;
882 if (bwlimit && n > bwlimit_writemax)
883 n = bwlimit_writemax;
884 ret = write(fd, buf + total, n);
885
886 if (ret <= 0) {
3309507d
WD
887 if (ret < 0) {
888 if (errno == EINTR)
889 continue;
890 if (errno == EWOULDBLOCK || errno == EAGAIN) {
891 msleep(1);
892 continue;
893 }
f0359dd0
AT
894 }
895
1ea087a7 896 /* Don't try to write errors back across the stream. */
1f75bb10
WD
897 if (fd == sock_f_out)
898 io_multiplexing_close();
1ea087a7
WD
899 rsyserr(FERROR, errno,
900 "writefd_unbuffered failed to write %ld bytes: phase \"%s\"",
901 (long)len, io_write_phase);
902 exit_cleanup(RERR_STREAMIO);
903 }
4c36ddbe 904
1ea087a7 905 total += ret;
a800434a 906
1f75bb10
WD
907 if (fd == sock_f_out) {
908 if (io_timeout)
909 last_io = time(NULL);
910 sleep_for_bwlimit(ret);
911 }
4c36ddbe 912 }
e44f9a12
AT
913
914 no_flush--;
720b47f2
AT
915}
916
8d9dc9f9 917
880da007
MP
918/**
919 * Write an message to a multiplexed stream. If this fails then rsync
920 * exits.
921 **/
1f75bb10 922static void mplex_write(enum msgcode code, char *buf, size_t len)
ff41a59f
AT
923{
924 char buffer[4096];
06ce139f 925 size_t n = len;
8d9dc9f9 926
ff41a59f
AT
927 SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
928
c399d22a 929 if (n > sizeof buffer - 4)
3151cbae 930 n = sizeof buffer - 4;
ff41a59f
AT
931
932 memcpy(&buffer[4], buf, n);
1f75bb10 933 writefd_unbuffered(sock_f_out, buffer, n+4);
ff41a59f
AT
934
935 len -= n;
936 buf += n;
937
1ea087a7 938 if (len)
1f75bb10 939 writefd_unbuffered(sock_f_out, buf, len);
d6dead6b
AT
940}
941
ff41a59f 942
d17e1dd2 943void io_flush(int flush_it_all)
d6dead6b 944{
d17e1dd2 945 msg_list_push(flush_it_all);
90ba34e2 946
1f75bb10 947 if (!iobuf_out_cnt || no_flush)
d17e1dd2 948 return;
8d9dc9f9 949
d17e1dd2 950 if (io_multiplexing_out)
1f75bb10 951 mplex_write(MSG_DATA, iobuf_out, iobuf_out_cnt);
d17e1dd2 952 else
1f75bb10
WD
953 writefd_unbuffered(sock_f_out, iobuf_out, iobuf_out_cnt);
954 iobuf_out_cnt = 0;
8d9dc9f9
AT
955}
956
0ba48136 957
9dd891bb 958static void writefd(int fd,char *buf,size_t len)
d6dead6b 959{
4033b80b
WD
960 if (fd == msg_fd_out) {
961 rprintf(FERROR, "Internal error: wrong write used in receiver.\n");
962 exit_cleanup(RERR_PROTOCOL);
963 }
90ba34e2 964
1f75bb10
WD
965 if (fd == sock_f_out)
966 stats.total_written += len;
967
b9f592fb
WD
968 if (fd == write_batch_monitor_out) {
969 if ((size_t)write(batch_fd, buf, len) != len)
970 exit_cleanup(RERR_FILEIO);
971 }
972
1f75bb10 973 if (!iobuf_out || fd != sock_f_out) {
4c36ddbe
AT
974 writefd_unbuffered(fd, buf, len);
975 return;
976 }
d6dead6b
AT
977
978 while (len) {
1f75bb10 979 int n = MIN((int)len, IO_BUFFER_SIZE - iobuf_out_cnt);
d6dead6b 980 if (n > 0) {
1f75bb10 981 memcpy(iobuf_out+iobuf_out_cnt, buf, n);
d6dead6b
AT
982 buf += n;
983 len -= n;
1f75bb10 984 iobuf_out_cnt += n;
d6dead6b 985 }
3151cbae 986
1f75bb10 987 if (iobuf_out_cnt == IO_BUFFER_SIZE)
d17e1dd2 988 io_flush(NORMAL_FLUSH);
d6dead6b 989 }
d6dead6b 990}
720b47f2
AT
991
992
b7922338 993void write_int(int f,int32 x)
720b47f2 994{
8d9dc9f9
AT
995 char b[4];
996 SIVAL(b,0,x);
4c36ddbe 997 writefd(f,b,4);
720b47f2
AT
998}
999
7a24c346 1000
805edf9d
MP
1001void write_int_named(int f, int32 x, const char *phase)
1002{
1003 io_write_phase = phase;
1004 write_int(f, x);
1005 io_write_phase = phase_unknown;
1006}
1007
1008
7a24c346
MP
1009/*
1010 * Note: int64 may actually be a 32-bit type if ./configure couldn't find any
1011 * 64-bit types on this platform.
1012 */
71c46176 1013void write_longint(int f, int64 x)
3a6a366f 1014{
3a6a366f 1015 char b[8];
3a6a366f 1016
91c4da3f 1017 if (x <= 0x7FFFFFFF) {
3a6a366f
AT
1018 write_int(f, (int)x);
1019 return;
1020 }
1021
28deecca
WD
1022#ifdef INT64_IS_OFF_T
1023 if (sizeof (int64) < 8) {
1024 rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
1025 exit_cleanup(RERR_UNSUPPORTED);
1026 }
1027#endif
1028
8de330a3 1029 write_int(f, (int32)0xFFFFFFFF);
3a6a366f
AT
1030 SIVAL(b,0,(x&0xFFFFFFFF));
1031 SIVAL(b,4,((x>>32)&0xFFFFFFFF));
1032
4c36ddbe 1033 writefd(f,b,8);
3a6a366f
AT
1034}
1035
9dd891bb 1036void write_buf(int f,char *buf,size_t len)
720b47f2 1037{
4c36ddbe 1038 writefd(f,buf,len);
720b47f2
AT
1039}
1040
880da007 1041/** Write a string to the connection */
cf338ab1 1042void write_sbuf(int f, char *buf)
f0fca04e 1043{
93095cbe 1044 writefd(f, buf, strlen(buf));
f0fca04e
AT
1045}
1046
182dca5c
AT
1047void write_byte(int f,unsigned char c)
1048{
93095cbe 1049 writefd(f, (char *)&c, 1);
182dca5c
AT
1050}
1051
7a55d06e
MP
1052
1053
914cc65c 1054/**
6ed6d7f5
WD
1055 * Read a line of up to @p maxlen characters into @p buf (not counting
1056 * the trailing null). Strips the (required) trailing newline and all
1057 * carriage returns.
914cc65c 1058 *
6ed6d7f5 1059 * @return 1 for success; 0 for I/O error or truncation.
914cc65c 1060 **/
9dd891bb 1061int read_line(int f, char *buf, size_t maxlen)
f0fca04e
AT
1062{
1063 while (maxlen) {
528bfcd7 1064 buf[0] = 0;
f0fca04e 1065 read_buf(f, buf, 1);
914cc65c
MP
1066 if (buf[0] == 0)
1067 return 0;
6ed6d7f5 1068 if (buf[0] == '\n')
f0fca04e 1069 break;
f0fca04e
AT
1070 if (buf[0] != '\r') {
1071 buf++;
1072 maxlen--;
1073 }
1074 }
6ed6d7f5
WD
1075 *buf = '\0';
1076 return maxlen > 0;
f0fca04e
AT
1077}
1078
1079
1080void io_printf(int fd, const char *format, ...)
1081{
d62bcc17 1082 va_list ap;
f0fca04e
AT
1083 char buf[1024];
1084 int len;
3151cbae 1085
f0fca04e 1086 va_start(ap, format);
3151cbae 1087 len = vsnprintf(buf, sizeof buf, format, ap);
f0fca04e
AT
1088 va_end(ap);
1089
f89b9368
WD
1090 if (len < 0)
1091 exit_cleanup(RERR_STREAMIO);
f0fca04e
AT
1092
1093 write_sbuf(fd, buf);
1094}
8d9dc9f9
AT
1095
1096
d17e1dd2 1097/** Setup for multiplexing a MSG_* stream with the data stream. */
1f75bb10 1098void io_start_multiplex_out(void)
8d9dc9f9 1099{
d17e1dd2 1100 io_flush(NORMAL_FLUSH);
1f75bb10 1101 io_start_buffering_out();
8d9dc9f9
AT
1102 io_multiplexing_out = 1;
1103}
1104
d17e1dd2 1105/** Setup for multiplexing a MSG_* stream with the data stream. */
1f75bb10 1106void io_start_multiplex_in(void)
8d9dc9f9 1107{
d17e1dd2 1108 io_flush(NORMAL_FLUSH);
1f75bb10 1109 io_start_buffering_in();
8d9dc9f9
AT
1110 io_multiplexing_in = 1;
1111}
1112
d17e1dd2
WD
1113/** Write an message to the multiplexed data stream. */
1114int io_multiplex_write(enum msgcode code, char *buf, size_t len)
8d9dc9f9 1115{
f89b9368
WD
1116 if (!io_multiplexing_out)
1117 return 0;
8d9dc9f9 1118
d17e1dd2 1119 io_flush(NORMAL_FLUSH);
1b7c47cb 1120 stats.total_written += (len+4);
1f75bb10 1121 mplex_write(code, buf, len);
8d9dc9f9
AT
1122 return 1;
1123}
1124
d17e1dd2 1125/** Stop output multiplexing. */
554e0a8d
AT
1126void io_multiplexing_close(void)
1127{
1128 io_multiplexing_out = 0;
1129}
1130
b9f592fb
WD
1131void start_write_batch(int fd)
1132{
741d6544
WD
1133 write_stream_flags(batch_fd);
1134
b9f592fb
WD
1135 /* Some communication has already taken place, but we don't
1136 * enable batch writing until here so that we can write a
1137 * canonical record of the communication even though the
1138 * actual communication so far depends on whether a daemon
1139 * is involved. */
1140 write_int(batch_fd, protocol_version);
1141 write_int(batch_fd, checksum_seed);
b9f592fb
WD
1142
1143 if (am_sender)
1144 write_batch_monitor_out = fd;
1145 else
1146 write_batch_monitor_in = fd;
1147}
1148
1149void stop_write_batch(void)
1150{
1151 write_batch_monitor_out = -1;
1152 write_batch_monitor_in = -1;
1153}