Turn an FSOCKERR into a normal FERROR if it's not going to our
[rsync/rsync.git] / io.c
CommitLineData
7a24c346 1/* -*- c-file-style: "linux" -*-
d62bcc17
WD
2 *
3 * Copyright (C) 1996-2001 by Andrew Tridgell
880da007
MP
4 * Copyright (C) Paul Mackerras 1996
5 * Copyright (C) 2001, 2002 by Martin Pool <mbp@samba.org>
d62bcc17 6 *
880da007
MP
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
d62bcc17 11 *
880da007
MP
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
d62bcc17 16 *
880da007
MP
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 */
720b47f2 21
87ee2481 22/**
87ee2481
MP
23 * @file io.c
24 *
6ed6d7f5 25 * Socket and pipe I/O utilities used in rsync.
87ee2481
MP
26 *
27 * rsync provides its own multiplexing system, which is used to send
28 * stderr and stdout over a single socket. We need this because
29 * stdout normally carries the binary data stream, and stderr all our
30 * error messages.
31 *
32 * For historical reasons this is off during the start of the
33 * connection, but it's switched on quite early using
34 * io_start_multiplex_out() and io_start_multiplex_in().
35 **/
720b47f2 36
720b47f2
AT
37#include "rsync.h"
38
880da007 39/** If no timeout is specified then use a 60 second select timeout */
8cd9fd4e
AT
40#define SELECT_TIMEOUT 60
41
7a55d06e 42extern int bwlimit;
71e58630 43extern size_t bwlimit_writemax;
720b47f2 44extern int verbose;
6ba9279f 45extern int io_timeout;
cdf236aa 46extern int allowed_lull;
d17e1dd2
WD
47extern int am_server;
48extern int am_daemon;
49extern int am_sender;
98f8c9a5 50extern int am_generator;
e626b29e 51extern int eol_nulls;
3b0a30eb 52extern int read_batch;
188fed95 53extern int csum_length;
b9f592fb
WD
54extern int checksum_seed;
55extern int protocol_version;
cdf236aa
WD
56extern int remove_sent_files;
57extern int preserve_hard_links;
d19320fd 58extern char *filesfrom_host;
a800434a 59extern struct stats stats;
cdf236aa 60extern struct file_list *the_file_list;
720b47f2 61
a86179f4 62const char phase_unknown[] = "unknown";
9e2409ab 63int ignore_timeout = 0;
b9f592fb 64int batch_fd = -1;
b0ad5429 65int batch_gen_fd = -1;
805edf9d 66
98b332ed
MP
67/**
68 * The connection might be dropped at some point; perhaps because the
69 * remote instance crashed. Just giving the offset on the stream is
70 * not very helpful. So instead we try to make io_phase_name point to
71 * something useful.
eca2adb4 72 *
6ed6d7f5 73 * For buffered/multiplexed I/O these names will be somewhat
805edf9d 74 * approximate; perhaps for ease of support we would rather make the
6ed6d7f5 75 * buffer always flush when a single application-level I/O finishes.
805edf9d 76 *
eca2adb4
MP
77 * @todo Perhaps we want some simple stack functionality, but there's
78 * no need to overdo it.
98b332ed 79 **/
805edf9d
MP
80const char *io_write_phase = phase_unknown;
81const char *io_read_phase = phase_unknown;
98b332ed 82
cb2e1f18 83/* Ignore an EOF error if non-zero. See whine_about_eof(). */
574c2500 84int kluge_around_eof = 0;
7a55d06e 85
d17e1dd2
WD
86int msg_fd_in = -1;
87int msg_fd_out = -1;
cdf236aa
WD
88int sock_f_in = -1;
89int sock_f_out = -1;
7a55d06e 90
1f75bb10
WD
91static int io_multiplexing_out;
92static int io_multiplexing_in;
3b0a30eb
WD
93static time_t last_io_in;
94static time_t last_io_out;
1f75bb10
WD
95static int no_flush;
96
b9f592fb
WD
97static int write_batch_monitor_in = -1;
98static int write_batch_monitor_out = -1;
99
56014c8c
WD
100static int io_filesfrom_f_in = -1;
101static int io_filesfrom_f_out = -1;
102static char io_filesfrom_buf[2048];
103static char *io_filesfrom_bp;
104static char io_filesfrom_lastchar;
105static int io_filesfrom_buflen;
af436313 106static size_t contiguous_write_len = 0;
3b0a30eb 107static int select_timeout = SELECT_TIMEOUT;
720b47f2 108
9dd891bb 109static void read_loop(int fd, char *buf, size_t len);
ff41a59f 110
c6816b94
WD
111struct flist_ndx_item {
112 struct flist_ndx_item *next;
113 int ndx;
d17e1dd2
WD
114};
115
c6816b94
WD
116struct flist_ndx_list {
117 struct flist_ndx_item *head, *tail;
118};
119
cdf236aa 120static struct flist_ndx_list redo_list, hlink_list;
d17e1dd2 121
08c88178
WD
122struct msg_list_item {
123 struct msg_list_item *next;
d17e1dd2
WD
124 char *buf;
125 int len;
126};
127
08c88178
WD
128struct msg_list {
129 struct msg_list_item *head, *tail;
130};
131
132static struct msg_list msg_list;
d17e1dd2 133
c6816b94 134static void flist_ndx_push(struct flist_ndx_list *lp, int ndx)
d17e1dd2 135{
c6816b94
WD
136 struct flist_ndx_item *item;
137
138 if (!(item = new(struct flist_ndx_item)))
139 out_of_memory("flist_ndx_push");
140 item->next = NULL;
141 item->ndx = ndx;
142 if (lp->tail)
143 lp->tail->next = item;
d17e1dd2 144 else
c6816b94
WD
145 lp->head = item;
146 lp->tail = item;
147}
148
149static int flist_ndx_pop(struct flist_ndx_list *lp)
150{
151 struct flist_ndx_item *next;
152 int ndx;
153
154 if (!lp->head)
155 return -1;
156
157 ndx = lp->head->ndx;
158 next = lp->head->next;
159 free(lp->head);
160 lp->head = next;
161 if (!next)
162 lp->tail = NULL;
163
164 return ndx;
d17e1dd2
WD
165}
166
8d9dc9f9
AT
167static void check_timeout(void)
168{
169 time_t t;
90ba34e2 170
9e2409ab 171 if (!io_timeout || ignore_timeout)
d17e1dd2 172 return;
8d9dc9f9 173
3b0a30eb
WD
174 if (!last_io_in) {
175 last_io_in = time(NULL);
8d9dc9f9
AT
176 return;
177 }
178
179 t = time(NULL);
180
3b0a30eb 181 if (t - last_io_in >= io_timeout) {
0adb99b9 182 if (!am_server && !am_daemon) {
4ccfd96c 183 rprintf(FERROR, "io timeout after %d seconds -- exiting\n",
3b0a30eb 184 (int)(t-last_io_in));
0adb99b9 185 }
65417579 186 exit_cleanup(RERR_TIMEOUT);
8d9dc9f9
AT
187 }
188}
189
1f75bb10
WD
190/* Note the fds used for the main socket (which might really be a pipe
191 * for a local transfer, but we can ignore that). */
192void io_set_sock_fds(int f_in, int f_out)
193{
194 sock_f_in = f_in;
195 sock_f_out = f_out;
196}
197
3b0a30eb
WD
198void set_io_timeout(int secs)
199{
200 io_timeout = secs;
201
202 if (!io_timeout || io_timeout > SELECT_TIMEOUT)
203 select_timeout = SELECT_TIMEOUT;
204 else
205 select_timeout = io_timeout;
206
207 allowed_lull = read_batch ? 0 : (io_timeout + 1) / 2;
208}
209
98f8c9a5
WD
210/* Setup the fd used to receive MSG_* messages. Only needed during the
211 * early stages of being a local sender (up through the sending of the
212 * file list) or when we're the generator (to fetch the messages from
213 * the receiver). */
d17e1dd2
WD
214void set_msg_fd_in(int fd)
215{
216 msg_fd_in = fd;
217}
218
98f8c9a5
WD
219/* Setup the fd used to send our MSG_* messages. Only needed when
220 * we're the receiver (to send our messages to the generator). */
d17e1dd2
WD
221void set_msg_fd_out(int fd)
222{
223 msg_fd_out = fd;
224 set_nonblocking(msg_fd_out);
225}
226
227/* Add a message to the pending MSG_* list. */
228static void msg_list_add(int code, char *buf, int len)
554e0a8d 229{
08c88178 230 struct msg_list_item *ml;
d17e1dd2 231
08c88178 232 if (!(ml = new(struct msg_list_item)))
c6816b94 233 out_of_memory("msg_list_add");
d17e1dd2
WD
234 ml->next = NULL;
235 if (!(ml->buf = new_array(char, len+4)))
c6816b94 236 out_of_memory("msg_list_add");
d17e1dd2
WD
237 SIVAL(ml->buf, 0, ((code+MPLEX_BASE)<<24) | len);
238 memcpy(ml->buf+4, buf, len);
239 ml->len = len+4;
08c88178
WD
240 if (msg_list.tail)
241 msg_list.tail->next = ml;
d17e1dd2 242 else
08c88178
WD
243 msg_list.head = ml;
244 msg_list.tail = ml;
554e0a8d
AT
245}
246
98f8c9a5
WD
247/* Read a message from the MSG_* fd and handle it. This is called either
248 * during the early stages of being a local sender (up through the sending
249 * of the file list) or when we're the generator (to fetch the messages
250 * from the receiver). */
d17e1dd2 251static void read_msg_fd(void)
554e0a8d 252{
f9c6b3e7 253 char buf[2048];
06ce139f 254 size_t n;
d17e1dd2 255 int fd = msg_fd_in;
ff41a59f
AT
256 int tag, len;
257
00bdf899 258 /* Temporarily disable msg_fd_in. This is needed to avoid looping back
af436313 259 * to this routine from writefd_unbuffered(). */
d17e1dd2 260 msg_fd_in = -1;
554e0a8d 261
ff41a59f
AT
262 read_loop(fd, buf, 4);
263 tag = IVAL(buf, 0);
264
265 len = tag & 0xFFFFFF;
d17e1dd2 266 tag = (tag >> 24) - MPLEX_BASE;
ff41a59f 267
d17e1dd2
WD
268 switch (tag) {
269 case MSG_DONE:
98f8c9a5 270 if (len != 0 || !am_generator) {
13c7bcbb 271 rprintf(FERROR, "invalid message %d:%d\n", tag, len);
d17e1dd2 272 exit_cleanup(RERR_STREAMIO);
13c7bcbb 273 }
c6816b94 274 flist_ndx_push(&redo_list, -1);
d17e1dd2
WD
275 break;
276 case MSG_REDO:
98f8c9a5 277 if (len != 4 || !am_generator) {
13c7bcbb 278 rprintf(FERROR, "invalid message %d:%d\n", tag, len);
d17e1dd2 279 exit_cleanup(RERR_STREAMIO);
13c7bcbb 280 }
d17e1dd2 281 read_loop(fd, buf, 4);
c6816b94 282 flist_ndx_push(&redo_list, IVAL(buf,0));
d17e1dd2 283 break;
0d67e00a
WD
284 case MSG_DELETED:
285 if (len >= (int)sizeof buf || !am_generator) {
286 rprintf(FERROR, "invalid message %d:%d\n", tag, len);
287 exit_cleanup(RERR_STREAMIO);
288 }
289 read_loop(fd, buf, len);
290 io_multiplex_write(MSG_DELETED, buf, len);
291 break;
9981c27e
WD
292 case MSG_SUCCESS:
293 if (len != 4 || !am_generator) {
294 rprintf(FERROR, "invalid message %d:%d\n", tag, len);
295 exit_cleanup(RERR_STREAMIO);
296 }
297 read_loop(fd, buf, len);
cdf236aa
WD
298 if (remove_sent_files)
299 io_multiplex_write(MSG_SUCCESS, buf, len);
300 if (preserve_hard_links)
301 flist_ndx_push(&hlink_list, IVAL(buf,0));
9981c27e 302 break;
d17e1dd2
WD
303 case MSG_INFO:
304 case MSG_ERROR:
305 case MSG_LOG:
306 while (len) {
307 n = len;
308 if (n >= sizeof buf)
309 n = sizeof buf - 1;
310 read_loop(fd, buf, n);
311 rwrite((enum logcode)tag, buf, n);
312 len -= n;
313 }
314 break;
315 default:
13c7bcbb 316 rprintf(FERROR, "unknown message %d:%d\n", tag, len);
d17e1dd2
WD
317 exit_cleanup(RERR_STREAMIO);
318 }
319
320 msg_fd_in = fd;
321}
322
323/* Try to push messages off the list onto the wire. If we leave with more
324 * to do, return 0. On error, return -1. If everything flushed, return 1.
f9c6b3e7 325 * This is only active in the receiver. */
37f35d89 326static int msg_list_flush(int flush_it_all)
d17e1dd2
WD
327{
328 static int written = 0;
329 struct timeval tv;
330 fd_set fds;
331
332 if (msg_fd_out < 0)
333 return -1;
334
08c88178
WD
335 while (msg_list.head) {
336 struct msg_list_item *ml = msg_list.head;
d17e1dd2
WD
337 int n = write(msg_fd_out, ml->buf + written, ml->len - written);
338 if (n < 0) {
339 if (errno == EINTR)
340 continue;
341 if (errno != EWOULDBLOCK && errno != EAGAIN)
342 return -1;
343 if (!flush_it_all)
344 return 0;
345 FD_ZERO(&fds);
346 FD_SET(msg_fd_out, &fds);
e626b29e 347 tv.tv_sec = select_timeout;
d17e1dd2
WD
348 tv.tv_usec = 0;
349 if (!select(msg_fd_out+1, NULL, &fds, NULL, &tv))
350 check_timeout();
351 } else if ((written += n) == ml->len) {
352 free(ml->buf);
08c88178
WD
353 msg_list.head = ml->next;
354 if (!msg_list.head)
355 msg_list.tail = NULL;
d17e1dd2
WD
356 free(ml);
357 written = 0;
358 }
554e0a8d 359 }
d17e1dd2
WD
360 return 1;
361}
362
37f35d89
WD
363void send_msg(enum msgcode code, char *buf, int len)
364{
365 if (msg_fd_out < 0) {
366 io_multiplex_write(code, buf, len);
367 return;
368 }
369 msg_list_add(code, buf, len);
370 msg_list_flush(NORMAL_FLUSH);
371}
372
cdf236aa 373int get_redo_num(int itemizing, enum logcode code)
d17e1dd2 374{
cdf236aa
WD
375 while (1) {
376 if (hlink_list.head)
377 check_for_finished_hlinks(itemizing, code);
378 if (redo_list.head)
379 break;
d17e1dd2 380 read_msg_fd();
c6816b94 381 }
554e0a8d 382
c6816b94 383 return flist_ndx_pop(&redo_list);
554e0a8d
AT
384}
385
cdf236aa
WD
386int get_hlink_num(void)
387{
388 return flist_ndx_pop(&hlink_list);
389}
390
56014c8c
WD
391/**
392 * When we're the receiver and we have a local --files-from list of names
393 * that needs to be sent over the socket to the sender, we have to do two
394 * things at the same time: send the sender a list of what files we're
395 * processing and read the incoming file+info list from the sender. We do
396 * this by augmenting the read_timeout() function to copy this data. It
397 * uses the io_filesfrom_buf to read a block of data from f_in (when it is
398 * ready, since it might be a pipe) and then blast it out f_out (when it
399 * is ready to receive more data).
400 */
401void io_set_filesfrom_fds(int f_in, int f_out)
402{
403 io_filesfrom_f_in = f_in;
404 io_filesfrom_f_out = f_out;
405 io_filesfrom_bp = io_filesfrom_buf;
406 io_filesfrom_lastchar = '\0';
407 io_filesfrom_buflen = 0;
408}
720b47f2 409
cb2e1f18
WD
410/* It's almost always an error to get an EOF when we're trying to read from the
411 * network, because the protocol is (for the most part) self-terminating.
880da007 412 *
cb2e1f18
WD
413 * There is one case for the receiver when it is at the end of the transfer
414 * (hanging around reading any keep-alive packets that might come its way): if
415 * the sender dies before the generator's kill-signal comes through, we can end
416 * up here needing to loop until the kill-signal arrives. In this situation,
417 * kluge_around_eof will be < 0.
418 *
419 * There is another case for older protocol versions (< 24) where the module
420 * listing was not terminated, so we must ignore an EOF error in that case and
421 * exit. In this situation, kluge_around_eof will be > 0. */
1f75bb10 422static void whine_about_eof(int fd)
7a55d06e 423{
574c2500 424 if (kluge_around_eof && fd == sock_f_in) {
55bb7fff 425 int i;
574c2500
WD
426 if (kluge_around_eof > 0)
427 exit_cleanup(0);
55bb7fff
WD
428 /* If we're still here after 10 seconds, exit with an error. */
429 for (i = 10*1000/20; i--; )
574c2500
WD
430 msleep(20);
431 }
3151cbae 432
00bdf899 433 rprintf(FERROR, RSYNC_NAME ": connection unexpectedly closed "
dca68b0a
WD
434 "(%.0f bytes received so far) [%s]\n",
435 (double)stats.total_read, who_am_i());
00bdf899
WD
436
437 exit_cleanup(RERR_STREAMIO);
7a55d06e 438}
720b47f2 439
880da007 440/**
6ed6d7f5 441 * Read from a socket with I/O timeout. return the number of bytes
c3563c46
MP
442 * read. If no bytes can be read then exit, never return a number <= 0.
443 *
8886f8d0
MP
444 * TODO: If the remote shell connection fails, then current versions
445 * actually report an "unexpected EOF" error here. Since it's a
446 * fairly common mistake to try to use rsh when ssh is required, we
447 * should trap that: if we fail to read any data at all, we should
448 * give a better explanation. We can tell whether the connection has
449 * started by looking e.g. at whether the remote version is known yet.
c3563c46 450 */
3151cbae 451static int read_timeout(int fd, char *buf, size_t len)
8d9dc9f9 452{
d8aeda1e 453 int n, cnt = 0;
4c36ddbe 454
d17e1dd2 455 io_flush(NORMAL_FLUSH);
ea2111d1 456
d8aeda1e 457 while (cnt == 0) {
7a55d06e 458 /* until we manage to read *something* */
56014c8c 459 fd_set r_fds, w_fds;
4c36ddbe 460 struct timeval tv;
1ea087a7 461 int maxfd = fd;
a57873b7 462 int count;
4c36ddbe 463
56014c8c 464 FD_ZERO(&r_fds);
a7026ba9 465 FD_ZERO(&w_fds);
56014c8c 466 FD_SET(fd, &r_fds);
08c88178 467 if (msg_list.head) {
4033b80b 468 FD_SET(msg_fd_out, &w_fds);
1ea087a7
WD
469 if (msg_fd_out > maxfd)
470 maxfd = msg_fd_out;
56014c8c 471 }
3309507d 472 if (io_filesfrom_f_out >= 0) {
56014c8c
WD
473 int new_fd;
474 if (io_filesfrom_buflen == 0) {
3309507d 475 if (io_filesfrom_f_in >= 0) {
56014c8c
WD
476 FD_SET(io_filesfrom_f_in, &r_fds);
477 new_fd = io_filesfrom_f_in;
478 } else {
479 io_filesfrom_f_out = -1;
480 new_fd = -1;
481 }
482 } else {
56014c8c
WD
483 FD_SET(io_filesfrom_f_out, &w_fds);
484 new_fd = io_filesfrom_f_out;
485 }
1ea087a7
WD
486 if (new_fd > maxfd)
487 maxfd = new_fd;
554e0a8d
AT
488 }
489
e626b29e 490 tv.tv_sec = select_timeout;
4c36ddbe
AT
491 tv.tv_usec = 0;
492
554e0a8d
AT
493 errno = 0;
494
a7026ba9 495 count = select(maxfd + 1, &r_fds, &w_fds, NULL, &tv);
a57873b7 496
a57873b7 497 if (count <= 0) {
f89b9368 498 if (errno == EBADF)
554e0a8d 499 exit_cleanup(RERR_SOCKETIO);
bd717af8 500 check_timeout();
4c36ddbe
AT
501 continue;
502 }
503
08c88178 504 if (msg_list.head && FD_ISSET(msg_fd_out, &w_fds))
37f35d89 505 msg_list_flush(NORMAL_FLUSH);
554e0a8d 506
3309507d 507 if (io_filesfrom_f_out >= 0) {
56014c8c
WD
508 if (io_filesfrom_buflen) {
509 if (FD_ISSET(io_filesfrom_f_out, &w_fds)) {
510 int l = write(io_filesfrom_f_out,
511 io_filesfrom_bp,
512 io_filesfrom_buflen);
513 if (l > 0) {
514 if (!(io_filesfrom_buflen -= l))
515 io_filesfrom_bp = io_filesfrom_buf;
516 else
517 io_filesfrom_bp += l;
518 } else {
519 /* XXX should we complain? */
520 io_filesfrom_f_out = -1;
521 }
522 }
3309507d 523 } else if (io_filesfrom_f_in >= 0) {
56014c8c
WD
524 if (FD_ISSET(io_filesfrom_f_in, &r_fds)) {
525 int l = read(io_filesfrom_f_in,
526 io_filesfrom_buf,
527 sizeof io_filesfrom_buf);
528 if (l <= 0) {
529 /* Send end-of-file marker */
530 io_filesfrom_buf[0] = '\0';
531 io_filesfrom_buf[1] = '\0';
532 io_filesfrom_buflen = io_filesfrom_lastchar? 2 : 1;
533 io_filesfrom_f_in = -1;
534 } else {
56014c8c
WD
535 if (!eol_nulls) {
536 char *s = io_filesfrom_buf + l;
537 /* Transform CR and/or LF into '\0' */
538 while (s-- > io_filesfrom_buf) {
539 if (*s == '\n' || *s == '\r')
540 *s = '\0';
541 }
542 }
543 if (!io_filesfrom_lastchar) {
544 /* Last buf ended with a '\0', so don't
545 * let this buf start with one. */
546 while (l && !*io_filesfrom_bp)
547 io_filesfrom_bp++, l--;
548 }
549 if (!l)
550 io_filesfrom_bp = io_filesfrom_buf;
551 else {
552 char *f = io_filesfrom_bp;
553 char *t = f;
554 char *eob = f + l;
555 /* Eliminate any multi-'\0' runs. */
556 while (f != eob) {
557 if (!(*t++ = *f++)) {
558 while (f != eob && !*f)
559 f++, l--;
560 }
561 }
562 io_filesfrom_lastchar = f[-1];
563 }
564 io_filesfrom_buflen = l;
565 }
566 }
567 }
568 }
569
f89b9368
WD
570 if (!FD_ISSET(fd, &r_fds))
571 continue;
554e0a8d 572
4c36ddbe
AT
573 n = read(fd, buf, len);
574
1ea087a7
WD
575 if (n <= 0) {
576 if (n == 0)
1f75bb10 577 whine_about_eof(fd); /* Doesn't return. */
d62bcc17
WD
578 if (errno == EINTR || errno == EWOULDBLOCK
579 || errno == EAGAIN)
7a55d06e 580 continue;
1f75bb10
WD
581
582 /* Don't write errors on a dead socket. */
583 if (fd == sock_f_in)
7f459268 584 close_multiplexing_out();
1f75bb10
WD
585 rsyserr(FERROR, errno, "read error");
586 exit_cleanup(RERR_STREAMIO);
8d9dc9f9 587 }
00bdf899
WD
588
589 buf += n;
590 len -= n;
d8aeda1e 591 cnt += n;
1f75bb10 592
3b0a30eb
WD
593 if (fd == sock_f_in && io_timeout)
594 last_io_in = time(NULL);
4c36ddbe 595 }
8d9dc9f9 596
d8aeda1e 597 return cnt;
4c36ddbe 598}
8d9dc9f9 599
56014c8c
WD
600/**
601 * Read a line into the "fname" buffer (which must be at least MAXPATHLEN
602 * characters long).
603 */
604int read_filesfrom_line(int fd, char *fname)
605{
606 char ch, *s, *eob = fname + MAXPATHLEN - 1;
607 int cnt;
d19320fd 608 int reading_remotely = filesfrom_host != NULL;
56014c8c
WD
609 int nulls = eol_nulls || reading_remotely;
610
611 start:
612 s = fname;
613 while (1) {
614 cnt = read(fd, &ch, 1);
615 if (cnt < 0 && (errno == EWOULDBLOCK
616 || errno == EINTR || errno == EAGAIN)) {
617 struct timeval tv;
618 fd_set fds;
619 FD_ZERO(&fds);
620 FD_SET(fd, &fds);
e626b29e 621 tv.tv_sec = select_timeout;
56014c8c
WD
622 tv.tv_usec = 0;
623 if (!select(fd+1, &fds, NULL, NULL, &tv))
624 check_timeout();
625 continue;
626 }
627 if (cnt != 1)
628 break;
629 if (nulls? !ch : (ch == '\r' || ch == '\n')) {
630 /* Skip empty lines if reading locally. */
631 if (!reading_remotely && s == fname)
632 continue;
633 break;
634 }
635 if (s < eob)
636 *s++ = ch;
637 }
638 *s = '\0';
7a55d06e 639
6b45fcf1
WD
640 /* Dump comments. */
641 if (*fname == '#' || *fname == ';')
56014c8c
WD
642 goto start;
643
644 return s - fname;
645}
7a55d06e 646
1f75bb10
WD
647static char *iobuf_out;
648static int iobuf_out_cnt;
649
650void io_start_buffering_out(void)
651{
652 if (iobuf_out)
653 return;
654 if (!(iobuf_out = new_array(char, IO_BUFFER_SIZE)))
655 out_of_memory("io_start_buffering_out");
656 iobuf_out_cnt = 0;
657}
658
1f75bb10
WD
659static char *iobuf_in;
660static size_t iobuf_in_siz;
661
662void io_start_buffering_in(void)
663{
664 if (iobuf_in)
665 return;
666 iobuf_in_siz = 2 * IO_BUFFER_SIZE;
667 if (!(iobuf_in = new_array(char, iobuf_in_siz)))
668 out_of_memory("io_start_buffering_in");
669}
670
1f75bb10
WD
671void io_end_buffering(void)
672{
673 io_flush(NORMAL_FLUSH);
674 if (!io_multiplexing_out) {
675 free(iobuf_out);
676 iobuf_out = NULL;
677 }
678}
679
626bec8e
WD
680void maybe_flush_socket(void)
681{
3b0a30eb 682 if (iobuf_out && iobuf_out_cnt && time(NULL) - last_io_out >= 5)
626bec8e
WD
683 io_flush(NORMAL_FLUSH);
684}
685
cdf236aa 686void maybe_send_keepalive(void)
9e2409ab 687{
3b0a30eb 688 if (time(NULL) - last_io_out >= allowed_lull) {
9e2409ab 689 if (!iobuf_out || !iobuf_out_cnt) {
3221f451
WD
690 if (protocol_version < 29)
691 return; /* there's nothing we can do */
cdf236aa 692 write_int(sock_f_out, the_file_list->count);
9e2409ab
WD
693 write_shortint(sock_f_out, ITEM_IS_NEW);
694 }
695 if (iobuf_out)
696 io_flush(NORMAL_FLUSH);
697 }
698}
699
880da007
MP
700/**
701 * Continue trying to read len bytes - don't return until len has been
702 * read.
703 **/
3151cbae 704static void read_loop(int fd, char *buf, size_t len)
4c36ddbe
AT
705{
706 while (len) {
707 int n = read_timeout(fd, buf, len);
708
709 buf += n;
710 len -= n;
8d9dc9f9
AT
711 }
712}
713
7a55d06e
MP
714/**
715 * Read from the file descriptor handling multiplexing - return number
716 * of bytes read.
d62bcc17
WD
717 *
718 * Never returns <= 0.
7a55d06e 719 */
c399d22a 720static int readfd_unbuffered(int fd, char *buf, size_t len)
8d9dc9f9 721{
6fe25398 722 static size_t remaining;
1f75bb10 723 static size_t iobuf_in_ndx;
f2a4853c 724 size_t msg_bytes;
d8aeda1e 725 int tag, cnt = 0;
33544bf4 726 char line[BIGPATHBUFLEN];
8d9dc9f9 727
1f75bb10 728 if (!iobuf_in || fd != sock_f_in)
4c36ddbe 729 return read_timeout(fd, buf, len);
8d9dc9f9 730
76c21947 731 if (!io_multiplexing_in && remaining == 0) {
1f75bb10
WD
732 remaining = read_timeout(fd, iobuf_in, iobuf_in_siz);
733 iobuf_in_ndx = 0;
76c21947
WD
734 }
735
d8aeda1e 736 while (cnt == 0) {
8d9dc9f9
AT
737 if (remaining) {
738 len = MIN(len, remaining);
1f75bb10
WD
739 memcpy(buf, iobuf_in + iobuf_in_ndx, len);
740 iobuf_in_ndx += len;
8d9dc9f9 741 remaining -= len;
d8aeda1e 742 cnt = len;
76c21947 743 break;
8d9dc9f9
AT
744 }
745
909ce14f 746 read_loop(fd, line, 4);
ff41a59f 747 tag = IVAL(line, 0);
679e7657 748
f2a4853c 749 msg_bytes = tag & 0xFFFFFF;
d17e1dd2 750 tag = (tag >> 24) - MPLEX_BASE;
8d9dc9f9 751
d17e1dd2
WD
752 switch (tag) {
753 case MSG_DATA:
f2a4853c 754 if (msg_bytes > iobuf_in_siz) {
1f75bb10 755 if (!(iobuf_in = realloc_array(iobuf_in, char,
f2a4853c 756 msg_bytes)))
c399d22a 757 out_of_memory("readfd_unbuffered");
f2a4853c 758 iobuf_in_siz = msg_bytes;
76c21947 759 }
f2a4853c
WD
760 read_loop(fd, iobuf_in, msg_bytes);
761 remaining = msg_bytes;
1f75bb10 762 iobuf_in_ndx = 0;
d17e1dd2 763 break;
0d67e00a 764 case MSG_DELETED:
f2a4853c
WD
765 if (msg_bytes >= sizeof line)
766 goto overflow;
767 read_loop(fd, line, msg_bytes);
768 line[msg_bytes] = '\0';
0d67e00a 769 /* A directory name was sent with the trailing null */
f2a4853c 770 if (msg_bytes > 0 && !line[msg_bytes-1])
0d67e00a
WD
771 log_delete(line, S_IFDIR);
772 else
773 log_delete(line, S_IFREG);
0d67e00a 774 break;
9981c27e 775 case MSG_SUCCESS:
f2a4853c 776 if (msg_bytes != 4) {
cdf236aa 777 rprintf(FERROR, "invalid multi-message %d:%ld [%s]\n",
f2a4853c 778 tag, (long)msg_bytes, who_am_i());
9981c27e
WD
779 exit_cleanup(RERR_STREAMIO);
780 }
f2a4853c 781 read_loop(fd, line, msg_bytes);
9981c27e 782 successful_send(IVAL(line, 0));
9981c27e 783 break;
d17e1dd2
WD
784 case MSG_INFO:
785 case MSG_ERROR:
f2a4853c
WD
786 if (msg_bytes >= sizeof line) {
787 overflow:
bd9fca47 788 rprintf(FERROR,
cdf236aa 789 "multiplexing overflow %d:%ld [%s]\n",
f2a4853c 790 tag, (long)msg_bytes, who_am_i());
d17e1dd2
WD
791 exit_cleanup(RERR_STREAMIO);
792 }
f2a4853c
WD
793 read_loop(fd, line, msg_bytes);
794 rwrite((enum logcode)tag, line, msg_bytes);
d17e1dd2
WD
795 break;
796 default:
cdf236aa
WD
797 rprintf(FERROR, "unexpected tag %d [%s]\n",
798 tag, who_am_i());
65417579 799 exit_cleanup(RERR_STREAMIO);
8d9dc9f9 800 }
8d9dc9f9
AT
801 }
802
76c21947 803 if (remaining == 0)
d17e1dd2 804 io_flush(NORMAL_FLUSH);
76c21947 805
d8aeda1e 806 return cnt;
8d9dc9f9
AT
807}
808
880da007
MP
809/**
810 * Do a buffered read from @p fd. Don't return until all @p n bytes
811 * have been read. If all @p n can't be read then exit with an
812 * error.
813 **/
3151cbae 814static void readfd(int fd, char *buffer, size_t N)
720b47f2 815{
d8aeda1e 816 int cnt;
d62bcc17 817 size_t total = 0;
3151cbae 818
6ba9279f 819 while (total < N) {
d8aeda1e
WD
820 cnt = readfd_unbuffered(fd, buffer + total, N-total);
821 total += cnt;
7f28dbee 822 }
1b7c47cb 823
b9f592fb
WD
824 if (fd == write_batch_monitor_in) {
825 if ((size_t)write(batch_fd, buffer, total) != total)
826 exit_cleanup(RERR_FILEIO);
827 }
1f75bb10
WD
828
829 if (fd == sock_f_in)
830 stats.total_read += total;
720b47f2
AT
831}
832
0d67e00a 833int read_shortint(int f)
9361f839
WD
834{
835 uchar b[2];
836 readfd(f, (char *)b, 2);
837 return (b[1] << 8) + b[0];
838}
839
b7922338 840int32 read_int(int f)
720b47f2 841{
4c36ddbe 842 char b[4];
d8aeda1e 843 int32 num;
d730b113 844
4c36ddbe 845 readfd(f,b,4);
d8aeda1e
WD
846 num = IVAL(b,0);
847 if (num == (int32)0xffffffff)
f89b9368 848 return -1;
d8aeda1e 849 return num;
720b47f2
AT
850}
851
71c46176 852int64 read_longint(int f)
3a6a366f 853{
d8aeda1e 854 int64 num;
3a6a366f 855 char b[8];
d8aeda1e 856 num = read_int(f);
71c46176 857
d8aeda1e
WD
858 if ((int32)num != (int32)0xffffffff)
859 return num;
71c46176 860
031fa9ad
WD
861#if SIZEOF_INT64 < 8
862 rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
863 exit_cleanup(RERR_UNSUPPORTED);
864#else
91c4da3f 865 readfd(f,b,8);
d8aeda1e 866 num = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
031fa9ad 867#endif
71c46176 868
d8aeda1e 869 return num;
3a6a366f
AT
870}
871
9dd891bb 872void read_buf(int f,char *buf,size_t len)
720b47f2 873{
4c36ddbe 874 readfd(f,buf,len);
720b47f2
AT
875}
876
9dd891bb 877void read_sbuf(int f,char *buf,size_t len)
575f2fca 878{
93095cbe 879 readfd(f, buf, len);
af436313 880 buf[len] = '\0';
575f2fca
AT
881}
882
9361f839 883uchar read_byte(int f)
182dca5c 884{
9361f839 885 uchar c;
93095cbe 886 readfd(f, (char *)&c, 1);
4c36ddbe 887 return c;
182dca5c 888}
720b47f2 889
46e99b09
WD
890int read_vstring(int f, char *buf, int bufsize)
891{
892 int len = read_byte(f);
893
894 if (len & 0x80)
895 len = (len & ~0x80) * 0x100 + read_byte(f);
896
897 if (len >= bufsize) {
898 rprintf(FERROR, "over-long vstring received (%d > %d)\n",
899 len, bufsize - 1);
9a6ed83f 900 return -1;
46e99b09
WD
901 }
902
903 if (len)
904 readfd(f, buf, len);
905 buf[len] = '\0';
906 return len;
907}
908
188fed95
WD
909/* Populate a sum_struct with values from the socket. This is
910 * called by both the sender and the receiver. */
911void read_sum_head(int f, struct sum_struct *sum)
912{
913 sum->count = read_int(f);
914 sum->blength = read_int(f);
915 if (sum->blength < 0 || sum->blength > MAX_BLOCK_SIZE) {
cdf236aa
WD
916 rprintf(FERROR, "Invalid block length %ld [%s]\n",
917 (long)sum->blength, who_am_i());
188fed95
WD
918 exit_cleanup(RERR_PROTOCOL);
919 }
920 sum->s2length = protocol_version < 27 ? csum_length : (int)read_int(f);
921 if (sum->s2length < 0 || sum->s2length > MD4_SUM_LENGTH) {
cdf236aa
WD
922 rprintf(FERROR, "Invalid checksum length %d [%s]\n",
923 sum->s2length, who_am_i());
188fed95
WD
924 exit_cleanup(RERR_PROTOCOL);
925 }
926 sum->remainder = read_int(f);
927 if (sum->remainder < 0 || sum->remainder > sum->blength) {
cdf236aa
WD
928 rprintf(FERROR, "Invalid remainder length %ld [%s]\n",
929 (long)sum->remainder, who_am_i());
188fed95
WD
930 exit_cleanup(RERR_PROTOCOL);
931 }
932}
933
c207d7ec
WD
934/* Send the values from a sum_struct over the socket. Set sum to
935 * NULL if there are no checksums to send. This is called by both
936 * the generator and the sender. */
937void write_sum_head(int f, struct sum_struct *sum)
938{
939 static struct sum_struct null_sum;
940
941 if (sum == NULL)
942 sum = &null_sum;
943
944 write_int(f, sum->count);
945 write_int(f, sum->blength);
946 if (protocol_version >= 27)
947 write_int(f, sum->s2length);
948 write_int(f, sum->remainder);
949}
950
08571358
MP
951/**
952 * Sleep after writing to limit I/O bandwidth usage.
953 *
954 * @todo Rather than sleeping after each write, it might be better to
955 * use some kind of averaging. The current algorithm seems to always
956 * use a bit less bandwidth than specified, because it doesn't make up
957 * for slow periods. But arguably this is a feature. In addition, we
958 * ought to take the time used to write the data into account.
71e58630
WD
959 *
960 * During some phases of big transfers (file FOO is uptodate) this is
961 * called with a small bytes_written every time. As the kernel has to
962 * round small waits up to guarantee that we actually wait at least the
963 * requested number of microseconds, this can become grossly inaccurate.
964 * We therefore keep track of the bytes we've written over time and only
965 * sleep when the accumulated delay is at least 1 tenth of a second.
08571358
MP
966 **/
967static void sleep_for_bwlimit(int bytes_written)
968{
71e58630
WD
969 static struct timeval prior_tv;
970 static long total_written = 0;
971 struct timeval tv, start_tv;
972 long elapsed_usec, sleep_usec;
973
974#define ONE_SEC 1000000L /* # of microseconds in a second */
08571358
MP
975
976 if (!bwlimit)
977 return;
e681e820 978
71e58630
WD
979 total_written += bytes_written;
980
981 gettimeofday(&start_tv, NULL);
982 if (prior_tv.tv_sec) {
983 elapsed_usec = (start_tv.tv_sec - prior_tv.tv_sec) * ONE_SEC
984 + (start_tv.tv_usec - prior_tv.tv_usec);
985 total_written -= elapsed_usec * bwlimit / (ONE_SEC/1024);
986 if (total_written < 0)
987 total_written = 0;
988 }
3151cbae 989
71e58630
WD
990 sleep_usec = total_written * (ONE_SEC/1024) / bwlimit;
991 if (sleep_usec < ONE_SEC / 10) {
992 prior_tv = start_tv;
993 return;
994 }
08571358 995
71e58630
WD
996 tv.tv_sec = sleep_usec / ONE_SEC;
997 tv.tv_usec = sleep_usec % ONE_SEC;
98b332ed 998 select(0, NULL, NULL, NULL, &tv);
71e58630
WD
999
1000 gettimeofday(&prior_tv, NULL);
1001 elapsed_usec = (prior_tv.tv_sec - start_tv.tv_sec) * ONE_SEC
1002 + (prior_tv.tv_usec - start_tv.tv_usec);
1003 total_written = (sleep_usec - elapsed_usec) * bwlimit / (ONE_SEC/1024);
08571358
MP
1004}
1005
1f75bb10 1006/* Write len bytes to the file descriptor fd, looping as necessary to get
0fb2fc4a 1007 * the job done and also (in certain circumstances) reading any data on
af436313 1008 * msg_fd_in to avoid deadlock.
880da007
MP
1009 *
1010 * This function underlies the multiplexing system. The body of the
1f75bb10 1011 * application never calls this function directly. */
9dd891bb 1012static void writefd_unbuffered(int fd,char *buf,size_t len)
720b47f2 1013{
1ea087a7 1014 size_t n, total = 0;
8d9dc9f9 1015 fd_set w_fds, r_fds;
d8aeda1e 1016 int maxfd, count, cnt, using_r_fds;
8d9dc9f9 1017 struct timeval tv;
720b47f2 1018
e44f9a12
AT
1019 no_flush++;
1020
4c36ddbe 1021 while (total < len) {
8d9dc9f9 1022 FD_ZERO(&w_fds);
8d9dc9f9 1023 FD_SET(fd,&w_fds);
1ea087a7 1024 maxfd = fd;
4c36ddbe 1025
af436313 1026 if (msg_fd_in >= 0 && len-total >= contiguous_write_len) {
56014c8c 1027 FD_ZERO(&r_fds);
d17e1dd2 1028 FD_SET(msg_fd_in,&r_fds);
1ea087a7
WD
1029 if (msg_fd_in > maxfd)
1030 maxfd = msg_fd_in;
3eeac9bc
WD
1031 using_r_fds = 1;
1032 } else
1033 using_r_fds = 0;
8d9dc9f9 1034
e626b29e 1035 tv.tv_sec = select_timeout;
8d9dc9f9 1036 tv.tv_usec = 0;
4c36ddbe 1037
554e0a8d 1038 errno = 0;
3eeac9bc 1039 count = select(maxfd + 1, using_r_fds ? &r_fds : NULL,
d17e1dd2 1040 &w_fds, NULL, &tv);
4c36ddbe
AT
1041
1042 if (count <= 0) {
1ea087a7 1043 if (count < 0 && errno == EBADF)
554e0a8d 1044 exit_cleanup(RERR_SOCKETIO);
bd717af8 1045 check_timeout();
8d9dc9f9
AT
1046 continue;
1047 }
4c36ddbe 1048
3eeac9bc 1049 if (using_r_fds && FD_ISSET(msg_fd_in, &r_fds))
d17e1dd2 1050 read_msg_fd();
554e0a8d 1051
9a6ed83f 1052 if (!FD_ISSET(fd, &w_fds))
1ea087a7 1053 continue;
4c36ddbe 1054
1ea087a7
WD
1055 n = len - total;
1056 if (bwlimit && n > bwlimit_writemax)
1057 n = bwlimit_writemax;
d8aeda1e 1058 cnt = write(fd, buf + total, n);
1ea087a7 1059
d8aeda1e
WD
1060 if (cnt <= 0) {
1061 if (cnt < 0) {
3309507d
WD
1062 if (errno == EINTR)
1063 continue;
1064 if (errno == EWOULDBLOCK || errno == EAGAIN) {
1065 msleep(1);
1066 continue;
1067 }
f0359dd0
AT
1068 }
1069
1ea087a7 1070 /* Don't try to write errors back across the stream. */
1f75bb10 1071 if (fd == sock_f_out)
7f459268 1072 close_multiplexing_out();
1ea087a7 1073 rsyserr(FERROR, errno,
dca68b0a
WD
1074 "writefd_unbuffered failed to write %ld bytes: phase \"%s\" [%s]",
1075 (long)len, io_write_phase, who_am_i());
d1b31da7
WD
1076 /* If the other side is sending us error messages, try
1077 * to grab any messages they sent before they died. */
7f459268 1078 while (fd == sock_f_out && io_multiplexing_in) {
3b0a30eb 1079 set_io_timeout(30);
9e2409ab 1080 ignore_timeout = 0;
d1b31da7
WD
1081 readfd_unbuffered(sock_f_in, io_filesfrom_buf,
1082 sizeof io_filesfrom_buf);
1083 }
1ea087a7
WD
1084 exit_cleanup(RERR_STREAMIO);
1085 }
4c36ddbe 1086
d8aeda1e 1087 total += cnt;
a800434a 1088
1f75bb10 1089 if (fd == sock_f_out) {
626bec8e 1090 if (io_timeout || am_generator)
3b0a30eb 1091 last_io_out = time(NULL);
d8aeda1e 1092 sleep_for_bwlimit(cnt);
1f75bb10 1093 }
4c36ddbe 1094 }
e44f9a12
AT
1095
1096 no_flush--;
720b47f2
AT
1097}
1098
880da007
MP
1099/**
1100 * Write an message to a multiplexed stream. If this fails then rsync
1101 * exits.
1102 **/
1f75bb10 1103static void mplex_write(enum msgcode code, char *buf, size_t len)
ff41a59f 1104{
33544bf4 1105 char buffer[BIGPATHBUFLEN];
06ce139f 1106 size_t n = len;
8d9dc9f9 1107
ff41a59f
AT
1108 SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
1109
af436313
WD
1110 /* When the generator reads messages from the msg_fd_in pipe, it can
1111 * cause output to occur down the socket. Setting contiguous_write_len
1112 * prevents the reading of msg_fd_in once we actually start to write
1113 * this sequence of data (though we might read it before the start). */
1114 if (am_generator && msg_fd_in >= 0)
1115 contiguous_write_len = len + 4;
1116
c399d22a 1117 if (n > sizeof buffer - 4)
3151cbae 1118 n = sizeof buffer - 4;
ff41a59f
AT
1119
1120 memcpy(&buffer[4], buf, n);
1f75bb10 1121 writefd_unbuffered(sock_f_out, buffer, n+4);
ff41a59f
AT
1122
1123 len -= n;
1124 buf += n;
1125
1ea087a7 1126 if (len)
1f75bb10 1127 writefd_unbuffered(sock_f_out, buf, len);
af436313
WD
1128
1129 if (am_generator && msg_fd_in >= 0)
1130 contiguous_write_len = 0;
d6dead6b
AT
1131}
1132
d17e1dd2 1133void io_flush(int flush_it_all)
d6dead6b 1134{
37f35d89 1135 msg_list_flush(flush_it_all);
90ba34e2 1136
1f75bb10 1137 if (!iobuf_out_cnt || no_flush)
d17e1dd2 1138 return;
8d9dc9f9 1139
d17e1dd2 1140 if (io_multiplexing_out)
1f75bb10 1141 mplex_write(MSG_DATA, iobuf_out, iobuf_out_cnt);
d17e1dd2 1142 else
1f75bb10
WD
1143 writefd_unbuffered(sock_f_out, iobuf_out, iobuf_out_cnt);
1144 iobuf_out_cnt = 0;
8d9dc9f9
AT
1145}
1146
9dd891bb 1147static void writefd(int fd,char *buf,size_t len)
d6dead6b 1148{
4033b80b
WD
1149 if (fd == msg_fd_out) {
1150 rprintf(FERROR, "Internal error: wrong write used in receiver.\n");
1151 exit_cleanup(RERR_PROTOCOL);
1152 }
90ba34e2 1153
1f75bb10
WD
1154 if (fd == sock_f_out)
1155 stats.total_written += len;
1156
b9f592fb
WD
1157 if (fd == write_batch_monitor_out) {
1158 if ((size_t)write(batch_fd, buf, len) != len)
1159 exit_cleanup(RERR_FILEIO);
1160 }
1161
1f75bb10 1162 if (!iobuf_out || fd != sock_f_out) {
4c36ddbe
AT
1163 writefd_unbuffered(fd, buf, len);
1164 return;
1165 }
d6dead6b
AT
1166
1167 while (len) {
1f75bb10 1168 int n = MIN((int)len, IO_BUFFER_SIZE - iobuf_out_cnt);
d6dead6b 1169 if (n > 0) {
1f75bb10 1170 memcpy(iobuf_out+iobuf_out_cnt, buf, n);
d6dead6b
AT
1171 buf += n;
1172 len -= n;
1f75bb10 1173 iobuf_out_cnt += n;
d6dead6b 1174 }
3151cbae 1175
1f75bb10 1176 if (iobuf_out_cnt == IO_BUFFER_SIZE)
d17e1dd2 1177 io_flush(NORMAL_FLUSH);
d6dead6b 1178 }
d6dead6b 1179}
720b47f2 1180
0d67e00a 1181void write_shortint(int f, int x)
9361f839
WD
1182{
1183 uchar b[2];
1184 b[0] = x;
1185 b[1] = x >> 8;
1186 writefd(f, (char *)b, 2);
1187}
1188
b7922338 1189void write_int(int f,int32 x)
720b47f2 1190{
8d9dc9f9
AT
1191 char b[4];
1192 SIVAL(b,0,x);
4c36ddbe 1193 writefd(f,b,4);
720b47f2
AT
1194}
1195
805edf9d
MP
1196void write_int_named(int f, int32 x, const char *phase)
1197{
1198 io_write_phase = phase;
1199 write_int(f, x);
1200 io_write_phase = phase_unknown;
1201}
1202
7a24c346
MP
1203/*
1204 * Note: int64 may actually be a 32-bit type if ./configure couldn't find any
1205 * 64-bit types on this platform.
1206 */
71c46176 1207void write_longint(int f, int64 x)
3a6a366f 1208{
3a6a366f 1209 char b[8];
3a6a366f 1210
91c4da3f 1211 if (x <= 0x7FFFFFFF) {
3a6a366f
AT
1212 write_int(f, (int)x);
1213 return;
1214 }
1215
031fa9ad
WD
1216#if SIZEOF_INT64 < 8
1217 rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
1218 exit_cleanup(RERR_UNSUPPORTED);
1219#else
8de330a3 1220 write_int(f, (int32)0xFFFFFFFF);
3a6a366f
AT
1221 SIVAL(b,0,(x&0xFFFFFFFF));
1222 SIVAL(b,4,((x>>32)&0xFFFFFFFF));
1223
4c36ddbe 1224 writefd(f,b,8);
031fa9ad 1225#endif
3a6a366f
AT
1226}
1227
9dd891bb 1228void write_buf(int f,char *buf,size_t len)
720b47f2 1229{
4c36ddbe 1230 writefd(f,buf,len);
720b47f2
AT
1231}
1232
880da007 1233/** Write a string to the connection */
cf338ab1 1234void write_sbuf(int f, char *buf)
f0fca04e 1235{
93095cbe 1236 writefd(f, buf, strlen(buf));
f0fca04e
AT
1237}
1238
9361f839 1239void write_byte(int f, uchar c)
182dca5c 1240{
93095cbe 1241 writefd(f, (char *)&c, 1);
182dca5c
AT
1242}
1243
46e99b09
WD
1244void write_vstring(int f, char *str, int len)
1245{
1246 uchar lenbuf[3], *lb = lenbuf;
1247
1248 if (len > 0x7F) {
1249 if (len > 0x7FFF) {
1250 rprintf(FERROR,
1251 "attempting to send over-long vstring (%d > %d)\n",
1252 len, 0x7FFF);
1253 exit_cleanup(RERR_PROTOCOL);
1254 }
1255 *lb++ = len / 0x100 + 0x80;
1256 }
1257 *lb = len;
1258
1259 writefd(f, (char*)lenbuf, lb - lenbuf + 1);
1260 if (len)
1261 writefd(f, str, len);
1262}
1263
914cc65c 1264/**
6ed6d7f5
WD
1265 * Read a line of up to @p maxlen characters into @p buf (not counting
1266 * the trailing null). Strips the (required) trailing newline and all
1267 * carriage returns.
914cc65c 1268 *
6ed6d7f5 1269 * @return 1 for success; 0 for I/O error or truncation.
914cc65c 1270 **/
9dd891bb 1271int read_line(int f, char *buf, size_t maxlen)
f0fca04e
AT
1272{
1273 while (maxlen) {
528bfcd7 1274 buf[0] = 0;
f0fca04e 1275 read_buf(f, buf, 1);
914cc65c
MP
1276 if (buf[0] == 0)
1277 return 0;
6ed6d7f5 1278 if (buf[0] == '\n')
f0fca04e 1279 break;
f0fca04e
AT
1280 if (buf[0] != '\r') {
1281 buf++;
1282 maxlen--;
1283 }
1284 }
6ed6d7f5
WD
1285 *buf = '\0';
1286 return maxlen > 0;
f0fca04e
AT
1287}
1288
f0fca04e
AT
1289void io_printf(int fd, const char *format, ...)
1290{
d62bcc17 1291 va_list ap;
33544bf4 1292 char buf[BIGPATHBUFLEN];
f0fca04e 1293 int len;
3151cbae 1294
f0fca04e 1295 va_start(ap, format);
3151cbae 1296 len = vsnprintf(buf, sizeof buf, format, ap);
f0fca04e
AT
1297 va_end(ap);
1298
f89b9368
WD
1299 if (len < 0)
1300 exit_cleanup(RERR_STREAMIO);
f0fca04e 1301
33544bf4
WD
1302 if (len > (int)sizeof buf) {
1303 rprintf(FERROR, "io_printf() was too long for the buffer.\n");
1304 exit_cleanup(RERR_STREAMIO);
1305 }
1306
f0fca04e
AT
1307 write_sbuf(fd, buf);
1308}
8d9dc9f9 1309
d17e1dd2 1310/** Setup for multiplexing a MSG_* stream with the data stream. */
1f75bb10 1311void io_start_multiplex_out(void)
8d9dc9f9 1312{
d17e1dd2 1313 io_flush(NORMAL_FLUSH);
1f75bb10 1314 io_start_buffering_out();
8d9dc9f9
AT
1315 io_multiplexing_out = 1;
1316}
1317
d17e1dd2 1318/** Setup for multiplexing a MSG_* stream with the data stream. */
1f75bb10 1319void io_start_multiplex_in(void)
8d9dc9f9 1320{
d17e1dd2 1321 io_flush(NORMAL_FLUSH);
1f75bb10 1322 io_start_buffering_in();
8d9dc9f9
AT
1323 io_multiplexing_in = 1;
1324}
1325
d17e1dd2
WD
1326/** Write an message to the multiplexed data stream. */
1327int io_multiplex_write(enum msgcode code, char *buf, size_t len)
8d9dc9f9 1328{
f89b9368
WD
1329 if (!io_multiplexing_out)
1330 return 0;
8d9dc9f9 1331
d17e1dd2 1332 io_flush(NORMAL_FLUSH);
1b7c47cb 1333 stats.total_written += (len+4);
1f75bb10 1334 mplex_write(code, buf, len);
8d9dc9f9
AT
1335 return 1;
1336}
1337
7f459268
WD
1338void close_multiplexing_in(void)
1339{
1340 io_multiplexing_in = 0;
1341}
1342
d17e1dd2 1343/** Stop output multiplexing. */
7f459268 1344void close_multiplexing_out(void)
554e0a8d
AT
1345{
1346 io_multiplexing_out = 0;
1347}
1348
b9f592fb
WD
1349void start_write_batch(int fd)
1350{
741d6544
WD
1351 write_stream_flags(batch_fd);
1352
b9f592fb
WD
1353 /* Some communication has already taken place, but we don't
1354 * enable batch writing until here so that we can write a
1355 * canonical record of the communication even though the
1356 * actual communication so far depends on whether a daemon
1357 * is involved. */
1358 write_int(batch_fd, protocol_version);
1359 write_int(batch_fd, checksum_seed);
b9f592fb
WD
1360
1361 if (am_sender)
1362 write_batch_monitor_out = fd;
1363 else
1364 write_batch_monitor_in = fd;
1365}
1366
1367void stop_write_batch(void)
1368{
1369 write_batch_monitor_out = -1;
1370 write_batch_monitor_in = -1;
1371}