X-Git-Url: https://mattmccutchen.net/rsync/rsync.git/blobdiff_plain/12ccc73ae7b330c7187d88ae5c579c1882fabb34..5a3e9ff6081cb83873881df9a7bcfc19f9af5e58:/io.c diff --git a/io.c b/io.c index 5b87be6f..75f39132 100644 --- a/io.c +++ b/io.c @@ -46,7 +46,7 @@ extern int read_batch; extern int csum_length; extern int checksum_seed; extern int protocol_version; -extern int remove_sent_files; +extern int remove_source_files; extern int preserve_hard_links; extern char *filesfrom_host; extern struct stats stats; @@ -85,6 +85,25 @@ static int select_timeout = SELECT_TIMEOUT; static int active_filecnt = 0; static OFF_T active_bytecnt = 0; +static char int_byte_cnt[256] = { + 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, /* 00 - 0F */ + 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, /* 10 - 1F */ + 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, /* 20 - 2F */ + 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, /* 30 - 3F */ + 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, /* 40 - 4F */ + 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, /* 50 - 5F */ + 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, /* 60 - 6F */ + 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, /* 70 - 7F */ + 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, /* 80 - 8F */ + 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, /* 90 - 9F */ + 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, /* A0 - AF */ + 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, /* B0 - BF */ + 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, /* C0 - CF */ + 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, /* D0 - DF */ + 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, /* E0 - EF */ + 7, 7, 7, 7, 7, 7, 7, 7, 8, 8, 8, 8, 9, 9, 9, 9, /* F0 - FF */ +}; + static void read_loop(int fd, char *buf, size_t len); struct flist_ndx_item { @@ -204,7 +223,7 @@ void set_msg_fd_out(int fd) } /* Add a message to the pending MSG_* list. */ -static void msg_list_add(struct msg_list *lst, int code, char *buf, int len) +static void msg_list_add(struct msg_list *lst, int code, const char *buf, int len) { struct msg_list_item *m; int sz = len + 4 + sizeof m[0] - 1; @@ -257,7 +276,7 @@ static void read_msg_fd(void) exit_cleanup(RERR_STREAMIO); } read_loop(fd, buf, 4); - if (remove_sent_files) + if (remove_source_files) decrement_active_files(IVAL(buf,0)); flist_ndx_push(&redo_list, IVAL(buf,0)); break; @@ -275,7 +294,7 @@ static void read_msg_fd(void) exit_cleanup(RERR_STREAMIO); } read_loop(fd, buf, len); - if (remove_sent_files) { + if (remove_source_files) { decrement_active_files(IVAL(buf,0)); send_msg(MSG_SUCCESS, buf, len); } @@ -283,11 +302,13 @@ static void read_msg_fd(void) flist_ndx_push(&hlink_list, IVAL(buf,0)); break; case MSG_SOCKERR: + case MSG_CLIENT: if (!am_generator) { rprintf(FERROR, "invalid message %d:%d\n", tag, len); exit_cleanup(RERR_STREAMIO); } - close_multiplexing_out(); + if (tag == MSG_SOCKERR) + close_multiplexing_out(); /* FALL THROUGH */ case MSG_INFO: case MSG_ERROR: @@ -297,7 +318,7 @@ static void read_msg_fd(void) if (n >= sizeof buf) n = sizeof buf - 1; read_loop(fd, buf, n); - rwrite(tag, buf, n); + rwrite((enum logcode)tag, buf, n); len -= n; } break; @@ -311,25 +332,27 @@ static void read_msg_fd(void) } /* This is used by the generator to limit how many file transfers can - * be active at once when --remove-sent-files is specified. Without + * be active at once when --remove-source-files is specified. Without * this, sender-side deletions were mostly happening at the end. */ void increment_active_files(int ndx, int itemizing, enum logcode code) { /* TODO: tune these limits? */ while (active_filecnt >= (active_bytecnt >= 128*1024 ? 10 : 50)) { +#ifdef SUPPORT_HARD_LINKS if (hlink_list.head) check_for_finished_hlinks(itemizing, code); +#endif read_msg_fd(); } active_filecnt++; - active_bytecnt += the_file_list->files[ndx]->length; + active_bytecnt += F_LENGTH(the_file_list->files[ndx]); } void decrement_active_files(int ndx) { active_filecnt--; - active_bytecnt -= the_file_list->files[ndx]->length; + active_bytecnt -= F_LENGTH(the_file_list->files[ndx]); } /* Try to push messages off the list onto the wire. If we leave with more @@ -371,7 +394,7 @@ static int msg2genr_flush(int flush_it_all) return 1; } -int send_msg(enum msgcode code, char *buf, int len) +int send_msg(enum msgcode code, const char *buf, int len) { if (msg_fd_out < 0) { if (!defer_forwarding_messages) @@ -386,11 +409,20 @@ int send_msg(enum msgcode code, char *buf, int len) return 1; } +void send_msg_int(enum msgcode code, int num) +{ + char numbuf[4]; + SIVAL(numbuf, 0, num); + send_msg(code, numbuf, 4); +} + int get_redo_num(int itemizing, enum logcode code) { while (1) { +#ifdef SUPPORT_HARD_LINKS if (hlink_list.head) check_for_finished_hlinks(itemizing, code); +#endif if (redo_list.head) break; read_msg_fd(); @@ -633,13 +665,19 @@ int read_filesfrom_line(int fd, char *fname) if (cnt < 0 && (errno == EWOULDBLOCK || errno == EINTR || errno == EAGAIN)) { struct timeval tv; - fd_set fds; - FD_ZERO(&fds); - FD_SET(fd, &fds); + fd_set r_fds, e_fds; + FD_ZERO(&r_fds); + FD_SET(fd, &r_fds); + FD_ZERO(&e_fds); + FD_SET(fd, &e_fds); tv.tv_sec = select_timeout; tv.tv_usec = 0; - if (!select(fd+1, &fds, NULL, NULL, &tv)) + if (!select(fd+1, &r_fds, NULL, &e_fds, &tv)) check_timeout(); + if (FD_ISSET(fd, &e_fds)) { + rsyserr(FINFO, errno, + "select exception on fd %d", fd); + } continue; } if (cnt != 1) @@ -849,11 +887,11 @@ static void readfd(int fd, char *buffer, size_t N) stats.total_read += total; } -int read_shortint(int f) +unsigned short read_shortint(int f) { - uchar b[2]; - readfd(f, (char *)b, 2); - return (b[1] << 8) + b[0]; + char b[2]; + readfd(f, b, 2); + return (UVAL(b, 1) << 8) + UVAL(b, 0); } int32 read_int(int f) @@ -861,39 +899,83 @@ int32 read_int(int f) char b[4]; int32 num; - readfd(f,b,4); - num = IVAL(b,0); - if (num == (int32)0xffffffff) - return -1; + readfd(f, b, 4); + num = IVAL(b, 0); +#if SIZEOF_INT32 > 4 + if (num & (int32)0x80000000) + num |= ~(int32)0xffffffff; +#endif return num; } int64 read_longint(int f) { int64 num; - char b[8]; - num = read_int(f); + char b[9]; + + if (protocol_version < 30) { + num = read_int(f); - if ((int32)num != (int32)0xffffffff) - return num; + if ((int32)num != (int32)0xffffffff) + return num; #if SIZEOF_INT64 < 8 - rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n"); - exit_cleanup(RERR_UNSUPPORTED); + rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n"); + exit_cleanup(RERR_UNSUPPORTED); #else - readfd(f,b,8); - num = IVAL(b,0) | (((int64)IVAL(b,4))<<32); + readfd(f, b, 8); + num = IVAL(b,0) | (((int64)IVAL(b,4))<<32); +#endif + } else { + int cnt; + readfd(f, b, 3); + cnt = int_byte_cnt[CVAL(b, 0)]; +#if SIZEOF_INT64 < 8 + if (cnt > 5 || (cnt == 5 && (CVAL(b,0)&0x3F || CVAL(b,1)&0x80))) { + rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n"); + exit_cleanup(RERR_UNSUPPORTED); + } +#endif + if (cnt > 3) + readfd(f, b + 3, cnt - 3); + switch (cnt) { + case 3: + num = NVAL3(b, 0); + break; + case 4: + num = NVAL4(b, 0x80); + break; + case 5: + num = NVAL5(b, 0xC0); + break; +#if SIZEOF_INT64 >= 8 + case 6: + num = NVAL6(b, 0xE0); + break; + case 7: + num = NVAL7(b, 0xF0); + break; + case 8: + num = NVAL8(b, 0xF8); + break; + case 9: + num = NVAL8(b+1, 0); + break; #endif + default: + exit_cleanup(RERR_PROTOCOL); /* impossible... */ + } + } return num; } -void read_buf(int f,char *buf,size_t len) +void read_buf(int f, char *buf, size_t len) { readfd(f,buf,len); } -void read_sbuf(int f,char *buf,size_t len) +void read_sbuf(int f, char *buf, size_t len) { readfd(f, buf, len); buf[len] = '\0'; @@ -1033,10 +1115,10 @@ static void sleep_for_bwlimit(int bytes_written) * * This function underlies the multiplexing system. The body of the * application never calls this function directly. */ -static void writefd_unbuffered(int fd,char *buf,size_t len) +static void writefd_unbuffered(int fd, const char *buf, size_t len) { size_t n, total = 0; - fd_set w_fds, r_fds; + fd_set w_fds, r_fds, e_fds; int maxfd, count, cnt, using_r_fds; int defer_save = defer_forwarding_messages; struct timeval tv; @@ -1045,12 +1127,14 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) while (total < len) { FD_ZERO(&w_fds); - FD_SET(fd,&w_fds); + FD_SET(fd, &w_fds); + FD_ZERO(&e_fds); + FD_SET(fd, &e_fds); maxfd = fd; if (msg_fd_in >= 0) { FD_ZERO(&r_fds); - FD_SET(msg_fd_in,&r_fds); + FD_SET(msg_fd_in, &r_fds); if (msg_fd_in > maxfd) maxfd = msg_fd_in; using_r_fds = 1; @@ -1062,7 +1146,7 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) errno = 0; count = select(maxfd + 1, using_r_fds ? &r_fds : NULL, - &w_fds, NULL, &tv); + &w_fds, &e_fds, &tv); if (count <= 0) { if (count < 0 && errno == EBADF) @@ -1071,6 +1155,11 @@ static void writefd_unbuffered(int fd,char *buf,size_t len) continue; } + if (FD_ISSET(fd, &e_fds)) { + rsyserr(FINFO, errno, + "select exception on fd %d", fd); + } + if (using_r_fds && FD_ISSET(msg_fd_in, &r_fds)) read_msg_fd(); @@ -1144,7 +1233,7 @@ static void msg2sndr_flush(void) * Write an message to a multiplexed stream. If this fails then rsync * exits. **/ -static void mplex_write(enum msgcode code, char *buf, size_t len) +static void mplex_write(enum msgcode code, const char *buf, size_t len) { char buffer[1024]; size_t n = len; @@ -1184,7 +1273,7 @@ void io_flush(int flush_it_all) iobuf_out_cnt = 0; } -static void writefd(int fd,char *buf,size_t len) +static void writefd(int fd, const char *buf, size_t len) { if (fd == msg_fd_out) { rprintf(FERROR, "Internal error: wrong write used in receiver.\n"); @@ -1218,19 +1307,19 @@ static void writefd(int fd,char *buf,size_t len) } } -void write_shortint(int f, int x) +void write_shortint(int f, unsigned short x) { - uchar b[2]; - b[0] = x; - b[1] = x >> 8; - writefd(f, (char *)b, 2); + char b[2]; + b[0] = (char)x; + b[1] = (char)(x >> 8); + writefd(f, b, 2); } -void write_int(int f,int32 x) +void write_int(int f, int32 x) { char b[4]; - SIVAL(b,0,x); - writefd(f,b,4); + SIVAL(b, 0, x); + writefd(f, b, 4); } /* @@ -1239,32 +1328,110 @@ void write_int(int f,int32 x) */ void write_longint(int f, int64 x) { - char b[8]; + char b[12]; - if (x <= 0x7FFFFFFF) { - write_int(f, (int)x); - return; +#if SIZEOF_INT64 < 8 + if (x < 0 || x > 0x7FFFFFFF) { + rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n"); + exit_cleanup(RERR_UNSUPPORTED); } +#endif + if (protocol_version < 30) { + char * const s = b+4; + SIVAL(s, 0, x); #if SIZEOF_INT64 < 8 - rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n"); - exit_cleanup(RERR_UNSUPPORTED); + writefd(f, s, 4); #else - write_int(f, (int32)0xFFFFFFFF); - SIVAL(b,0,(x&0xFFFFFFFF)); - SIVAL(b,4,((x>>32)&0xFFFFFFFF)); + if (x <= 0x7FFFFFFF && x >= 0) { + writefd(f, s, 4); + return; + } - writefd(f,b,8); + memset(b, 0xFF, 4); + SIVAL(s, 4, x >> 32); + writefd(f, b, 12); + } else if (x < 0) { + goto all_bits; +#endif + } else if (x < ((int32)1<<(3*8-1))) { + b[0] = (char)(x >> 16); + b[1] = (char)(x >> 8); + b[2] = (char)x; + writefd(f, b, 3); + } else if (x < ((int64)1<<(4*8-2))) { + b[0] = (char)((x >> 24) | 0x80); + b[1] = (char)(x >> 16); + b[2] = (char)(x >> 8); + b[3] = (char)x; + writefd(f, b, 4); +#if SIZEOF_INT64 < 8 + } else { + b[0] = 0xC0; + b[1] = (char)(x >> 24); + b[2] = (char)(x >> 16); + b[3] = (char)(x >> 8); + b[4] = (char)x; + writefd(f, b, 5); + } +#else + } else if (x < ((int64)1<<(5*8-3))) { + b[0] = (char)((x >> 32) | 0xC0); + b[1] = (char)(x >> 24); + b[2] = (char)(x >> 16); + b[3] = (char)(x >> 8); + b[4] = (char)x; + writefd(f, b, 5); + } else if (x < ((int64)1<<(6*8-4))) { + b[0] = (char)((x >> 40) | 0xE0); + b[1] = (char)(x >> 32); + b[2] = (char)(x >> 24); + b[3] = (char)(x >> 16); + b[4] = (char)(x >> 8); + b[5] = (char)x; + writefd(f, b, 6); + } else if (x < ((int64)1<<(7*8-5))) { + b[0] = (char)((x >> 48) | 0xF0); + b[1] = (char)(x >> 40); + b[2] = (char)(x >> 32); + b[3] = (char)(x >> 24); + b[4] = (char)(x >> 16); + b[5] = (char)(x >> 8); + b[6] = (char)x; + writefd(f, b, 7); + } else if (x < ((int64)1<<(8*8-6))) { + b[0] = (char)((x >> 56) | 0xF8); + b[1] = (char)(x >> 48); + b[2] = (char)(x >> 40); + b[3] = (char)(x >> 32); + b[4] = (char)(x >> 24); + b[5] = (char)(x >> 16); + b[6] = (char)(x >> 8); + b[7] = (char)x; + writefd(f, b, 8); + } else { + all_bits: + b[0] = (char)0xFC; + b[1] = (char)(x >> 56); + b[2] = (char)(x >> 48); + b[3] = (char)(x >> 40); + b[4] = (char)(x >> 32); + b[5] = (char)(x >> 24); + b[6] = (char)(x >> 16); + b[7] = (char)(x >> 8); + b[8] = (char)x; + writefd(f, b, 9); + } #endif } -void write_buf(int f,char *buf,size_t len) +void write_buf(int f, const char *buf, size_t len) { writefd(f,buf,len); } /** Write a string to the connection */ -void write_sbuf(int f, char *buf) +void write_sbuf(int f, const char *buf) { writefd(f, buf, strlen(buf)); } @@ -1274,7 +1441,7 @@ void write_byte(int f, uchar c) writefd(f, (char *)&c, 1); } -void write_vstring(int f, char *str, int len) +void write_vstring(int f, const char *str, int len) { uchar lenbuf[3], *lb = lenbuf; @@ -1357,7 +1524,7 @@ void io_start_multiplex_in(void) } /** Write an message to the multiplexed data stream. */ -int io_multiplex_write(enum msgcode code, char *buf, size_t len) +int io_multiplex_write(enum msgcode code, const char *buf, size_t len) { if (!io_multiplexing_out) return 0;