* called by the generator. */
static void read_msg_fd(void)
{
- char buf[200];
+ char buf[2048];
size_t n;
int fd = msg_fd_in;
int tag, len;
- /* Temporarily disable msg_fd_in. This is needed because we
- * may call a write routine that could try to call us back. */
+ /* Temporarily disable msg_fd_in. This is needed to avoid looping back
+ * to this routine from read_timeout() and writefd_unbuffered(). */
msg_fd_in = -1;
read_loop(fd, buf, 4);
/* Try to push messages off the list onto the wire. If we leave with more
* to do, return 0. On error, return -1. If everything flushed, return 1.
- * This is only called by the receiver. */
+ * This is only active in the receiver. */
int msg_list_push(int flush_it_all)
{
static int written = 0;
{
if (kludge_around_eof)
exit_cleanup(0);
- else {
- rprintf(FERROR, RSYNC_NAME ": connection unexpectedly closed "
- "(%.0f bytes read so far)\n",
- (double)stats.total_read);
- exit_cleanup(RERR_STREAMIO);
- }
+ rprintf(FERROR, RSYNC_NAME ": connection unexpectedly closed "
+ "(%.0f bytes read so far)\n",
+ (double)stats.total_read);
+
+ exit_cleanup(RERR_STREAMIO);
}
/* until we manage to read *something* */
fd_set r_fds, w_fds;
struct timeval tv;
- int fd_count = fd+1;
+ int maxfd = fd;
int count;
FD_ZERO(&r_fds);
+ FD_ZERO(&w_fds);
FD_SET(fd, &r_fds);
if (msg_fd_in >= 0) {
FD_SET(msg_fd_in, &r_fds);
- if (msg_fd_in >= fd_count)
- fd_count = msg_fd_in+1;
+ if (msg_fd_in > maxfd)
+ maxfd = msg_fd_in;
} else if (msg_list_head) {
FD_SET(msg_fd_out, &w_fds);
- if (msg_fd_out >= fd_count)
- fd_count = msg_fd_out+1;
+ if (msg_fd_out > maxfd)
+ maxfd = msg_fd_out;
}
if (io_filesfrom_f_out >= 0) {
int new_fd;
new_fd = -1;
}
} else {
- FD_ZERO(&w_fds);
FD_SET(io_filesfrom_f_out, &w_fds);
new_fd = io_filesfrom_f_out;
}
- if (new_fd >= fd_count)
- fd_count = new_fd+1;
+ if (new_fd > maxfd)
+ maxfd = new_fd;
}
tv.tv_sec = select_timeout;
errno = 0;
- count = select(fd_count, &r_fds,
- io_filesfrom_buflen? &w_fds : NULL,
- NULL, &tv);
+ count = select(maxfd + 1, &r_fds, &w_fds, NULL, &tv);
if (count <= 0) {
- check_timeout();
if (errno == EBADF)
exit_cleanup(RERR_SOCKETIO);
+ check_timeout();
continue;
}
n = read(fd, buf, len);
- if (n > 0) {
- buf += n;
- len -= n;
- ret += n;
- if (io_timeout)
- last_io = time(NULL);
- continue;
- } else if (n == 0) {
- whine_about_eof();
- return -1; /* doesn't return */
- } else if (n < 0) {
+ if (n <= 0) {
+ if (n == 0)
+ whine_about_eof(); /* Doesn't return. */
if (errno == EINTR || errno == EWOULDBLOCK
|| errno == EAGAIN)
continue;
- die_from_readerr(errno);
+ die_from_readerr(errno); /* Doesn't return. */
}
+
+ buf += n;
+ len -= n;
+ ret += n;
+ if (io_timeout)
+ last_io = time(NULL);
}
return ret;
*
* Never returns <= 0.
*/
-static int read_unbuffered(int fd, char *buf, size_t len)
+static int readfd_unbuffered(int fd, char *buf, size_t len)
{
static size_t remaining;
int tag, ret = 0;
bufferSz = 2 * IO_BUFFER_SIZE;
buffer = new_array(char, bufferSz);
if (!buffer)
- out_of_memory("read_unbuffered");
+ out_of_memory("readfd_unbuffered");
}
remaining = read_timeout(fd, buffer, bufferSz);
bufferIdx = 0;
if (!buffer || remaining > bufferSz) {
buffer = realloc_array(buffer, char, remaining);
if (!buffer)
- out_of_memory("read_unbuffered");
+ out_of_memory("readfd_unbuffered");
bufferSz = remaining;
}
read_loop(fd, buffer, remaining);
size_t total = 0;
while (total < N) {
- ret = read_unbuffered(fd, buffer + total, N-total);
+ ret = readfd_unbuffered(fd, buffer + total, N-total);
total += ret;
}
**/
static void writefd_unbuffered(int fd,char *buf,size_t len)
{
- size_t total = 0;
+ size_t n, total = 0;
fd_set w_fds, r_fds;
- int fd_count, count;
+ int maxfd, count, ret;
struct timeval tv;
if (fd == msg_fd_out) {
while (total < len) {
FD_ZERO(&w_fds);
FD_SET(fd,&w_fds);
- fd_count = fd;
+ maxfd = fd;
if (msg_fd_in >= 0) {
FD_ZERO(&r_fds);
FD_SET(msg_fd_in,&r_fds);
- if (msg_fd_in > fd_count)
- fd_count = msg_fd_in;
+ if (msg_fd_in > maxfd)
+ maxfd = msg_fd_in;
}
tv.tv_sec = select_timeout;
tv.tv_usec = 0;
errno = 0;
- count = select(fd_count+1, msg_fd_in >= 0 ? &r_fds : NULL,
+ count = select(maxfd + 1, msg_fd_in >= 0 ? &r_fds : NULL,
&w_fds, NULL, &tv);
if (count <= 0) {
- check_timeout();
- if (errno == EBADF)
+ if (count < 0 && errno == EBADF)
exit_cleanup(RERR_SOCKETIO);
+ check_timeout();
continue;
}
if (msg_fd_in >= 0 && FD_ISSET(msg_fd_in, &r_fds))
read_msg_fd();
- if (FD_ISSET(fd, &w_fds)) {
- int ret;
- size_t n = len-total;
- if (bwlimit && n > bwlimit_writemax)
- n = bwlimit_writemax;
- ret = write(fd,buf+total,n);
+ if (!FD_ISSET(fd, &w_fds))
+ continue;
+
+ n = len - total;
+ if (bwlimit && n > bwlimit_writemax)
+ n = bwlimit_writemax;
+ ret = write(fd, buf + total, n);
+ if (ret <= 0) {
if (ret < 0) {
if (errno == EINTR)
continue;
}
}
- if (ret <= 0) {
- /* Don't try to write errors back
- * across the stream */
- io_multiplexing_close();
- rsyserr(FERROR, errno,
- "writefd_unbuffered failed to write %ld bytes: phase \"%s\"",
- (long)len, io_write_phase);
- exit_cleanup(RERR_STREAMIO);
- }
+ /* Don't try to write errors back across the stream. */
+ io_multiplexing_close();
+ rsyserr(FERROR, errno,
+ "writefd_unbuffered failed to write %ld bytes: phase \"%s\"",
+ (long)len, io_write_phase);
+ exit_cleanup(RERR_STREAMIO);
+ }
- sleep_for_bwlimit(ret);
+ sleep_for_bwlimit(ret);
- total += ret;
+ total += ret;
- if (io_timeout)
- last_io = time(NULL);
- }
+ if (io_timeout)
+ last_io = time(NULL);
}
no_flush--;
SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
- if (n > (sizeof buffer - 4)) {
+ if (n > sizeof buffer - 4)
n = sizeof buffer - 4;
- }
memcpy(&buffer[4], buf, n);
writefd_unbuffered(fd, buffer, n+4);
len -= n;
buf += n;
- if (len) {
+ if (len)
writefd_unbuffered(fd, buf, len);
- }
}