extern struct stats stats;
-const char * const phase_unknown = "unknown";
+const char phase_unknown[] = "unknown";
/**
* The connection might be dropped at some point; perhaps because the
static int io_error_fd = -1;
+static int io_filesfrom_f_in = -1;
+static int io_filesfrom_f_out = -1;
+static char io_filesfrom_buf[2048];
+static char *io_filesfrom_bp;
+static char io_filesfrom_lastchar;
+static int io_filesfrom_buflen;
static void read_loop(int fd, char *buf, size_t len);
time_t t;
err_list_push();
-
+
if (!io_timeout) return;
if (!last_io) {
}
}
-/** Setup the fd used to propogate errors */
+/** Setup the fd used to propagate errors */
void io_set_error_fd(int fd)
{
io_error_fd = fd;
int fd = io_error_fd;
int tag, len;
- /* io_error_fd is temporarily disabled -- is this meant to
- * prevent indefinite recursion? */
+ /* io_error_fd is temporarily disabled -- is this meant to
+ * prevent indefinite recursion? */
io_error_fd = -1;
read_loop(fd, buf, 4);
while (len) {
n = len;
- if (n > (sizeof(buf)-1))
- n = sizeof(buf)-1;
+ if (n > (sizeof buf - 1))
+ n = sizeof buf - 1;
read_loop(fd, buf, n);
rwrite((enum logcode)tag, buf, n);
len -= n;
io_error_fd = fd;
}
+/**
+ * When we're the receiver and we have a local --files-from list of names
+ * that needs to be sent over the socket to the sender, we have to do two
+ * things at the same time: send the sender a list of what files we're
+ * processing and read the incoming file+info list from the sender. We do
+ * this by augmenting the read_timeout() function to copy this data. It
+ * uses the io_filesfrom_buf to read a block of data from f_in (when it is
+ * ready, since it might be a pipe) and then blast it out f_out (when it
+ * is ready to receive more data).
+ */
+void io_set_filesfrom_fds(int f_in, int f_out)
+{
+ io_filesfrom_f_in = f_in;
+ io_filesfrom_f_out = f_out;
+ io_filesfrom_bp = io_filesfrom_buf;
+ io_filesfrom_lastchar = '\0';
+ io_filesfrom_buflen = 0;
+}
/**
* It's almost always an error to get an EOF when we're trying to read
* program where that is a problem (start_socket_client),
* kludge_around_eof is True and we just exit.
*/
-static void whine_about_eof (void)
+static void whine_about_eof(void)
{
if (kludge_around_eof)
- exit_cleanup (0);
+ exit_cleanup(0);
else {
- rprintf (FERROR,
- "%s: connection unexpectedly closed "
- "(%.0f bytes read so far)\n",
- RSYNC_NAME, (double)stats.total_read);
-
- exit_cleanup (RERR_STREAMIO);
+ rprintf(FERROR,
+ "%s: connection unexpectedly closed "
+ "(%.0f bytes read so far)\n",
+ RSYNC_NAME, (double)stats.total_read);
+
+ exit_cleanup(RERR_STREAMIO);
}
}
-static void die_from_readerr (int err)
+static void die_from_readerr(int err)
{
/* this prevents us trying to write errors on a dead socket */
io_multiplexing_close();
-
+
rprintf(FERROR, "%s: read error: %s\n",
- RSYNC_NAME, strerror (err));
+ RSYNC_NAME, strerror(err));
exit_cleanup(RERR_STREAMIO);
}
* give a better explanation. We can tell whether the connection has
* started by looking e.g. at whether the remote version is known yet.
*/
-static int read_timeout (int fd, char *buf, size_t len)
+static int read_timeout(int fd, char *buf, size_t len)
{
int n, ret=0;
while (ret == 0) {
/* until we manage to read *something* */
- fd_set fds;
+ fd_set r_fds, w_fds;
struct timeval tv;
int fd_count = fd+1;
int count;
- FD_ZERO(&fds);
- FD_SET(fd, &fds);
+ FD_ZERO(&r_fds);
+ FD_SET(fd, &r_fds);
if (io_error_fd != -1) {
- FD_SET(io_error_fd, &fds);
- if (io_error_fd > fd) fd_count = io_error_fd+1;
+ FD_SET(io_error_fd, &r_fds);
+ if (io_error_fd >= fd_count) fd_count = io_error_fd+1;
+ }
+ if (io_filesfrom_f_out != -1) {
+ int new_fd;
+ if (io_filesfrom_buflen == 0) {
+ if (io_filesfrom_f_in != -1) {
+ FD_SET(io_filesfrom_f_in, &r_fds);
+ new_fd = io_filesfrom_f_in;
+ } else {
+ io_filesfrom_f_out = -1;
+ new_fd = -1;
+ }
+ } else {
+ FD_ZERO(&w_fds);
+ FD_SET(io_filesfrom_f_out, &w_fds);
+ new_fd = io_filesfrom_f_out;
+ }
+ if (new_fd >= fd_count) fd_count = new_fd+1;
}
tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
errno = 0;
- count = select(fd_count, &fds, NULL, NULL, &tv);
+ count = select(fd_count, &r_fds,
+ io_filesfrom_buflen? &w_fds : NULL,
+ NULL, &tv);
if (count == 0) {
check_timeout();
continue;
}
- if (io_error_fd != -1 && FD_ISSET(io_error_fd, &fds)) {
+ if (io_error_fd != -1 && FD_ISSET(io_error_fd, &r_fds)) {
read_error_fd();
}
- if (!FD_ISSET(fd, &fds)) continue;
+ if (io_filesfrom_f_out != -1) {
+ if (io_filesfrom_buflen) {
+ if (FD_ISSET(io_filesfrom_f_out, &w_fds)) {
+ int l = write(io_filesfrom_f_out,
+ io_filesfrom_bp,
+ io_filesfrom_buflen);
+ if (l > 0) {
+ if (!(io_filesfrom_buflen -= l))
+ io_filesfrom_bp = io_filesfrom_buf;
+ else
+ io_filesfrom_bp += l;
+ } else {
+ /* XXX should we complain? */
+ io_filesfrom_f_out = -1;
+ }
+ }
+ } else if (io_filesfrom_f_in != -1) {
+ if (FD_ISSET(io_filesfrom_f_in, &r_fds)) {
+ int l = read(io_filesfrom_f_in,
+ io_filesfrom_buf,
+ sizeof io_filesfrom_buf);
+ if (l <= 0) {
+ /* Send end-of-file marker */
+ io_filesfrom_buf[0] = '\0';
+ io_filesfrom_buf[1] = '\0';
+ io_filesfrom_buflen = io_filesfrom_lastchar? 2 : 1;
+ io_filesfrom_f_in = -1;
+ } else {
+ extern int eol_nulls;
+ if (!eol_nulls) {
+ char *s = io_filesfrom_buf + l;
+ /* Transform CR and/or LF into '\0' */
+ while (s-- > io_filesfrom_buf) {
+ if (*s == '\n' || *s == '\r')
+ *s = '\0';
+ }
+ }
+ if (!io_filesfrom_lastchar) {
+ /* Last buf ended with a '\0', so don't
+ * let this buf start with one. */
+ while (l && !*io_filesfrom_bp)
+ io_filesfrom_bp++, l--;
+ }
+ if (!l)
+ io_filesfrom_bp = io_filesfrom_buf;
+ else {
+ char *f = io_filesfrom_bp;
+ char *t = f;
+ char *eob = f + l;
+ /* Eliminate any multi-'\0' runs. */
+ while (f != eob) {
+ if (!(*t++ = *f++)) {
+ while (f != eob && !*f)
+ f++, l--;
+ }
+ }
+ io_filesfrom_lastchar = f[-1];
+ }
+ io_filesfrom_buflen = l;
+ }
+ }
+ }
+ }
+
+ if (!FD_ISSET(fd, &r_fds)) continue;
n = read(fd, buf, len);
last_io = time(NULL);
continue;
} else if (n == 0) {
- whine_about_eof ();
+ whine_about_eof();
return -1; /* doesn't return */
} else if (n == -1) {
if (errno == EINTR || errno == EWOULDBLOCK ||
errno == EAGAIN)
continue;
- else
- die_from_readerr (errno);
+ die_from_readerr(errno);
}
}
return ret;
}
+/**
+ * Read a line into the "fname" buffer (which must be at least MAXPATHLEN
+ * characters long).
+ */
+int read_filesfrom_line(int fd, char *fname)
+{
+ char ch, *s, *eob = fname + MAXPATHLEN - 1;
+ int cnt;
+ extern int io_timeout;
+ extern int eol_nulls;
+ extern char *remote_filesfrom_file;
+ int reading_remotely = remote_filesfrom_file != NULL;
+ int nulls = eol_nulls || reading_remotely;
+
+ start:
+ s = fname;
+ while (1) {
+ cnt = read(fd, &ch, 1);
+ if (cnt < 0 && (errno == EWOULDBLOCK
+ || errno == EINTR || errno == EAGAIN)) {
+ struct timeval tv;
+ fd_set fds;
+ FD_ZERO(&fds);
+ FD_SET(fd, &fds);
+ tv.tv_sec = io_timeout? io_timeout : SELECT_TIMEOUT;
+ tv.tv_usec = 0;
+ if (!select(fd+1, &fds, NULL, NULL, &tv))
+ check_timeout();
+ continue;
+ }
+ if (cnt != 1)
+ break;
+ if (nulls? !ch : (ch == '\r' || ch == '\n')) {
+ /* Skip empty lines if reading locally. */
+ if (!reading_remotely && s == fname)
+ continue;
+ break;
+ }
+ if (s < eob)
+ *s++ = ch;
+ }
+ *s = '\0';
+
+ /* Dump comments. */
+ if (*fname == '#' || *fname == ';')
+ goto start;
+ return s - fname;
+}
/**
* Continue trying to read len bytes - don't return until len has been
* read.
**/
-static void read_loop (int fd, char *buf, size_t len)
+static void read_loop(int fd, char *buf, size_t len)
{
while (len) {
int n = read_timeout(fd, buf, len);
exit_cleanup(RERR_STREAMIO);
}
- if (remaining > sizeof(line) - 1) {
- rprintf(FERROR, "multiplexing overflow %d\n\n",
- remaining);
+ if (remaining > sizeof line - 1) {
+ rprintf(FERROR, "multiplexing overflow %ld\n\n",
+ (long)remaining);
exit_cleanup(RERR_STREAMIO);
}
* have been read. If all @p n can't be read then exit with an
* error.
**/
-static void readfd (int fd, char *buffer, size_t N)
+static void readfd(int fd, char *buffer, size_t N)
{
int ret;
size_t total=0;
-
+
while (total < N) {
io_flush();
- ret = read_unbuffered (fd, buffer + total, N-total);
+ ret = read_unbuffered(fd, buffer + total, N-total);
total += ret;
}
int64 read_longint(int f)
{
- extern int remote_version;
int64 ret;
char b[8];
ret = read_int(f);
rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
exit_cleanup(RERR_UNSUPPORTED);
#else
- if (remote_version >= 16) {
- readfd(f,b,8);
- ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
- }
+ readfd(f,b,8);
+ ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
#endif
return ret;
void read_sbuf(int f,char *buf,size_t len)
{
- read_buf (f,buf,len);
+ read_buf(f,buf,len);
buf[len] = 0;
}
unsigned char read_byte(int f)
{
unsigned char c;
- read_buf (f, (char *)&c, 1);
+ read_buf(f, (char *)&c, 1);
return c;
}
assert(bytes_written > 0);
assert(bwlimit > 0);
-
+
tv.tv_usec = bytes_written * 1000 / bwlimit;
tv.tv_sec = tv.tv_usec / 1000000;
tv.tv_usec = tv.tv_usec % 1000000;
while (total < len) {
FD_ZERO(&w_fds);
- FD_ZERO(&r_fds);
FD_SET(fd,&w_fds);
fd_count = fd;
if (io_error_fd != -1) {
+ FD_ZERO(&r_fds);
FD_SET(io_error_fd,&r_fds);
if (io_error_fd > fd_count)
fd_count = io_error_fd;
{
if (io_buffer) return;
multiplex_out_fd = fd;
- io_buffer = (char *)malloc(IO_BUFFER_SIZE);
+ io_buffer = new_array(char, IO_BUFFER_SIZE);
if (!io_buffer) out_of_memory("writefd");
io_buffer_count = 0;
}
SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
- if (n > (sizeof(buffer)-4)) {
- n = sizeof(buffer)-4;
+ if (n > (sizeof buffer - 4)) {
+ n = sizeof buffer - 4;
}
memcpy(&buffer[4], buf, n);
len -= n;
io_buffer_count += n;
}
-
+
if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
}
}
*/
void write_longint(int f, int64 x)
{
- extern int remote_version;
char b[8];
- if (remote_version < 16 || x <= 0x7FFFFFFF) {
+ if (x <= 0x7FFFFFFF) {
write_int(f, (int)x);
return;
}
+#ifdef NO_INT64
+ rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
+ exit_cleanup(RERR_UNSUPPORTED);
+#else
write_int(f, (int32)0xFFFFFFFF);
SIVAL(b,0,(x&0xFFFFFFFF));
SIVAL(b,4,((x>>32)&0xFFFFFFFF));
writefd(f,b,8);
+#endif
}
void write_buf(int f,char *buf,size_t len)
va_list ap;
char buf[1024];
int len;
-
+
va_start(ap, format);
- len = vsnprintf(buf, sizeof(buf), format, ap);
+ len = vsnprintf(buf, sizeof buf, format, ap);
va_end(ap);
if (len < 0) exit_cleanup(RERR_STREAMIO);