Tweaked some whitespace to match the latest version from autoconf.
[rsync/rsync.git] / io.c
CommitLineData
7a24c346 1/* -*- c-file-style: "linux" -*-
d62bcc17
WD
2 *
3 * Copyright (C) 1996-2001 by Andrew Tridgell
880da007
MP
4 * Copyright (C) Paul Mackerras 1996
5 * Copyright (C) 2001, 2002 by Martin Pool <mbp@samba.org>
d62bcc17 6 *
880da007
MP
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
d62bcc17 11 *
880da007
MP
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
d62bcc17 16 *
880da007
MP
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 */
720b47f2 21
87ee2481 22/**
87ee2481
MP
23 * @file io.c
24 *
6ed6d7f5 25 * Socket and pipe I/O utilities used in rsync.
87ee2481
MP
26 *
27 * rsync provides its own multiplexing system, which is used to send
28 * stderr and stdout over a single socket. We need this because
29 * stdout normally carries the binary data stream, and stderr all our
30 * error messages.
31 *
32 * For historical reasons this is off during the start of the
33 * connection, but it's switched on quite early using
34 * io_start_multiplex_out() and io_start_multiplex_in().
35 **/
720b47f2 36
720b47f2
AT
37#include "rsync.h"
38
880da007 39/** If no timeout is specified then use a 60 second select timeout */
8cd9fd4e
AT
40#define SELECT_TIMEOUT 60
41
7a55d06e 42extern int bwlimit;
71e58630 43extern size_t bwlimit_writemax;
6ba9279f 44extern int io_timeout;
cdf236aa 45extern int allowed_lull;
d17e1dd2
WD
46extern int am_server;
47extern int am_daemon;
48extern int am_sender;
98f8c9a5 49extern int am_generator;
e626b29e 50extern int eol_nulls;
3b0a30eb 51extern int read_batch;
188fed95 52extern int csum_length;
b9f592fb
WD
53extern int checksum_seed;
54extern int protocol_version;
cdf236aa
WD
55extern int remove_sent_files;
56extern int preserve_hard_links;
d19320fd 57extern char *filesfrom_host;
a800434a 58extern struct stats stats;
cdf236aa 59extern struct file_list *the_file_list;
720b47f2 60
a86179f4 61const char phase_unknown[] = "unknown";
9e2409ab 62int ignore_timeout = 0;
b9f592fb 63int batch_fd = -1;
b0ad5429 64int batch_gen_fd = -1;
805edf9d 65
cb2e1f18 66/* Ignore an EOF error if non-zero. See whine_about_eof(). */
574c2500 67int kluge_around_eof = 0;
7a55d06e 68
d17e1dd2
WD
69int msg_fd_in = -1;
70int msg_fd_out = -1;
cdf236aa
WD
71int sock_f_in = -1;
72int sock_f_out = -1;
7a55d06e 73
1f75bb10
WD
74static int io_multiplexing_out;
75static int io_multiplexing_in;
3b0a30eb
WD
76static time_t last_io_in;
77static time_t last_io_out;
1f75bb10
WD
78static int no_flush;
79
b9f592fb
WD
80static int write_batch_monitor_in = -1;
81static int write_batch_monitor_out = -1;
82
56014c8c
WD
83static int io_filesfrom_f_in = -1;
84static int io_filesfrom_f_out = -1;
85static char io_filesfrom_buf[2048];
86static char *io_filesfrom_bp;
87static char io_filesfrom_lastchar;
88static int io_filesfrom_buflen;
954bbed8 89static int defer_forwarding_messages = 0;
3b0a30eb 90static int select_timeout = SELECT_TIMEOUT;
d6081c82
WD
91static int active_filecnt = 0;
92static OFF_T active_bytecnt = 0;
720b47f2 93
9dd891bb 94static void read_loop(int fd, char *buf, size_t len);
ff41a59f 95
c6816b94
WD
96struct flist_ndx_item {
97 struct flist_ndx_item *next;
98 int ndx;
d17e1dd2
WD
99};
100
c6816b94
WD
101struct flist_ndx_list {
102 struct flist_ndx_item *head, *tail;
103};
104
cdf236aa 105static struct flist_ndx_list redo_list, hlink_list;
d17e1dd2 106
08c88178
WD
107struct msg_list_item {
108 struct msg_list_item *next;
d17e1dd2 109 int len;
954bbed8 110 char buf[1];
d17e1dd2
WD
111};
112
08c88178
WD
113struct msg_list {
114 struct msg_list_item *head, *tail;
115};
116
954bbed8 117static struct msg_list msg2genr, msg2sndr;
d17e1dd2 118
c6816b94 119static void flist_ndx_push(struct flist_ndx_list *lp, int ndx)
d17e1dd2 120{
c6816b94
WD
121 struct flist_ndx_item *item;
122
123 if (!(item = new(struct flist_ndx_item)))
124 out_of_memory("flist_ndx_push");
125 item->next = NULL;
126 item->ndx = ndx;
127 if (lp->tail)
128 lp->tail->next = item;
d17e1dd2 129 else
c6816b94
WD
130 lp->head = item;
131 lp->tail = item;
132}
133
134static int flist_ndx_pop(struct flist_ndx_list *lp)
135{
136 struct flist_ndx_item *next;
137 int ndx;
138
139 if (!lp->head)
140 return -1;
141
142 ndx = lp->head->ndx;
143 next = lp->head->next;
144 free(lp->head);
145 lp->head = next;
146 if (!next)
147 lp->tail = NULL;
148
149 return ndx;
d17e1dd2
WD
150}
151
8d9dc9f9
AT
152static void check_timeout(void)
153{
154 time_t t;
90ba34e2 155
9e2409ab 156 if (!io_timeout || ignore_timeout)
d17e1dd2 157 return;
8d9dc9f9 158
3b0a30eb
WD
159 if (!last_io_in) {
160 last_io_in = time(NULL);
8d9dc9f9
AT
161 return;
162 }
163
164 t = time(NULL);
165
3b0a30eb 166 if (t - last_io_in >= io_timeout) {
0adb99b9 167 if (!am_server && !am_daemon) {
4ccfd96c 168 rprintf(FERROR, "io timeout after %d seconds -- exiting\n",
3b0a30eb 169 (int)(t-last_io_in));
0adb99b9 170 }
65417579 171 exit_cleanup(RERR_TIMEOUT);
8d9dc9f9
AT
172 }
173}
174
1f75bb10
WD
175/* Note the fds used for the main socket (which might really be a pipe
176 * for a local transfer, but we can ignore that). */
177void io_set_sock_fds(int f_in, int f_out)
178{
179 sock_f_in = f_in;
180 sock_f_out = f_out;
181}
182
3b0a30eb
WD
183void set_io_timeout(int secs)
184{
185 io_timeout = secs;
186
187 if (!io_timeout || io_timeout > SELECT_TIMEOUT)
188 select_timeout = SELECT_TIMEOUT;
189 else
190 select_timeout = io_timeout;
191
192 allowed_lull = read_batch ? 0 : (io_timeout + 1) / 2;
193}
194
98f8c9a5
WD
195/* Setup the fd used to receive MSG_* messages. Only needed during the
196 * early stages of being a local sender (up through the sending of the
197 * file list) or when we're the generator (to fetch the messages from
198 * the receiver). */
d17e1dd2
WD
199void set_msg_fd_in(int fd)
200{
201 msg_fd_in = fd;
202}
203
98f8c9a5
WD
204/* Setup the fd used to send our MSG_* messages. Only needed when
205 * we're the receiver (to send our messages to the generator). */
d17e1dd2
WD
206void set_msg_fd_out(int fd)
207{
208 msg_fd_out = fd;
209 set_nonblocking(msg_fd_out);
210}
211
212/* Add a message to the pending MSG_* list. */
954bbed8 213static void msg_list_add(struct msg_list *lst, int code, char *buf, int len)
554e0a8d 214{
954bbed8
WD
215 struct msg_list_item *m;
216 int sz = len + 4 + sizeof m[0] - 1;
d17e1dd2 217
954bbed8 218 if (!(m = (struct msg_list_item *)new_array(char, sz)))
c6816b94 219 out_of_memory("msg_list_add");
954bbed8
WD
220 m->next = NULL;
221 m->len = len + 4;
222 SIVAL(m->buf, 0, ((code+MPLEX_BASE)<<24) | len);
223 memcpy(m->buf + 4, buf, len);
224 if (lst->tail)
225 lst->tail->next = m;
d17e1dd2 226 else
954bbed8
WD
227 lst->head = m;
228 lst->tail = m;
554e0a8d
AT
229}
230
98f8c9a5
WD
231/* Read a message from the MSG_* fd and handle it. This is called either
232 * during the early stages of being a local sender (up through the sending
233 * of the file list) or when we're the generator (to fetch the messages
234 * from the receiver). */
d17e1dd2 235static void read_msg_fd(void)
554e0a8d 236{
f9c6b3e7 237 char buf[2048];
06ce139f 238 size_t n;
d17e1dd2 239 int fd = msg_fd_in;
ff41a59f
AT
240 int tag, len;
241
00bdf899 242 /* Temporarily disable msg_fd_in. This is needed to avoid looping back
af436313 243 * to this routine from writefd_unbuffered(). */
d17e1dd2 244 msg_fd_in = -1;
554e0a8d 245
ff41a59f
AT
246 read_loop(fd, buf, 4);
247 tag = IVAL(buf, 0);
248
249 len = tag & 0xFFFFFF;
d17e1dd2 250 tag = (tag >> 24) - MPLEX_BASE;
ff41a59f 251
d17e1dd2
WD
252 switch (tag) {
253 case MSG_DONE:
98f8c9a5 254 if (len != 0 || !am_generator) {
13c7bcbb 255 rprintf(FERROR, "invalid message %d:%d\n", tag, len);
d17e1dd2 256 exit_cleanup(RERR_STREAMIO);
13c7bcbb 257 }
c6816b94 258 flist_ndx_push(&redo_list, -1);
d17e1dd2
WD
259 break;
260 case MSG_REDO:
98f8c9a5 261 if (len != 4 || !am_generator) {
13c7bcbb 262 rprintf(FERROR, "invalid message %d:%d\n", tag, len);
d17e1dd2 263 exit_cleanup(RERR_STREAMIO);
13c7bcbb 264 }
d17e1dd2 265 read_loop(fd, buf, 4);
d6081c82
WD
266 if (remove_sent_files)
267 decrement_active_files(IVAL(buf,0));
c6816b94 268 flist_ndx_push(&redo_list, IVAL(buf,0));
d17e1dd2 269 break;
0d67e00a
WD
270 case MSG_DELETED:
271 if (len >= (int)sizeof buf || !am_generator) {
272 rprintf(FERROR, "invalid message %d:%d\n", tag, len);
273 exit_cleanup(RERR_STREAMIO);
274 }
275 read_loop(fd, buf, len);
954bbed8
WD
276 if (defer_forwarding_messages)
277 msg_list_add(&msg2sndr, MSG_DELETED, buf, len);
278 else
279 io_multiplex_write(MSG_DELETED, buf, len);
0d67e00a 280 break;
9981c27e
WD
281 case MSG_SUCCESS:
282 if (len != 4 || !am_generator) {
283 rprintf(FERROR, "invalid message %d:%d\n", tag, len);
284 exit_cleanup(RERR_STREAMIO);
285 }
286 read_loop(fd, buf, len);
d6081c82
WD
287 if (remove_sent_files) {
288 decrement_active_files(IVAL(buf,0));
954bbed8
WD
289 if (defer_forwarding_messages)
290 msg_list_add(&msg2sndr, MSG_SUCCESS, buf, len);
291 else
292 io_multiplex_write(MSG_SUCCESS, buf, len);
d6081c82 293 }
cdf236aa
WD
294 if (preserve_hard_links)
295 flist_ndx_push(&hlink_list, IVAL(buf,0));
9981c27e 296 break;
72f2d1b3
WD
297 case MSG_SOCKERR:
298 if (!am_generator) {
299 rprintf(FERROR, "invalid message %d:%d\n", tag, len);
300 exit_cleanup(RERR_STREAMIO);
301 }
302 close_multiplexing_out();
303 /* FALL THROUGH */
d17e1dd2
WD
304 case MSG_INFO:
305 case MSG_ERROR:
306 case MSG_LOG:
307 while (len) {
308 n = len;
309 if (n >= sizeof buf)
310 n = sizeof buf - 1;
311 read_loop(fd, buf, n);
954bbed8
WD
312 if (am_generator && am_server && defer_forwarding_messages)
313 msg_list_add(&msg2sndr, tag, buf, n);
314 else
315 rwrite((enum logcode)tag, buf, n);
d17e1dd2
WD
316 len -= n;
317 }
318 break;
319 default:
f8b9da1a
WD
320 rprintf(FERROR, "unknown message %d:%d [%s]\n",
321 tag, len, who_am_i());
d17e1dd2
WD
322 exit_cleanup(RERR_STREAMIO);
323 }
324
325 msg_fd_in = fd;
326}
327
d6081c82
WD
328/* This is used by the generator to limit how many file transfers can
329 * be active at once when --remove-sent-files is specified. Without
330 * this, sender-side deletions were mostly happening at the end. */
331void increment_active_files(int ndx, int itemizing, enum logcode code)
332{
333 /* TODO: tune these limits? */
5b986297 334 while (active_filecnt >= (active_bytecnt >= 128*1024 ? 10 : 50)) {
d6081c82
WD
335 if (hlink_list.head)
336 check_for_finished_hlinks(itemizing, code);
337 read_msg_fd();
338 }
339
340 active_filecnt++;
341 active_bytecnt += the_file_list->files[ndx]->length;
342}
343
344void decrement_active_files(int ndx)
345{
346 active_filecnt--;
347 active_bytecnt -= the_file_list->files[ndx]->length;
348}
349
d17e1dd2
WD
350/* Try to push messages off the list onto the wire. If we leave with more
351 * to do, return 0. On error, return -1. If everything flushed, return 1.
f9c6b3e7 352 * This is only active in the receiver. */
954bbed8 353static int msg2genr_flush(int flush_it_all)
d17e1dd2
WD
354{
355 static int written = 0;
356 struct timeval tv;
357 fd_set fds;
358
359 if (msg_fd_out < 0)
360 return -1;
361
954bbed8
WD
362 while (msg2genr.head) {
363 struct msg_list_item *m = msg2genr.head;
364 int n = write(msg_fd_out, m->buf + written, m->len - written);
d17e1dd2
WD
365 if (n < 0) {
366 if (errno == EINTR)
367 continue;
368 if (errno != EWOULDBLOCK && errno != EAGAIN)
369 return -1;
370 if (!flush_it_all)
371 return 0;
372 FD_ZERO(&fds);
373 FD_SET(msg_fd_out, &fds);
e626b29e 374 tv.tv_sec = select_timeout;
d17e1dd2
WD
375 tv.tv_usec = 0;
376 if (!select(msg_fd_out+1, NULL, &fds, NULL, &tv))
377 check_timeout();
954bbed8
WD
378 } else if ((written += n) == m->len) {
379 msg2genr.head = m->next;
380 if (!msg2genr.head)
381 msg2genr.tail = NULL;
382 free(m);
d17e1dd2
WD
383 written = 0;
384 }
554e0a8d 385 }
d17e1dd2
WD
386 return 1;
387}
388
37f35d89
WD
389void send_msg(enum msgcode code, char *buf, int len)
390{
391 if (msg_fd_out < 0) {
392 io_multiplex_write(code, buf, len);
393 return;
394 }
954bbed8
WD
395 msg_list_add(&msg2genr, code, buf, len);
396 msg2genr_flush(NORMAL_FLUSH);
37f35d89
WD
397}
398
cdf236aa 399int get_redo_num(int itemizing, enum logcode code)
d17e1dd2 400{
cdf236aa
WD
401 while (1) {
402 if (hlink_list.head)
403 check_for_finished_hlinks(itemizing, code);
404 if (redo_list.head)
405 break;
d17e1dd2 406 read_msg_fd();
c6816b94 407 }
554e0a8d 408
c6816b94 409 return flist_ndx_pop(&redo_list);
554e0a8d
AT
410}
411
cdf236aa
WD
412int get_hlink_num(void)
413{
414 return flist_ndx_pop(&hlink_list);
415}
416
56014c8c
WD
417/**
418 * When we're the receiver and we have a local --files-from list of names
419 * that needs to be sent over the socket to the sender, we have to do two
420 * things at the same time: send the sender a list of what files we're
421 * processing and read the incoming file+info list from the sender. We do
422 * this by augmenting the read_timeout() function to copy this data. It
423 * uses the io_filesfrom_buf to read a block of data from f_in (when it is
424 * ready, since it might be a pipe) and then blast it out f_out (when it
425 * is ready to receive more data).
426 */
427void io_set_filesfrom_fds(int f_in, int f_out)
428{
429 io_filesfrom_f_in = f_in;
430 io_filesfrom_f_out = f_out;
431 io_filesfrom_bp = io_filesfrom_buf;
432 io_filesfrom_lastchar = '\0';
433 io_filesfrom_buflen = 0;
434}
720b47f2 435
cb2e1f18
WD
436/* It's almost always an error to get an EOF when we're trying to read from the
437 * network, because the protocol is (for the most part) self-terminating.
880da007 438 *
cb2e1f18
WD
439 * There is one case for the receiver when it is at the end of the transfer
440 * (hanging around reading any keep-alive packets that might come its way): if
441 * the sender dies before the generator's kill-signal comes through, we can end
442 * up here needing to loop until the kill-signal arrives. In this situation,
443 * kluge_around_eof will be < 0.
444 *
445 * There is another case for older protocol versions (< 24) where the module
446 * listing was not terminated, so we must ignore an EOF error in that case and
447 * exit. In this situation, kluge_around_eof will be > 0. */
1f75bb10 448static void whine_about_eof(int fd)
7a55d06e 449{
574c2500 450 if (kluge_around_eof && fd == sock_f_in) {
55bb7fff 451 int i;
574c2500
WD
452 if (kluge_around_eof > 0)
453 exit_cleanup(0);
55bb7fff
WD
454 /* If we're still here after 10 seconds, exit with an error. */
455 for (i = 10*1000/20; i--; )
574c2500
WD
456 msleep(20);
457 }
3151cbae 458
00bdf899 459 rprintf(FERROR, RSYNC_NAME ": connection unexpectedly closed "
dca68b0a
WD
460 "(%.0f bytes received so far) [%s]\n",
461 (double)stats.total_read, who_am_i());
00bdf899
WD
462
463 exit_cleanup(RERR_STREAMIO);
7a55d06e 464}
720b47f2 465
880da007 466/**
6ed6d7f5 467 * Read from a socket with I/O timeout. return the number of bytes
c3563c46
MP
468 * read. If no bytes can be read then exit, never return a number <= 0.
469 *
8886f8d0
MP
470 * TODO: If the remote shell connection fails, then current versions
471 * actually report an "unexpected EOF" error here. Since it's a
472 * fairly common mistake to try to use rsh when ssh is required, we
473 * should trap that: if we fail to read any data at all, we should
474 * give a better explanation. We can tell whether the connection has
475 * started by looking e.g. at whether the remote version is known yet.
c3563c46 476 */
3151cbae 477static int read_timeout(int fd, char *buf, size_t len)
8d9dc9f9 478{
d8aeda1e 479 int n, cnt = 0;
4c36ddbe 480
d17e1dd2 481 io_flush(NORMAL_FLUSH);
ea2111d1 482
d8aeda1e 483 while (cnt == 0) {
7a55d06e 484 /* until we manage to read *something* */
56014c8c 485 fd_set r_fds, w_fds;
4c36ddbe 486 struct timeval tv;
1ea087a7 487 int maxfd = fd;
a57873b7 488 int count;
4c36ddbe 489
56014c8c 490 FD_ZERO(&r_fds);
a7026ba9 491 FD_ZERO(&w_fds);
56014c8c 492 FD_SET(fd, &r_fds);
954bbed8 493 if (msg2genr.head) {
4033b80b 494 FD_SET(msg_fd_out, &w_fds);
1ea087a7
WD
495 if (msg_fd_out > maxfd)
496 maxfd = msg_fd_out;
56014c8c 497 }
3309507d 498 if (io_filesfrom_f_out >= 0) {
56014c8c
WD
499 int new_fd;
500 if (io_filesfrom_buflen == 0) {
3309507d 501 if (io_filesfrom_f_in >= 0) {
56014c8c
WD
502 FD_SET(io_filesfrom_f_in, &r_fds);
503 new_fd = io_filesfrom_f_in;
504 } else {
505 io_filesfrom_f_out = -1;
506 new_fd = -1;
507 }
508 } else {
56014c8c
WD
509 FD_SET(io_filesfrom_f_out, &w_fds);
510 new_fd = io_filesfrom_f_out;
511 }
1ea087a7
WD
512 if (new_fd > maxfd)
513 maxfd = new_fd;
554e0a8d
AT
514 }
515
e626b29e 516 tv.tv_sec = select_timeout;
4c36ddbe
AT
517 tv.tv_usec = 0;
518
554e0a8d
AT
519 errno = 0;
520
a7026ba9 521 count = select(maxfd + 1, &r_fds, &w_fds, NULL, &tv);
a57873b7 522
a57873b7 523 if (count <= 0) {
f89b9368 524 if (errno == EBADF)
554e0a8d 525 exit_cleanup(RERR_SOCKETIO);
bd717af8 526 check_timeout();
4c36ddbe
AT
527 continue;
528 }
529
954bbed8
WD
530 if (msg2genr.head && FD_ISSET(msg_fd_out, &w_fds))
531 msg2genr_flush(NORMAL_FLUSH);
554e0a8d 532
3309507d 533 if (io_filesfrom_f_out >= 0) {
56014c8c
WD
534 if (io_filesfrom_buflen) {
535 if (FD_ISSET(io_filesfrom_f_out, &w_fds)) {
536 int l = write(io_filesfrom_f_out,
537 io_filesfrom_bp,
538 io_filesfrom_buflen);
539 if (l > 0) {
540 if (!(io_filesfrom_buflen -= l))
541 io_filesfrom_bp = io_filesfrom_buf;
542 else
543 io_filesfrom_bp += l;
544 } else {
545 /* XXX should we complain? */
546 io_filesfrom_f_out = -1;
547 }
548 }
3309507d 549 } else if (io_filesfrom_f_in >= 0) {
56014c8c
WD
550 if (FD_ISSET(io_filesfrom_f_in, &r_fds)) {
551 int l = read(io_filesfrom_f_in,
552 io_filesfrom_buf,
553 sizeof io_filesfrom_buf);
554 if (l <= 0) {
555 /* Send end-of-file marker */
556 io_filesfrom_buf[0] = '\0';
557 io_filesfrom_buf[1] = '\0';
558 io_filesfrom_buflen = io_filesfrom_lastchar? 2 : 1;
559 io_filesfrom_f_in = -1;
560 } else {
56014c8c
WD
561 if (!eol_nulls) {
562 char *s = io_filesfrom_buf + l;
563 /* Transform CR and/or LF into '\0' */
564 while (s-- > io_filesfrom_buf) {
565 if (*s == '\n' || *s == '\r')
566 *s = '\0';
567 }
568 }
569 if (!io_filesfrom_lastchar) {
570 /* Last buf ended with a '\0', so don't
571 * let this buf start with one. */
572 while (l && !*io_filesfrom_bp)
573 io_filesfrom_bp++, l--;
574 }
575 if (!l)
576 io_filesfrom_bp = io_filesfrom_buf;
577 else {
578 char *f = io_filesfrom_bp;
579 char *t = f;
580 char *eob = f + l;
581 /* Eliminate any multi-'\0' runs. */
582 while (f != eob) {
583 if (!(*t++ = *f++)) {
584 while (f != eob && !*f)
585 f++, l--;
586 }
587 }
588 io_filesfrom_lastchar = f[-1];
589 }
590 io_filesfrom_buflen = l;
591 }
592 }
593 }
594 }
595
f89b9368
WD
596 if (!FD_ISSET(fd, &r_fds))
597 continue;
554e0a8d 598
4c36ddbe
AT
599 n = read(fd, buf, len);
600
1ea087a7
WD
601 if (n <= 0) {
602 if (n == 0)
1f75bb10 603 whine_about_eof(fd); /* Doesn't return. */
d62bcc17
WD
604 if (errno == EINTR || errno == EWOULDBLOCK
605 || errno == EAGAIN)
7a55d06e 606 continue;
1f75bb10
WD
607
608 /* Don't write errors on a dead socket. */
72f2d1b3 609 if (fd == sock_f_in) {
7f459268 610 close_multiplexing_out();
72f2d1b3
WD
611 rsyserr(FSOCKERR, errno, "read error");
612 } else
613 rsyserr(FERROR, errno, "read error");
1f75bb10 614 exit_cleanup(RERR_STREAMIO);
8d9dc9f9 615 }
00bdf899
WD
616
617 buf += n;
618 len -= n;
d8aeda1e 619 cnt += n;
1f75bb10 620
3b0a30eb
WD
621 if (fd == sock_f_in && io_timeout)
622 last_io_in = time(NULL);
4c36ddbe 623 }
8d9dc9f9 624
d8aeda1e 625 return cnt;
4c36ddbe 626}
8d9dc9f9 627
56014c8c
WD
628/**
629 * Read a line into the "fname" buffer (which must be at least MAXPATHLEN
630 * characters long).
631 */
632int read_filesfrom_line(int fd, char *fname)
633{
634 char ch, *s, *eob = fname + MAXPATHLEN - 1;
635 int cnt;
d19320fd 636 int reading_remotely = filesfrom_host != NULL;
56014c8c
WD
637 int nulls = eol_nulls || reading_remotely;
638
639 start:
640 s = fname;
641 while (1) {
642 cnt = read(fd, &ch, 1);
643 if (cnt < 0 && (errno == EWOULDBLOCK
644 || errno == EINTR || errno == EAGAIN)) {
645 struct timeval tv;
646 fd_set fds;
647 FD_ZERO(&fds);
648 FD_SET(fd, &fds);
e626b29e 649 tv.tv_sec = select_timeout;
56014c8c
WD
650 tv.tv_usec = 0;
651 if (!select(fd+1, &fds, NULL, NULL, &tv))
652 check_timeout();
653 continue;
654 }
655 if (cnt != 1)
656 break;
657 if (nulls? !ch : (ch == '\r' || ch == '\n')) {
658 /* Skip empty lines if reading locally. */
659 if (!reading_remotely && s == fname)
660 continue;
661 break;
662 }
663 if (s < eob)
664 *s++ = ch;
665 }
666 *s = '\0';
7a55d06e 667
6b45fcf1
WD
668 /* Dump comments. */
669 if (*fname == '#' || *fname == ';')
56014c8c
WD
670 goto start;
671
672 return s - fname;
673}
7a55d06e 674
1f75bb10
WD
675static char *iobuf_out;
676static int iobuf_out_cnt;
677
678void io_start_buffering_out(void)
679{
680 if (iobuf_out)
681 return;
682 if (!(iobuf_out = new_array(char, IO_BUFFER_SIZE)))
683 out_of_memory("io_start_buffering_out");
684 iobuf_out_cnt = 0;
685}
686
1f75bb10
WD
687static char *iobuf_in;
688static size_t iobuf_in_siz;
689
690void io_start_buffering_in(void)
691{
692 if (iobuf_in)
693 return;
694 iobuf_in_siz = 2 * IO_BUFFER_SIZE;
695 if (!(iobuf_in = new_array(char, iobuf_in_siz)))
696 out_of_memory("io_start_buffering_in");
697}
698
1f75bb10
WD
699void io_end_buffering(void)
700{
701 io_flush(NORMAL_FLUSH);
702 if (!io_multiplexing_out) {
703 free(iobuf_out);
704 iobuf_out = NULL;
705 }
706}
707
626bec8e
WD
708void maybe_flush_socket(void)
709{
3b0a30eb 710 if (iobuf_out && iobuf_out_cnt && time(NULL) - last_io_out >= 5)
626bec8e
WD
711 io_flush(NORMAL_FLUSH);
712}
713
cdf236aa 714void maybe_send_keepalive(void)
9e2409ab 715{
3b0a30eb 716 if (time(NULL) - last_io_out >= allowed_lull) {
9e2409ab 717 if (!iobuf_out || !iobuf_out_cnt) {
3221f451
WD
718 if (protocol_version < 29)
719 return; /* there's nothing we can do */
cdf236aa 720 write_int(sock_f_out, the_file_list->count);
9e2409ab
WD
721 write_shortint(sock_f_out, ITEM_IS_NEW);
722 }
723 if (iobuf_out)
724 io_flush(NORMAL_FLUSH);
725 }
726}
727
880da007
MP
728/**
729 * Continue trying to read len bytes - don't return until len has been
730 * read.
731 **/
3151cbae 732static void read_loop(int fd, char *buf, size_t len)
4c36ddbe
AT
733{
734 while (len) {
735 int n = read_timeout(fd, buf, len);
736
737 buf += n;
738 len -= n;
8d9dc9f9
AT
739 }
740}
741
7a55d06e
MP
742/**
743 * Read from the file descriptor handling multiplexing - return number
744 * of bytes read.
d62bcc17
WD
745 *
746 * Never returns <= 0.
7a55d06e 747 */
c399d22a 748static int readfd_unbuffered(int fd, char *buf, size_t len)
8d9dc9f9 749{
6fe25398 750 static size_t remaining;
1f75bb10 751 static size_t iobuf_in_ndx;
f2a4853c 752 size_t msg_bytes;
d8aeda1e 753 int tag, cnt = 0;
33544bf4 754 char line[BIGPATHBUFLEN];
8d9dc9f9 755
1f75bb10 756 if (!iobuf_in || fd != sock_f_in)
4c36ddbe 757 return read_timeout(fd, buf, len);
8d9dc9f9 758
76c21947 759 if (!io_multiplexing_in && remaining == 0) {
1f75bb10
WD
760 remaining = read_timeout(fd, iobuf_in, iobuf_in_siz);
761 iobuf_in_ndx = 0;
76c21947
WD
762 }
763
d8aeda1e 764 while (cnt == 0) {
8d9dc9f9
AT
765 if (remaining) {
766 len = MIN(len, remaining);
1f75bb10
WD
767 memcpy(buf, iobuf_in + iobuf_in_ndx, len);
768 iobuf_in_ndx += len;
8d9dc9f9 769 remaining -= len;
d8aeda1e 770 cnt = len;
76c21947 771 break;
8d9dc9f9
AT
772 }
773
909ce14f 774 read_loop(fd, line, 4);
ff41a59f 775 tag = IVAL(line, 0);
679e7657 776
f2a4853c 777 msg_bytes = tag & 0xFFFFFF;
d17e1dd2 778 tag = (tag >> 24) - MPLEX_BASE;
8d9dc9f9 779
d17e1dd2
WD
780 switch (tag) {
781 case MSG_DATA:
f2a4853c 782 if (msg_bytes > iobuf_in_siz) {
1f75bb10 783 if (!(iobuf_in = realloc_array(iobuf_in, char,
f2a4853c 784 msg_bytes)))
c399d22a 785 out_of_memory("readfd_unbuffered");
f2a4853c 786 iobuf_in_siz = msg_bytes;
76c21947 787 }
f2a4853c
WD
788 read_loop(fd, iobuf_in, msg_bytes);
789 remaining = msg_bytes;
1f75bb10 790 iobuf_in_ndx = 0;
d17e1dd2 791 break;
0d67e00a 792 case MSG_DELETED:
f2a4853c
WD
793 if (msg_bytes >= sizeof line)
794 goto overflow;
795 read_loop(fd, line, msg_bytes);
0d67e00a 796 /* A directory name was sent with the trailing null */
f2a4853c 797 if (msg_bytes > 0 && !line[msg_bytes-1])
0d67e00a 798 log_delete(line, S_IFDIR);
12bda6f7
WD
799 else {
800 line[msg_bytes] = '\0';
0d67e00a 801 log_delete(line, S_IFREG);
12bda6f7 802 }
0d67e00a 803 break;
9981c27e 804 case MSG_SUCCESS:
f2a4853c 805 if (msg_bytes != 4) {
cdf236aa 806 rprintf(FERROR, "invalid multi-message %d:%ld [%s]\n",
f2a4853c 807 tag, (long)msg_bytes, who_am_i());
9981c27e
WD
808 exit_cleanup(RERR_STREAMIO);
809 }
f2a4853c 810 read_loop(fd, line, msg_bytes);
9981c27e 811 successful_send(IVAL(line, 0));
9981c27e 812 break;
d17e1dd2
WD
813 case MSG_INFO:
814 case MSG_ERROR:
f2a4853c
WD
815 if (msg_bytes >= sizeof line) {
816 overflow:
bd9fca47 817 rprintf(FERROR,
cdf236aa 818 "multiplexing overflow %d:%ld [%s]\n",
f2a4853c 819 tag, (long)msg_bytes, who_am_i());
d17e1dd2
WD
820 exit_cleanup(RERR_STREAMIO);
821 }
f2a4853c
WD
822 read_loop(fd, line, msg_bytes);
823 rwrite((enum logcode)tag, line, msg_bytes);
d17e1dd2
WD
824 break;
825 default:
cdf236aa
WD
826 rprintf(FERROR, "unexpected tag %d [%s]\n",
827 tag, who_am_i());
65417579 828 exit_cleanup(RERR_STREAMIO);
8d9dc9f9 829 }
8d9dc9f9
AT
830 }
831
76c21947 832 if (remaining == 0)
d17e1dd2 833 io_flush(NORMAL_FLUSH);
76c21947 834
d8aeda1e 835 return cnt;
8d9dc9f9
AT
836}
837
880da007
MP
838/**
839 * Do a buffered read from @p fd. Don't return until all @p n bytes
840 * have been read. If all @p n can't be read then exit with an
841 * error.
842 **/
3151cbae 843static void readfd(int fd, char *buffer, size_t N)
720b47f2 844{
d8aeda1e 845 int cnt;
d62bcc17 846 size_t total = 0;
3151cbae 847
6ba9279f 848 while (total < N) {
d8aeda1e
WD
849 cnt = readfd_unbuffered(fd, buffer + total, N-total);
850 total += cnt;
7f28dbee 851 }
1b7c47cb 852
b9f592fb
WD
853 if (fd == write_batch_monitor_in) {
854 if ((size_t)write(batch_fd, buffer, total) != total)
855 exit_cleanup(RERR_FILEIO);
856 }
1f75bb10
WD
857
858 if (fd == sock_f_in)
859 stats.total_read += total;
720b47f2
AT
860}
861
0d67e00a 862int read_shortint(int f)
9361f839
WD
863{
864 uchar b[2];
865 readfd(f, (char *)b, 2);
866 return (b[1] << 8) + b[0];
867}
868
b7922338 869int32 read_int(int f)
720b47f2 870{
4c36ddbe 871 char b[4];
d8aeda1e 872 int32 num;
d730b113 873
4c36ddbe 874 readfd(f,b,4);
d8aeda1e
WD
875 num = IVAL(b,0);
876 if (num == (int32)0xffffffff)
f89b9368 877 return -1;
d8aeda1e 878 return num;
720b47f2
AT
879}
880
71c46176 881int64 read_longint(int f)
3a6a366f 882{
d8aeda1e 883 int64 num;
3a6a366f 884 char b[8];
d8aeda1e 885 num = read_int(f);
71c46176 886
d8aeda1e
WD
887 if ((int32)num != (int32)0xffffffff)
888 return num;
71c46176 889
031fa9ad
WD
890#if SIZEOF_INT64 < 8
891 rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
892 exit_cleanup(RERR_UNSUPPORTED);
893#else
91c4da3f 894 readfd(f,b,8);
d8aeda1e 895 num = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
031fa9ad 896#endif
71c46176 897
d8aeda1e 898 return num;
3a6a366f
AT
899}
900
9dd891bb 901void read_buf(int f,char *buf,size_t len)
720b47f2 902{
4c36ddbe 903 readfd(f,buf,len);
720b47f2
AT
904}
905
9dd891bb 906void read_sbuf(int f,char *buf,size_t len)
575f2fca 907{
93095cbe 908 readfd(f, buf, len);
af436313 909 buf[len] = '\0';
575f2fca
AT
910}
911
9361f839 912uchar read_byte(int f)
182dca5c 913{
9361f839 914 uchar c;
93095cbe 915 readfd(f, (char *)&c, 1);
4c36ddbe 916 return c;
182dca5c 917}
720b47f2 918
46e99b09
WD
919int read_vstring(int f, char *buf, int bufsize)
920{
921 int len = read_byte(f);
922
923 if (len & 0x80)
924 len = (len & ~0x80) * 0x100 + read_byte(f);
925
926 if (len >= bufsize) {
927 rprintf(FERROR, "over-long vstring received (%d > %d)\n",
928 len, bufsize - 1);
9a6ed83f 929 return -1;
46e99b09
WD
930 }
931
932 if (len)
933 readfd(f, buf, len);
934 buf[len] = '\0';
935 return len;
936}
937
188fed95
WD
938/* Populate a sum_struct with values from the socket. This is
939 * called by both the sender and the receiver. */
940void read_sum_head(int f, struct sum_struct *sum)
941{
942 sum->count = read_int(f);
c638caa6
WD
943 if (sum->count < 0) {
944 rprintf(FERROR, "Invalid checksum count %ld [%s]\n",
945 (long)sum->count, who_am_i());
946 exit_cleanup(RERR_PROTOCOL);
947 }
188fed95
WD
948 sum->blength = read_int(f);
949 if (sum->blength < 0 || sum->blength > MAX_BLOCK_SIZE) {
cdf236aa
WD
950 rprintf(FERROR, "Invalid block length %ld [%s]\n",
951 (long)sum->blength, who_am_i());
188fed95
WD
952 exit_cleanup(RERR_PROTOCOL);
953 }
954 sum->s2length = protocol_version < 27 ? csum_length : (int)read_int(f);
955 if (sum->s2length < 0 || sum->s2length > MD4_SUM_LENGTH) {
cdf236aa
WD
956 rprintf(FERROR, "Invalid checksum length %d [%s]\n",
957 sum->s2length, who_am_i());
188fed95
WD
958 exit_cleanup(RERR_PROTOCOL);
959 }
960 sum->remainder = read_int(f);
961 if (sum->remainder < 0 || sum->remainder > sum->blength) {
cdf236aa
WD
962 rprintf(FERROR, "Invalid remainder length %ld [%s]\n",
963 (long)sum->remainder, who_am_i());
188fed95
WD
964 exit_cleanup(RERR_PROTOCOL);
965 }
966}
967
c207d7ec
WD
968/* Send the values from a sum_struct over the socket. Set sum to
969 * NULL if there are no checksums to send. This is called by both
970 * the generator and the sender. */
971void write_sum_head(int f, struct sum_struct *sum)
972{
973 static struct sum_struct null_sum;
974
975 if (sum == NULL)
976 sum = &null_sum;
977
978 write_int(f, sum->count);
979 write_int(f, sum->blength);
980 if (protocol_version >= 27)
981 write_int(f, sum->s2length);
982 write_int(f, sum->remainder);
983}
984
08571358
MP
985/**
986 * Sleep after writing to limit I/O bandwidth usage.
987 *
988 * @todo Rather than sleeping after each write, it might be better to
989 * use some kind of averaging. The current algorithm seems to always
990 * use a bit less bandwidth than specified, because it doesn't make up
991 * for slow periods. But arguably this is a feature. In addition, we
992 * ought to take the time used to write the data into account.
71e58630
WD
993 *
994 * During some phases of big transfers (file FOO is uptodate) this is
995 * called with a small bytes_written every time. As the kernel has to
996 * round small waits up to guarantee that we actually wait at least the
997 * requested number of microseconds, this can become grossly inaccurate.
998 * We therefore keep track of the bytes we've written over time and only
999 * sleep when the accumulated delay is at least 1 tenth of a second.
08571358
MP
1000 **/
1001static void sleep_for_bwlimit(int bytes_written)
1002{
71e58630
WD
1003 static struct timeval prior_tv;
1004 static long total_written = 0;
1005 struct timeval tv, start_tv;
1006 long elapsed_usec, sleep_usec;
1007
1008#define ONE_SEC 1000000L /* # of microseconds in a second */
08571358 1009
9a0cfff5 1010 if (!bwlimit_writemax)
08571358 1011 return;
e681e820 1012
71e58630
WD
1013 total_written += bytes_written;
1014
1015 gettimeofday(&start_tv, NULL);
1016 if (prior_tv.tv_sec) {
1017 elapsed_usec = (start_tv.tv_sec - prior_tv.tv_sec) * ONE_SEC
1018 + (start_tv.tv_usec - prior_tv.tv_usec);
1019 total_written -= elapsed_usec * bwlimit / (ONE_SEC/1024);
1020 if (total_written < 0)
1021 total_written = 0;
1022 }
3151cbae 1023
71e58630
WD
1024 sleep_usec = total_written * (ONE_SEC/1024) / bwlimit;
1025 if (sleep_usec < ONE_SEC / 10) {
1026 prior_tv = start_tv;
1027 return;
1028 }
08571358 1029
71e58630
WD
1030 tv.tv_sec = sleep_usec / ONE_SEC;
1031 tv.tv_usec = sleep_usec % ONE_SEC;
98b332ed 1032 select(0, NULL, NULL, NULL, &tv);
71e58630
WD
1033
1034 gettimeofday(&prior_tv, NULL);
1035 elapsed_usec = (prior_tv.tv_sec - start_tv.tv_sec) * ONE_SEC
1036 + (prior_tv.tv_usec - start_tv.tv_usec);
1037 total_written = (sleep_usec - elapsed_usec) * bwlimit / (ONE_SEC/1024);
08571358
MP
1038}
1039
1f75bb10 1040/* Write len bytes to the file descriptor fd, looping as necessary to get
0fb2fc4a 1041 * the job done and also (in certain circumstances) reading any data on
af436313 1042 * msg_fd_in to avoid deadlock.
880da007
MP
1043 *
1044 * This function underlies the multiplexing system. The body of the
1f75bb10 1045 * application never calls this function directly. */
9dd891bb 1046static void writefd_unbuffered(int fd,char *buf,size_t len)
720b47f2 1047{
1ea087a7 1048 size_t n, total = 0;
8d9dc9f9 1049 fd_set w_fds, r_fds;
d8aeda1e 1050 int maxfd, count, cnt, using_r_fds;
7d51b837 1051 int defer_save = defer_forwarding_messages;
8d9dc9f9 1052 struct timeval tv;
720b47f2 1053
e44f9a12
AT
1054 no_flush++;
1055
4c36ddbe 1056 while (total < len) {
8d9dc9f9 1057 FD_ZERO(&w_fds);
8d9dc9f9 1058 FD_SET(fd,&w_fds);
1ea087a7 1059 maxfd = fd;
4c36ddbe 1060
954bbed8 1061 if (msg_fd_in >= 0) {
56014c8c 1062 FD_ZERO(&r_fds);
d17e1dd2 1063 FD_SET(msg_fd_in,&r_fds);
1ea087a7
WD
1064 if (msg_fd_in > maxfd)
1065 maxfd = msg_fd_in;
3eeac9bc
WD
1066 using_r_fds = 1;
1067 } else
1068 using_r_fds = 0;
8d9dc9f9 1069
e626b29e 1070 tv.tv_sec = select_timeout;
8d9dc9f9 1071 tv.tv_usec = 0;
4c36ddbe 1072
554e0a8d 1073 errno = 0;
3eeac9bc 1074 count = select(maxfd + 1, using_r_fds ? &r_fds : NULL,
d17e1dd2 1075 &w_fds, NULL, &tv);
4c36ddbe
AT
1076
1077 if (count <= 0) {
1ea087a7 1078 if (count < 0 && errno == EBADF)
554e0a8d 1079 exit_cleanup(RERR_SOCKETIO);
bd717af8 1080 check_timeout();
8d9dc9f9
AT
1081 continue;
1082 }
4c36ddbe 1083
3eeac9bc 1084 if (using_r_fds && FD_ISSET(msg_fd_in, &r_fds))
d17e1dd2 1085 read_msg_fd();
554e0a8d 1086
9a6ed83f 1087 if (!FD_ISSET(fd, &w_fds))
1ea087a7 1088 continue;
4c36ddbe 1089
1ea087a7 1090 n = len - total;
9a0cfff5 1091 if (bwlimit_writemax && n > bwlimit_writemax)
1ea087a7 1092 n = bwlimit_writemax;
d8aeda1e 1093 cnt = write(fd, buf + total, n);
1ea087a7 1094
d8aeda1e
WD
1095 if (cnt <= 0) {
1096 if (cnt < 0) {
3309507d
WD
1097 if (errno == EINTR)
1098 continue;
1099 if (errno == EWOULDBLOCK || errno == EAGAIN) {
1100 msleep(1);
1101 continue;
1102 }
f0359dd0
AT
1103 }
1104
1ea087a7 1105 /* Don't try to write errors back across the stream. */
1f75bb10 1106 if (fd == sock_f_out)
7f459268 1107 close_multiplexing_out();
1ea087a7 1108 rsyserr(FERROR, errno,
b88c2e8f
WD
1109 "writefd_unbuffered failed to write %ld bytes [%s]",
1110 (long)len, who_am_i());
d1b31da7
WD
1111 /* If the other side is sending us error messages, try
1112 * to grab any messages they sent before they died. */
7f459268 1113 while (fd == sock_f_out && io_multiplexing_in) {
3b0a30eb 1114 set_io_timeout(30);
9e2409ab 1115 ignore_timeout = 0;
d1b31da7
WD
1116 readfd_unbuffered(sock_f_in, io_filesfrom_buf,
1117 sizeof io_filesfrom_buf);
1118 }
1ea087a7
WD
1119 exit_cleanup(RERR_STREAMIO);
1120 }
4c36ddbe 1121
d8aeda1e 1122 total += cnt;
954bbed8 1123 defer_forwarding_messages = 1;
a800434a 1124
1f75bb10 1125 if (fd == sock_f_out) {
626bec8e 1126 if (io_timeout || am_generator)
3b0a30eb 1127 last_io_out = time(NULL);
d8aeda1e 1128 sleep_for_bwlimit(cnt);
1f75bb10 1129 }
4c36ddbe 1130 }
e44f9a12 1131
7d51b837 1132 defer_forwarding_messages = defer_save;
e44f9a12 1133 no_flush--;
720b47f2
AT
1134}
1135
7d51b837
WD
1136static void msg2sndr_flush(void)
1137{
1138 if (defer_forwarding_messages)
1139 return;
1140
a7704777 1141 while (msg2sndr.head && io_multiplexing_out) {
7d51b837
WD
1142 struct msg_list_item *m = msg2sndr.head;
1143 if (!(msg2sndr.head = m->next))
1144 msg2sndr.tail = NULL;
1145 stats.total_written += m->len;
1146 defer_forwarding_messages = 1;
1147 writefd_unbuffered(sock_f_out, m->buf, m->len);
1148 defer_forwarding_messages = 0;
1149 free(m);
1150 }
1151}
1152
880da007
MP
1153/**
1154 * Write an message to a multiplexed stream. If this fails then rsync
1155 * exits.
1156 **/
1f75bb10 1157static void mplex_write(enum msgcode code, char *buf, size_t len)
ff41a59f 1158{
12bda6f7 1159 char buffer[1024];
06ce139f 1160 size_t n = len;
8d9dc9f9 1161
ff41a59f
AT
1162 SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
1163
c399d22a 1164 if (n > sizeof buffer - 4)
12bda6f7
WD
1165 n = 0;
1166 else
1167 memcpy(buffer + 4, buf, n);
ff41a59f 1168
1f75bb10 1169 writefd_unbuffered(sock_f_out, buffer, n+4);
ff41a59f
AT
1170
1171 len -= n;
1172 buf += n;
1173
7d51b837
WD
1174 if (len) {
1175 defer_forwarding_messages = 1;
1f75bb10 1176 writefd_unbuffered(sock_f_out, buf, len);
7d51b837
WD
1177 defer_forwarding_messages = 0;
1178 msg2sndr_flush();
1179 }
d6dead6b
AT
1180}
1181
d17e1dd2 1182void io_flush(int flush_it_all)
d6dead6b 1183{
954bbed8 1184 msg2genr_flush(flush_it_all);
7d51b837 1185 msg2sndr_flush();
90ba34e2 1186
1f75bb10 1187 if (!iobuf_out_cnt || no_flush)
d17e1dd2 1188 return;
8d9dc9f9 1189
d17e1dd2 1190 if (io_multiplexing_out)
1f75bb10 1191 mplex_write(MSG_DATA, iobuf_out, iobuf_out_cnt);
d17e1dd2 1192 else
1f75bb10
WD
1193 writefd_unbuffered(sock_f_out, iobuf_out, iobuf_out_cnt);
1194 iobuf_out_cnt = 0;
8d9dc9f9
AT
1195}
1196
9dd891bb 1197static void writefd(int fd,char *buf,size_t len)
d6dead6b 1198{
4033b80b
WD
1199 if (fd == msg_fd_out) {
1200 rprintf(FERROR, "Internal error: wrong write used in receiver.\n");
1201 exit_cleanup(RERR_PROTOCOL);
1202 }
90ba34e2 1203
1f75bb10
WD
1204 if (fd == sock_f_out)
1205 stats.total_written += len;
1206
b9f592fb
WD
1207 if (fd == write_batch_monitor_out) {
1208 if ((size_t)write(batch_fd, buf, len) != len)
1209 exit_cleanup(RERR_FILEIO);
1210 }
1211
1f75bb10 1212 if (!iobuf_out || fd != sock_f_out) {
4c36ddbe
AT
1213 writefd_unbuffered(fd, buf, len);
1214 return;
1215 }
d6dead6b
AT
1216
1217 while (len) {
1f75bb10 1218 int n = MIN((int)len, IO_BUFFER_SIZE - iobuf_out_cnt);
d6dead6b 1219 if (n > 0) {
1f75bb10 1220 memcpy(iobuf_out+iobuf_out_cnt, buf, n);
d6dead6b
AT
1221 buf += n;
1222 len -= n;
1f75bb10 1223 iobuf_out_cnt += n;
d6dead6b 1224 }
3151cbae 1225
1f75bb10 1226 if (iobuf_out_cnt == IO_BUFFER_SIZE)
d17e1dd2 1227 io_flush(NORMAL_FLUSH);
d6dead6b 1228 }
d6dead6b 1229}
720b47f2 1230
0d67e00a 1231void write_shortint(int f, int x)
9361f839
WD
1232{
1233 uchar b[2];
1234 b[0] = x;
1235 b[1] = x >> 8;
1236 writefd(f, (char *)b, 2);
1237}
1238
b7922338 1239void write_int(int f,int32 x)
720b47f2 1240{
8d9dc9f9
AT
1241 char b[4];
1242 SIVAL(b,0,x);
4c36ddbe 1243 writefd(f,b,4);
720b47f2
AT
1244}
1245
7a24c346
MP
1246/*
1247 * Note: int64 may actually be a 32-bit type if ./configure couldn't find any
1248 * 64-bit types on this platform.
1249 */
71c46176 1250void write_longint(int f, int64 x)
3a6a366f 1251{
3a6a366f 1252 char b[8];
3a6a366f 1253
91c4da3f 1254 if (x <= 0x7FFFFFFF) {
3a6a366f
AT
1255 write_int(f, (int)x);
1256 return;
1257 }
1258
031fa9ad
WD
1259#if SIZEOF_INT64 < 8
1260 rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
1261 exit_cleanup(RERR_UNSUPPORTED);
1262#else
8de330a3 1263 write_int(f, (int32)0xFFFFFFFF);
3a6a366f
AT
1264 SIVAL(b,0,(x&0xFFFFFFFF));
1265 SIVAL(b,4,((x>>32)&0xFFFFFFFF));
1266
4c36ddbe 1267 writefd(f,b,8);
031fa9ad 1268#endif
3a6a366f
AT
1269}
1270
9dd891bb 1271void write_buf(int f,char *buf,size_t len)
720b47f2 1272{
4c36ddbe 1273 writefd(f,buf,len);
720b47f2
AT
1274}
1275
880da007 1276/** Write a string to the connection */
cf338ab1 1277void write_sbuf(int f, char *buf)
f0fca04e 1278{
93095cbe 1279 writefd(f, buf, strlen(buf));
f0fca04e
AT
1280}
1281
9361f839 1282void write_byte(int f, uchar c)
182dca5c 1283{
93095cbe 1284 writefd(f, (char *)&c, 1);
182dca5c
AT
1285}
1286
46e99b09
WD
1287void write_vstring(int f, char *str, int len)
1288{
1289 uchar lenbuf[3], *lb = lenbuf;
1290
1291 if (len > 0x7F) {
1292 if (len > 0x7FFF) {
1293 rprintf(FERROR,
1294 "attempting to send over-long vstring (%d > %d)\n",
1295 len, 0x7FFF);
1296 exit_cleanup(RERR_PROTOCOL);
1297 }
1298 *lb++ = len / 0x100 + 0x80;
1299 }
1300 *lb = len;
1301
1302 writefd(f, (char*)lenbuf, lb - lenbuf + 1);
1303 if (len)
1304 writefd(f, str, len);
1305}
1306
914cc65c 1307/**
6ed6d7f5
WD
1308 * Read a line of up to @p maxlen characters into @p buf (not counting
1309 * the trailing null). Strips the (required) trailing newline and all
1310 * carriage returns.
914cc65c 1311 *
6ed6d7f5 1312 * @return 1 for success; 0 for I/O error or truncation.
914cc65c 1313 **/
9dd891bb 1314int read_line(int f, char *buf, size_t maxlen)
f0fca04e
AT
1315{
1316 while (maxlen) {
528bfcd7 1317 buf[0] = 0;
f0fca04e 1318 read_buf(f, buf, 1);
914cc65c
MP
1319 if (buf[0] == 0)
1320 return 0;
6ed6d7f5 1321 if (buf[0] == '\n')
f0fca04e 1322 break;
f0fca04e
AT
1323 if (buf[0] != '\r') {
1324 buf++;
1325 maxlen--;
1326 }
1327 }
6ed6d7f5
WD
1328 *buf = '\0';
1329 return maxlen > 0;
f0fca04e
AT
1330}
1331
f0fca04e
AT
1332void io_printf(int fd, const char *format, ...)
1333{
d62bcc17 1334 va_list ap;
33544bf4 1335 char buf[BIGPATHBUFLEN];
f0fca04e 1336 int len;
3151cbae 1337
f0fca04e 1338 va_start(ap, format);
3151cbae 1339 len = vsnprintf(buf, sizeof buf, format, ap);
f0fca04e
AT
1340 va_end(ap);
1341
f89b9368
WD
1342 if (len < 0)
1343 exit_cleanup(RERR_STREAMIO);
f0fca04e 1344
33544bf4
WD
1345 if (len > (int)sizeof buf) {
1346 rprintf(FERROR, "io_printf() was too long for the buffer.\n");
1347 exit_cleanup(RERR_STREAMIO);
1348 }
1349
f0fca04e
AT
1350 write_sbuf(fd, buf);
1351}
8d9dc9f9 1352
d17e1dd2 1353/** Setup for multiplexing a MSG_* stream with the data stream. */
1f75bb10 1354void io_start_multiplex_out(void)
8d9dc9f9 1355{
d17e1dd2 1356 io_flush(NORMAL_FLUSH);
1f75bb10 1357 io_start_buffering_out();
8d9dc9f9
AT
1358 io_multiplexing_out = 1;
1359}
1360
d17e1dd2 1361/** Setup for multiplexing a MSG_* stream with the data stream. */
1f75bb10 1362void io_start_multiplex_in(void)
8d9dc9f9 1363{
d17e1dd2 1364 io_flush(NORMAL_FLUSH);
1f75bb10 1365 io_start_buffering_in();
8d9dc9f9
AT
1366 io_multiplexing_in = 1;
1367}
1368
d17e1dd2
WD
1369/** Write an message to the multiplexed data stream. */
1370int io_multiplex_write(enum msgcode code, char *buf, size_t len)
8d9dc9f9 1371{
f89b9368
WD
1372 if (!io_multiplexing_out)
1373 return 0;
8d9dc9f9 1374
d17e1dd2 1375 io_flush(NORMAL_FLUSH);
1b7c47cb 1376 stats.total_written += (len+4);
1f75bb10 1377 mplex_write(code, buf, len);
8d9dc9f9
AT
1378 return 1;
1379}
1380
7f459268
WD
1381void close_multiplexing_in(void)
1382{
1383 io_multiplexing_in = 0;
1384}
1385
d17e1dd2 1386/** Stop output multiplexing. */
7f459268 1387void close_multiplexing_out(void)
554e0a8d
AT
1388{
1389 io_multiplexing_out = 0;
1390}
1391
b9f592fb
WD
1392void start_write_batch(int fd)
1393{
741d6544
WD
1394 write_stream_flags(batch_fd);
1395
b9f592fb
WD
1396 /* Some communication has already taken place, but we don't
1397 * enable batch writing until here so that we can write a
1398 * canonical record of the communication even though the
1399 * actual communication so far depends on whether a daemon
1400 * is involved. */
1401 write_int(batch_fd, protocol_version);
1402 write_int(batch_fd, checksum_seed);
b9f592fb
WD
1403
1404 if (am_sender)
1405 write_batch_monitor_out = fd;
1406 else
1407 write_batch_monitor_in = fd;
1408}
1409
1410void stop_write_batch(void)
1411{
1412 write_batch_monitor_out = -1;
1413 write_batch_monitor_in = -1;
1414}