Martin gave his approval to use GPLv3 with this code.
[rsync/rsync.git] / io.c
CommitLineData
0f78b815
WD
1/*
2 * Socket and pipe I/O utilities used in rsync.
d62bcc17 3 *
0f78b815
WD
4 * Copyright (C) 1996-2001 Andrew Tridgell
5 * Copyright (C) 1996 Paul Mackerras
6 * Copyright (C) 2001, 2002 Martin Pool <mbp@samba.org>
ba2133d6 7 * Copyright (C) 2003-2007 Wayne Davison
d62bcc17 8 *
880da007 9 * This program is free software; you can redistribute it and/or modify
4fd842f9 10 * it under the terms of the GNU General Public License version 3 as
ba2133d6 11 * published by the Free Software Foundation.
d62bcc17 12 *
880da007
MP
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
d62bcc17 17 *
e7c67065 18 * You should have received a copy of the GNU General Public License along
4fd842f9 19 * with this program; if not, visit the http://fsf.org website.
880da007 20 */
720b47f2 21
0f78b815
WD
22/* Rsync provides its own multiplexing system, which is used to send
23 * stderr and stdout over a single socket.
87ee2481
MP
24 *
25 * For historical reasons this is off during the start of the
26 * connection, but it's switched on quite early using
0f78b815 27 * io_start_multiplex_out() and io_start_multiplex_in(). */
720b47f2 28
720b47f2
AT
29#include "rsync.h"
30
880da007 31/** If no timeout is specified then use a 60 second select timeout */
8cd9fd4e
AT
32#define SELECT_TIMEOUT 60
33
7a55d06e 34extern int bwlimit;
71e58630 35extern size_t bwlimit_writemax;
6ba9279f 36extern int io_timeout;
cdf236aa 37extern int allowed_lull;
d17e1dd2
WD
38extern int am_server;
39extern int am_daemon;
40extern int am_sender;
98f8c9a5 41extern int am_generator;
3ea6e0e7 42extern int inc_recurse;
8ef246e0 43extern int io_error;
e626b29e 44extern int eol_nulls;
8ef246e0 45extern int flist_eof;
3b0a30eb 46extern int read_batch;
188fed95 47extern int csum_length;
b9f592fb
WD
48extern int checksum_seed;
49extern int protocol_version;
47c11975 50extern int remove_source_files;
cdf236aa 51extern int preserve_hard_links;
d19320fd 52extern char *filesfrom_host;
a800434a 53extern struct stats stats;
8ef246e0 54extern struct file_list *cur_flist, *first_flist;
332cf6df
WD
55#ifdef ICONV_OPTION
56extern iconv_t ic_send, ic_recv;
57#endif
720b47f2 58
a86179f4 59const char phase_unknown[] = "unknown";
9e2409ab 60int ignore_timeout = 0;
b9f592fb 61int batch_fd = -1;
04c722d5 62int msgdone_cnt = 0;
805edf9d 63
cb2e1f18 64/* Ignore an EOF error if non-zero. See whine_about_eof(). */
574c2500 65int kluge_around_eof = 0;
7a55d06e 66
d17e1dd2
WD
67int msg_fd_in = -1;
68int msg_fd_out = -1;
cdf236aa
WD
69int sock_f_in = -1;
70int sock_f_out = -1;
7a55d06e 71
8ef246e0
WD
72static int iobuf_f_in = -1;
73static char *iobuf_in;
74static size_t iobuf_in_siz;
75static size_t iobuf_in_ndx;
76static size_t iobuf_in_remaining;
77
78static int iobuf_f_out = -1;
79static char *iobuf_out;
80static int iobuf_out_cnt;
81
82int flist_forward_from = -1;
83
1f75bb10
WD
84static int io_multiplexing_out;
85static int io_multiplexing_in;
3b0a30eb
WD
86static time_t last_io_in;
87static time_t last_io_out;
1f75bb10
WD
88static int no_flush;
89
b9f592fb
WD
90static int write_batch_monitor_in = -1;
91static int write_batch_monitor_out = -1;
92
56014c8c
WD
93static int io_filesfrom_f_in = -1;
94static int io_filesfrom_f_out = -1;
95static char io_filesfrom_buf[2048];
332cf6df
WD
96#ifdef ICONV_OPTION
97static char iconv_buf[sizeof io_filesfrom_buf / 2];
98#endif
56014c8c
WD
99static char *io_filesfrom_bp;
100static char io_filesfrom_lastchar;
101static int io_filesfrom_buflen;
954bbed8 102static int defer_forwarding_messages = 0;
3b0a30eb 103static int select_timeout = SELECT_TIMEOUT;
d6081c82
WD
104static int active_filecnt = 0;
105static OFF_T active_bytecnt = 0;
720b47f2 106
351e23ad
WD
107static char int_byte_extra[64] = {
108 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, /* (00 - 3F)/4 */
109 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, /* (40 - 7F)/4 */
110 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, /* (80 - BF)/4 */
111 2, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 4, 4, 5, 6, /* (C0 - FF)/4 */
482f48cc
WD
112};
113
dde5b772 114static void readfd(int fd, char *buffer, size_t N);
8ef246e0
WD
115static void writefd(int fd, const char *buf, size_t len);
116static void writefd_unbuffered(int fd, const char *buf, size_t len);
117static void decrement_active_files(int ndx);
118static void decrement_flist_in_progress(int ndx, int redo);
332cf6df 119static void mplex_write(int fd, enum msgcode code, const char *buf, size_t len, int convert);
ff41a59f 120
c6816b94
WD
121struct flist_ndx_item {
122 struct flist_ndx_item *next;
123 int ndx;
d17e1dd2
WD
124};
125
c6816b94
WD
126struct flist_ndx_list {
127 struct flist_ndx_item *head, *tail;
128};
129
cdf236aa 130static struct flist_ndx_list redo_list, hlink_list;
d17e1dd2 131
08c88178
WD
132struct msg_list_item {
133 struct msg_list_item *next;
332cf6df 134 char convert;
954bbed8 135 char buf[1];
d17e1dd2
WD
136};
137
08c88178
WD
138struct msg_list {
139 struct msg_list_item *head, *tail;
140};
141
e4c877cf 142static struct msg_list msg_queue;
d17e1dd2 143
c6816b94 144static void flist_ndx_push(struct flist_ndx_list *lp, int ndx)
d17e1dd2 145{
c6816b94
WD
146 struct flist_ndx_item *item;
147
148 if (!(item = new(struct flist_ndx_item)))
149 out_of_memory("flist_ndx_push");
150 item->next = NULL;
151 item->ndx = ndx;
152 if (lp->tail)
153 lp->tail->next = item;
d17e1dd2 154 else
c6816b94
WD
155 lp->head = item;
156 lp->tail = item;
157}
158
159static int flist_ndx_pop(struct flist_ndx_list *lp)
160{
161 struct flist_ndx_item *next;
162 int ndx;
163
164 if (!lp->head)
165 return -1;
166
167 ndx = lp->head->ndx;
168 next = lp->head->next;
169 free(lp->head);
170 lp->head = next;
171 if (!next)
172 lp->tail = NULL;
173
174 return ndx;
d17e1dd2
WD
175}
176
8d9dc9f9
AT
177static void check_timeout(void)
178{
179 time_t t;
90ba34e2 180
9e2409ab 181 if (!io_timeout || ignore_timeout)
d17e1dd2 182 return;
8d9dc9f9 183
3b0a30eb
WD
184 if (!last_io_in) {
185 last_io_in = time(NULL);
8d9dc9f9
AT
186 return;
187 }
188
189 t = time(NULL);
190
3b0a30eb 191 if (t - last_io_in >= io_timeout) {
0adb99b9 192 if (!am_server && !am_daemon) {
4ccfd96c 193 rprintf(FERROR, "io timeout after %d seconds -- exiting\n",
3b0a30eb 194 (int)(t-last_io_in));
0adb99b9 195 }
65417579 196 exit_cleanup(RERR_TIMEOUT);
8d9dc9f9
AT
197 }
198}
199
1f75bb10
WD
200/* Note the fds used for the main socket (which might really be a pipe
201 * for a local transfer, but we can ignore that). */
202void io_set_sock_fds(int f_in, int f_out)
203{
204 sock_f_in = f_in;
205 sock_f_out = f_out;
206}
207
3b0a30eb
WD
208void set_io_timeout(int secs)
209{
210 io_timeout = secs;
211
212 if (!io_timeout || io_timeout > SELECT_TIMEOUT)
213 select_timeout = SELECT_TIMEOUT;
214 else
215 select_timeout = io_timeout;
216
217 allowed_lull = read_batch ? 0 : (io_timeout + 1) / 2;
218}
219
98f8c9a5
WD
220/* Setup the fd used to receive MSG_* messages. Only needed during the
221 * early stages of being a local sender (up through the sending of the
222 * file list) or when we're the generator (to fetch the messages from
223 * the receiver). */
d17e1dd2
WD
224void set_msg_fd_in(int fd)
225{
226 msg_fd_in = fd;
227}
228
98f8c9a5
WD
229/* Setup the fd used to send our MSG_* messages. Only needed when
230 * we're the receiver (to send our messages to the generator). */
d17e1dd2
WD
231void set_msg_fd_out(int fd)
232{
233 msg_fd_out = fd;
234 set_nonblocking(msg_fd_out);
235}
236
237/* Add a message to the pending MSG_* list. */
332cf6df 238static void msg_list_add(struct msg_list *lst, int code, const char *buf, int len, int convert)
554e0a8d 239{
954bbed8
WD
240 struct msg_list_item *m;
241 int sz = len + 4 + sizeof m[0] - 1;
d17e1dd2 242
954bbed8 243 if (!(m = (struct msg_list_item *)new_array(char, sz)))
c6816b94 244 out_of_memory("msg_list_add");
954bbed8 245 m->next = NULL;
332cf6df 246 m->convert = convert;
954bbed8
WD
247 SIVAL(m->buf, 0, ((code+MPLEX_BASE)<<24) | len);
248 memcpy(m->buf + 4, buf, len);
249 if (lst->tail)
250 lst->tail->next = m;
d17e1dd2 251 else
954bbed8
WD
252 lst->head = m;
253 lst->tail = m;
554e0a8d
AT
254}
255
e4c877cf 256static void msg_flush(void)
3be97bf9 257{
e4c877cf
WD
258 if (am_generator) {
259 while (msg_queue.head && io_multiplexing_out) {
260 struct msg_list_item *m = msg_queue.head;
332cf6df
WD
261 int len = IVAL(m->buf, 0) & 0xFFFFFF;
262 int tag = *((uchar*)m->buf+3) - MPLEX_BASE;
e4c877cf
WD
263 if (!(msg_queue.head = m->next))
264 msg_queue.tail = NULL;
332cf6df 265 stats.total_written += len + 4;
e4c877cf 266 defer_forwarding_messages++;
332cf6df 267 mplex_write(sock_f_out, tag, m->buf + 4, len, m->convert);
e4c877cf
WD
268 defer_forwarding_messages--;
269 free(m);
270 }
271 } else {
272 while (msg_queue.head) {
273 struct msg_list_item *m = msg_queue.head;
332cf6df
WD
274 int len = IVAL(m->buf, 0) & 0xFFFFFF;
275 int tag = *((uchar*)m->buf+3) - MPLEX_BASE;
e4c877cf
WD
276 if (!(msg_queue.head = m->next))
277 msg_queue.tail = NULL;
278 defer_forwarding_messages++;
332cf6df 279 mplex_write(msg_fd_out, tag, m->buf + 4, len, m->convert);
e4c877cf
WD
280 defer_forwarding_messages--;
281 free(m);
282 }
3be97bf9
WD
283 }
284}
285
98f8c9a5
WD
286/* Read a message from the MSG_* fd and handle it. This is called either
287 * during the early stages of being a local sender (up through the sending
288 * of the file list) or when we're the generator (to fetch the messages
289 * from the receiver). */
d17e1dd2 290static void read_msg_fd(void)
554e0a8d 291{
f9c6b3e7 292 char buf[2048];
06ce139f 293 size_t n;
8ef246e0 294 struct file_list *flist;
d17e1dd2 295 int fd = msg_fd_in;
ff41a59f
AT
296 int tag, len;
297
00bdf899 298 /* Temporarily disable msg_fd_in. This is needed to avoid looping back
af436313 299 * to this routine from writefd_unbuffered(). */
3be97bf9 300 no_flush++;
d17e1dd2 301 msg_fd_in = -1;
be21e29c 302 defer_forwarding_messages++;
554e0a8d 303
dde5b772 304 readfd(fd, buf, 4);
ff41a59f
AT
305 tag = IVAL(buf, 0);
306
307 len = tag & 0xFFFFFF;
d17e1dd2 308 tag = (tag >> 24) - MPLEX_BASE;
ff41a59f 309
d17e1dd2
WD
310 switch (tag) {
311 case MSG_DONE:
cc7b86bf 312 if (len < 0 || len > 1 || !am_generator) {
8ef246e0
WD
313 invalid_msg:
314 rprintf(FERROR, "invalid message %d:%d [%s%s]\n",
315 tag, len, who_am_i(),
3ea6e0e7 316 inc_recurse ? "/inc" : "");
d17e1dd2 317 exit_cleanup(RERR_STREAMIO);
13c7bcbb 318 }
cc7b86bf
WD
319 if (len) {
320 readfd(fd, buf, len);
351e23ad 321 stats.total_read = read_varlong(fd, 3);
cc7b86bf 322 }
04c722d5 323 msgdone_cnt++;
d17e1dd2
WD
324 break;
325 case MSG_REDO:
8ef246e0
WD
326 if (len != 4 || !am_generator)
327 goto invalid_msg;
dde5b772 328 readfd(fd, buf, 4);
47c11975 329 if (remove_source_files)
d6081c82 330 decrement_active_files(IVAL(buf,0));
c6816b94 331 flist_ndx_push(&redo_list, IVAL(buf,0));
3ea6e0e7 332 if (inc_recurse)
8ef246e0
WD
333 decrement_flist_in_progress(IVAL(buf,0), 1);
334 break;
335 case MSG_FLIST:
3ea6e0e7 336 if (len != 4 || !am_generator || !inc_recurse)
8ef246e0 337 goto invalid_msg;
dde5b772 338 readfd(fd, buf, 4);
8ef246e0
WD
339 /* Read extra file list from receiver. */
340 assert(iobuf_in != NULL);
341 assert(iobuf_f_in == fd);
1faa1a6d
WD
342 if (verbose > 3) {
343 rprintf(FINFO, "[%s] receiving flist for dir %d\n",
344 who_am_i(), IVAL(buf,0));
345 }
8ef246e0
WD
346 flist = recv_file_list(fd);
347 flist->parent_ndx = IVAL(buf,0);
348 break;
349 case MSG_FLIST_EOF:
3ea6e0e7 350 if (len != 0 || !am_generator || !inc_recurse)
8ef246e0
WD
351 goto invalid_msg;
352 flist_eof = 1;
d17e1dd2 353 break;
0d67e00a 354 case MSG_DELETED:
8ef246e0
WD
355 if (len >= (int)sizeof buf || !am_generator)
356 goto invalid_msg;
dde5b772 357 readfd(fd, buf, len);
332cf6df 358 send_msg(MSG_DELETED, buf, len, 1);
0d67e00a 359 break;
9981c27e 360 case MSG_SUCCESS:
8ef246e0
WD
361 if (len != 4 || !am_generator)
362 goto invalid_msg;
dde5b772 363 readfd(fd, buf, len);
47c11975 364 if (remove_source_files) {
d6081c82 365 decrement_active_files(IVAL(buf,0));
332cf6df 366 send_msg(MSG_SUCCESS, buf, len, 0);
d6081c82 367 }
cdf236aa
WD
368 if (preserve_hard_links)
369 flist_ndx_push(&hlink_list, IVAL(buf,0));
3ea6e0e7 370 if (inc_recurse)
8ef246e0
WD
371 decrement_flist_in_progress(IVAL(buf,0), 0);
372 break;
373 case MSG_NO_SEND:
374 if (len != 4 || !am_generator)
375 goto invalid_msg;
dde5b772 376 readfd(fd, buf, len);
3ea6e0e7 377 if (inc_recurse)
8ef246e0 378 decrement_flist_in_progress(IVAL(buf,0), 0);
9981c27e 379 break;
72f2d1b3 380 case MSG_SOCKERR:
64119c79 381 case MSG_CLIENT:
8ef246e0
WD
382 if (!am_generator)
383 goto invalid_msg;
64119c79 384 if (tag == MSG_SOCKERR)
8ef246e0 385 io_end_multiplex_out();
72f2d1b3 386 /* FALL THROUGH */
d17e1dd2
WD
387 case MSG_INFO:
388 case MSG_ERROR:
389 case MSG_LOG:
390 while (len) {
391 n = len;
392 if (n >= sizeof buf)
393 n = sizeof buf - 1;
dde5b772 394 readfd(fd, buf, n);
332cf6df 395 rwrite((enum logcode)tag, buf, n, !am_generator);
d17e1dd2
WD
396 len -= n;
397 }
398 break;
399 default:
f8b9da1a
WD
400 rprintf(FERROR, "unknown message %d:%d [%s]\n",
401 tag, len, who_am_i());
d17e1dd2
WD
402 exit_cleanup(RERR_STREAMIO);
403 }
404
3be97bf9 405 no_flush--;
d17e1dd2 406 msg_fd_in = fd;
3be97bf9 407 if (!--defer_forwarding_messages)
e4c877cf 408 msg_flush();
d17e1dd2
WD
409}
410
d6081c82 411/* This is used by the generator to limit how many file transfers can
47c11975 412 * be active at once when --remove-source-files is specified. Without
d6081c82
WD
413 * this, sender-side deletions were mostly happening at the end. */
414void increment_active_files(int ndx, int itemizing, enum logcode code)
415{
416 /* TODO: tune these limits? */
5b986297 417 while (active_filecnt >= (active_bytecnt >= 128*1024 ? 10 : 50)) {
04c722d5
WD
418 check_for_finished_files(itemizing, code, 0);
419 if (iobuf_out_cnt)
420 io_flush(NORMAL_FLUSH);
421 else
422 read_msg_fd();
d6081c82
WD
423 }
424
425 active_filecnt++;
121bfb2b 426 active_bytecnt += F_LENGTH(cur_flist->files[ndx - cur_flist->ndx_start]);
d6081c82
WD
427}
428
8ef246e0 429static void decrement_active_files(int ndx)
d6081c82 430{
8ef246e0
WD
431 struct file_list *flist = flist_for_ndx(ndx);
432 assert(flist != NULL);
d6081c82 433 active_filecnt--;
8ef246e0
WD
434 active_bytecnt -= F_LENGTH(flist->files[ndx - flist->ndx_start]);
435}
436
437static void decrement_flist_in_progress(int ndx, int redo)
438{
439 struct file_list *flist = cur_flist ? cur_flist : first_flist;
440
441 while (ndx < flist->ndx_start) {
442 if (flist == first_flist) {
443 invalid_ndx:
444 rprintf(FERROR,
445 "Invalid file index: %d (%d - %d) [%s]\n",
446 ndx, first_flist->ndx_start,
9decb4d2 447 first_flist->prev->ndx_start + first_flist->prev->used - 1,
8ef246e0
WD
448 who_am_i());
449 exit_cleanup(RERR_PROTOCOL);
450 }
451 flist = flist->prev;
452 }
9decb4d2 453 while (ndx >= flist->ndx_start + flist->used) {
8ef246e0
WD
454 if (!(flist = flist->next))
455 goto invalid_ndx;
456 }
457
458 flist->in_progress--;
459 if (redo)
460 flist->to_redo++;
d6081c82
WD
461}
462
3be97bf9 463/* Write an message to a multiplexed stream. If this fails, rsync exits. */
332cf6df 464static void mplex_write(int fd, enum msgcode code, const char *buf, size_t len, int convert)
d17e1dd2 465{
332cf6df 466 char buffer[BIGPATHBUFLEN]; /* Oversized for use by iconv code. */
3be97bf9 467 size_t n = len;
d17e1dd2 468
3be97bf9
WD
469 SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
470
332cf6df
WD
471#ifdef ICONV_OPTION
472 if (convert && ic_send == (iconv_t)-1)
473#endif
474 convert = 0;
475
476 if (convert || n > 1024 - 4) /* BIGPATHBUFLEN can handle 1024 bytes */
3be97bf9
WD
477 n = 0;
478 else
479 memcpy(buffer + 4, buf, n);
480
481 writefd_unbuffered(fd, buffer, n+4);
482
483 len -= n;
484 buf += n;
485
332cf6df
WD
486#ifdef ICONV_OPTION
487 if (convert) {
488 iconv(ic_send, NULL, 0, NULL, 0);
489 defer_forwarding_messages++;
490 while (len) {
491 ICONV_CONST char *ibuf = (ICONV_CONST char *)buf;
492 char *obuf = buffer;
493 size_t ocnt = sizeof buffer;
494 while (len && iconv(ic_send, &ibuf,&len,
495 &obuf,&ocnt) == (size_t)-1) {
496 if (errno == E2BIG || !ocnt)
497 break;
498 *obuf++ = *ibuf++;
499 ocnt--, len--;
500 }
501 n = obuf - buffer;
502 writefd_unbuffered(fd, buffer, n);
503 }
504 if (!--defer_forwarding_messages)
505 msg_flush();
506 } else
507#endif
3be97bf9
WD
508 if (len) {
509 defer_forwarding_messages++;
510 writefd_unbuffered(fd, buf, len);
511 if (!--defer_forwarding_messages)
e4c877cf 512 msg_flush();
8ef246e0 513 }
d17e1dd2
WD
514}
515
332cf6df 516int send_msg(enum msgcode code, const char *buf, int len, int convert)
37f35d89
WD
517{
518 if (msg_fd_out < 0) {
12ccc73a 519 if (!defer_forwarding_messages)
332cf6df 520 return io_multiplex_write(code, buf, len, convert);
12ccc73a
WD
521 if (!io_multiplexing_out)
522 return 0;
332cf6df 523 msg_list_add(&msg_queue, code, buf, len, convert);
12ccc73a 524 return 1;
37f35d89 525 }
e4c877cf 526 if (flist_forward_from >= 0)
332cf6df 527 msg_list_add(&msg_queue, code, buf, len, convert);
e4c877cf 528 else
332cf6df 529 mplex_write(msg_fd_out, code, buf, len, convert);
12ccc73a 530 return 1;
37f35d89
WD
531}
532
155d9206
WD
533void send_msg_int(enum msgcode code, int num)
534{
535 char numbuf[4];
536 SIVAL(numbuf, 0, num);
332cf6df 537 send_msg(code, numbuf, 4, 0);
155d9206
WD
538}
539
8ef246e0 540void wait_for_receiver(void)
d17e1dd2 541{
2a5df862
WD
542 if (iobuf_out_cnt)
543 io_flush(NORMAL_FLUSH);
544 else
545 read_msg_fd();
8ef246e0 546}
554e0a8d 547
8ef246e0
WD
548int get_redo_num(void)
549{
c6816b94 550 return flist_ndx_pop(&redo_list);
554e0a8d
AT
551}
552
cdf236aa
WD
553int get_hlink_num(void)
554{
555 return flist_ndx_pop(&hlink_list);
556}
557
56014c8c
WD
558/**
559 * When we're the receiver and we have a local --files-from list of names
560 * that needs to be sent over the socket to the sender, we have to do two
561 * things at the same time: send the sender a list of what files we're
562 * processing and read the incoming file+info list from the sender. We do
563 * this by augmenting the read_timeout() function to copy this data. It
564 * uses the io_filesfrom_buf to read a block of data from f_in (when it is
565 * ready, since it might be a pipe) and then blast it out f_out (when it
566 * is ready to receive more data).
567 */
568void io_set_filesfrom_fds(int f_in, int f_out)
569{
570 io_filesfrom_f_in = f_in;
571 io_filesfrom_f_out = f_out;
572 io_filesfrom_bp = io_filesfrom_buf;
573 io_filesfrom_lastchar = '\0';
574 io_filesfrom_buflen = 0;
575}
720b47f2 576
cb2e1f18
WD
577/* It's almost always an error to get an EOF when we're trying to read from the
578 * network, because the protocol is (for the most part) self-terminating.
880da007 579 *
cb2e1f18
WD
580 * There is one case for the receiver when it is at the end of the transfer
581 * (hanging around reading any keep-alive packets that might come its way): if
582 * the sender dies before the generator's kill-signal comes through, we can end
583 * up here needing to loop until the kill-signal arrives. In this situation,
584 * kluge_around_eof will be < 0.
585 *
586 * There is another case for older protocol versions (< 24) where the module
587 * listing was not terminated, so we must ignore an EOF error in that case and
588 * exit. In this situation, kluge_around_eof will be > 0. */
1f75bb10 589static void whine_about_eof(int fd)
7a55d06e 590{
574c2500 591 if (kluge_around_eof && fd == sock_f_in) {
55bb7fff 592 int i;
574c2500
WD
593 if (kluge_around_eof > 0)
594 exit_cleanup(0);
55bb7fff
WD
595 /* If we're still here after 10 seconds, exit with an error. */
596 for (i = 10*1000/20; i--; )
574c2500
WD
597 msleep(20);
598 }
3151cbae 599
00bdf899 600 rprintf(FERROR, RSYNC_NAME ": connection unexpectedly closed "
dca68b0a
WD
601 "(%.0f bytes received so far) [%s]\n",
602 (double)stats.total_read, who_am_i());
00bdf899
WD
603
604 exit_cleanup(RERR_STREAMIO);
7a55d06e 605}
720b47f2 606
880da007 607/**
6ed6d7f5 608 * Read from a socket with I/O timeout. return the number of bytes
c3563c46
MP
609 * read. If no bytes can be read then exit, never return a number <= 0.
610 *
8886f8d0
MP
611 * TODO: If the remote shell connection fails, then current versions
612 * actually report an "unexpected EOF" error here. Since it's a
613 * fairly common mistake to try to use rsh when ssh is required, we
614 * should trap that: if we fail to read any data at all, we should
615 * give a better explanation. We can tell whether the connection has
616 * started by looking e.g. at whether the remote version is known yet.
c3563c46 617 */
3151cbae 618static int read_timeout(int fd, char *buf, size_t len)
8d9dc9f9 619{
d8aeda1e 620 int n, cnt = 0;
4c36ddbe 621
dde5b772 622 io_flush(FULL_FLUSH);
ea2111d1 623
d8aeda1e 624 while (cnt == 0) {
7a55d06e 625 /* until we manage to read *something* */
56014c8c 626 fd_set r_fds, w_fds;
4c36ddbe 627 struct timeval tv;
1ea087a7 628 int maxfd = fd;
a57873b7 629 int count;
4c36ddbe 630
56014c8c 631 FD_ZERO(&r_fds);
a7026ba9 632 FD_ZERO(&w_fds);
56014c8c 633 FD_SET(fd, &r_fds);
3309507d 634 if (io_filesfrom_f_out >= 0) {
56014c8c
WD
635 int new_fd;
636 if (io_filesfrom_buflen == 0) {
3309507d 637 if (io_filesfrom_f_in >= 0) {
56014c8c
WD
638 FD_SET(io_filesfrom_f_in, &r_fds);
639 new_fd = io_filesfrom_f_in;
640 } else {
641 io_filesfrom_f_out = -1;
642 new_fd = -1;
643 }
644 } else {
56014c8c
WD
645 FD_SET(io_filesfrom_f_out, &w_fds);
646 new_fd = io_filesfrom_f_out;
647 }
1ea087a7
WD
648 if (new_fd > maxfd)
649 maxfd = new_fd;
554e0a8d
AT
650 }
651
e626b29e 652 tv.tv_sec = select_timeout;
4c36ddbe
AT
653 tv.tv_usec = 0;
654
554e0a8d
AT
655 errno = 0;
656
a7026ba9 657 count = select(maxfd + 1, &r_fds, &w_fds, NULL, &tv);
a57873b7 658
a57873b7 659 if (count <= 0) {
2a5df862
WD
660 if (errno == EBADF) {
661 defer_forwarding_messages = 0;
554e0a8d 662 exit_cleanup(RERR_SOCKETIO);
2a5df862 663 }
bd717af8 664 check_timeout();
4c36ddbe
AT
665 continue;
666 }
667
3309507d 668 if (io_filesfrom_f_out >= 0) {
56014c8c
WD
669 if (io_filesfrom_buflen) {
670 if (FD_ISSET(io_filesfrom_f_out, &w_fds)) {
671 int l = write(io_filesfrom_f_out,
672 io_filesfrom_bp,
673 io_filesfrom_buflen);
674 if (l > 0) {
675 if (!(io_filesfrom_buflen -= l))
676 io_filesfrom_bp = io_filesfrom_buf;
677 else
678 io_filesfrom_bp += l;
47cffb77 679 } else if (errno != EINTR) {
56014c8c
WD
680 /* XXX should we complain? */
681 io_filesfrom_f_out = -1;
682 }
683 }
3309507d 684 } else if (io_filesfrom_f_in >= 0) {
56014c8c
WD
685 if (FD_ISSET(io_filesfrom_f_in, &r_fds)) {
686 int l = read(io_filesfrom_f_in,
687 io_filesfrom_buf,
688 sizeof io_filesfrom_buf);
689 if (l <= 0) {
47cffb77
WD
690 if (l == 0 || errno != EINTR) {
691 /* Send end-of-file marker */
692 io_filesfrom_buf[0] = '\0';
693 io_filesfrom_buf[1] = '\0';
694 io_filesfrom_buflen = io_filesfrom_lastchar? 2 : 1;
695 io_filesfrom_f_in = -1;
696 }
56014c8c 697 } else {
56014c8c
WD
698 if (!eol_nulls) {
699 char *s = io_filesfrom_buf + l;
700 /* Transform CR and/or LF into '\0' */
701 while (s-- > io_filesfrom_buf) {
702 if (*s == '\n' || *s == '\r')
703 *s = '\0';
704 }
705 }
706 if (!io_filesfrom_lastchar) {
707 /* Last buf ended with a '\0', so don't
708 * let this buf start with one. */
709 while (l && !*io_filesfrom_bp)
710 io_filesfrom_bp++, l--;
711 }
712 if (!l)
713 io_filesfrom_bp = io_filesfrom_buf;
714 else {
715 char *f = io_filesfrom_bp;
716 char *t = f;
717 char *eob = f + l;
718 /* Eliminate any multi-'\0' runs. */
719 while (f != eob) {
720 if (!(*t++ = *f++)) {
721 while (f != eob && !*f)
722 f++, l--;
723 }
724 }
725 io_filesfrom_lastchar = f[-1];
726 }
727 io_filesfrom_buflen = l;
728 }
729 }
730 }
731 }
732
f89b9368
WD
733 if (!FD_ISSET(fd, &r_fds))
734 continue;
554e0a8d 735
4c36ddbe
AT
736 n = read(fd, buf, len);
737
1ea087a7
WD
738 if (n <= 0) {
739 if (n == 0)
1f75bb10 740 whine_about_eof(fd); /* Doesn't return. */
d62bcc17
WD
741 if (errno == EINTR || errno == EWOULDBLOCK
742 || errno == EAGAIN)
7a55d06e 743 continue;
1f75bb10
WD
744
745 /* Don't write errors on a dead socket. */
72f2d1b3 746 if (fd == sock_f_in) {
8ef246e0 747 io_end_multiplex_out();
72f2d1b3
WD
748 rsyserr(FSOCKERR, errno, "read error");
749 } else
750 rsyserr(FERROR, errno, "read error");
1f75bb10 751 exit_cleanup(RERR_STREAMIO);
8d9dc9f9 752 }
00bdf899
WD
753
754 buf += n;
755 len -= n;
d8aeda1e 756 cnt += n;
1f75bb10 757
3b0a30eb
WD
758 if (fd == sock_f_in && io_timeout)
759 last_io_in = time(NULL);
4c36ddbe 760 }
8d9dc9f9 761
d8aeda1e 762 return cnt;
4c36ddbe 763}
8d9dc9f9 764
332cf6df
WD
765/* Read a line into the "fname" buffer (which must be at least MAXPATHLEN
766 * characters long). */
56014c8c
WD
767int read_filesfrom_line(int fd, char *fname)
768{
769 char ch, *s, *eob = fname + MAXPATHLEN - 1;
770 int cnt;
d19320fd 771 int reading_remotely = filesfrom_host != NULL;
56014c8c
WD
772 int nulls = eol_nulls || reading_remotely;
773
774 start:
775 s = fname;
776 while (1) {
777 cnt = read(fd, &ch, 1);
778 if (cnt < 0 && (errno == EWOULDBLOCK
779 || errno == EINTR || errno == EAGAIN)) {
780 struct timeval tv;
6c850772
WD
781 fd_set r_fds, e_fds;
782 FD_ZERO(&r_fds);
783 FD_SET(fd, &r_fds);
784 FD_ZERO(&e_fds);
785 FD_SET(fd, &e_fds);
e626b29e 786 tv.tv_sec = select_timeout;
56014c8c 787 tv.tv_usec = 0;
6c850772 788 if (!select(fd+1, &r_fds, NULL, &e_fds, &tv))
56014c8c 789 check_timeout();
6c850772
WD
790 if (FD_ISSET(fd, &e_fds)) {
791 rsyserr(FINFO, errno,
792 "select exception on fd %d", fd);
793 }
56014c8c
WD
794 continue;
795 }
796 if (cnt != 1)
797 break;
798 if (nulls? !ch : (ch == '\r' || ch == '\n')) {
799 /* Skip empty lines if reading locally. */
800 if (!reading_remotely && s == fname)
801 continue;
802 break;
803 }
804 if (s < eob)
805 *s++ = ch;
806 }
807 *s = '\0';
7a55d06e 808
6b45fcf1
WD
809 /* Dump comments. */
810 if (*fname == '#' || *fname == ';')
56014c8c
WD
811 goto start;
812
813 return s - fname;
814}
7a55d06e 815
8ef246e0 816int io_start_buffering_out(int f_out)
1f75bb10 817{
8ef246e0
WD
818 if (iobuf_out) {
819 assert(f_out == iobuf_f_out);
820 return 0;
821 }
1f75bb10
WD
822 if (!(iobuf_out = new_array(char, IO_BUFFER_SIZE)))
823 out_of_memory("io_start_buffering_out");
824 iobuf_out_cnt = 0;
8ef246e0
WD
825 iobuf_f_out = f_out;
826 return 1;
1f75bb10
WD
827}
828
8ef246e0 829int io_start_buffering_in(int f_in)
1f75bb10 830{
8ef246e0
WD
831 if (iobuf_in) {
832 assert(f_in == iobuf_f_in);
833 return 0;
834 }
1f75bb10
WD
835 iobuf_in_siz = 2 * IO_BUFFER_SIZE;
836 if (!(iobuf_in = new_array(char, iobuf_in_siz)))
837 out_of_memory("io_start_buffering_in");
8ef246e0
WD
838 iobuf_f_in = f_in;
839 return 1;
1f75bb10
WD
840}
841
8ef246e0 842void io_end_buffering_in(void)
1f75bb10 843{
8ef246e0
WD
844 if (!iobuf_in)
845 return;
846 free(iobuf_in);
847 iobuf_in = NULL;
848 iobuf_in_ndx = 0;
849 iobuf_in_remaining = 0;
850 iobuf_f_in = -1;
851}
852
853void io_end_buffering_out(void)
854{
855 if (!iobuf_out)
856 return;
857 io_flush(FULL_FLUSH);
858 free(iobuf_out);
859 iobuf_out = NULL;
860 iobuf_f_out = -1;
1f75bb10
WD
861}
862
be91bd81 863void maybe_flush_socket(int important)
626bec8e 864{
be91bd81
WD
865 if (iobuf_out && iobuf_out_cnt
866 && (important || time(NULL) - last_io_out >= 5))
626bec8e
WD
867 io_flush(NORMAL_FLUSH);
868}
869
cdf236aa 870void maybe_send_keepalive(void)
9e2409ab 871{
3b0a30eb 872 if (time(NULL) - last_io_out >= allowed_lull) {
9e2409ab 873 if (!iobuf_out || !iobuf_out_cnt) {
3221f451
WD
874 if (protocol_version < 29)
875 return; /* there's nothing we can do */
8ef246e0 876 if (protocol_version >= 30)
332cf6df 877 send_msg(MSG_NOOP, "", 0, 0);
8ef246e0 878 else {
9decb4d2 879 write_int(sock_f_out, cur_flist->used);
8ef246e0
WD
880 write_shortint(sock_f_out, ITEM_IS_NEW);
881 }
9e2409ab
WD
882 }
883 if (iobuf_out)
884 io_flush(NORMAL_FLUSH);
885 }
886}
887
8ef246e0
WD
888void start_flist_forward(int f_in)
889{
890 assert(iobuf_out != NULL);
891 assert(iobuf_f_out == msg_fd_out);
892 flist_forward_from = f_in;
893}
894
895void stop_flist_forward()
896{
8ef246e0 897 flist_forward_from = -1;
dde5b772 898 io_flush(FULL_FLUSH);
8ef246e0
WD
899}
900
880da007
MP
901/**
902 * Continue trying to read len bytes - don't return until len has been
903 * read.
904 **/
3151cbae 905static void read_loop(int fd, char *buf, size_t len)
4c36ddbe
AT
906{
907 while (len) {
908 int n = read_timeout(fd, buf, len);
909
910 buf += n;
911 len -= n;
8d9dc9f9
AT
912 }
913}
914
7a55d06e
MP
915/**
916 * Read from the file descriptor handling multiplexing - return number
917 * of bytes read.
d62bcc17
WD
918 *
919 * Never returns <= 0.
7a55d06e 920 */
c399d22a 921static int readfd_unbuffered(int fd, char *buf, size_t len)
8d9dc9f9 922{
f2a4853c 923 size_t msg_bytes;
d8aeda1e 924 int tag, cnt = 0;
33544bf4 925 char line[BIGPATHBUFLEN];
8d9dc9f9 926
8ef246e0 927 if (!iobuf_in || fd != iobuf_f_in)
4c36ddbe 928 return read_timeout(fd, buf, len);
8d9dc9f9 929
8ef246e0
WD
930 if (!io_multiplexing_in && iobuf_in_remaining == 0) {
931 iobuf_in_remaining = read_timeout(fd, iobuf_in, iobuf_in_siz);
1f75bb10 932 iobuf_in_ndx = 0;
76c21947
WD
933 }
934
d8aeda1e 935 while (cnt == 0) {
8ef246e0
WD
936 if (iobuf_in_remaining) {
937 len = MIN(len, iobuf_in_remaining);
1f75bb10
WD
938 memcpy(buf, iobuf_in + iobuf_in_ndx, len);
939 iobuf_in_ndx += len;
8ef246e0 940 iobuf_in_remaining -= len;
d8aeda1e 941 cnt = len;
76c21947 942 break;
8d9dc9f9
AT
943 }
944
909ce14f 945 read_loop(fd, line, 4);
ff41a59f 946 tag = IVAL(line, 0);
679e7657 947
f2a4853c 948 msg_bytes = tag & 0xFFFFFF;
d17e1dd2 949 tag = (tag >> 24) - MPLEX_BASE;
8d9dc9f9 950
d17e1dd2
WD
951 switch (tag) {
952 case MSG_DATA:
f2a4853c 953 if (msg_bytes > iobuf_in_siz) {
1f75bb10 954 if (!(iobuf_in = realloc_array(iobuf_in, char,
f2a4853c 955 msg_bytes)))
c399d22a 956 out_of_memory("readfd_unbuffered");
f2a4853c 957 iobuf_in_siz = msg_bytes;
76c21947 958 }
f2a4853c 959 read_loop(fd, iobuf_in, msg_bytes);
8ef246e0 960 iobuf_in_remaining = msg_bytes;
1f75bb10 961 iobuf_in_ndx = 0;
d17e1dd2 962 break;
8ef246e0
WD
963 case MSG_NOOP:
964 if (am_sender)
965 maybe_send_keepalive();
966 break;
967 case MSG_IO_ERROR:
968 if (msg_bytes != 4)
969 goto invalid_msg;
970 read_loop(fd, line, msg_bytes);
971 io_error |= IVAL(line, 0);
972 break;
0d67e00a 973 case MSG_DELETED:
f2a4853c
WD
974 if (msg_bytes >= sizeof line)
975 goto overflow;
332cf6df
WD
976#ifdef ICONV_OPTION
977 if (ic_recv != (iconv_t)-1) {
978 ICONV_CONST char *ibuf;
979 char *obuf = line;
980 size_t icnt, ocnt = sizeof line - 1;
981 int add_null = 0;
982 iconv(ic_send, NULL, 0, NULL, 0);
983 while (msg_bytes) {
984 icnt = msg_bytes > sizeof iconv_buf
985 ? sizeof iconv_buf : msg_bytes;
986 read_loop(fd, iconv_buf, icnt);
987 if (!(msg_bytes -= icnt) && !iconv_buf[icnt-1])
988 icnt--, add_null = 1;
989 ibuf = (ICONV_CONST char *)iconv_buf;
990 if (iconv(ic_send, &ibuf,&icnt,
991 &obuf,&ocnt) == (size_t)-1)
992 goto overflow; // XXX
993 }
994 if (add_null)
995 *obuf++ = '\0';
996 msg_bytes = obuf - line;
997 } else
998#endif
999 read_loop(fd, line, msg_bytes);
0d67e00a 1000 /* A directory name was sent with the trailing null */
f2a4853c 1001 if (msg_bytes > 0 && !line[msg_bytes-1])
0d67e00a 1002 log_delete(line, S_IFDIR);
12bda6f7
WD
1003 else {
1004 line[msg_bytes] = '\0';
0d67e00a 1005 log_delete(line, S_IFREG);
12bda6f7 1006 }
0d67e00a 1007 break;
9981c27e 1008 case MSG_SUCCESS:
f2a4853c 1009 if (msg_bytes != 4) {
8ef246e0 1010 invalid_msg:
cdf236aa 1011 rprintf(FERROR, "invalid multi-message %d:%ld [%s]\n",
f2a4853c 1012 tag, (long)msg_bytes, who_am_i());
9981c27e
WD
1013 exit_cleanup(RERR_STREAMIO);
1014 }
f2a4853c 1015 read_loop(fd, line, msg_bytes);
9981c27e 1016 successful_send(IVAL(line, 0));
9981c27e 1017 break;
8ef246e0
WD
1018 case MSG_NO_SEND:
1019 if (msg_bytes != 4)
1020 goto invalid_msg;
1021 read_loop(fd, line, msg_bytes);
1022 send_msg_int(MSG_NO_SEND, IVAL(line, 0));
1023 break;
d17e1dd2
WD
1024 case MSG_INFO:
1025 case MSG_ERROR:
f2a4853c
WD
1026 if (msg_bytes >= sizeof line) {
1027 overflow:
bd9fca47 1028 rprintf(FERROR,
cdf236aa 1029 "multiplexing overflow %d:%ld [%s]\n",
f2a4853c 1030 tag, (long)msg_bytes, who_am_i());
d17e1dd2
WD
1031 exit_cleanup(RERR_STREAMIO);
1032 }
f2a4853c 1033 read_loop(fd, line, msg_bytes);
332cf6df 1034 rwrite((enum logcode)tag, line, msg_bytes, 1);
d17e1dd2
WD
1035 break;
1036 default:
cdf236aa
WD
1037 rprintf(FERROR, "unexpected tag %d [%s]\n",
1038 tag, who_am_i());
65417579 1039 exit_cleanup(RERR_STREAMIO);
8d9dc9f9 1040 }
8d9dc9f9
AT
1041 }
1042
8ef246e0 1043 if (iobuf_in_remaining == 0)
d17e1dd2 1044 io_flush(NORMAL_FLUSH);
76c21947 1045
d8aeda1e 1046 return cnt;
8d9dc9f9
AT
1047}
1048
dde5b772
WD
1049/* Do a buffered read from fd. Don't return until all N bytes have
1050 * been read. If all N can't be read then exit with an error. */
3151cbae 1051static void readfd(int fd, char *buffer, size_t N)
720b47f2 1052{
d8aeda1e 1053 int cnt;
d62bcc17 1054 size_t total = 0;
3151cbae 1055
6ba9279f 1056 while (total < N) {
d8aeda1e
WD
1057 cnt = readfd_unbuffered(fd, buffer + total, N-total);
1058 total += cnt;
7f28dbee 1059 }
1b7c47cb 1060
b9f592fb
WD
1061 if (fd == write_batch_monitor_in) {
1062 if ((size_t)write(batch_fd, buffer, total) != total)
1063 exit_cleanup(RERR_FILEIO);
1064 }
1f75bb10 1065
8ef246e0
WD
1066 if (fd == flist_forward_from)
1067 writefd(iobuf_f_out, buffer, total);
1068
1f75bb10
WD
1069 if (fd == sock_f_in)
1070 stats.total_read += total;
720b47f2
AT
1071}
1072
3a993aa4 1073unsigned short read_shortint(int f)
9361f839 1074{
482f48cc
WD
1075 char b[2];
1076 readfd(f, b, 2);
1077 return (UVAL(b, 1) << 8) + UVAL(b, 0);
9361f839
WD
1078}
1079
b7922338 1080int32 read_int(int f)
720b47f2 1081{
4c36ddbe 1082 char b[4];
d8aeda1e 1083 int32 num;
d730b113 1084
482f48cc
WD
1085 readfd(f, b, 4);
1086 num = IVAL(b, 0);
1087#if SIZEOF_INT32 > 4
1088 if (num & (int32)0x80000000)
1089 num |= ~(int32)0xffffffff;
1090#endif
d8aeda1e 1091 return num;
720b47f2
AT
1092}
1093
351e23ad 1094int32 read_varint(int f)
3a6a366f 1095{
351e23ad
WD
1096 union {
1097 char b[5];
1098 int32 x;
1099 } u;
1100 uchar ch;
1101 int extra;
1102
1103 u.x = 0;
1104 readfd(f, (char*)&ch, 1);
1105 extra = int_byte_extra[ch / 4];
1106 if (extra) {
1107 uchar bit = ((uchar)1<<(8-extra));
1108 if (extra >= (int)sizeof u.b) {
1109 rprintf(FERROR, "Overflow in read_varint()\n");
1110 exit_cleanup(RERR_STREAMIO);
1111 }
1112 readfd(f, u.b, extra);
1113 u.b[extra] = ch & (bit-1);
1114 } else
1115 u.b[0] = ch;
1116#if CAREFUL_ALIGNMENT
1117 u.x = IVAL(u.b,0);
1118#endif
1119#if SIZEOF_INT32 > 4
1120 if (u.x & (int32)0x80000000)
1121 u.x |= ~(int32)0xffffffff;
1122#endif
1123 return u.x;
1124}
71c46176 1125
351e23ad
WD
1126int64 read_varlong(int f, uchar min_bytes)
1127{
1128 union {
1129 char b[9];
1130 int64 x;
1131 } u;
1132 char b2[8];
1133 int extra;
71c46176 1134
031fa9ad 1135#if SIZEOF_INT64 < 8
351e23ad 1136 memset(u.b, 0, 8);
031fa9ad 1137#else
351e23ad 1138 u.x = 0;
031fa9ad 1139#endif
351e23ad
WD
1140 readfd(f, b2, min_bytes);
1141 memcpy(u.b, b2+1, min_bytes-1);
1142 extra = int_byte_extra[CVAL(b2, 0) / 4];
1143 if (extra) {
1144 uchar bit = ((uchar)1<<(8-extra));
1145 if (min_bytes + extra > (int)sizeof u.b) {
1146 rprintf(FERROR, "Overflow in read_varlong()\n");
1147 exit_cleanup(RERR_STREAMIO);
1148 }
1149 readfd(f, u.b + min_bytes - 1, extra);
1150 u.b[min_bytes + extra - 1] = CVAL(b2, 0) & (bit-1);
4ea4acf1 1151#if SIZEOF_INT64 < 8
351e23ad 1152 if (min_bytes + extra > 5 || u.b[4] || CVAL(u.b,3) & 0x80) {
4ea4acf1
WD
1153 rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
1154 exit_cleanup(RERR_UNSUPPORTED);
1155 }
1156#endif
351e23ad
WD
1157 } else
1158 u.b[min_bytes + extra - 1] = CVAL(b2, 0);
1159#if SIZEOF_INT64 < 8
1160 u.x = IVAL(u.b,0);
1161#elif CAREFUL_ALIGNMENT
1162 u.x = IVAL(u.b,0) | (((int64)IVAL(u.b,4))<<32);
1163#endif
1164 return u.x;
1165}
1166
1167int64 read_longint(int f)
1168{
4ea4acf1 1169#if SIZEOF_INT64 >= 8
351e23ad 1170 char b[9];
4ea4acf1 1171#endif
351e23ad 1172 int32 num = read_int(f);
71c46176 1173
351e23ad
WD
1174 if (num != (int32)0xffffffff)
1175 return num;
1176
1177#if SIZEOF_INT64 < 8
1178 rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
1179 exit_cleanup(RERR_UNSUPPORTED);
1180#else
1181 readfd(f, b, 8);
1182 return IVAL(b,0) | (((int64)IVAL(b,4))<<32);
1183#endif
3a6a366f
AT
1184}
1185
4a19c3b2 1186void read_buf(int f, char *buf, size_t len)
720b47f2 1187{
4c36ddbe 1188 readfd(f,buf,len);
720b47f2
AT
1189}
1190
4a19c3b2 1191void read_sbuf(int f, char *buf, size_t len)
575f2fca 1192{
93095cbe 1193 readfd(f, buf, len);
af436313 1194 buf[len] = '\0';
575f2fca
AT
1195}
1196
9361f839 1197uchar read_byte(int f)
182dca5c 1198{
9361f839 1199 uchar c;
93095cbe 1200 readfd(f, (char *)&c, 1);
4c36ddbe 1201 return c;
182dca5c 1202}
720b47f2 1203
46e99b09
WD
1204int read_vstring(int f, char *buf, int bufsize)
1205{
1206 int len = read_byte(f);
1207
1208 if (len & 0x80)
1209 len = (len & ~0x80) * 0x100 + read_byte(f);
1210
1211 if (len >= bufsize) {
1212 rprintf(FERROR, "over-long vstring received (%d > %d)\n",
1213 len, bufsize - 1);
9a6ed83f 1214 return -1;
46e99b09
WD
1215 }
1216
1217 if (len)
1218 readfd(f, buf, len);
1219 buf[len] = '\0';
1220 return len;
1221}
1222
188fed95
WD
1223/* Populate a sum_struct with values from the socket. This is
1224 * called by both the sender and the receiver. */
1225void read_sum_head(int f, struct sum_struct *sum)
1226{
1227 sum->count = read_int(f);
c638caa6
WD
1228 if (sum->count < 0) {
1229 rprintf(FERROR, "Invalid checksum count %ld [%s]\n",
1230 (long)sum->count, who_am_i());
1231 exit_cleanup(RERR_PROTOCOL);
1232 }
188fed95
WD
1233 sum->blength = read_int(f);
1234 if (sum->blength < 0 || sum->blength > MAX_BLOCK_SIZE) {
cdf236aa
WD
1235 rprintf(FERROR, "Invalid block length %ld [%s]\n",
1236 (long)sum->blength, who_am_i());
188fed95
WD
1237 exit_cleanup(RERR_PROTOCOL);
1238 }
1239 sum->s2length = protocol_version < 27 ? csum_length : (int)read_int(f);
a0456b9c 1240 if (sum->s2length < 0 || sum->s2length > MAX_DIGEST_LEN) {
cdf236aa
WD
1241 rprintf(FERROR, "Invalid checksum length %d [%s]\n",
1242 sum->s2length, who_am_i());
188fed95
WD
1243 exit_cleanup(RERR_PROTOCOL);
1244 }
1245 sum->remainder = read_int(f);
1246 if (sum->remainder < 0 || sum->remainder > sum->blength) {
cdf236aa
WD
1247 rprintf(FERROR, "Invalid remainder length %ld [%s]\n",
1248 (long)sum->remainder, who_am_i());
188fed95
WD
1249 exit_cleanup(RERR_PROTOCOL);
1250 }
1251}
1252
c207d7ec
WD
1253/* Send the values from a sum_struct over the socket. Set sum to
1254 * NULL if there are no checksums to send. This is called by both
1255 * the generator and the sender. */
1256void write_sum_head(int f, struct sum_struct *sum)
1257{
1258 static struct sum_struct null_sum;
1259
1260 if (sum == NULL)
1261 sum = &null_sum;
1262
1263 write_int(f, sum->count);
1264 write_int(f, sum->blength);
1265 if (protocol_version >= 27)
1266 write_int(f, sum->s2length);
1267 write_int(f, sum->remainder);
1268}
1269
08571358
MP
1270/**
1271 * Sleep after writing to limit I/O bandwidth usage.
1272 *
1273 * @todo Rather than sleeping after each write, it might be better to
1274 * use some kind of averaging. The current algorithm seems to always
1275 * use a bit less bandwidth than specified, because it doesn't make up
1276 * for slow periods. But arguably this is a feature. In addition, we
1277 * ought to take the time used to write the data into account.
71e58630
WD
1278 *
1279 * During some phases of big transfers (file FOO is uptodate) this is
1280 * called with a small bytes_written every time. As the kernel has to
1281 * round small waits up to guarantee that we actually wait at least the
1282 * requested number of microseconds, this can become grossly inaccurate.
1283 * We therefore keep track of the bytes we've written over time and only
1284 * sleep when the accumulated delay is at least 1 tenth of a second.
08571358
MP
1285 **/
1286static void sleep_for_bwlimit(int bytes_written)
1287{
71e58630 1288 static struct timeval prior_tv;
0f78b815 1289 static long total_written = 0;
71e58630
WD
1290 struct timeval tv, start_tv;
1291 long elapsed_usec, sleep_usec;
1292
1293#define ONE_SEC 1000000L /* # of microseconds in a second */
08571358 1294
9a0cfff5 1295 if (!bwlimit_writemax)
08571358 1296 return;
e681e820 1297
0f78b815 1298 total_written += bytes_written;
71e58630
WD
1299
1300 gettimeofday(&start_tv, NULL);
1301 if (prior_tv.tv_sec) {
1302 elapsed_usec = (start_tv.tv_sec - prior_tv.tv_sec) * ONE_SEC
1303 + (start_tv.tv_usec - prior_tv.tv_usec);
1304 total_written -= elapsed_usec * bwlimit / (ONE_SEC/1024);
1305 if (total_written < 0)
1306 total_written = 0;
1307 }
3151cbae 1308
71e58630
WD
1309 sleep_usec = total_written * (ONE_SEC/1024) / bwlimit;
1310 if (sleep_usec < ONE_SEC / 10) {
1311 prior_tv = start_tv;
1312 return;
1313 }
08571358 1314
71e58630
WD
1315 tv.tv_sec = sleep_usec / ONE_SEC;
1316 tv.tv_usec = sleep_usec % ONE_SEC;
98b332ed 1317 select(0, NULL, NULL, NULL, &tv);
71e58630
WD
1318
1319 gettimeofday(&prior_tv, NULL);
1320 elapsed_usec = (prior_tv.tv_sec - start_tv.tv_sec) * ONE_SEC
1321 + (prior_tv.tv_usec - start_tv.tv_usec);
1322 total_written = (sleep_usec - elapsed_usec) * bwlimit / (ONE_SEC/1024);
08571358
MP
1323}
1324
1f75bb10 1325/* Write len bytes to the file descriptor fd, looping as necessary to get
0fb2fc4a 1326 * the job done and also (in certain circumstances) reading any data on
af436313 1327 * msg_fd_in to avoid deadlock.
880da007
MP
1328 *
1329 * This function underlies the multiplexing system. The body of the
1f75bb10 1330 * application never calls this function directly. */
4a19c3b2 1331static void writefd_unbuffered(int fd, const char *buf, size_t len)
720b47f2 1332{
1ea087a7 1333 size_t n, total = 0;
6c850772 1334 fd_set w_fds, r_fds, e_fds;
d8aeda1e 1335 int maxfd, count, cnt, using_r_fds;
e4c877cf 1336 int defer_inc = 0;
8d9dc9f9 1337 struct timeval tv;
720b47f2 1338
3be97bf9 1339 if (no_flush++)
e4c877cf 1340 defer_forwarding_messages++, defer_inc++;
e44f9a12 1341
4c36ddbe 1342 while (total < len) {
8d9dc9f9 1343 FD_ZERO(&w_fds);
6c850772
WD
1344 FD_SET(fd, &w_fds);
1345 FD_ZERO(&e_fds);
1346 FD_SET(fd, &e_fds);
1ea087a7 1347 maxfd = fd;
4c36ddbe 1348
954bbed8 1349 if (msg_fd_in >= 0) {
56014c8c 1350 FD_ZERO(&r_fds);
6c850772 1351 FD_SET(msg_fd_in, &r_fds);
1ea087a7
WD
1352 if (msg_fd_in > maxfd)
1353 maxfd = msg_fd_in;
3eeac9bc
WD
1354 using_r_fds = 1;
1355 } else
1356 using_r_fds = 0;
8d9dc9f9 1357
e626b29e 1358 tv.tv_sec = select_timeout;
8d9dc9f9 1359 tv.tv_usec = 0;
4c36ddbe 1360
554e0a8d 1361 errno = 0;
3eeac9bc 1362 count = select(maxfd + 1, using_r_fds ? &r_fds : NULL,
6c850772 1363 &w_fds, &e_fds, &tv);
4c36ddbe
AT
1364
1365 if (count <= 0) {
1ea087a7 1366 if (count < 0 && errno == EBADF)
554e0a8d 1367 exit_cleanup(RERR_SOCKETIO);
bd717af8 1368 check_timeout();
8d9dc9f9
AT
1369 continue;
1370 }
4c36ddbe 1371
6c850772
WD
1372 if (FD_ISSET(fd, &e_fds)) {
1373 rsyserr(FINFO, errno,
1374 "select exception on fd %d", fd);
1375 }
1376
3eeac9bc 1377 if (using_r_fds && FD_ISSET(msg_fd_in, &r_fds))
d17e1dd2 1378 read_msg_fd();
554e0a8d 1379
9a6ed83f 1380 if (!FD_ISSET(fd, &w_fds))
1ea087a7 1381 continue;
4c36ddbe 1382
1ea087a7 1383 n = len - total;
9a0cfff5 1384 if (bwlimit_writemax && n > bwlimit_writemax)
1ea087a7 1385 n = bwlimit_writemax;
d8aeda1e 1386 cnt = write(fd, buf + total, n);
1ea087a7 1387
d8aeda1e
WD
1388 if (cnt <= 0) {
1389 if (cnt < 0) {
3309507d
WD
1390 if (errno == EINTR)
1391 continue;
1392 if (errno == EWOULDBLOCK || errno == EAGAIN) {
1393 msleep(1);
1394 continue;
1395 }
f0359dd0
AT
1396 }
1397
1ea087a7 1398 /* Don't try to write errors back across the stream. */
1f75bb10 1399 if (fd == sock_f_out)
8ef246e0 1400 io_end_multiplex_out();
2a5df862
WD
1401 /* Don't try to write errors down a failing msg pipe. */
1402 if (am_server && fd == msg_fd_out)
1403 exit_cleanup(RERR_STREAMIO);
1ea087a7 1404 rsyserr(FERROR, errno,
b88c2e8f
WD
1405 "writefd_unbuffered failed to write %ld bytes [%s]",
1406 (long)len, who_am_i());
d1b31da7
WD
1407 /* If the other side is sending us error messages, try
1408 * to grab any messages they sent before they died. */
7f459268 1409 while (fd == sock_f_out && io_multiplexing_in) {
3b0a30eb 1410 set_io_timeout(30);
9e2409ab 1411 ignore_timeout = 0;
d1b31da7
WD
1412 readfd_unbuffered(sock_f_in, io_filesfrom_buf,
1413 sizeof io_filesfrom_buf);
1414 }
1ea087a7
WD
1415 exit_cleanup(RERR_STREAMIO);
1416 }
4c36ddbe 1417
d8aeda1e 1418 total += cnt;
e4c877cf 1419 defer_forwarding_messages++, defer_inc++;
a800434a 1420
1f75bb10 1421 if (fd == sock_f_out) {
626bec8e 1422 if (io_timeout || am_generator)
3b0a30eb 1423 last_io_out = time(NULL);
d8aeda1e 1424 sleep_for_bwlimit(cnt);
1f75bb10 1425 }
4c36ddbe 1426 }
e44f9a12
AT
1427
1428 no_flush--;
e4c877cf
WD
1429 if (!(defer_forwarding_messages -= defer_inc))
1430 msg_flush();
d6dead6b
AT
1431}
1432
dde5b772 1433void io_flush(int flush_it_all)
d6dead6b 1434{
1f75bb10 1435 if (!iobuf_out_cnt || no_flush)
d17e1dd2 1436 return;
8d9dc9f9 1437
d17e1dd2 1438 if (io_multiplexing_out)
332cf6df 1439 mplex_write(sock_f_out, MSG_DATA, iobuf_out, iobuf_out_cnt, 0);
d17e1dd2 1440 else
8ef246e0 1441 writefd_unbuffered(iobuf_f_out, iobuf_out, iobuf_out_cnt);
1f75bb10 1442 iobuf_out_cnt = 0;
e4c877cf
WD
1443
1444 if (flush_it_all && !defer_forwarding_messages)
1445 msg_flush();
8d9dc9f9
AT
1446}
1447
4a19c3b2 1448static void writefd(int fd, const char *buf, size_t len)
d6dead6b 1449{
1f75bb10
WD
1450 if (fd == sock_f_out)
1451 stats.total_written += len;
1452
b9f592fb
WD
1453 if (fd == write_batch_monitor_out) {
1454 if ((size_t)write(batch_fd, buf, len) != len)
1455 exit_cleanup(RERR_FILEIO);
1456 }
1457
8ef246e0 1458 if (!iobuf_out || fd != iobuf_f_out) {
4c36ddbe
AT
1459 writefd_unbuffered(fd, buf, len);
1460 return;
1461 }
d6dead6b
AT
1462
1463 while (len) {
1f75bb10 1464 int n = MIN((int)len, IO_BUFFER_SIZE - iobuf_out_cnt);
d6dead6b 1465 if (n > 0) {
1f75bb10 1466 memcpy(iobuf_out+iobuf_out_cnt, buf, n);
d6dead6b
AT
1467 buf += n;
1468 len -= n;
1f75bb10 1469 iobuf_out_cnt += n;
d6dead6b 1470 }
3151cbae 1471
1f75bb10 1472 if (iobuf_out_cnt == IO_BUFFER_SIZE)
d17e1dd2 1473 io_flush(NORMAL_FLUSH);
d6dead6b 1474 }
d6dead6b 1475}
720b47f2 1476
3a993aa4 1477void write_shortint(int f, unsigned short x)
9361f839 1478{
3a993aa4
WD
1479 char b[2];
1480 b[0] = (char)x;
1481 b[1] = (char)(x >> 8);
1482 writefd(f, b, 2);
9361f839
WD
1483}
1484
351e23ad
WD
1485void write_int(int f, int32 x)
1486{
1487 char b[4];
1488 SIVAL(b, 0, x);
1489 writefd(f, b, 4);
1490}
1491
f31514ad 1492void write_varint(int f, int32 x)
1c3344a1
WD
1493{
1494 char b[5];
351e23ad
WD
1495 uchar bit;
1496 int cnt = 4;
1497
1498 SIVAL(b, 1, x);
1499
1500 while (cnt > 1 && b[cnt] == 0)
1501 cnt--;
1502 bit = ((uchar)1<<(7-cnt+1));
1503 if (CVAL(b, cnt) >= bit) {
1504 cnt++;
1505 *b = ~(bit-1);
1506 } else if (cnt > 1)
1507 *b = b[cnt] | ~(bit*2-1);
1508 else
1509 *b = b[cnt];
1510
1511 writefd(f, b, cnt);
1c3344a1
WD
1512}
1513
351e23ad 1514void write_varlong(int f, int64 x, uchar min_bytes)
720b47f2 1515{
351e23ad
WD
1516 char b[9];
1517 uchar bit;
1518 int cnt = 8;
1519
1520 SIVAL(b, 1, x);
1521#if SIZEOF_INT64 >= 8
1522 SIVAL(b, 5, x >> 32);
1523#else
1524 if (x <= 0x7FFFFFFF && x >= 0)
1525 memset(b + 5, 0, 4);
1526 else {
1527 rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
1528 exit_cleanup(RERR_UNSUPPORTED);
1529 }
1530#endif
1531
1532 while (cnt > min_bytes && b[cnt] == 0)
1533 cnt--;
1534 bit = ((uchar)1<<(7-cnt+min_bytes));
1535 if (CVAL(b, cnt) >= bit) {
1536 cnt++;
1537 *b = ~(bit-1);
1538 } else if (cnt > min_bytes)
1539 *b = b[cnt] | ~(bit*2-1);
1540 else
1541 *b = b[cnt];
1542
1543 writefd(f, b, cnt);
720b47f2
AT
1544}
1545
7a24c346
MP
1546/*
1547 * Note: int64 may actually be a 32-bit type if ./configure couldn't find any
1548 * 64-bit types on this platform.
1549 */
71c46176 1550void write_longint(int f, int64 x)
3a6a366f 1551{
351e23ad 1552 char b[12], * const s = b+4;
3a6a366f 1553
351e23ad
WD
1554 SIVAL(s, 0, x);
1555 if (x <= 0x7FFFFFFF && x >= 0) {
482f48cc 1556 writefd(f, s, 4);
351e23ad
WD
1557 return;
1558 }
3a6a366f 1559
4ea4acf1 1560#if SIZEOF_INT64 < 8
351e23ad
WD
1561 rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
1562 exit_cleanup(RERR_UNSUPPORTED);
4ea4acf1 1563#else
351e23ad
WD
1564 memset(b, 0xFF, 4);
1565 SIVAL(s, 4, x >> 32);
1566 writefd(f, b, 12);
4ea4acf1 1567#endif
3a6a366f
AT
1568}
1569
4a19c3b2 1570void write_buf(int f, const char *buf, size_t len)
720b47f2 1571{
4c36ddbe 1572 writefd(f,buf,len);
720b47f2
AT
1573}
1574
880da007 1575/** Write a string to the connection */
4a19c3b2 1576void write_sbuf(int f, const char *buf)
f0fca04e 1577{
93095cbe 1578 writefd(f, buf, strlen(buf));
f0fca04e
AT
1579}
1580
9361f839 1581void write_byte(int f, uchar c)
182dca5c 1582{
93095cbe 1583 writefd(f, (char *)&c, 1);
182dca5c
AT
1584}
1585
4a19c3b2 1586void write_vstring(int f, const char *str, int len)
46e99b09
WD
1587{
1588 uchar lenbuf[3], *lb = lenbuf;
1589
1590 if (len > 0x7F) {
1591 if (len > 0x7FFF) {
1592 rprintf(FERROR,
1593 "attempting to send over-long vstring (%d > %d)\n",
1594 len, 0x7FFF);
1595 exit_cleanup(RERR_PROTOCOL);
1596 }
1597 *lb++ = len / 0x100 + 0x80;
1598 }
1599 *lb = len;
1600
1601 writefd(f, (char*)lenbuf, lb - lenbuf + 1);
1602 if (len)
1603 writefd(f, str, len);
1604}
1605
8a65e0ce
WD
1606/* Send a file-list index using a byte-reduction method. */
1607void write_ndx(int f, int32 ndx)
1608{
1609 static int32 prev_positive = -1, prev_negative = 1;
1610 int32 diff, cnt = 0;
1611 char b[6];
1612
1613 if (protocol_version < 30 || read_batch) {
1614 write_int(f, ndx);
1615 return;
1616 }
1617
1618 /* Send NDX_DONE as a single-byte 0 with no side effects. Send
1619 * negative nums as a positive after sending a leading 0xFF. */
1620 if (ndx >= 0) {
1621 diff = ndx - prev_positive;
1622 prev_positive = ndx;
1623 } else if (ndx == NDX_DONE) {
1624 *b = 0;
1625 writefd(f, b, 1);
1626 return;
1627 } else {
1628 b[cnt++] = (char)0xFF;
1629 ndx = -ndx;
1630 diff = ndx - prev_negative;
1631 prev_negative = ndx;
1632 }
1633
1634 /* A diff of 1 - 253 is sent as a one-byte diff; a diff of 254 - 32767
1635 * or 0 is sent as a 0xFE + a two-byte diff; otherwise we send 0xFE
1636 * & all 4 bytes of the (non-negative) num with the high-bit set. */
1637 if (diff < 0xFE && diff > 0)
1638 b[cnt++] = (char)diff;
1639 else if (diff < 0 || diff > 0x7FFF) {
1640 b[cnt++] = (char)0xFE;
1641 b[cnt++] = (char)((ndx >> 24) | 0x80);
8a65e0ce 1642 b[cnt++] = (char)ndx;
351e23ad
WD
1643 b[cnt++] = (char)(ndx >> 8);
1644 b[cnt++] = (char)(ndx >> 16);
8a65e0ce
WD
1645 } else {
1646 b[cnt++] = (char)0xFE;
1647 b[cnt++] = (char)(diff >> 8);
1648 b[cnt++] = (char)diff;
1649 }
1650 writefd(f, b, cnt);
1651}
1652
1653/* Receive a file-list index using a byte-reduction method. */
1654int32 read_ndx(int f)
1655{
1656 static int32 prev_positive = -1, prev_negative = 1;
1657 int32 *prev_ptr, num;
1658 char b[4];
1659
1660 if (protocol_version < 30)
1661 return read_int(f);
1662
1663 readfd(f, b, 1);
1664 if (CVAL(b, 0) == 0xFF) {
1665 readfd(f, b, 1);
1666 prev_ptr = &prev_negative;
1667 } else if (CVAL(b, 0) == 0)
1668 return NDX_DONE;
1669 else
1670 prev_ptr = &prev_positive;
1671 if (CVAL(b, 0) == 0xFE) {
1672 readfd(f, b, 2);
1673 if (CVAL(b, 0) & 0x80) {
351e23ad
WD
1674 b[3] = CVAL(b, 0) & ~0x80;
1675 b[0] = b[1];
1676 readfd(f, b+1, 2);
1677 num = IVAL(b, 0);
8a65e0ce 1678 } else
351e23ad 1679 num = (UVAL(b,0)<<8) + UVAL(b,1) + *prev_ptr;
8a65e0ce 1680 } else
351e23ad 1681 num = UVAL(b, 0) + *prev_ptr;
8a65e0ce
WD
1682 *prev_ptr = num;
1683 if (prev_ptr == &prev_negative)
1684 num = -num;
1685 return num;
1686}
1687
914cc65c 1688/**
6ed6d7f5
WD
1689 * Read a line of up to @p maxlen characters into @p buf (not counting
1690 * the trailing null). Strips the (required) trailing newline and all
1691 * carriage returns.
914cc65c 1692 *
6ed6d7f5 1693 * @return 1 for success; 0 for I/O error or truncation.
914cc65c 1694 **/
9dd891bb 1695int read_line(int f, char *buf, size_t maxlen)
f0fca04e
AT
1696{
1697 while (maxlen) {
528bfcd7 1698 buf[0] = 0;
f0fca04e 1699 read_buf(f, buf, 1);
914cc65c
MP
1700 if (buf[0] == 0)
1701 return 0;
6ed6d7f5 1702 if (buf[0] == '\n')
f0fca04e 1703 break;
f0fca04e
AT
1704 if (buf[0] != '\r') {
1705 buf++;
1706 maxlen--;
1707 }
1708 }
6ed6d7f5
WD
1709 *buf = '\0';
1710 return maxlen > 0;
f0fca04e
AT
1711}
1712
f0fca04e
AT
1713void io_printf(int fd, const char *format, ...)
1714{
d62bcc17 1715 va_list ap;
33544bf4 1716 char buf[BIGPATHBUFLEN];
f0fca04e 1717 int len;
3151cbae 1718
f0fca04e 1719 va_start(ap, format);
3151cbae 1720 len = vsnprintf(buf, sizeof buf, format, ap);
f0fca04e
AT
1721 va_end(ap);
1722
f89b9368
WD
1723 if (len < 0)
1724 exit_cleanup(RERR_STREAMIO);
f0fca04e 1725
33544bf4
WD
1726 if (len > (int)sizeof buf) {
1727 rprintf(FERROR, "io_printf() was too long for the buffer.\n");
1728 exit_cleanup(RERR_STREAMIO);
1729 }
1730
f0fca04e
AT
1731 write_sbuf(fd, buf);
1732}
8d9dc9f9 1733
d17e1dd2 1734/** Setup for multiplexing a MSG_* stream with the data stream. */
1f75bb10 1735void io_start_multiplex_out(void)
8d9dc9f9 1736{
d17e1dd2 1737 io_flush(NORMAL_FLUSH);
8ef246e0 1738 io_start_buffering_out(sock_f_out);
8d9dc9f9
AT
1739 io_multiplexing_out = 1;
1740}
1741
d17e1dd2 1742/** Setup for multiplexing a MSG_* stream with the data stream. */
1f75bb10 1743void io_start_multiplex_in(void)
8d9dc9f9 1744{
d17e1dd2 1745 io_flush(NORMAL_FLUSH);
8ef246e0 1746 io_start_buffering_in(sock_f_in);
8d9dc9f9
AT
1747 io_multiplexing_in = 1;
1748}
1749
d17e1dd2 1750/** Write an message to the multiplexed data stream. */
332cf6df 1751int io_multiplex_write(enum msgcode code, const char *buf, size_t len, int convert)
8d9dc9f9 1752{
f89b9368
WD
1753 if (!io_multiplexing_out)
1754 return 0;
d17e1dd2 1755 io_flush(NORMAL_FLUSH);
1b7c47cb 1756 stats.total_written += (len+4);
332cf6df 1757 mplex_write(sock_f_out, code, buf, len, convert);
8d9dc9f9
AT
1758 return 1;
1759}
1760
8ef246e0 1761void io_end_multiplex_in(void)
7f459268
WD
1762{
1763 io_multiplexing_in = 0;
8ef246e0 1764 io_end_buffering_in();
7f459268
WD
1765}
1766
d17e1dd2 1767/** Stop output multiplexing. */
8ef246e0 1768void io_end_multiplex_out(void)
554e0a8d
AT
1769{
1770 io_multiplexing_out = 0;
8ef246e0 1771 io_end_buffering_out();
554e0a8d
AT
1772}
1773
b9f592fb
WD
1774void start_write_batch(int fd)
1775{
1776 /* Some communication has already taken place, but we don't
1777 * enable batch writing until here so that we can write a
1778 * canonical record of the communication even though the
1779 * actual communication so far depends on whether a daemon
1780 * is involved. */
1781 write_int(batch_fd, protocol_version);
1782 write_int(batch_fd, checksum_seed);
b9f592fb
WD
1783
1784 if (am_sender)
1785 write_batch_monitor_out = fd;
1786 else
1787 write_batch_monitor_in = fd;
1788}
1789
1790void stop_write_batch(void)
1791{
1792 write_batch_monitor_out = -1;
1793 write_batch_monitor_in = -1;
1794}