X-Git-Url: https://mattmccutchen.net/rsync/rsync.git/blobdiff_plain/8b3e60523aa3d86583a1df54c81c9d480ea19740..ce827c3e50cd168027a555ec961b6636e5b63e59:/io.c diff --git a/io.c b/io.c index 1752d86f..c9d990ad 100644 --- a/io.c +++ b/io.c @@ -4,7 +4,7 @@ * Copyright (C) 1996-2001 Andrew Tridgell * Copyright (C) 1996 Paul Mackerras * Copyright (C) 2001, 2002 Martin Pool - * Copyright (C) 2003-2008 Wayne Davison + * Copyright (C) 2003-2009 Wayne Davison * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -29,6 +29,7 @@ #include "rsync.h" #include "ifuncs.h" +#include "inums.h" /** If no timeout is specified then use a 60 second select timeout */ #define SELECT_TIMEOUT 60 @@ -44,6 +45,8 @@ extern int inc_recurse; extern int io_error; extern int eol_nulls; extern int flist_eof; +extern int file_total; +extern int file_old_total; extern int list_only; extern int read_batch; extern int protect_args; @@ -123,16 +126,7 @@ static void writefd(int fd, const char *buf, size_t len); static void writefd_unbuffered(int fd, const char *buf, size_t len); static void mplex_write(int fd, enum msgcode code, const char *buf, size_t len, int convert); -struct flist_ndx_item { - struct flist_ndx_item *next; - int ndx; -}; - -struct flist_ndx_list { - struct flist_ndx_item *head, *tail; -}; - -static struct flist_ndx_list redo_list, hlink_list; +static flist_ndx_list redo_list, hlink_list; struct msg_list_item { struct msg_list_item *next; @@ -146,39 +140,6 @@ struct msg_list { static struct msg_list msg_queue; -static void flist_ndx_push(struct flist_ndx_list *lp, int ndx) -{ - struct flist_ndx_item *item; - - if (!(item = new(struct flist_ndx_item))) - out_of_memory("flist_ndx_push"); - item->next = NULL; - item->ndx = ndx; - if (lp->tail) - lp->tail->next = item; - else - lp->head = item; - lp->tail = item; -} - -static int flist_ndx_pop(struct flist_ndx_list *lp) -{ - struct flist_ndx_item *next; - int ndx; - - if (!lp->head) - return -1; - - ndx = lp->head->ndx; - next = lp->head->next; - free(lp->head); - lp->head = next; - if (!next) - lp->tail = NULL; - - return ndx; -} - static void got_flist_entry_status(enum festatus status, const char *buf) { int ndx = IVAL(buf, 0); @@ -205,6 +166,11 @@ static void got_flist_entry_status(enum festatus status, const char *buf) } break; case FES_REDO: + if (read_batch) { + if (inc_recurse) + flist->in_progress++; + break; + } if (inc_recurse) flist->to_redo++; flist_ndx_push(&redo_list, ndx); @@ -483,9 +449,14 @@ static void read_msg_fd(void) * this, sender-side deletions were mostly happening at the end. */ void increment_active_files(int ndx, int itemizing, enum logcode code) { - /* TODO: tune these limits? */ - while (active_filecnt >= (active_bytecnt >= 128*1024 ? 10 : 50)) { + while (1) { + /* TODO: tune these limits? */ + int limit = active_bytecnt >= 128*1024 ? 10 : 50; + if (active_filecnt < limit) + break; check_for_finished_files(itemizing, code, 0); + if (active_filecnt < limit) + break; if (iobuf_out_cnt) io_flush(NORMAL_FLUSH); else @@ -626,7 +597,7 @@ static void whine_about_eof(int fd) rprintf(FERROR, RSYNC_NAME ": connection unexpectedly closed " "(%s bytes received so far) [%s]\n", - big_num(stats.total_read, 0), who_am_i()); + big_num(stats.total_read), who_am_i()); exit_cleanup(RERR_STREAMIO); } @@ -676,7 +647,12 @@ static int read_timeout(int fd, char *buf, size_t len) maxfd = new_fd; } - tv.tv_sec = select_timeout; + if (am_sender && inc_recurse && !flist_eof && !defer_forwarding_messages && !cnt + && file_total - file_old_total < MAX_FILECNT_LOOKAHEAD + && file_total - file_old_total >= MIN_FILECNT_LOOKAHEAD) + tv.tv_sec = 0; + else + tv.tv_sec = select_timeout; tv.tv_usec = 0; errno = 0; @@ -688,7 +664,10 @@ static int read_timeout(int fd, char *buf, size_t len) defer_forwarding_messages = 0; exit_cleanup(RERR_SOCKETIO); } - check_timeout(); + if (am_sender && tv.tv_sec == 0) + send_extra_file_list(sock_f_out, -1); + else + check_timeout(); continue; } @@ -1080,6 +1059,13 @@ static int readfd_unbuffered(int fd, char *buf, size_t len) send_msg_int(MSG_IO_ERROR, IVAL(line, 0)); io_error |= IVAL(line, 0); break; + case MSG_DEL_STATS: + if (msg_bytes) + goto invalid_msg; + read_del_stats(fd); + if (am_sender && am_server) + write_del_stats(sock_f_out); + break; case MSG_DELETED: if (msg_bytes >= sizeof line) goto overflow; @@ -1447,6 +1433,22 @@ static void sleep_for_bwlimit(int bytes_written) total_written = (sleep_usec - elapsed_usec) * bwlimit / (ONE_SEC/1024); } +static const char *what_fd_is(int fd) +{ + static char buf[20]; + + if (fd == sock_f_out) + return "socket"; + else if (fd == msg_fd_out) + return "message fd"; + else if (fd == batch_fd) + return "batch file"; + else { + snprintf(buf, sizeof buf, "fd %d", fd); + return buf; + } +} + /* Write len bytes to the file descriptor fd, looping as necessary to get * the job done and also (in certain circumstances) reading any data on * msg_fd_in to avoid deadlock. @@ -1525,8 +1527,8 @@ static void writefd_unbuffered(int fd, const char *buf, size_t len) if (am_server && fd == msg_fd_out) exit_cleanup(RERR_STREAMIO); rsyserr(FERROR, errno, - "writefd_unbuffered failed to write %ld bytes [%s]", - (long)len, who_am_i()); + "writefd_unbuffered failed to write %ld bytes to %s [%s]", + (long)len, what_fd_is(fd), who_am_i()); /* If the other side is sending us error messages, try * to grab any messages they sent before they died. */ while (!am_server && fd == sock_f_out && io_multiplexing_in) { @@ -1584,10 +1586,8 @@ static void writefd(int fd, const char *buf, size_t len) if (fd == sock_f_out) stats.total_written += len; - if (fd == write_batch_monitor_out) { - if ((size_t)write(batch_fd, buf, len) != len) - exit_cleanup(RERR_FILEIO); - } + if (fd == write_batch_monitor_out) + writefd_unbuffered(batch_fd, buf, len); if (!iobuf_out || fd != iobuf_f_out) { writefd_unbuffered(fd, buf, len);