* Copyright (C) 1996-2001 Andrew Tridgell
* Copyright (C) 1996 Paul Mackerras
* Copyright (C) 2001, 2002 Martin Pool <mbp@samba.org>
- * Copyright (C) 2003-2008 Wayne Davison
+ * Copyright (C) 2003-2009 Wayne Davison
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
#include "rsync.h"
#include "ifuncs.h"
+#include "inums.h"
/** If no timeout is specified then use a 60 second select timeout */
#define SELECT_TIMEOUT 60
extern int io_error;
extern int eol_nulls;
extern int flist_eof;
+extern int file_total;
+extern int file_old_total;
extern int list_only;
extern int read_batch;
extern int protect_args;
int ignore_timeout = 0;
int batch_fd = -1;
int msgdone_cnt = 0;
-int check_for_io_err = 0;
/* Ignore an EOF error if non-zero. See whine_about_eof(). */
int kluge_around_eof = 0;
static void writefd(int fd, const char *buf, size_t len);
static void writefd_unbuffered(int fd, const char *buf, size_t len);
static void mplex_write(int fd, enum msgcode code, const char *buf, size_t len, int convert);
+static void read_a_msg(int fd);
-struct flist_ndx_item {
- struct flist_ndx_item *next;
- int ndx;
-};
-
-struct flist_ndx_list {
- struct flist_ndx_item *head, *tail;
-};
-
-static struct flist_ndx_list redo_list, hlink_list;
+static flist_ndx_list redo_list, hlink_list;
struct msg_list_item {
struct msg_list_item *next;
static struct msg_list msg_queue;
-static void flist_ndx_push(struct flist_ndx_list *lp, int ndx)
-{
- struct flist_ndx_item *item;
-
- if (!(item = new(struct flist_ndx_item)))
- out_of_memory("flist_ndx_push");
- item->next = NULL;
- item->ndx = ndx;
- if (lp->tail)
- lp->tail->next = item;
- else
- lp->head = item;
- lp->tail = item;
-}
-
-static int flist_ndx_pop(struct flist_ndx_list *lp)
-{
- struct flist_ndx_item *next;
- int ndx;
-
- if (!lp->head)
- return -1;
-
- ndx = lp->head->ndx;
- next = lp->head->next;
- free(lp->head);
- lp->head = next;
- if (!next)
- lp->tail = NULL;
-
- return ndx;
-}
-
static void got_flist_entry_status(enum festatus status, const char *buf)
{
int ndx = IVAL(buf, 0);
}
break;
case FES_REDO:
+ if (read_batch) {
+ if (inc_recurse)
+ flist->in_progress++;
+ break;
+ }
if (inc_recurse)
flist->to_redo++;
flist_ndx_push(&redo_list, ndx);
}
}
-/* Read a message from the MSG_* fd and handle it. This is called either
- * during the early stages of being a local sender (up through the sending
- * of the file list) or when we're the generator (to fetch the messages
- * from the receiver). */
-static void read_msg_fd(void)
-{
- char buf[2048];
- size_t n;
- struct file_list *flist;
- int fd = msg_fd_in;
- int tag, len;
-
- /* Temporarily disable msg_fd_in. This is needed to avoid looping back
- * to this routine from writefd_unbuffered(). */
- no_flush++;
- msg_fd_in = -1;
- defer_forwarding_messages++;
-
- readfd(fd, buf, 4);
- tag = IVAL(buf, 0);
-
- len = tag & 0xFFFFFF;
- tag = (tag >> 24) - MPLEX_BASE;
-
- check_for_io_err = 0;
-
- switch (tag) {
- case MSG_DONE:
- if (len < 0 || len > 1 || !am_generator) {
- invalid_msg:
- rprintf(FERROR, "invalid message %d:%d [%s%s]\n",
- tag, len, who_am_i(),
- inc_recurse ? "/inc" : "");
- exit_cleanup(RERR_STREAMIO);
- }
- if (len) {
- readfd(fd, buf, len);
- stats.total_read = read_varlong(fd, 3);
- }
- msgdone_cnt++;
- break;
- case MSG_REDO:
- if (len != 4 || !am_generator)
- goto invalid_msg;
- readfd(fd, buf, 4);
- got_flist_entry_status(FES_REDO, buf);
- break;
- case MSG_FLIST:
- if (len != 4 || !am_generator || !inc_recurse)
- goto invalid_msg;
- readfd(fd, buf, 4);
- /* Read extra file list from receiver. */
- assert(iobuf_in != NULL);
- assert(iobuf_f_in == fd);
- if (DEBUG_GTE(FLIST, 2)) {
- rprintf(FINFO, "[%s] receiving flist for dir %d\n",
- who_am_i(), IVAL(buf,0));
- }
- flist = recv_file_list(fd);
- flist->parent_ndx = IVAL(buf,0);
- /* If the sender is going to send us an MSG_IO_ERROR value, it
- * will always be the very next message following MSG_FLIST. */
- check_for_io_err = 1;
-#ifdef SUPPORT_HARD_LINKS
- if (preserve_hard_links)
- match_hard_links(flist);
-#endif
- break;
- case MSG_FLIST_EOF:
- if (len != 0 || !am_generator || !inc_recurse)
- goto invalid_msg;
- flist_eof = 1;
- break;
- case MSG_IO_ERROR:
- if (len != 4)
- goto invalid_msg;
- readfd(fd, buf, len);
- io_error |= IVAL(buf, 0);
- break;
- case MSG_DELETED:
- if (len >= (int)sizeof buf || !am_generator)
- goto invalid_msg;
- readfd(fd, buf, len);
- send_msg(MSG_DELETED, buf, len, 1);
- break;
- case MSG_SUCCESS:
- if (len != 4 || !am_generator)
- goto invalid_msg;
- readfd(fd, buf, 4);
- got_flist_entry_status(FES_SUCCESS, buf);
- break;
- case MSG_NO_SEND:
- if (len != 4 || !am_generator)
- goto invalid_msg;
- readfd(fd, buf, 4);
- got_flist_entry_status(FES_NO_SEND, buf);
- break;
- case MSG_ERROR_SOCKET:
- case MSG_ERROR_UTF8:
- case MSG_CLIENT:
- if (!am_generator)
- goto invalid_msg;
- if (tag == MSG_ERROR_SOCKET)
- io_end_multiplex_out();
- /* FALL THROUGH */
- case MSG_INFO:
- case MSG_ERROR:
- case MSG_ERROR_XFER:
- case MSG_WARNING:
- case MSG_LOG:
- while (len) {
- n = len;
- if (n >= sizeof buf)
- n = sizeof buf - 1;
- readfd(fd, buf, n);
- rwrite((enum logcode)tag, buf, n, !am_generator);
- len -= n;
- }
- break;
- default:
- rprintf(FERROR, "unknown message %d:%d [%s]\n",
- tag, len, who_am_i());
- exit_cleanup(RERR_STREAMIO);
- }
-
- no_flush--;
- msg_fd_in = fd;
- if (!--defer_forwarding_messages && !no_flush)
- msg_flush();
-}
-
/* This is used by the generator to limit how many file transfers can
* be active at once when --remove-source-files is specified. Without
* this, sender-side deletions were mostly happening at the end. */
void increment_active_files(int ndx, int itemizing, enum logcode code)
{
- /* TODO: tune these limits? */
- while (active_filecnt >= (active_bytecnt >= 128*1024 ? 10 : 50)) {
+ while (1) {
+ /* TODO: tune these limits? */
+ int limit = active_bytecnt >= 128*1024 ? 10 : 50;
+ if (active_filecnt < limit)
+ break;
check_for_finished_files(itemizing, code, 0);
+ if (active_filecnt < limit)
+ break;
if (iobuf_out_cnt)
io_flush(NORMAL_FLUSH);
else
- read_msg_fd();
+ read_a_msg(msg_fd_in);
}
active_filecnt++;
msg_list_add(&msg_queue, code, buf, len, convert);
return 1;
}
- if (flist_forward_from >= 0)
+ if (defer_forwarding_messages)
msg_list_add(&msg_queue, code, buf, len, convert);
else
mplex_write(msg_fd_out, code, buf, len, convert);
{
if (io_flush(FULL_FLUSH))
return;
- read_msg_fd();
+ read_a_msg(msg_fd_in);
}
int get_redo_num(void)
rprintf(FERROR, RSYNC_NAME ": connection unexpectedly closed "
"(%s bytes received so far) [%s]\n",
- big_num(stats.total_read, 0), who_am_i());
+ big_num(stats.total_read), who_am_i());
exit_cleanup(RERR_STREAMIO);
}
-/**
- * Read from a socket with I/O timeout. return the number of bytes
+/* Read from a socket with I/O timeout. return the number of bytes
* read. If no bytes can be read then exit, never return a number <= 0.
*
* TODO: If the remote shell connection fails, then current versions
* fairly common mistake to try to use rsh when ssh is required, we
* should trap that: if we fail to read any data at all, we should
* give a better explanation. We can tell whether the connection has
- * started by looking e.g. at whether the remote version is known yet.
- */
+ * started by looking e.g. at whether the remote version is known yet. */
static int read_timeout(int fd, char *buf, size_t len)
{
int n, cnt = 0;
maxfd = new_fd;
}
- tv.tv_sec = select_timeout;
+ if (am_sender && inc_recurse && !flist_eof && !defer_forwarding_messages && !cnt
+ && file_total - file_old_total < MAX_FILECNT_LOOKAHEAD
+ && file_total - file_old_total >= MIN_FILECNT_LOOKAHEAD)
+ tv.tv_sec = 0;
+ else
+ tv.tv_sec = select_timeout;
tv.tv_usec = 0;
errno = 0;
defer_forwarding_messages = 0;
exit_cleanup(RERR_SOCKETIO);
}
- check_timeout();
+ if (am_sender && tv.tv_sec == 0)
+ send_extra_file_list(sock_f_out, -1);
+ else
+ check_timeout();
continue;
}
io_flush(FULL_FLUSH);
}
-/**
- * Continue trying to read len bytes - don't return until len has been
- * read.
- **/
+/* Continue trying to read len bytes until all have been read.
+ * Used to read raw bytes from a multiplexed source. */
static void read_loop(int fd, char *buf, size_t len)
{
while (len) {
}
}
-/**
- * Read from the file descriptor handling multiplexing - return number
- * of bytes read.
- *
- * Never returns <= 0.
- */
-static int readfd_unbuffered(int fd, char *buf, size_t len)
+/* Read a message from a multiplexed source. */
+static void read_a_msg(int fd)
{
- size_t msg_bytes;
- int tag, cnt = 0;
char line[BIGPATHBUFLEN];
+ size_t msg_bytes;
+ int tag, flist_parent = -1, save_msg_fd_in = msg_fd_in;
+
+ /* Temporarily disable msg_fd_in. This is needed to avoid looping back
+ * to this routine from writefd_unbuffered(). */
+ msg_fd_in = -1;
+
+ read_loop(fd, line, 4);
+ tag = IVAL(line, 0);
+
+ msg_bytes = tag & 0xFFFFFF;
+ tag = (tag >> 24) - MPLEX_BASE;
+
+ no_flush++;
+
+ switch (tag) {
+ case MSG_DATA:
+ if (msg_bytes > iobuf_in_siz) {
+ if (!(iobuf_in = realloc_array(iobuf_in, char, msg_bytes)))
+ out_of_memory("read_a_msg");
+ iobuf_in_siz = msg_bytes;
+ }
+ read_loop(fd, iobuf_in, msg_bytes);
+ iobuf_in_remaining = msg_bytes;
+ iobuf_in_ndx = 0;
+ break;
+ case MSG_DONE:
+ if (msg_bytes > 1 || !am_generator)
+ goto invalid_msg;
+ if (msg_bytes) {
+ read_loop(fd, line, 1);
+ stats.total_read = read_varlong(fd, 3);
+ }
+ msgdone_cnt++;
+ break;
+ case MSG_REDO:
+ if (msg_bytes != 4 || !am_generator)
+ goto invalid_msg;
+ read_loop(fd, line, 4);
+ got_flist_entry_status(FES_REDO, line);
+ break;
+ case MSG_FLIST:
+ if (msg_bytes != 4 || !am_generator || !inc_recurse)
+ goto invalid_msg;
+ read_loop(fd, line, 4);
+ /* Read extra file list from receiver. */
+ if (DEBUG_GTE(FLIST, 2)) {
+ rprintf(FINFO, "[%s] receiving flist for dir %d\n",
+ who_am_i(), IVAL(line, 0));
+ }
+ flist_parent = IVAL(line, 0);
+ break;
+ case MSG_FLIST_EOF:
+ if (msg_bytes != 0 || !am_generator || !inc_recurse)
+ goto invalid_msg;
+ flist_eof = 1;
+ break;
+ case MSG_IO_ERROR:
+ if (msg_bytes != 4 || am_sender)
+ goto invalid_msg;
+ read_loop(fd, line, 4);
+ io_error |= IVAL(line, 0);
+ if (!am_generator)
+ send_msg_int(MSG_IO_ERROR, IVAL(line, 0));
+ break;
+ case MSG_NOOP:
+ if (am_sender)
+ maybe_send_keepalive();
+ break;
+ case MSG_DEL_STATS:
+ if (msg_bytes)
+ goto invalid_msg;
+ read_del_stats(fd);
+ if (am_sender && am_server)
+ write_del_stats(sock_f_out);
+ break;
+ case MSG_DELETED:
+ if (msg_bytes >= sizeof line)
+ goto overflow;
+ if (am_generator) {
+ read_loop(fd, line, msg_bytes);
+ send_msg(MSG_DELETED, line, msg_bytes, 1);
+ break;
+ }
+#ifdef ICONV_OPTION
+ if (ic_recv != (iconv_t)-1) {
+ xbuf outbuf, inbuf;
+ char ibuf[512];
+ int add_null = 0;
+
+ INIT_CONST_XBUF(outbuf, line);
+ INIT_XBUF(inbuf, ibuf, 0, (size_t)-1);
+
+ while (msg_bytes) {
+ inbuf.len = msg_bytes > sizeof ibuf
+ ? sizeof ibuf : msg_bytes;
+ read_loop(fd, inbuf.buf, inbuf.len);
+ if (!(msg_bytes -= inbuf.len)
+ && !ibuf[inbuf.len-1])
+ inbuf.len--, add_null = 1;
+ if (iconvbufs(ic_send, &inbuf, &outbuf,
+ ICB_INCLUDE_BAD | ICB_INCLUDE_INCOMPLETE) < 0)
+ goto overflow;
+ }
+ if (add_null) {
+ if (outbuf.len == outbuf.size)
+ goto overflow;
+ outbuf.buf[outbuf.len++] = '\0';
+ }
+ msg_bytes = outbuf.len;
+ } else
+#endif
+ read_loop(fd, line, msg_bytes);
+ /* A directory name was sent with the trailing null */
+ if (msg_bytes > 0 && !line[msg_bytes-1])
+ log_delete(line, S_IFDIR);
+ else {
+ line[msg_bytes] = '\0';
+ log_delete(line, S_IFREG);
+ }
+ break;
+ case MSG_SUCCESS:
+ if (msg_bytes != 4) {
+ invalid_msg:
+ rprintf(FERROR, "invalid multi-message %d:%lu [%s%s]\n",
+ tag, (unsigned long)msg_bytes, who_am_i(),
+ inc_recurse ? "/inc" : "");
+ exit_cleanup(RERR_STREAMIO);
+ }
+ read_loop(fd, line, 4);
+ if (am_generator)
+ got_flist_entry_status(FES_SUCCESS, line);
+ else
+ successful_send(IVAL(line, 0));
+ break;
+ case MSG_NO_SEND:
+ if (msg_bytes != 4)
+ goto invalid_msg;
+ read_loop(fd, line, 4);
+ if (am_generator)
+ got_flist_entry_status(FES_NO_SEND, line);
+ else
+ send_msg_int(MSG_NO_SEND, IVAL(line, 0));
+ break;
+ case MSG_ERROR_SOCKET:
+ case MSG_ERROR_UTF8:
+ case MSG_CLIENT:
+ case MSG_LOG:
+ if (!am_generator)
+ goto invalid_msg;
+ if (tag == MSG_ERROR_SOCKET)
+ io_end_multiplex_out();
+ /* FALL THROUGH */
+ case MSG_INFO:
+ case MSG_ERROR:
+ case MSG_ERROR_XFER:
+ case MSG_WARNING:
+ if (msg_bytes >= sizeof line) {
+ overflow:
+ rprintf(FERROR,
+ "multiplexing overflow %d:%lu [%s%s]\n",
+ tag, (unsigned long)msg_bytes, who_am_i(),
+ inc_recurse ? "/inc" : "");
+ exit_cleanup(RERR_STREAMIO);
+ }
+ read_loop(fd, line, msg_bytes);
+ rwrite((enum logcode)tag, line, msg_bytes, !am_generator);
+ if (first_message) {
+ if (list_only && !am_sender && tag == 1) {
+ line[msg_bytes] = '\0';
+ check_for_d_option_error(line);
+ }
+ first_message = 0;
+ }
+ break;
+ default:
+ rprintf(FERROR, "unexpected tag %d [%s%s]\n",
+ tag, who_am_i(), inc_recurse ? "/inc" : "");
+ exit_cleanup(RERR_STREAMIO);
+ }
+
+ msg_fd_in = save_msg_fd_in;
+ no_flush--;
+
+ if (flist_parent >= 0) {
+ struct file_list *flist = recv_file_list(fd);
+ flist->parent_ndx = flist_parent;
+#ifdef SUPPORT_HARD_LINKS
+ if (preserve_hard_links)
+ match_hard_links(flist);
+#endif
+ }
+}
+
+/* Read from the file descriptor handling multiplexing and return the
+ * number of bytes read. Never returns <= 0. */
+static int readfd_unbuffered(int fd, char *buf, size_t len)
+{
+ int cnt = 0;
if (!iobuf_in || fd != iobuf_f_in)
return read_timeout(fd, buf, len);
cnt = len;
break;
}
-
- read_loop(fd, line, 4);
- tag = IVAL(line, 0);
-
- msg_bytes = tag & 0xFFFFFF;
- tag = (tag >> 24) - MPLEX_BASE;
-
- check_for_io_err = 0;
-
- switch (tag) {
- case MSG_DATA:
- if (msg_bytes > iobuf_in_siz) {
- if (!(iobuf_in = realloc_array(iobuf_in, char,
- msg_bytes)))
- out_of_memory("readfd_unbuffered");
- iobuf_in_siz = msg_bytes;
- }
- read_loop(fd, iobuf_in, msg_bytes);
- iobuf_in_remaining = msg_bytes;
- iobuf_in_ndx = 0;
- break;
- case MSG_NOOP:
- if (am_sender)
- maybe_send_keepalive();
- break;
- case MSG_IO_ERROR:
- if (msg_bytes != 4)
- goto invalid_msg;
- read_loop(fd, line, msg_bytes);
- send_msg_int(MSG_IO_ERROR, IVAL(line, 0));
- io_error |= IVAL(line, 0);
- break;
- case MSG_DELETED:
- if (msg_bytes >= sizeof line)
- goto overflow;
-#ifdef ICONV_OPTION
- if (ic_recv != (iconv_t)-1) {
- xbuf outbuf, inbuf;
- char ibuf[512];
- int add_null = 0;
-
- INIT_CONST_XBUF(outbuf, line);
- INIT_XBUF(inbuf, ibuf, 0, (size_t)-1);
-
- while (msg_bytes) {
- inbuf.len = msg_bytes > sizeof ibuf
- ? sizeof ibuf : msg_bytes;
- read_loop(fd, inbuf.buf, inbuf.len);
- if (!(msg_bytes -= inbuf.len)
- && !ibuf[inbuf.len-1])
- inbuf.len--, add_null = 1;
- if (iconvbufs(ic_send, &inbuf, &outbuf,
- ICB_INCLUDE_BAD | ICB_INCLUDE_INCOMPLETE) < 0)
- goto overflow;
- }
- if (add_null) {
- if (outbuf.len == outbuf.size)
- goto overflow;
- outbuf.buf[outbuf.len++] = '\0';
- }
- msg_bytes = outbuf.len;
- } else
-#endif
- read_loop(fd, line, msg_bytes);
- /* A directory name was sent with the trailing null */
- if (msg_bytes > 0 && !line[msg_bytes-1])
- log_delete(line, S_IFDIR);
- else {
- line[msg_bytes] = '\0';
- log_delete(line, S_IFREG);
- }
- break;
- case MSG_SUCCESS:
- if (msg_bytes != 4) {
- invalid_msg:
- rprintf(FERROR, "invalid multi-message %d:%ld [%s]\n",
- tag, (long)msg_bytes, who_am_i());
- exit_cleanup(RERR_STREAMIO);
- }
- read_loop(fd, line, msg_bytes);
- successful_send(IVAL(line, 0));
- break;
- case MSG_NO_SEND:
- if (msg_bytes != 4)
- goto invalid_msg;
- read_loop(fd, line, msg_bytes);
- send_msg_int(MSG_NO_SEND, IVAL(line, 0));
- break;
- case MSG_INFO:
- case MSG_ERROR:
- case MSG_ERROR_XFER:
- case MSG_WARNING:
- if (msg_bytes >= sizeof line) {
- overflow:
- rprintf(FERROR,
- "multiplexing overflow %d:%ld [%s]\n",
- tag, (long)msg_bytes, who_am_i());
- exit_cleanup(RERR_STREAMIO);
- }
- read_loop(fd, line, msg_bytes);
- rwrite((enum logcode)tag, line, msg_bytes, 1);
- if (first_message) {
- if (list_only && !am_sender && tag == 1) {
- line[msg_bytes] = '\0';
- check_for_d_option_error(line);
- }
- first_message = 0;
- }
- break;
- default:
- rprintf(FERROR, "unexpected tag %d [%s]\n",
- tag, who_am_i());
- exit_cleanup(RERR_STREAMIO);
- }
+ read_a_msg(fd);
}
if (iobuf_in_remaining == 0)
total_written = (sleep_usec - elapsed_usec) * bwlimit / (ONE_SEC/1024);
}
+static const char *what_fd_is(int fd)
+{
+ static char buf[20];
+
+ if (fd == sock_f_out)
+ return "socket";
+ else if (fd == msg_fd_out)
+ return "message fd";
+ else if (fd == batch_fd)
+ return "batch file";
+ else {
+ snprintf(buf, sizeof buf, "fd %d", fd);
+ return buf;
+ }
+}
+
/* Write len bytes to the file descriptor fd, looping as necessary to get
* the job done and also (in certain circumstances) reading any data on
* msg_fd_in to avoid deadlock.
FD_SET(fd, &e_fds);
maxfd = fd;
- if (msg_fd_in >= 0) {
+ if (msg_fd_in >= 0 && iobuf_in_remaining == 0) {
FD_ZERO(&r_fds);
FD_SET(msg_fd_in, &r_fds);
if (msg_fd_in > maxfd)
rprintf(FINFO, "select exception on fd %d\n", fd); */
if (using_r_fds && FD_ISSET(msg_fd_in, &r_fds))
- read_msg_fd();
+ read_a_msg(msg_fd_in);
if (!FD_ISSET(fd, &w_fds))
continue;
if (am_server && fd == msg_fd_out)
exit_cleanup(RERR_STREAMIO);
rsyserr(FERROR, errno,
- "writefd_unbuffered failed to write %ld bytes [%s]",
- (long)len, who_am_i());
+ "writefd_unbuffered failed to write %ld bytes to %s [%s]",
+ (long)len, what_fd_is(fd), who_am_i());
/* If the other side is sending us error messages, try
* to grab any messages they sent before they died. */
while (!am_server && fd == sock_f_out && io_multiplexing_in) {
char buf[1024];
set_io_timeout(30);
ignore_timeout = 0;
- readfd_unbuffered(sock_f_in, buf, sizeof buf);
+ readfd_unbuffered(iobuf_f_in, buf, sizeof buf);
}
exit_cleanup(RERR_STREAMIO);
}
if (iobuf_out_cnt) {
if (io_multiplexing_out)
- mplex_write(sock_f_out, MSG_DATA, iobuf_out, iobuf_out_cnt, 0);
+ mplex_write(iobuf_f_out, MSG_DATA, iobuf_out, iobuf_out_cnt, 0);
else
writefd_unbuffered(iobuf_f_out, iobuf_out, iobuf_out_cnt);
iobuf_out_cnt = 0;
if (fd == sock_f_out)
stats.total_written += len;
- if (fd == write_batch_monitor_out) {
- if ((size_t)write(batch_fd, buf, len) != len)
- exit_cleanup(RERR_FILEIO);
- }
+ if (fd == write_batch_monitor_out)
+ writefd_unbuffered(batch_fd, buf, len);
if (!iobuf_out || fd != iobuf_f_out) {
writefd_unbuffered(fd, buf, len);
}
/** Setup for multiplexing a MSG_* stream with the data stream. */
-void io_start_multiplex_out(void)
+void io_start_multiplex_out(int f)
{
io_flush(NORMAL_FLUSH);
- io_start_buffering_out(sock_f_out);
+ io_start_buffering_out(f);
io_multiplexing_out = 1;
}
/** Setup for multiplexing a MSG_* stream with the data stream. */
-void io_start_multiplex_in(void)
+void io_start_multiplex_in(int f)
{
io_flush(NORMAL_FLUSH);
- io_start_buffering_in(sock_f_in);
+ io_start_buffering_in(f);
io_multiplexing_in = 1;
}
return 0;
io_flush(NORMAL_FLUSH);
stats.total_written += (len+4);
- mplex_write(sock_f_out, code, buf, len, convert);
+ mplex_write(iobuf_f_out, code, buf, len, convert);
return 1;
}