extern int am_daemon;
extern int am_sender;
extern int eol_nulls;
+extern int checksum_seed;
+extern int protocol_version;
extern char *remote_filesfrom_file;
extern struct stats stats;
const char phase_unknown[] = "unknown";
int select_timeout = SELECT_TIMEOUT;
+int batch_fd = -1;
/**
* The connection might be dropped at some point; perhaps because the
int msg_fd_in = -1;
int msg_fd_out = -1;
+static int write_batch_monitor_in = -1;
+static int write_batch_monitor_out = -1;
+
static int io_filesfrom_f_in = -1;
static int io_filesfrom_f_out = -1;
static char io_filesfrom_buf[2048];
int count;
FD_ZERO(&r_fds);
+ FD_ZERO(&w_fds);
FD_SET(fd, &r_fds);
if (msg_fd_in >= 0) {
FD_SET(msg_fd_in, &r_fds);
new_fd = -1;
}
} else {
- FD_ZERO(&w_fds);
FD_SET(io_filesfrom_f_out, &w_fds);
new_fd = io_filesfrom_f_out;
}
errno = 0;
- count = select(maxfd + 1, &r_fds,
- io_filesfrom_buflen? &w_fds : NULL,
- NULL, &tv);
+ count = select(maxfd + 1, &r_fds, &w_fds, NULL, &tv);
if (count <= 0) {
- check_timeout();
if (errno == EBADF)
exit_cleanup(RERR_SOCKETIO);
+ check_timeout();
continue;
}
*
* Never returns <= 0.
*/
-static int read_unbuffered(int fd, char *buf, size_t len)
+static int readfd_unbuffered(int fd, char *buf, size_t len)
{
static size_t remaining;
int tag, ret = 0;
bufferSz = 2 * IO_BUFFER_SIZE;
buffer = new_array(char, bufferSz);
if (!buffer)
- out_of_memory("read_unbuffered");
+ out_of_memory("readfd_unbuffered");
}
remaining = read_timeout(fd, buffer, bufferSz);
bufferIdx = 0;
if (!buffer || remaining > bufferSz) {
buffer = realloc_array(buffer, char, remaining);
if (!buffer)
- out_of_memory("read_unbuffered");
+ out_of_memory("readfd_unbuffered");
bufferSz = remaining;
}
read_loop(fd, buffer, remaining);
size_t total = 0;
while (total < N) {
- ret = read_unbuffered(fd, buffer + total, N-total);
+ ret = readfd_unbuffered(fd, buffer + total, N-total);
total += ret;
}
+ if (fd == write_batch_monitor_in) {
+ if ((size_t)write(batch_fd, buffer, total) != total)
+ exit_cleanup(RERR_FILEIO);
+ }
+
stats.total_read += total;
}
&w_fds, NULL, &tv);
if (count <= 0) {
- check_timeout();
if (count < 0 && errno == EBADF)
exit_cleanup(RERR_SOCKETIO);
+ check_timeout();
continue;
}
SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
- if (n > (sizeof buffer - 4)) {
+ if (n > sizeof buffer - 4)
n = sizeof buffer - 4;
- }
memcpy(&buffer[4], buf, n);
writefd_unbuffered(fd, buffer, n+4);
exit_cleanup(RERR_PROTOCOL);
}
+ if (fd == write_batch_monitor_out) {
+ if ((size_t)write(batch_fd, buf, len) != len)
+ exit_cleanup(RERR_FILEIO);
+ }
+
if (!io_buffer || fd != multiplex_out_fd) {
writefd_unbuffered(fd, buf, len);
return;
io_multiplexing_out = 0;
}
+void start_write_batch(int fd)
+{
+ /* Some communication has already taken place, but we don't
+ * enable batch writing until here so that we can write a
+ * canonical record of the communication even though the
+ * actual communication so far depends on whether a daemon
+ * is involved. */
+ write_int(batch_fd, protocol_version);
+ write_int(batch_fd, checksum_seed);
+ stats.total_written -= sizeof (int) * 2;
+
+ if (am_sender)
+ write_batch_monitor_out = fd;
+ else
+ write_batch_monitor_in = fd;
+}
+
+void stop_write_batch(void)
+{
+ write_batch_monitor_out = -1;
+ write_batch_monitor_in = -1;
+}