const char phase_unknown[] = "unknown";
int select_timeout = SELECT_TIMEOUT;
int batch_fd = -1;
+int batch_gen_fd = -1;
/**
* The connection might be dropped at some point; perhaps because the
exit_cleanup(0);
rprintf(FERROR, RSYNC_NAME ": connection unexpectedly closed "
- "(%.0f bytes read so far)\n",
- (double)stats.total_read);
+ "(%.0f bytes received so far) [%s]\n",
+ (double)stats.total_read, who_am_i());
exit_cleanup(RERR_STREAMIO);
}
/* Don't write errors on a dead socket. */
if (fd == sock_f_in)
- io_multiplexing_close();
+ close_multiplexing_out();
rsyserr(FERROR, errno, "read error");
exit_cleanup(RERR_STREAMIO);
}
if ((int32)ret != (int32)0xffffffff)
return ret;
-#ifdef NO_INT64
- rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
- exit_cleanup(RERR_UNSUPPORTED);
-#else
+#ifdef INT64_IS_OFF_T
+ if (sizeof (int64) < 8) {
+ rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
+ exit_cleanup(RERR_UNSUPPORTED);
+ }
+#endif
readfd(f,b,8);
ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
-#endif
return ret;
}
void read_sbuf(int f,char *buf,size_t len)
{
- read_buf(f,buf,len);
+ readfd(f, buf, len);
buf[len] = 0;
}
unsigned char read_byte(int f)
{
unsigned char c;
- read_buf(f, (char *)&c, 1);
+ readfd(f, (char *)&c, 1);
return c;
}
/* Don't try to write errors back across the stream. */
if (fd == sock_f_out)
- io_multiplexing_close();
+ close_multiplexing_out();
rsyserr(FERROR, errno,
- "writefd_unbuffered failed to write %ld bytes: phase \"%s\"",
- (long)len, io_write_phase);
+ "writefd_unbuffered failed to write %ld bytes: phase \"%s\" [%s]",
+ (long)len, io_write_phase, who_am_i());
+ /* If the other side is sending us error messages, try
+ * to grab any messages they sent before they died. */
+ while (fd == sock_f_out && io_multiplexing_in) {
+ io_timeout = 30;
+ readfd_unbuffered(sock_f_in, io_filesfrom_buf,
+ sizeof io_filesfrom_buf);
+ }
exit_cleanup(RERR_STREAMIO);
}
return;
}
-#ifdef NO_INT64
- rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
- exit_cleanup(RERR_UNSUPPORTED);
-#else
+#ifdef INT64_IS_OFF_T
+ if (sizeof (int64) < 8) {
+ rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
+ exit_cleanup(RERR_UNSUPPORTED);
+ }
+#endif
+
write_int(f, (int32)0xFFFFFFFF);
SIVAL(b,0,(x&0xFFFFFFFF));
SIVAL(b,4,((x>>32)&0xFFFFFFFF));
writefd(f,b,8);
-#endif
}
void write_buf(int f,char *buf,size_t len)
}
/** Write a string to the connection */
-static void write_sbuf(int f,char *buf)
+void write_sbuf(int f, char *buf)
{
- write_buf(f, buf, strlen(buf));
+ writefd(f, buf, strlen(buf));
}
-
void write_byte(int f,unsigned char c)
{
- write_buf(f,(char *)&c,1);
+ writefd(f, (char *)&c, 1);
}
return 1;
}
+void close_multiplexing_in(void)
+{
+ io_multiplexing_in = 0;
+}
+
/** Stop output multiplexing. */
-void io_multiplexing_close(void)
+void close_multiplexing_out(void)
{
io_multiplexing_out = 0;
}
void start_write_batch(int fd)
{
+ write_stream_flags(batch_fd);
+
/* Some communication has already taken place, but we don't
* enable batch writing until here so that we can write a
* canonical record of the communication even though the