const char phase_unknown[] = "unknown";
int select_timeout = SELECT_TIMEOUT;
+int ignore_timeout = 0;
int batch_fd = -1;
int batch_gen_fd = -1;
{
time_t t;
- if (!io_timeout)
+ if (!io_timeout || ignore_timeout)
return;
if (!last_io) {
read_loop(fd, buf, len);
io_multiplex_write(MSG_DELETED, buf, len);
break;
+ case MSG_SUCCESS:
+ if (len != 4 || !am_generator) {
+ rprintf(FERROR, "invalid message %d:%d\n", tag, len);
+ exit_cleanup(RERR_STREAMIO);
+ }
+ read_loop(fd, buf, len);
+ io_multiplex_write(MSG_SUCCESS, buf, len);
+ break;
case MSG_INFO:
case MSG_ERROR:
case MSG_LOG:
}
+void maybe_send_keepalive(int allowed_lull, int ndx)
+{
+ if (time(NULL) - last_io >= allowed_lull) {
+ if (!iobuf_out || !iobuf_out_cnt) {
+ if (protocol_version < 29)
+ return; /* there's nothing we can do */
+ write_int(sock_f_out, ndx);
+ write_shortint(sock_f_out, ITEM_IS_NEW);
+ }
+ if (iobuf_out)
+ io_flush(NORMAL_FLUSH);
+ }
+}
+
+
/**
* Continue trying to read len bytes - don't return until len has been
* read.
log_delete(line, S_IFREG);
remaining = 0;
break;
+ case MSG_SUCCESS:
+ if (remaining != 4) {
+ rprintf(FERROR, "invalid multi-message %d:%ld\n",
+ tag, (long)remaining);
+ exit_cleanup(RERR_STREAMIO);
+ }
+ read_loop(fd, line, remaining);
+ successful_send(IVAL(line, 0));
+ remaining = 0;
+ break;
case MSG_INFO:
case MSG_ERROR:
if (remaining >= sizeof line) {
* to grab any messages they sent before they died. */
while (fd == sock_f_out && io_multiplexing_in) {
io_timeout = select_timeout = 30;
+ ignore_timeout = 0;
readfd_unbuffered(sock_f_in, io_filesfrom_buf,
sizeof io_filesfrom_buf);
}