X-Git-Url: https://mattmccutchen.net/rsync/rsync.git/blobdiff_plain/229e1950eda7f8b05f80b6788851fcdb93de7728..112d728f4858fef5c75e0b25bf6f02af8aa5b83c:/io.c diff --git a/io.c b/io.c index e235d3e5..05a755fb 100644 --- a/io.c +++ b/io.c @@ -46,7 +46,7 @@ extern int read_batch; extern int csum_length; extern int checksum_seed; extern int protocol_version; -extern int remove_sent_files; +extern int remove_source_files; extern int preserve_hard_links; extern char *filesfrom_host; extern struct stats stats; @@ -85,6 +85,25 @@ static int select_timeout = SELECT_TIMEOUT; static int active_filecnt = 0; static OFF_T active_bytecnt = 0; +static char int_byte_cnt[256] = { + 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, /* 00 - 0F */ + 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, /* 10 - 1F */ + 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, /* 20 - 2F */ + 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, /* 30 - 3F */ + 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, /* 40 - 4F */ + 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, /* 50 - 5F */ + 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, /* 60 - 6F */ + 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, /* 70 - 7F */ + 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, /* 80 - 8F */ + 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, /* 90 - 9F */ + 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, /* A0 - AF */ + 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, /* B0 - BF */ + 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, /* C0 - CF */ + 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, /* D0 - DF */ + 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, /* E0 - EF */ + 7, 7, 7, 7, 7, 7, 7, 7, 8, 8, 8, 8, 9, 9, 9, 9, /* F0 - FF */ +}; + static void read_loop(int fd, char *buf, size_t len); struct flist_ndx_item { @@ -204,7 +223,7 @@ void set_msg_fd_out(int fd) } /* Add a message to the pending MSG_* list. */ -static void msg_list_add(struct msg_list *lst, int code, char *buf, int len) +static void msg_list_add(struct msg_list *lst, int code, const char *buf, int len) { struct msg_list_item *m; int sz = len + 4 + sizeof m[0] - 1; @@ -257,7 +276,7 @@ static void read_msg_fd(void) exit_cleanup(RERR_STREAMIO); } read_loop(fd, buf, 4); - if (remove_sent_files) + if (remove_source_files) decrement_active_files(IVAL(buf,0)); flist_ndx_push(&redo_list, IVAL(buf,0)); break; @@ -267,10 +286,7 @@ static void read_msg_fd(void) exit_cleanup(RERR_STREAMIO); } read_loop(fd, buf, len); - if (defer_forwarding_messages) - msg_list_add(&msg2sndr, MSG_DELETED, buf, len); - else - io_multiplex_write(MSG_DELETED, buf, len); + send_msg(MSG_DELETED, buf, len); break; case MSG_SUCCESS: if (len != 4 || !am_generator) { @@ -278,12 +294,9 @@ static void read_msg_fd(void) exit_cleanup(RERR_STREAMIO); } read_loop(fd, buf, len); - if (remove_sent_files) { + if (remove_source_files) { decrement_active_files(IVAL(buf,0)); - if (defer_forwarding_messages) - msg_list_add(&msg2sndr, MSG_SUCCESS, buf, len); - else - io_multiplex_write(MSG_SUCCESS, buf, len); + send_msg(MSG_SUCCESS, buf, len); } if (preserve_hard_links) flist_ndx_push(&hlink_list, IVAL(buf,0)); @@ -294,7 +307,6 @@ static void read_msg_fd(void) exit_cleanup(RERR_STREAMIO); } close_multiplexing_out(); - defer_forwarding_messages = 0; /* FALL THROUGH */ case MSG_INFO: case MSG_ERROR: @@ -304,11 +316,7 @@ static void read_msg_fd(void) if (n >= sizeof buf) n = sizeof buf - 1; read_loop(fd, buf, n); - if (am_generator && am_server - && defer_forwarding_messages && tag != MSG_LOG) - msg_list_add(&msg2sndr, tag, buf, n); - else - rwrite((enum logcode)tag, buf, n); + rwrite((enum logcode)tag, buf, n); len -= n; } break; @@ -322,7 +330,7 @@ static void read_msg_fd(void) } /* This is used by the generator to limit how many file transfers can - * be active at once when --remove-sent-files is specified. Without + * be active at once when --remove-source-files is specified. Without * this, sender-side deletions were mostly happening at the end. */ void increment_active_files(int ndx, int itemizing, enum logcode code) { @@ -334,13 +342,13 @@ void increment_active_files(int ndx, int itemizing, enum logcode code) } active_filecnt++; - active_bytecnt += the_file_list->files[ndx]->length; + active_bytecnt += F_LENGTH(the_file_list->files[ndx]); } void decrement_active_files(int ndx) { active_filecnt--; - active_bytecnt -= the_file_list->files[ndx]->length; + active_bytecnt -= F_LENGTH(the_file_list->files[ndx]); } /* Try to push messages off the list onto the wire. If we leave with more @@ -382,14 +390,26 @@ static int msg2genr_flush(int flush_it_all) return 1; } -void send_msg(enum msgcode code, char *buf, int len) +int send_msg(enum msgcode code, const char *buf, int len) { if (msg_fd_out < 0) { - io_multiplex_write(code, buf, len); - return; + if (!defer_forwarding_messages) + return io_multiplex_write(code, buf, len); + if (!io_multiplexing_out) + return 0; + msg_list_add(&msg2sndr, code, buf, len); + return 1; } msg_list_add(&msg2genr, code, buf, len); msg2genr_flush(NORMAL_FLUSH); + return 1; +} + +void send_msg_int(enum msgcode code, int num) +{ + char numbuf[4]; + SIVAL(numbuf, 0, num); + send_msg(code, numbuf, 4); } int get_redo_num(int itemizing, enum logcode code) @@ -639,13 +659,19 @@ int read_filesfrom_line(int fd, char *fname) if (cnt < 0 && (errno == EWOULDBLOCK || errno == EINTR || errno == EAGAIN)) { struct timeval tv; - fd_set fds; - FD_ZERO(&fds); - FD_SET(fd, &fds); + fd_set r_fds, e_fds; + FD_ZERO(&r_fds); + FD_SET(fd, &r_fds); + FD_ZERO(&e_fds); + FD_SET(fd, &e_fds); tv.tv_sec = select_timeout; tv.tv_usec = 0; - if (!select(fd+1, &fds, NULL, NULL, &tv)) + if (!select(fd+1, &r_fds, NULL, &e_fds, &tv)) check_timeout(); + if (FD_ISSET(fd, &e_fds)) { + rsyserr(FINFO, errno, + "select exception on fd %d", fd); + } continue; } if (cnt != 1) @@ -855,11 +881,11 @@ static void readfd(int fd, char *buffer, size_t N) stats.total_read += total; } -int read_shortint(int f) +unsigned short read_shortint(int f) { - uchar b[2]; - readfd(f, (char *)b, 2); - return (b[1] << 8) + b[0]; + char b[2]; + readfd(f, b, 2); + return (UVAL(b, 1) << 8) + UVAL(b, 0); } int32 read_int(int f) @@ -867,39 +893,83 @@ int32 read_int(int f) char b[4]; int32 num; - readfd(f,b,4); - num = IVAL(b,0); - if (num == (int32)0xffffffff) - return -1; + readfd(f, b, 4); + num = IVAL(b, 0); +#if SIZEOF_INT32 > 4 + if (num & (int32)0x80000000) + num |= ~(int32)0xffffffff; +#endif return num; } int64 read_longint(int f) { int64 num; - char b[8]; - num = read_int(f); + char b[9]; + + if (protocol_version < 30) { + num = read_int(f); - if ((int32)num != (int32)0xffffffff) - return num; + if ((int32)num != (int32)0xffffffff) + return num; #if SIZEOF_INT64 < 8 - rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n"); - exit_cleanup(RERR_UNSUPPORTED); + rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n"); + exit_cleanup(RERR_UNSUPPORTED); #else - readfd(f,b,8); - num = IVAL(b,0) | (((int64)IVAL(b,4))<<32); + readfd(f, b, 8); + num = IVAL(b,0) | (((int64)IVAL(b,4))<<32); +#endif + } else { + int cnt; + readfd(f, b, 3); + cnt = int_byte_cnt[CVAL(b, 0)]; +#if SIZEOF_INT64 < 8 + if (cnt > 5 || (cnt == 5 && (CVAL(b,0)&0x3F || CVAL(b,1)&0x80))) { + rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n"); + exit_cleanup(RERR_UNSUPPORTED); + } #endif + if (cnt > 3) + readfd(f, b + 3, cnt - 3); + switch (cnt) { + case 3: + num = NVAL3(b, 0); + break; + case 4: + num = NVAL4(b, 0x80); + break; + case 5: + num = NVAL5(b, 0xC0); + break; +#if SIZEOF_INT64 >= 8 + case 6: + num = NVAL6(b, 0xE0); + break; + case 7: + num = NVAL7(b, 0xF0); + break; + case 8: + num = NVAL8(b, 0xF8); + break; + case 9: + num = NVAL8(b+1, 0); + break; +#endif + default: + exit_cleanup(RERR_PROTOCOL); /* impossible... */ + } + } return num; } -void read_buf(int f,char *buf,size_t len) +void read_buf(int f, char *buf, size_t len) { readfd(f,buf,len); } -void read_sbuf(int f,char *buf,size_t len) +void read_sbuf(int f, char *buf, size_t len) { readfd(f, buf, len); buf[len] = '\0'; @@ -1039,10 +1109,10 @@ static void sleep_for_bwlimit(int bytes_written) * * This function underlies the multiplexing system. The body of the * application never calls this function directly. */ -static void writefd_unbuffered(int fd,char *buf,size_t len) +static void writefd_unbuffered(int fd, const char *buf, size_t len) { size_t n, total = 0; - fd_set w_fds, r_fds; + fd_set w_fds, r_fds, e_fds; int maxfd, count, cnt, using_r_fds; int defer_save = defer_forwarding_messages; struct timeval tv; @@ -1051,12 +1121,14 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) while (total < len) { FD_ZERO(&w_fds); - FD_SET(fd,&w_fds); + FD_SET(fd, &w_fds); + FD_ZERO(&e_fds); + FD_SET(fd, &e_fds); maxfd = fd; if (msg_fd_in >= 0) { FD_ZERO(&r_fds); - FD_SET(msg_fd_in,&r_fds); + FD_SET(msg_fd_in, &r_fds); if (msg_fd_in > maxfd) maxfd = msg_fd_in; using_r_fds = 1; @@ -1068,7 +1140,7 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) errno = 0; count = select(maxfd + 1, using_r_fds ? &r_fds : NULL, - &w_fds, NULL, &tv); + &w_fds, &e_fds, &tv); if (count <= 0) { if (count < 0 && errno == EBADF) @@ -1077,6 +1149,11 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) continue; } + if (FD_ISSET(fd, &e_fds)) { + rsyserr(FINFO, errno, + "select exception on fd %d", fd); + } + if (using_r_fds && FD_ISSET(msg_fd_in, &r_fds)) read_msg_fd(); @@ -1136,20 +1213,11 @@ static void msg2sndr_flush(void) while (msg2sndr.head && io_multiplexing_out) { struct msg_list_item *m = msg2sndr.head; - int tag = (IVAL(m->buf, 0) >> 24) - MPLEX_BASE; if (!(msg2sndr.head = m->next)) msg2sndr.tail = NULL; + stats.total_written += m->len; defer_forwarding_messages = 1; - switch (tag) { - case MSG_INFO: - case MSG_ERROR: - rwrite((enum logcode)tag, m->buf + 4, m->len - 4); - break; - default: - stats.total_written += m->len; - writefd_unbuffered(sock_f_out, m->buf, m->len); - break; - } + writefd_unbuffered(sock_f_out, m->buf, m->len); defer_forwarding_messages = 0; free(m); } @@ -1159,7 +1227,7 @@ static void msg2sndr_flush(void) * Write an message to a multiplexed stream. If this fails then rsync * exits. **/ -static void mplex_write(enum msgcode code, char *buf, size_t len) +static void mplex_write(enum msgcode code, const char *buf, size_t len) { char buffer[1024]; size_t n = len; @@ -1199,7 +1267,7 @@ void io_flush(int flush_it_all) iobuf_out_cnt = 0; } -static void writefd(int fd,char *buf,size_t len) +static void writefd(int fd, const char *buf, size_t len) { if (fd == msg_fd_out) { rprintf(FERROR, "Internal error: wrong write used in receiver.\n"); @@ -1233,19 +1301,19 @@ static void writefd(int fd,char *buf,size_t len) } } -void write_shortint(int f, int x) +void write_shortint(int f, unsigned short x) { - uchar b[2]; - b[0] = x; - b[1] = x >> 8; - writefd(f, (char *)b, 2); + char b[2]; + b[0] = (char)x; + b[1] = (char)(x >> 8); + writefd(f, b, 2); } -void write_int(int f,int32 x) +void write_int(int f, int32 x) { char b[4]; - SIVAL(b,0,x); - writefd(f,b,4); + SIVAL(b, 0, x); + writefd(f, b, 4); } /* @@ -1254,32 +1322,110 @@ void write_int(int f,int32 x) */ void write_longint(int f, int64 x) { - char b[8]; + char b[12]; - if (x <= 0x7FFFFFFF) { - write_int(f, (int)x); - return; +#if SIZEOF_INT64 < 8 + if (x < 0 || x > 0x7FFFFFFF) { + rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n"); + exit_cleanup(RERR_UNSUPPORTED); } +#endif + if (protocol_version < 30) { + char * const s = b+4; + SIVAL(s, 0, x); #if SIZEOF_INT64 < 8 - rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n"); - exit_cleanup(RERR_UNSUPPORTED); + writefd(f, s, 4); #else - write_int(f, (int32)0xFFFFFFFF); - SIVAL(b,0,(x&0xFFFFFFFF)); - SIVAL(b,4,((x>>32)&0xFFFFFFFF)); + if (x <= 0x7FFFFFFF && x >= 0) { + writefd(f, s, 4); + return; + } - writefd(f,b,8); + memset(b, 0xFF, 4); + SIVAL(s, 4, x >> 32); + writefd(f, b, 12); + } else if (x < 0) { + goto all_bits; +#endif + } else if (x < ((int32)1<<(3*8-1))) { + b[0] = (char)(x >> 16); + b[1] = (char)(x >> 8); + b[2] = (char)x; + writefd(f, b, 3); + } else if (x < ((int64)1<<(4*8-2))) { + b[0] = (char)((x >> 24) | 0x80); + b[1] = (char)(x >> 16); + b[2] = (char)(x >> 8); + b[3] = (char)x; + writefd(f, b, 4); +#if SIZEOF_INT64 < 8 + } else { + b[0] = 0xC0; + b[1] = (char)(x >> 24); + b[2] = (char)(x >> 16); + b[3] = (char)(x >> 8); + b[4] = (char)x; + writefd(f, b, 5); + } +#else + } else if (x < ((int64)1<<(5*8-3))) { + b[0] = (char)((x >> 32) | 0xC0); + b[1] = (char)(x >> 24); + b[2] = (char)(x >> 16); + b[3] = (char)(x >> 8); + b[4] = (char)x; + writefd(f, b, 5); + } else if (x < ((int64)1<<(6*8-4))) { + b[0] = (char)((x >> 40) | 0xE0); + b[1] = (char)(x >> 32); + b[2] = (char)(x >> 24); + b[3] = (char)(x >> 16); + b[4] = (char)(x >> 8); + b[5] = (char)x; + writefd(f, b, 6); + } else if (x < ((int64)1<<(7*8-5))) { + b[0] = (char)((x >> 48) | 0xF0); + b[1] = (char)(x >> 40); + b[2] = (char)(x >> 32); + b[3] = (char)(x >> 24); + b[4] = (char)(x >> 16); + b[5] = (char)(x >> 8); + b[6] = (char)x; + writefd(f, b, 7); + } else if (x < ((int64)1<<(8*8-6))) { + b[0] = (char)((x >> 56) | 0xF8); + b[1] = (char)(x >> 48); + b[2] = (char)(x >> 40); + b[3] = (char)(x >> 32); + b[4] = (char)(x >> 24); + b[5] = (char)(x >> 16); + b[6] = (char)(x >> 8); + b[7] = (char)x; + writefd(f, b, 8); + } else { + all_bits: + b[0] = (char)0xFC; + b[1] = (char)(x >> 56); + b[2] = (char)(x >> 48); + b[3] = (char)(x >> 40); + b[4] = (char)(x >> 32); + b[5] = (char)(x >> 24); + b[6] = (char)(x >> 16); + b[7] = (char)(x >> 8); + b[8] = (char)x; + writefd(f, b, 9); + } #endif } -void write_buf(int f,char *buf,size_t len) +void write_buf(int f, const char *buf, size_t len) { writefd(f,buf,len); } /** Write a string to the connection */ -void write_sbuf(int f, char *buf) +void write_sbuf(int f, const char *buf) { writefd(f, buf, strlen(buf)); } @@ -1289,7 +1435,7 @@ void write_byte(int f, uchar c) writefd(f, (char *)&c, 1); } -void write_vstring(int f, char *str, int len) +void write_vstring(int f, const char *str, int len) { uchar lenbuf[3], *lb = lenbuf; @@ -1372,7 +1518,7 @@ void io_start_multiplex_in(void) } /** Write an message to the multiplexed data stream. */ -int io_multiplex_write(enum msgcode code, char *buf, size_t len) +int io_multiplex_write(enum msgcode code, const char *buf, size_t len) { if (!io_multiplexing_out) return 0;