return len;
}
+
+static char *wf_writeBuf;
+static size_t wf_writeBufSize;
+static size_t wf_writeBufCnt;
+
+int flush_write_file(int f)
+{
+ int ret = write(f, wf_writeBuf, wf_writeBufCnt);
+ if (ret < 0)
+ return ret;
+ /* if (ret < wf_writeBufCnt) ??? */
+ wf_writeBufCnt = 0;
+ return ret;
+}
+
/*
* write_file does not allow incomplete writes. It loops internally
* until len bytes are written or errno is set.
int len1 = MIN(len, SPARSE_WRITE_SIZE);
r1 = write_sparse(f, buf, len1);
} else {
- r1 = write(f, buf, len);
+ if (!wf_writeBuf) {
+ wf_writeBufSize = MAX_MAP_SIZE;
+ wf_writeBufCnt = 0;
+ wf_writeBuf = new_array(char, MAX_MAP_SIZE);
+ if (!wf_writeBuf) out_of_memory("write_file");
+ }
+ r1 = MIN(len, wf_writeBufSize - wf_writeBufCnt);
+ if (r1) {
+ memcpy(wf_writeBuf + wf_writeBufCnt, buf, r1);
+ wf_writeBufCnt += r1;
+ }
+ if (wf_writeBufCnt == wf_writeBufSize) {
+ if (flush_write_file(f) < 0) return -1;
+ if (!r1 && len)
+ continue;
+ }
}
if (r1 <= 0) {
if (ret > 0) return ret;
flist = flist_new();
if (f != -1) {
- io_start_buffering(f);
+ io_start_buffering_out(f);
if (filesfrom_fd >= 0) {
if (argv[0] && !push_dir(argv[0], 0)) {
rprintf(FERROR, "push_dir %s failed: %s\n",
static int io_multiplexing_out;
static int io_multiplexing_in;
-static int multiplex_in_fd;
-static int multiplex_out_fd;
+static int multiplex_in_fd = -1;
+static int multiplex_out_fd = -1;
static time_t last_io;
static int no_flush;
static size_t remaining;
int tag, ret = 0;
char line[1024];
+ static char *buffer;
+ static size_t bufferIdx = 0;
+ static size_t bufferSz;
- if (!io_multiplexing_in || fd != multiplex_in_fd)
+ if (fd != multiplex_in_fd)
return read_timeout(fd, buf, len);
+ if (!io_multiplexing_in && remaining == 0) {
+ if (!buffer) {
+ bufferSz = 2 * IO_BUFFER_SIZE;
+ buffer = new_array(char, bufferSz);
+ if (!buffer) out_of_memory("read_unbuffered");
+ }
+ remaining = read_timeout(fd, buffer, bufferSz);
+ bufferIdx = 0;
+ }
+
while (ret == 0) {
if (remaining) {
len = MIN(len, remaining);
- read_loop(fd, buf, len);
+ memcpy(buf, buffer + bufferIdx, len);
+ bufferIdx += len;
remaining -= len;
ret = len;
- continue;
+ break;
}
read_loop(fd, line, 4);
remaining = tag & 0xFFFFFF;
tag = tag >> 24;
- if (tag == MPLEX_BASE)
+ if (tag == MPLEX_BASE) {
+ if (!buffer || remaining > bufferSz) {
+ buffer = realloc_array(buffer, char, remaining);
+ if (!buffer) out_of_memory("read_unbuffered");
+ bufferSz = remaining;
+ }
+ read_loop(fd, buffer, remaining);
+ bufferIdx = 0;
continue;
+ }
tag -= MPLEX_BASE;
remaining = 0;
}
+ if (remaining == 0)
+ io_flush();
+
return ret;
}
size_t total=0;
while (total < N) {
- io_flush();
-
ret = read_unbuffered(fd, buffer + total, N-total);
total += ret;
}
static char *io_buffer;
static int io_buffer_count;
-void io_start_buffering(int fd)
+void io_start_buffering_out(int fd)
{
if (io_buffer) return;
multiplex_out_fd = fd;
io_buffer_count = 0;
}
+void io_start_buffering_in(int fd)
+{
+ multiplex_in_fd = fd;
+}
+
/**
* Write an message to a multiplexed stream. If this fails then rsync
* exits.
{
multiplex_out_fd = fd;
io_flush();
- io_start_buffering(fd);
+ io_start_buffering_out(fd);
io_multiplexing_out = 1;
}
exit_cleanup(0);
}
+ io_start_buffering_in(f_in);
+ io_start_buffering_out(f_out);
send_files(flist,f_out,f_in);
io_flush();
report(f_out);
close(error_pipe[1]);
if (f_in != f_out) close(f_in);
- io_start_buffering(f_out);
+ io_start_buffering_out(f_out);
io_set_error_fd(error_pipe[0]);
}
}
+ io_start_buffering_in(f_in);
if (delete_mode && !delete_excluded)
recv_exclude_list(f_in);
extern int cvs_exclude;
extern int delete_mode;
extern int delete_excluded;
+ io_start_buffering_out(f_out);
if (cvs_exclude)
add_cvs_excludes();
if (delete_mode && !delete_excluded)
if (verbose > 3)
rprintf(FINFO,"file list sent\n");
+ io_flush();
+ io_start_buffering_out(f_out);
send_files(flist,f_out,f_in);
+ io_flush();
if (protocol_version >= 24) {
/* final goodbye message */
read_int(f_in);
wait_process(pid, &status);
}
report(-1);
+ io_flush();
exit_cleanup(status);
}
offset += len;
}
+ flush_write_file(fd);
+
if (do_progress)
end_progress(total_size);