X-Git-Url: https://mattmccutchen.net/rsync/rsync.git/blobdiff_plain/e4c877cf7082ddcb4a979fa7a8b252f95a34eb62..9b49704959f4b7ecb482f3d1f87ba2735e1d7ed1:/io.c diff --git a/io.c b/io.c index a7603ff8..3021ac72 100644 --- a/io.c +++ b/io.c @@ -7,8 +7,9 @@ * Copyright (C) 2003-2007 Wayne Davison * * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 as - * published by the Free Software Foundation. + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of @@ -16,8 +17,7 @@ * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License along - * with this program; if not, write to the Free Software Foundation, Inc., - * 51 Franklin Street - Fifth Floor, Boston, MA 02110-1301, USA. + * with this program; if not, visit the http://fsf.org website. */ /* Rsync provides its own multiplexing system, which is used to send @@ -53,6 +53,9 @@ extern int preserve_hard_links; extern char *filesfrom_host; extern struct stats stats; extern struct file_list *cur_flist, *first_flist; +#ifdef ICONV_OPTION +extern iconv_t ic_send, ic_recv; +#endif const char phase_unknown[] = "unknown"; int ignore_timeout = 0; @@ -91,6 +94,9 @@ static int write_batch_monitor_out = -1; static int io_filesfrom_f_in = -1; static int io_filesfrom_f_out = -1; static char io_filesfrom_buf[2048]; +#ifdef ICONV_OPTION +static char iconv_buf[sizeof io_filesfrom_buf / 2]; +#endif static char *io_filesfrom_bp; static char io_filesfrom_lastchar; static int io_filesfrom_buflen; @@ -106,11 +112,12 @@ static char int_byte_extra[64] = { 2, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 4, 4, 5, 6, /* (C0 - FF)/4 */ }; +enum festatus { FES_SUCCESS, FES_REDO, FES_NO_SEND }; + static void readfd(int fd, char *buffer, size_t N); static void writefd(int fd, const char *buf, size_t len); static void writefd_unbuffered(int fd, const char *buf, size_t len); -static void decrement_active_files(int ndx); -static void decrement_flist_in_progress(int ndx, int redo); +static void mplex_write(int fd, enum msgcode code, const char *buf, size_t len, int convert); struct flist_ndx_item { struct flist_ndx_item *next; @@ -125,7 +132,7 @@ static struct flist_ndx_list redo_list, hlink_list; struct msg_list_item { struct msg_list_item *next; - int len; + char convert; char buf[1]; }; @@ -168,6 +175,42 @@ static int flist_ndx_pop(struct flist_ndx_list *lp) return ndx; } +static void got_flist_entry_status(enum festatus status, const char *buf) +{ + int ndx = IVAL(buf, 0); + struct file_list *flist = flist_for_ndx(ndx); + + assert(flist != NULL); + assert(ndx >= flist->ndx_start); + + if (remove_source_files) { + active_filecnt--; + active_bytecnt -= F_LENGTH(flist->files[ndx - flist->ndx_start]); + } + + if (inc_recurse) + flist->in_progress--; + + switch (status) { + case FES_SUCCESS: + if (remove_source_files) + send_msg(MSG_SUCCESS, buf, 4, 0); + if (preserve_hard_links) { + struct file_struct *file = flist->files[ndx - flist->ndx_start]; + if (F_IS_HLINKED(file)) + flist_ndx_push(&hlink_list, ndx); + } + break; + case FES_REDO: + if (inc_recurse) + flist->to_redo++; + flist_ndx_push(&redo_list, ndx); + break; + case FES_NO_SEND: + break; + } +} + static void check_timeout(void) { time_t t; @@ -229,7 +272,7 @@ void set_msg_fd_out(int fd) } /* Add a message to the pending MSG_* list. */ -static void msg_list_add(struct msg_list *lst, int code, const char *buf, int len) +static void msg_list_add(struct msg_list *lst, int code, const char *buf, int len, int convert) { struct msg_list_item *m; int sz = len + 4 + sizeof m[0] - 1; @@ -237,7 +280,7 @@ static void msg_list_add(struct msg_list *lst, int code, const char *buf, int le if (!(m = (struct msg_list_item *)new_array(char, sz))) out_of_memory("msg_list_add"); m->next = NULL; - m->len = len + 4; + m->convert = convert; SIVAL(m->buf, 0, ((code+MPLEX_BASE)<<24) | len); memcpy(m->buf + 4, buf, len); if (lst->tail) @@ -252,21 +295,25 @@ static void msg_flush(void) if (am_generator) { while (msg_queue.head && io_multiplexing_out) { struct msg_list_item *m = msg_queue.head; + int len = IVAL(m->buf, 0) & 0xFFFFFF; + int tag = *((uchar*)m->buf+3) - MPLEX_BASE; if (!(msg_queue.head = m->next)) msg_queue.tail = NULL; - stats.total_written += m->len; + stats.total_written += len + 4; defer_forwarding_messages++; - writefd_unbuffered(sock_f_out, m->buf, m->len); + mplex_write(sock_f_out, tag, m->buf + 4, len, m->convert); defer_forwarding_messages--; free(m); } } else { while (msg_queue.head) { struct msg_list_item *m = msg_queue.head; + int len = IVAL(m->buf, 0) & 0xFFFFFF; + int tag = *((uchar*)m->buf+3) - MPLEX_BASE; if (!(msg_queue.head = m->next)) msg_queue.tail = NULL; defer_forwarding_messages++; - writefd_unbuffered(msg_fd_out, m->buf, m->len); + mplex_write(msg_fd_out, tag, m->buf + 4, len, m->convert); defer_forwarding_messages--; free(m); } @@ -316,11 +363,7 @@ static void read_msg_fd(void) if (len != 4 || !am_generator) goto invalid_msg; readfd(fd, buf, 4); - if (remove_source_files) - decrement_active_files(IVAL(buf,0)); - flist_ndx_push(&redo_list, IVAL(buf,0)); - if (inc_recurse) - decrement_flist_in_progress(IVAL(buf,0), 1); + got_flist_entry_status(FES_REDO, buf); break; case MSG_FLIST: if (len != 4 || !am_generator || !inc_recurse) @@ -329,6 +372,10 @@ static void read_msg_fd(void) /* Read extra file list from receiver. */ assert(iobuf_in != NULL); assert(iobuf_f_in == fd); + if (verbose > 3) { + rprintf(FINFO, "[%s] receiving flist for dir %d\n", + who_am_i(), IVAL(buf,0)); + } flist = recv_file_list(fd); flist->parent_ndx = IVAL(buf,0); break; @@ -341,27 +388,19 @@ static void read_msg_fd(void) if (len >= (int)sizeof buf || !am_generator) goto invalid_msg; readfd(fd, buf, len); - send_msg(MSG_DELETED, buf, len); + send_msg(MSG_DELETED, buf, len, 1); break; case MSG_SUCCESS: if (len != 4 || !am_generator) goto invalid_msg; - readfd(fd, buf, len); - if (remove_source_files) { - decrement_active_files(IVAL(buf,0)); - send_msg(MSG_SUCCESS, buf, len); - } - if (preserve_hard_links) - flist_ndx_push(&hlink_list, IVAL(buf,0)); - if (inc_recurse) - decrement_flist_in_progress(IVAL(buf,0), 0); + readfd(fd, buf, 4); + got_flist_entry_status(FES_SUCCESS, buf); break; case MSG_NO_SEND: if (len != 4 || !am_generator) goto invalid_msg; - readfd(fd, buf, len); - if (inc_recurse) - decrement_flist_in_progress(IVAL(buf,0), 0); + readfd(fd, buf, 4); + got_flist_entry_status(FES_NO_SEND, buf); break; case MSG_SOCKERR: case MSG_CLIENT: @@ -378,7 +417,7 @@ static void read_msg_fd(void) if (n >= sizeof buf) n = sizeof buf - 1; readfd(fd, buf, n); - rwrite((enum logcode)tag, buf, n); + rwrite((enum logcode)tag, buf, n, !am_generator); len -= n; } break; @@ -412,49 +451,20 @@ void increment_active_files(int ndx, int itemizing, enum logcode code) active_bytecnt += F_LENGTH(cur_flist->files[ndx - cur_flist->ndx_start]); } -static void decrement_active_files(int ndx) -{ - struct file_list *flist = flist_for_ndx(ndx); - assert(flist != NULL); - active_filecnt--; - active_bytecnt -= F_LENGTH(flist->files[ndx - flist->ndx_start]); -} - -static void decrement_flist_in_progress(int ndx, int redo) -{ - struct file_list *flist = cur_flist ? cur_flist : first_flist; - - while (ndx < flist->ndx_start) { - if (flist == first_flist) { - invalid_ndx: - rprintf(FERROR, - "Invalid file index: %d (%d - %d) [%s]\n", - ndx, first_flist->ndx_start, - first_flist->prev->ndx_start + first_flist->prev->count - 1, - who_am_i()); - exit_cleanup(RERR_PROTOCOL); - } - flist = flist->prev; - } - while (ndx >= flist->ndx_start + flist->count) { - if (!(flist = flist->next)) - goto invalid_ndx; - } - - flist->in_progress--; - if (redo) - flist->to_redo++; -} - /* Write an message to a multiplexed stream. If this fails, rsync exits. */ -static void mplex_write(int fd, enum msgcode code, const char *buf, size_t len) +static void mplex_write(int fd, enum msgcode code, const char *buf, size_t len, int convert) { - char buffer[1024]; + char buffer[BIGPATHBUFLEN]; /* Oversized for use by iconv code. */ size_t n = len; SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len); - if (n > sizeof buffer - 4) +#ifdef ICONV_OPTION + if (convert && ic_send == (iconv_t)-1) +#endif + convert = 0; + + if (convert || n > 1024 - 4) /* BIGPATHBUFLEN can handle 1024 bytes */ n = 0; else memcpy(buffer + 4, buf, n); @@ -464,6 +474,28 @@ static void mplex_write(int fd, enum msgcode code, const char *buf, size_t len) len -= n; buf += n; +#ifdef ICONV_OPTION + if (convert) { + iconv(ic_send, NULL, 0, NULL, 0); + defer_forwarding_messages++; + while (len) { + ICONV_CONST char *ibuf = (ICONV_CONST char *)buf; + char *obuf = buffer; + size_t ocnt = sizeof buffer; + while (len && iconv(ic_send, &ibuf,&len, + &obuf,&ocnt) == (size_t)-1) { + if (errno == E2BIG || !ocnt) + break; + *obuf++ = *ibuf++; + ocnt--, len--; + } + n = obuf - buffer; + writefd_unbuffered(fd, buffer, n); + } + if (!--defer_forwarding_messages) + msg_flush(); + } else +#endif if (len) { defer_forwarding_messages++; writefd_unbuffered(fd, buf, len); @@ -472,20 +504,20 @@ static void mplex_write(int fd, enum msgcode code, const char *buf, size_t len) } } -int send_msg(enum msgcode code, const char *buf, int len) +int send_msg(enum msgcode code, const char *buf, int len, int convert) { if (msg_fd_out < 0) { if (!defer_forwarding_messages) - return io_multiplex_write(code, buf, len); + return io_multiplex_write(code, buf, len, convert); if (!io_multiplexing_out) return 0; - msg_list_add(&msg_queue, code, buf, len); + msg_list_add(&msg_queue, code, buf, len, convert); return 1; } if (flist_forward_from >= 0) - msg_list_add(&msg_queue, code, buf, len); + msg_list_add(&msg_queue, code, buf, len, convert); else - mplex_write(msg_fd_out, code, buf, len); + mplex_write(msg_fd_out, code, buf, len, convert); return 1; } @@ -493,7 +525,7 @@ void send_msg_int(enum msgcode code, int num) { char numbuf[4]; SIVAL(numbuf, 0, num); - send_msg(code, numbuf, 4); + send_msg(code, numbuf, 4, 0); } void wait_for_receiver(void) @@ -635,7 +667,7 @@ static int read_timeout(int fd, char *buf, size_t len) io_filesfrom_bp = io_filesfrom_buf; else io_filesfrom_bp += l; - } else { + } else if (errno != EINTR) { /* XXX should we complain? */ io_filesfrom_f_out = -1; } @@ -646,11 +678,13 @@ static int read_timeout(int fd, char *buf, size_t len) io_filesfrom_buf, sizeof io_filesfrom_buf); if (l <= 0) { - /* Send end-of-file marker */ - io_filesfrom_buf[0] = '\0'; - io_filesfrom_buf[1] = '\0'; - io_filesfrom_buflen = io_filesfrom_lastchar? 2 : 1; - io_filesfrom_f_in = -1; + if (l == 0 || errno != EINTR) { + /* Send end-of-file marker */ + io_filesfrom_buf[0] = '\0'; + io_filesfrom_buf[1] = '\0'; + io_filesfrom_buflen = io_filesfrom_lastchar? 2 : 1; + io_filesfrom_f_in = -1; + } } else { if (!eol_nulls) { char *s = io_filesfrom_buf + l; @@ -719,10 +753,8 @@ static int read_timeout(int fd, char *buf, size_t len) return cnt; } -/** - * Read a line into the "fname" buffer (which must be at least MAXPATHLEN - * characters long). - */ +/* Read a line into the "fname" buffer (which must be at least MAXPATHLEN + * characters long). */ int read_filesfrom_line(int fd, char *fname) { char ch, *s, *eob = fname + MAXPATHLEN - 1; @@ -746,10 +778,8 @@ int read_filesfrom_line(int fd, char *fname) tv.tv_usec = 0; if (!select(fd+1, &r_fds, NULL, &e_fds, &tv)) check_timeout(); - if (FD_ISSET(fd, &e_fds)) { - rsyserr(FINFO, errno, - "select exception on fd %d", fd); - } + /*if (FD_ISSET(fd, &e_fds)) + rprintf(FINFO, "select exception on fd %d\n", fd); */ continue; } if (cnt != 1) @@ -833,9 +863,9 @@ void maybe_send_keepalive(void) if (protocol_version < 29) return; /* there's nothing we can do */ if (protocol_version >= 30) - send_msg(MSG_NOOP, "", 0); + send_msg(MSG_NOOP, "", 0, 0); else { - write_int(sock_f_out, cur_flist->count); + write_int(sock_f_out, cur_flist->used); write_shortint(sock_f_out, ITEM_IS_NEW); } } @@ -932,7 +962,30 @@ static int readfd_unbuffered(int fd, char *buf, size_t len) case MSG_DELETED: if (msg_bytes >= sizeof line) goto overflow; - read_loop(fd, line, msg_bytes); +#ifdef ICONV_OPTION + if (ic_recv != (iconv_t)-1) { + ICONV_CONST char *ibuf; + char *obuf = line; + size_t icnt, ocnt = sizeof line - 1; + int add_null = 0; + iconv(ic_send, NULL, 0, NULL, 0); + while (msg_bytes) { + icnt = msg_bytes > sizeof iconv_buf + ? sizeof iconv_buf : msg_bytes; + read_loop(fd, iconv_buf, icnt); + if (!(msg_bytes -= icnt) && !iconv_buf[icnt-1]) + icnt--, add_null = 1; + ibuf = (ICONV_CONST char *)iconv_buf; + if (iconv(ic_send, &ibuf,&icnt, + &obuf,&ocnt) == (size_t)-1) + goto overflow; // XXX + } + if (add_null) + *obuf++ = '\0'; + msg_bytes = obuf - line; + } else +#endif + read_loop(fd, line, msg_bytes); /* A directory name was sent with the trailing null */ if (msg_bytes > 0 && !line[msg_bytes-1]) log_delete(line, S_IFDIR); @@ -967,7 +1020,7 @@ static int readfd_unbuffered(int fd, char *buf, size_t len) exit_cleanup(RERR_STREAMIO); } read_loop(fd, line, msg_bytes); - rwrite((enum logcode)tag, line, msg_bytes); + rwrite((enum logcode)tag, line, msg_bytes, 1); break; default: rprintf(FERROR, "unexpected tag %d [%s]\n", @@ -1305,10 +1358,8 @@ static void writefd_unbuffered(int fd, const char *buf, size_t len) continue; } - if (FD_ISSET(fd, &e_fds)) { - rsyserr(FINFO, errno, - "select exception on fd %d", fd); - } + /*if (FD_ISSET(fd, &e_fds)) + rprintf(FINFO, "select exception on fd %d\n", fd); */ if (using_r_fds && FD_ISSET(msg_fd_in, &r_fds)) read_msg_fd(); @@ -1372,7 +1423,7 @@ void io_flush(int flush_it_all) return; if (io_multiplexing_out) - mplex_write(sock_f_out, MSG_DATA, iobuf_out, iobuf_out_cnt); + mplex_write(sock_f_out, MSG_DATA, iobuf_out, iobuf_out_cnt, 0); else writefd_unbuffered(iobuf_f_out, iobuf_out, iobuf_out_cnt); iobuf_out_cnt = 0; @@ -1684,13 +1735,13 @@ void io_start_multiplex_in(void) } /** Write an message to the multiplexed data stream. */ -int io_multiplex_write(enum msgcode code, const char *buf, size_t len) +int io_multiplex_write(enum msgcode code, const char *buf, size_t len, int convert) { if (!io_multiplexing_out) return 0; io_flush(NORMAL_FLUSH); stats.total_written += (len+4); - mplex_write(sock_f_out, code, buf, len); + mplex_write(sock_f_out, code, buf, len, convert); return 1; }