Matt McCutchen's Web Site
/
rsync
/
rsync.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Use the new LIVE_FLAGS define to fix a potential flag problem.
[rsync/rsync.git]
/
io.c
diff --git
a/io.c
b/io.c
index
117a9d6
..
bf7f3d9
100644
(file)
--- a/
io.c
+++ b/
io.c
@@
-41,8
+41,8
@@
static int io_multiplexing_out;
static int io_multiplexing_in;
static int io_multiplexing_out;
static int io_multiplexing_in;
-static int multiplex_in_fd;
-static int multiplex_out_fd;
+static int multiplex_in_fd
= -1
;
+static int multiplex_out_fd
= -1
;
static time_t last_io;
static int no_flush;
static time_t last_io;
static int no_flush;
@@
-91,7
+91,7
@@
static void check_timeout(void)
time_t t;
err_list_push();
time_t t;
err_list_push();
-
+
if (!io_timeout) return;
if (!last_io) {
if (!io_timeout) return;
if (!last_io) {
@@
-137,8
+137,8
@@
static void read_error_fd(void)
while (len) {
n = len;
while (len) {
n = len;
- if (n > (sizeof
(buf)-
1))
- n = sizeof
(buf)-
1;
+ if (n > (sizeof
buf -
1))
+ n = sizeof
buf -
1;
read_loop(fd, buf, n);
rwrite((enum logcode)tag, buf, n);
len -= n;
read_loop(fd, buf, n);
rwrite((enum logcode)tag, buf, n);
len -= n;
@@
-176,28
+176,28
@@
void io_set_filesfrom_fds(int f_in, int f_out)
* program where that is a problem (start_socket_client),
* kludge_around_eof is True and we just exit.
*/
* program where that is a problem (start_socket_client),
* kludge_around_eof is True and we just exit.
*/
-static void whine_about_eof
(void)
+static void whine_about_eof(void)
{
if (kludge_around_eof)
{
if (kludge_around_eof)
- exit_cleanup
(0);
+ exit_cleanup(0);
else {
else {
- rprintf
(FERROR,
-
"%s: connection unexpectedly closed "
-
"(%.0f bytes read so far)\n",
-
RSYNC_NAME, (double)stats.total_read);
-
- exit_cleanup
(RERR_STREAMIO);
+ rprintf(FERROR,
+ "%s: connection unexpectedly closed "
+ "(%.0f bytes read so far)\n",
+ RSYNC_NAME, (double)stats.total_read);
+
+ exit_cleanup(RERR_STREAMIO);
}
}
}
}
-static void die_from_readerr
(int err)
+static void die_from_readerr(int err)
{
/* this prevents us trying to write errors on a dead socket */
io_multiplexing_close();
{
/* this prevents us trying to write errors on a dead socket */
io_multiplexing_close();
-
+
rprintf(FERROR, "%s: read error: %s\n",
rprintf(FERROR, "%s: read error: %s\n",
- RSYNC_NAME, strerror
(err));
+ RSYNC_NAME, strerror(err));
exit_cleanup(RERR_STREAMIO);
}
exit_cleanup(RERR_STREAMIO);
}
@@
-213,7
+213,7
@@
static void die_from_readerr (int err)
* give a better explanation. We can tell whether the connection has
* started by looking e.g. at whether the remote version is known yet.
*/
* give a better explanation. We can tell whether the connection has
* started by looking e.g. at whether the remote version is known yet.
*/
-static int read_timeout
(int fd, char *buf, size_t len)
+static int read_timeout(int fd, char *buf, size_t len)
{
int n, ret=0;
{
int n, ret=0;
@@
-228,14
+228,14
@@
static int read_timeout (int fd, char *buf, size_t len)
FD_ZERO(&r_fds);
FD_SET(fd, &r_fds);
FD_ZERO(&r_fds);
FD_SET(fd, &r_fds);
- if (io_error_fd
!= -1
) {
+ if (io_error_fd
>= 0
) {
FD_SET(io_error_fd, &r_fds);
if (io_error_fd >= fd_count) fd_count = io_error_fd+1;
}
FD_SET(io_error_fd, &r_fds);
if (io_error_fd >= fd_count) fd_count = io_error_fd+1;
}
- if (io_filesfrom_f_out
!= -1
) {
+ if (io_filesfrom_f_out
>= 0
) {
int new_fd;
if (io_filesfrom_buflen == 0) {
int new_fd;
if (io_filesfrom_buflen == 0) {
- if (io_filesfrom_f_in
!= -1
) {
+ if (io_filesfrom_f_in
>= 0
) {
FD_SET(io_filesfrom_f_in, &r_fds);
new_fd = io_filesfrom_f_in;
} else {
FD_SET(io_filesfrom_f_in, &r_fds);
new_fd = io_filesfrom_f_in;
} else {
@@
-270,12
+270,11
@@
static int read_timeout (int fd, char *buf, size_t len)
continue;
}
continue;
}
-
- if (io_error_fd != -1 && FD_ISSET(io_error_fd, &r_fds)) {
+ if (io_error_fd >= 0 && FD_ISSET(io_error_fd, &r_fds)) {
read_error_fd();
}
read_error_fd();
}
- if (io_filesfrom_f_out
!= -1
) {
+ if (io_filesfrom_f_out
>= 0
) {
if (io_filesfrom_buflen) {
if (FD_ISSET(io_filesfrom_f_out, &w_fds)) {
int l = write(io_filesfrom_f_out,
if (io_filesfrom_buflen) {
if (FD_ISSET(io_filesfrom_f_out, &w_fds)) {
int l = write(io_filesfrom_f_out,
@@
-291,7
+290,7
@@
static int read_timeout (int fd, char *buf, size_t len)
io_filesfrom_f_out = -1;
}
}
io_filesfrom_f_out = -1;
}
}
- } else if (io_filesfrom_f_in
!= -1
) {
+ } else if (io_filesfrom_f_in
>= 0
) {
if (FD_ISSET(io_filesfrom_f_in, &r_fds)) {
int l = read(io_filesfrom_f_in,
io_filesfrom_buf,
if (FD_ISSET(io_filesfrom_f_in, &r_fds)) {
int l = read(io_filesfrom_f_in,
io_filesfrom_buf,
@@
-351,14
+350,13
@@
static int read_timeout (int fd, char *buf, size_t len)
last_io = time(NULL);
continue;
} else if (n == 0) {
last_io = time(NULL);
continue;
} else if (n == 0) {
- whine_about_eof
();
+ whine_about_eof();
return -1; /* doesn't return */
return -1; /* doesn't return */
- } else if (n
== -1
) {
+ } else if (n
< 0
) {
if (errno == EINTR || errno == EWOULDBLOCK ||
errno == EAGAIN)
continue;
if (errno == EINTR || errno == EWOULDBLOCK ||
errno == EAGAIN)
continue;
- else
- die_from_readerr (errno);
+ die_from_readerr(errno);
}
}
}
}
@@
-376,8
+374,7
@@
int read_filesfrom_line(int fd, char *fname)
extern int io_timeout;
extern int eol_nulls;
extern char *remote_filesfrom_file;
extern int io_timeout;
extern int eol_nulls;
extern char *remote_filesfrom_file;
- extern int am_server;
- int reading_remotely = remote_filesfrom_file || (am_server && fd == 0);
+ int reading_remotely = remote_filesfrom_file != NULL;
int nulls = eol_nulls || reading_remotely;
start:
int nulls = eol_nulls || reading_remotely;
start:
@@
-421,7
+418,7
@@
int read_filesfrom_line(int fd, char *fname)
* Continue trying to read len bytes - don't return until len has been
* read.
**/
* Continue trying to read len bytes - don't return until len has been
* read.
**/
-static void read_loop
(int fd, char *buf, size_t len)
+static void read_loop(int fd, char *buf, size_t len)
{
while (len) {
int n = read_timeout(fd, buf, len);
{
while (len) {
int n = read_timeout(fd, buf, len);
@@
-443,17
+440,31
@@
static int read_unbuffered(int fd, char *buf, size_t len)
static size_t remaining;
int tag, ret = 0;
char line[1024];
static size_t remaining;
int tag, ret = 0;
char line[1024];
+ static char *buffer;
+ static size_t bufferIdx = 0;
+ static size_t bufferSz;
- if (
!io_multiplexing_in ||
fd != multiplex_in_fd)
+ if (fd != multiplex_in_fd)
return read_timeout(fd, buf, len);
return read_timeout(fd, buf, len);
+ if (!io_multiplexing_in && remaining == 0) {
+ if (!buffer) {
+ bufferSz = 2 * IO_BUFFER_SIZE;
+ buffer = new_array(char, bufferSz);
+ if (!buffer) out_of_memory("read_unbuffered");
+ }
+ remaining = read_timeout(fd, buffer, bufferSz);
+ bufferIdx = 0;
+ }
+
while (ret == 0) {
if (remaining) {
len = MIN(len, remaining);
while (ret == 0) {
if (remaining) {
len = MIN(len, remaining);
- read_loop(fd, buf, len);
+ memcpy(buf, buffer + bufferIdx, len);
+ bufferIdx += len;
remaining -= len;
ret = len;
remaining -= len;
ret = len;
-
continue
;
+
break
;
}
read_loop(fd, line, 4);
}
read_loop(fd, line, 4);
@@
-462,8
+473,16
@@
static int read_unbuffered(int fd, char *buf, size_t len)
remaining = tag & 0xFFFFFF;
tag = tag >> 24;
remaining = tag & 0xFFFFFF;
tag = tag >> 24;
- if (tag == MPLEX_BASE)
+ if (tag == MPLEX_BASE) {
+ if (!buffer || remaining > bufferSz) {
+ buffer = realloc_array(buffer, char, remaining);
+ if (!buffer) out_of_memory("read_unbuffered");
+ bufferSz = remaining;
+ }
+ read_loop(fd, buffer, remaining);
+ bufferIdx = 0;
continue;
continue;
+ }
tag -= MPLEX_BASE;
tag -= MPLEX_BASE;
@@
-472,9
+491,9
@@
static int read_unbuffered(int fd, char *buf, size_t len)
exit_cleanup(RERR_STREAMIO);
}
exit_cleanup(RERR_STREAMIO);
}
- if (remaining > sizeof
(line)
- 1) {
- rprintf(FERROR, "multiplexing overflow %d\n\n",
- remaining);
+ if (remaining > sizeof
line
- 1) {
+ rprintf(FERROR, "multiplexing overflow %
l
d\n\n",
+
(long)
remaining);
exit_cleanup(RERR_STREAMIO);
}
exit_cleanup(RERR_STREAMIO);
}
@@
-485,6
+504,9
@@
static int read_unbuffered(int fd, char *buf, size_t len)
remaining = 0;
}
remaining = 0;
}
+ if (remaining == 0)
+ io_flush();
+
return ret;
}
return ret;
}
@@
-495,15
+517,13
@@
static int read_unbuffered(int fd, char *buf, size_t len)
* have been read. If all @p n can't be read then exit with an
* error.
**/
* have been read. If all @p n can't be read then exit with an
* error.
**/
-static void readfd
(int fd, char *buffer, size_t N)
+static void readfd(int fd, char *buffer, size_t N)
{
int ret;
size_t total=0;
{
int ret;
size_t total=0;
-
- while (total < N) {
- io_flush();
- ret = read_unbuffered (fd, buffer + total, N-total);
+ while (total < N) {
+ ret = read_unbuffered(fd, buffer + total, N-total);
total += ret;
}
total += ret;
}
@@
-550,14
+570,14
@@
void read_buf(int f,char *buf,size_t len)
void read_sbuf(int f,char *buf,size_t len)
{
void read_sbuf(int f,char *buf,size_t len)
{
- read_buf
(f,buf,len);
+ read_buf(f,buf,len);
buf[len] = 0;
}
unsigned char read_byte(int f)
{
unsigned char c;
buf[len] = 0;
}
unsigned char read_byte(int f)
{
unsigned char c;
- read_buf
(f, (char *)&c, 1);
+ read_buf(f, (char *)&c, 1);
return c;
}
return c;
}
@@
-580,7
+600,7
@@
static void sleep_for_bwlimit(int bytes_written)
assert(bytes_written > 0);
assert(bwlimit > 0);
assert(bytes_written > 0);
assert(bwlimit > 0);
-
+
tv.tv_usec = bytes_written * 1000 / bwlimit;
tv.tv_sec = tv.tv_usec / 1000000;
tv.tv_usec = tv.tv_usec % 1000000;
tv.tv_usec = bytes_written * 1000 / bwlimit;
tv.tv_sec = tv.tv_usec / 1000000;
tv.tv_usec = tv.tv_usec % 1000000;
@@
-611,7
+631,7
@@
static void writefd_unbuffered(int fd,char *buf,size_t len)
FD_SET(fd,&w_fds);
fd_count = fd;
FD_SET(fd,&w_fds);
fd_count = fd;
- if (io_error_fd
!= -1
) {
+ if (io_error_fd
>= 0
) {
FD_ZERO(&r_fds);
FD_SET(io_error_fd,&r_fds);
if (io_error_fd > fd_count)
FD_ZERO(&r_fds);
FD_SET(io_error_fd,&r_fds);
if (io_error_fd > fd_count)
@@
-624,7
+644,7
@@
static void writefd_unbuffered(int fd,char *buf,size_t len)
errno = 0;
count = select(fd_count+1,
errno = 0;
count = select(fd_count+1,
- io_error_fd
!= -1
?&r_fds:NULL,
+ io_error_fd
>= 0
?&r_fds:NULL,
&w_fds,NULL,
&tv);
&w_fds,NULL,
&tv);
@@
-639,7
+659,7
@@
static void writefd_unbuffered(int fd,char *buf,size_t len)
continue;
}
continue;
}
- if (io_error_fd
!= -1
&& FD_ISSET(io_error_fd, &r_fds)) {
+ if (io_error_fd
>= 0
&& FD_ISSET(io_error_fd, &r_fds)) {
read_error_fd();
}
read_error_fd();
}
@@
-648,14
+668,13
@@
static void writefd_unbuffered(int fd,char *buf,size_t len)
size_t n = len-total;
ret = write(fd,buf+total,n);
size_t n = len-total;
ret = write(fd,buf+total,n);
- if (ret == -1 && errno == EINTR) {
- continue;
- }
-
- if (ret == -1 &&
- (errno == EWOULDBLOCK || errno == EAGAIN)) {
- msleep(1);
- continue;
+ if (ret < 0) {
+ if (errno == EINTR)
+ continue;
+ if (errno == EWOULDBLOCK || errno == EAGAIN) {
+ msleep(1);
+ continue;
+ }
}
if (ret <= 0) {
}
if (ret <= 0) {
@@
-685,15
+704,20
@@
static void writefd_unbuffered(int fd,char *buf,size_t len)
static char *io_buffer;
static int io_buffer_count;
static char *io_buffer;
static int io_buffer_count;
-void io_start_buffering(int fd)
+void io_start_buffering
_out
(int fd)
{
if (io_buffer) return;
multiplex_out_fd = fd;
{
if (io_buffer) return;
multiplex_out_fd = fd;
- io_buffer =
(char *)malloc(
IO_BUFFER_SIZE);
+ io_buffer =
new_array(char,
IO_BUFFER_SIZE);
if (!io_buffer) out_of_memory("writefd");
io_buffer_count = 0;
}
if (!io_buffer) out_of_memory("writefd");
io_buffer_count = 0;
}
+void io_start_buffering_in(int fd)
+{
+ multiplex_in_fd = fd;
+}
+
/**
* Write an message to a multiplexed stream. If this fails then rsync
* exits.
/**
* Write an message to a multiplexed stream. If this fails then rsync
* exits.
@@
-705,8
+729,8
@@
static void mplex_write(int fd, enum logcode code, char *buf, size_t len)
SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
- if (n > (sizeof
(buffer)-
4)) {
- n = sizeof
(buffer)-
4;
+ if (n > (sizeof
buffer -
4)) {
+ n = sizeof
buffer -
4;
}
memcpy(&buffer[4], buf, n);
}
memcpy(&buffer[4], buf, n);
@@
-766,7
+790,7
@@
static void writefd(int fd,char *buf,size_t len)
len -= n;
io_buffer_count += n;
}
len -= n;
io_buffer_count += n;
}
-
+
if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
}
}
if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
}
}
@@
-868,9
+892,9
@@
void io_printf(int fd, const char *format, ...)
va_list ap;
char buf[1024];
int len;
va_list ap;
char buf[1024];
int len;
-
+
va_start(ap, format);
va_start(ap, format);
- len = vsnprintf(buf, sizeof
(buf)
, format, ap);
+ len = vsnprintf(buf, sizeof
buf
, format, ap);
va_end(ap);
if (len < 0) exit_cleanup(RERR_STREAMIO);
va_end(ap);
if (len < 0) exit_cleanup(RERR_STREAMIO);
@@
-884,7
+908,7
@@
void io_start_multiplex_out(int fd)
{
multiplex_out_fd = fd;
io_flush();
{
multiplex_out_fd = fd;
io_flush();
- io_start_buffering(fd);
+ io_start_buffering
_out
(fd);
io_multiplexing_out = 1;
}
io_multiplexing_out = 1;
}