- if (read_buffer_len < N)
- read_check(buffer_f_in);
-
- while (total < N)
- {
- if (read_buffer_len > 0 && buffer_f_in == fd) {
- ret = MIN(read_buffer_len,N-total);
- memcpy(buffer+total,read_buffer_p,ret);
- read_buffer_p += ret;
- read_buffer_len -= ret;
- } else {
- while ((ret = read(fd,buffer + total,N - total)) == -1) {
- fd_set fds;
-
- if (errno != EAGAIN && errno != EWOULDBLOCK)
- return -1;
- FD_ZERO(&fds);
- FD_SET(fd, &fds);
- select(fd+1, &fds, NULL, NULL, NULL);
+ buf += n;
+ len -= n;
+ }
+}
+
+
+/**
+ * Read from the file descriptor handling multiplexing - return number
+ * of bytes read.
+ *
+ * Never returns <= 0.
+ */
+static int read_unbuffered(int fd, char *buf, size_t len)
+{
+ static size_t remaining;
+ int tag, ret = 0;
+ char line[1024];
+ static char *buffer;
+ static size_t bufferIdx = 0;
+ static size_t bufferSz;
+
+ if (fd != multiplex_in_fd)
+ return read_timeout(fd, buf, len);
+
+ if (!io_multiplexing_in && remaining == 0) {
+ if (!buffer) {
+ bufferSz = 2 * IO_BUFFER_SIZE;
+ buffer = new_array(char, bufferSz);
+ if (!buffer) out_of_memory("read_unbuffered");
+ }
+ remaining = read_timeout(fd, buffer, bufferSz);
+ bufferIdx = 0;
+ }
+
+ while (ret == 0) {
+ if (remaining) {
+ len = MIN(len, remaining);
+ memcpy(buf, buffer + bufferIdx, len);
+ bufferIdx += len;
+ remaining -= len;
+ ret = len;
+ break;
+ }
+
+ read_loop(fd, line, 4);
+ tag = IVAL(line, 0);
+
+ remaining = tag & 0xFFFFFF;
+ tag = tag >> 24;
+
+ if (tag == MPLEX_BASE) {
+ if (!buffer || remaining > bufferSz) {
+ buffer = realloc_array(buffer, char, remaining);
+ if (!buffer) out_of_memory("read_unbuffered");
+ bufferSz = remaining;
+ }
+ read_loop(fd, buffer, remaining);
+ bufferIdx = 0;
+ continue;
+ }
+
+ tag -= MPLEX_BASE;
+
+ if (tag != FERROR && tag != FINFO) {
+ rprintf(FERROR, "unexpected tag %d\n", tag);
+ exit_cleanup(RERR_STREAMIO);
+ }
+
+ if (remaining > sizeof line - 1) {
+ rprintf(FERROR, "multiplexing overflow %ld\n\n",
+ (long)remaining);
+ exit_cleanup(RERR_STREAMIO);
+ }
+
+ read_loop(fd, line, remaining);
+ line[remaining] = 0;
+
+ rprintf((enum logcode) tag, "%s", line);
+ remaining = 0;
+ }
+
+ if (remaining == 0)
+ io_flush();
+
+ return ret;
+}
+
+
+
+/**
+ * Do a buffered read from @p fd. Don't return until all @p n bytes
+ * have been read. If all @p n can't be read then exit with an
+ * error.
+ **/
+static void readfd(int fd, char *buffer, size_t N)
+{
+ int ret;
+ size_t total=0;
+
+ while (total < N) {
+ ret = read_unbuffered(fd, buffer + total, N-total);
+ total += ret;