- int ret;
- int total=0;
-
- while (total < N)
- {
- if (read_buffer_len > 0) {
- ret = MIN(read_buffer_len,N-total);
- memcpy(buffer+total,read_buffer_p,ret);
- read_buffer_p += ret;
- read_buffer_len -= ret;
- } else {
- if ((ret = read(fd,buffer + total,N - total)) == -1)
- return -1;
- }
-
- if (ret <= 0)
- return total;
- total += ret;
- }
- return total;
-}
-
-
-int read_int(int f)
-{
- int ret;
- char b[4];
- if ((ret=readfd(f,b,4)) != 4) {
- if (verbose > 1)
- fprintf(stderr,"Error reading %d bytes : %s\n",
- 4,ret==-1?strerror(errno):"EOF");
- exit(1);
- }
- total_read += 4;
- return IVAL(b,0);
-}
-
-void read_buf(int f,char *buf,int len)
-{
- int ret;
- if ((ret=readfd(f,buf,len)) != len) {
- if (verbose > 1)
- fprintf(stderr,"Error reading %d bytes : %s\n",
- len,ret==-1?strerror(errno):"EOF");
- exit(1);
- }
- total_read += len;
+ static size_t remaining;
+ int tag, ret = 0;
+ char line[1024];
+
+ if (!io_multiplexing_in || fd != multiplex_in_fd)
+ return read_timeout(fd, buf, len);
+
+ while (ret == 0) {
+ if (remaining) {
+ len = MIN(len, remaining);
+ read_loop(fd, buf, len);
+ remaining -= len;
+ ret = len;
+ continue;
+ }
+
+ read_loop(fd, line, 4);
+ tag = IVAL(line, 0);
+
+ remaining = tag & 0xFFFFFF;
+ tag = tag >> 24;
+
+ if (tag == MPLEX_BASE)
+ continue;
+
+ tag -= MPLEX_BASE;
+
+ if (tag != FERROR && tag != FINFO) {
+ rprintf(FERROR, "unexpected tag %d\n", tag);
+ exit_cleanup(RERR_STREAMIO);
+ }
+
+ if (remaining > sizeof(line) - 1) {
+ rprintf(FERROR, "multiplexing overflow %d\n\n",
+ remaining);
+ exit_cleanup(RERR_STREAMIO);
+ }
+
+ read_loop(fd, line, remaining);
+ line[remaining] = 0;
+
+ rprintf((enum logcode) tag, "%s", line);
+ remaining = 0;
+ }
+
+ return ret;
+}
+
+
+
+/**
+ * Do a buffered read from @p fd. Don't return until all @p n bytes
+ * have been read. If all @p n can't be read then exit with an
+ * error.
+ **/
+static void readfd (int fd, char *buffer, size_t N)
+{
+ int ret;
+ size_t total=0;
+
+ while (total < N) {
+ io_flush();
+
+ ret = read_unbuffered (fd, buffer + total, N-total);
+ total += ret;
+ }
+
+ stats.total_read += total;
+}
+
+
+int32 read_int(int f)
+{
+ char b[4];
+ int32 ret;
+
+ readfd(f,b,4);
+ ret = IVAL(b,0);
+ if (ret == (int32)0xffffffff) return -1;
+ return ret;
+}
+
+int64 read_longint(int f)
+{
+ extern int remote_version;
+ int64 ret;
+ char b[8];
+ ret = read_int(f);
+
+ if ((int32)ret != (int32)0xffffffff) {
+ return ret;
+ }
+
+#ifdef NO_INT64
+ rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
+ exit_cleanup(RERR_UNSUPPORTED);
+#else
+ if (remote_version >= 16) {
+ readfd(f,b,8);
+ ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
+ }
+#endif
+
+ return ret;
+}
+
+void read_buf(int f,char *buf,size_t len)
+{
+ readfd(f,buf,len);
+}
+
+void read_sbuf(int f,char *buf,size_t len)
+{
+ read_buf (f,buf,len);
+ buf[len] = 0;