X-Git-Url: https://mattmccutchen.net/rsync/rsync.git/blobdiff_plain/37f35d89d11d5ca8beadcb946dcd2e545d58b171..ac669e8b922c7ace230294f9bf9a3a2bdfbd19d2:/io.c diff --git a/io.c b/io.c index 07909fe4..dd99913d 100644 --- a/io.c +++ b/io.c @@ -300,6 +300,13 @@ static void read_msg_fd(void) if (preserve_hard_links) flist_ndx_push(&hlink_list, IVAL(buf,0)); break; + case MSG_SOCKERR: + if (!am_generator) { + rprintf(FERROR, "invalid message %d:%d\n", tag, len); + exit_cleanup(RERR_STREAMIO); + } + close_multiplexing_out(); + /* FALL THROUGH */ case MSG_INFO: case MSG_ERROR: case MSG_LOG: @@ -437,7 +444,6 @@ static void whine_about_eof(int fd) exit_cleanup(RERR_STREAMIO); } - /** * Read from a socket with I/O timeout. return the number of bytes * read. If no bytes can be read then exit, never return a number <= 0. @@ -451,11 +457,11 @@ static void whine_about_eof(int fd) */ static int read_timeout(int fd, char *buf, size_t len) { - int n, ret = 0; + int n, cnt = 0; io_flush(NORMAL_FLUSH); - while (ret == 0) { + while (cnt == 0) { /* until we manage to read *something* */ fd_set r_fds, w_fds; struct timeval tv; @@ -581,21 +587,23 @@ static int read_timeout(int fd, char *buf, size_t len) continue; /* Don't write errors on a dead socket. */ - if (fd == sock_f_in) + if (fd == sock_f_in) { close_multiplexing_out(); - rsyserr(FERROR, errno, "read error"); + rsyserr(FSOCKERR, errno, "read error"); + } else + rsyserr(FERROR, errno, "read error"); exit_cleanup(RERR_STREAMIO); } buf += n; len -= n; - ret += n; + cnt += n; if (fd == sock_f_in && io_timeout) last_io_in = time(NULL); } - return ret; + return cnt; } /** @@ -645,7 +653,6 @@ int read_filesfrom_line(int fd, char *fname) return s - fname; } - static char *iobuf_out; static int iobuf_out_cnt; @@ -658,7 +665,6 @@ void io_start_buffering_out(void) iobuf_out_cnt = 0; } - static char *iobuf_in; static size_t iobuf_in_siz; @@ -671,7 +677,6 @@ void io_start_buffering_in(void) out_of_memory("io_start_buffering_in"); } - void io_end_buffering(void) { io_flush(NORMAL_FLUSH); @@ -681,14 +686,12 @@ void io_end_buffering(void) } } - void maybe_flush_socket(void) { if (iobuf_out && iobuf_out_cnt && time(NULL) - last_io_out >= 5) io_flush(NORMAL_FLUSH); } - void maybe_send_keepalive(void) { if (time(NULL) - last_io_out >= allowed_lull) { @@ -703,7 +706,6 @@ void maybe_send_keepalive(void) } } - /** * Continue trying to read len bytes - don't return until len has been * read. @@ -718,7 +720,6 @@ static void read_loop(int fd, char *buf, size_t len) } } - /** * Read from the file descriptor handling multiplexing - return number * of bytes read. @@ -730,7 +731,7 @@ static int readfd_unbuffered(int fd, char *buf, size_t len) static size_t remaining; static size_t iobuf_in_ndx; size_t msg_bytes; - int tag, ret = 0; + int tag, cnt = 0; char line[BIGPATHBUFLEN]; if (!iobuf_in || fd != sock_f_in) @@ -741,13 +742,13 @@ static int readfd_unbuffered(int fd, char *buf, size_t len) iobuf_in_ndx = 0; } - while (ret == 0) { + while (cnt == 0) { if (remaining) { len = MIN(len, remaining); memcpy(buf, iobuf_in + iobuf_in_ndx, len); iobuf_in_ndx += len; remaining -= len; - ret = len; + cnt = len; break; } @@ -773,12 +774,13 @@ static int readfd_unbuffered(int fd, char *buf, size_t len) if (msg_bytes >= sizeof line) goto overflow; read_loop(fd, line, msg_bytes); - line[msg_bytes] = '\0'; /* A directory name was sent with the trailing null */ if (msg_bytes > 0 && !line[msg_bytes-1]) log_delete(line, S_IFDIR); - else + else { + line[msg_bytes] = '\0'; log_delete(line, S_IFREG); + } break; case MSG_SUCCESS: if (msg_bytes != 4) { @@ -811,11 +813,9 @@ static int readfd_unbuffered(int fd, char *buf, size_t len) if (remaining == 0) io_flush(NORMAL_FLUSH); - return ret; + return cnt; } - - /** * Do a buffered read from @p fd. Don't return until all @p n bytes * have been read. If all @p n can't be read then exit with an @@ -823,12 +823,12 @@ static int readfd_unbuffered(int fd, char *buf, size_t len) **/ static void readfd(int fd, char *buffer, size_t N) { - int ret; + int cnt; size_t total = 0; while (total < N) { - ret = readfd_unbuffered(fd, buffer + total, N-total); - total += ret; + cnt = readfd_unbuffered(fd, buffer + total, N-total); + total += cnt; } if (fd == write_batch_monitor_in) { @@ -840,7 +840,6 @@ static void readfd(int fd, char *buffer, size_t N) stats.total_read += total; } - int read_shortint(int f) { uchar b[2]; @@ -848,37 +847,36 @@ int read_shortint(int f) return (b[1] << 8) + b[0]; } - int32 read_int(int f) { char b[4]; - int32 ret; + int32 num; readfd(f,b,4); - ret = IVAL(b,0); - if (ret == (int32)0xffffffff) + num = IVAL(b,0); + if (num == (int32)0xffffffff) return -1; - return ret; + return num; } int64 read_longint(int f) { - int64 ret; + int64 num; char b[8]; - ret = read_int(f); + num = read_int(f); - if ((int32)ret != (int32)0xffffffff) - return ret; + if ((int32)num != (int32)0xffffffff) + return num; #if SIZEOF_INT64 < 8 rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n"); exit_cleanup(RERR_UNSUPPORTED); #else readfd(f,b,8); - ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32); + num = IVAL(b,0) | (((int64)IVAL(b,4))<<32); #endif - return ret; + return num; } void read_buf(int f,char *buf,size_t len) @@ -960,7 +958,6 @@ void write_sum_head(int f, struct sum_struct *sum) write_int(f, sum->remainder); } - /** * Sleep after writing to limit I/O bandwidth usage. * @@ -1016,7 +1013,6 @@ static void sleep_for_bwlimit(int bytes_written) total_written = (sleep_usec - elapsed_usec) * bwlimit / (ONE_SEC/1024); } - /* Write len bytes to the file descriptor fd, looping as necessary to get * the job done and also (in certain circumstances) reading any data on * msg_fd_in to avoid deadlock. @@ -1027,7 +1023,7 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) { size_t n, total = 0; fd_set w_fds, r_fds; - int maxfd, count, ret, using_r_fds; + int maxfd, count, cnt, using_r_fds; struct timeval tv; no_flush++; @@ -1069,10 +1065,10 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) n = len - total; if (bwlimit && n > bwlimit_writemax) n = bwlimit_writemax; - ret = write(fd, buf + total, n); + cnt = write(fd, buf + total, n); - if (ret <= 0) { - if (ret < 0) { + if (cnt <= 0) { + if (cnt < 0) { if (errno == EINTR) continue; if (errno == EWOULDBLOCK || errno == EAGAIN) { @@ -1098,26 +1094,25 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) exit_cleanup(RERR_STREAMIO); } - total += ret; + total += cnt; if (fd == sock_f_out) { if (io_timeout || am_generator) last_io_out = time(NULL); - sleep_for_bwlimit(ret); + sleep_for_bwlimit(cnt); } } no_flush--; } - /** * Write an message to a multiplexed stream. If this fails then rsync * exits. **/ static void mplex_write(enum msgcode code, char *buf, size_t len) { - char buffer[BIGPATHBUFLEN]; + char buffer[1024]; size_t n = len; SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len); @@ -1130,9 +1125,10 @@ static void mplex_write(enum msgcode code, char *buf, size_t len) contiguous_write_len = len + 4; if (n > sizeof buffer - 4) - n = sizeof buffer - 4; + n = 0; + else + memcpy(buffer + 4, buf, n); - memcpy(&buffer[4], buf, n); writefd_unbuffered(sock_f_out, buffer, n+4); len -= n; @@ -1145,7 +1141,6 @@ static void mplex_write(enum msgcode code, char *buf, size_t len) contiguous_write_len = 0; } - void io_flush(int flush_it_all) { msg_list_flush(flush_it_all); @@ -1160,7 +1155,6 @@ void io_flush(int flush_it_all) iobuf_out_cnt = 0; } - static void writefd(int fd,char *buf,size_t len) { if (fd == msg_fd_out) { @@ -1195,7 +1189,6 @@ static void writefd(int fd,char *buf,size_t len) } } - void write_shortint(int f, int x) { uchar b[2]; @@ -1204,7 +1197,6 @@ void write_shortint(int f, int x) writefd(f, (char *)b, 2); } - void write_int(int f,int32 x) { char b[4]; @@ -1212,7 +1204,6 @@ void write_int(int f,int32 x) writefd(f,b,4); } - void write_int_named(int f, int32 x, const char *phase) { io_write_phase = phase; @@ -1220,7 +1211,6 @@ void write_int_named(int f, int32 x, const char *phase) io_write_phase = phase_unknown; } - /* * Note: int64 may actually be a 32-bit type if ./configure couldn't find any * 64-bit types on this platform. @@ -1282,7 +1272,6 @@ void write_vstring(int f, char *str, int len) writefd(f, str, len); } - /** * Read a line of up to @p maxlen characters into @p buf (not counting * the trailing null). Strips the (required) trailing newline and all @@ -1308,7 +1297,6 @@ int read_line(int f, char *buf, size_t maxlen) return maxlen > 0; } - void io_printf(int fd, const char *format, ...) { va_list ap; @@ -1330,7 +1318,6 @@ void io_printf(int fd, const char *format, ...) write_sbuf(fd, buf); } - /** Setup for multiplexing a MSG_* stream with the data stream. */ void io_start_multiplex_out(void) {