+ read_loop(fd, line, 4);
+ tag = IVAL(line, 0);
+
+ msg_bytes = tag & 0xFFFFFF;
+ tag = (tag >> 24) - MPLEX_BASE;
+
+ no_flush++;
+
+ switch (tag) {
+ case MSG_DATA:
+ if (msg_bytes > iobuf_in_siz) {
+ if (!(iobuf_in = realloc_array(iobuf_in, char, msg_bytes)))
+ out_of_memory("read_a_msg");
+ iobuf_in_siz = msg_bytes;
+ }
+ read_loop(fd, iobuf_in, msg_bytes);
+ iobuf_in_remaining = msg_bytes;
+ iobuf_in_ndx = 0;
+ break;
+ case MSG_DONE:
+ if (msg_bytes > 1 || !am_generator)
+ goto invalid_msg;
+ if (msg_bytes) {
+ read_loop(fd, line, 1);
+ stats.total_read = read_varlong(fd, 3);
+ }
+ msgdone_cnt++;
+ break;
+ case MSG_REDO:
+ if (msg_bytes != 4 || !am_generator)
+ goto invalid_msg;
+ read_loop(fd, line, 4);
+ got_flist_entry_status(FES_REDO, line);
+ break;
+ case MSG_FLIST:
+ if (msg_bytes != 4 || !am_generator || !inc_recurse)
+ goto invalid_msg;
+ read_loop(fd, line, 4);
+ /* Read extra file list from receiver. */
+ if (DEBUG_GTE(FLIST, 2)) {
+ rprintf(FINFO, "[%s] receiving flist for dir %d\n",
+ who_am_i(), IVAL(line, 0));
+ }
+ flist_parent = IVAL(line, 0);
+ break;
+ case MSG_FLIST_EOF:
+ if (msg_bytes != 0 || !am_generator || !inc_recurse)
+ goto invalid_msg;
+ flist_eof = 1;
+ break;
+ case MSG_IO_ERROR:
+ if (msg_bytes != 4 || am_sender)
+ goto invalid_msg;
+ read_loop(fd, line, 4);
+ io_error |= IVAL(line, 0);
+ if (!am_generator)
+ send_msg_int(MSG_IO_ERROR, IVAL(line, 0));
+ break;
+ case MSG_NOOP:
+ if (am_sender)
+ maybe_send_keepalive();
+ break;
+ case MSG_DEL_STATS:
+ if (msg_bytes)
+ goto invalid_msg;
+ read_del_stats(fd);
+ if (am_sender && am_server)
+ write_del_stats(sock_f_out);
+ break;
+ case MSG_DELETED:
+ if (msg_bytes >= sizeof line)
+ goto overflow;
+ if (am_generator) {
+ read_loop(fd, line, msg_bytes);
+ send_msg(MSG_DELETED, line, msg_bytes, 1);
+ break;
+ }
+#ifdef ICONV_OPTION
+ if (ic_recv != (iconv_t)-1) {
+ xbuf outbuf, inbuf;
+ char ibuf[512];
+ int add_null = 0;
+
+ INIT_CONST_XBUF(outbuf, line);
+ INIT_XBUF(inbuf, ibuf, 0, (size_t)-1);
+
+ while (msg_bytes) {
+ inbuf.len = msg_bytes > sizeof ibuf
+ ? sizeof ibuf : msg_bytes;
+ read_loop(fd, inbuf.buf, inbuf.len);
+ if (!(msg_bytes -= inbuf.len)
+ && !ibuf[inbuf.len-1])
+ inbuf.len--, add_null = 1;
+ if (iconvbufs(ic_send, &inbuf, &outbuf,
+ ICB_INCLUDE_BAD | ICB_INCLUDE_INCOMPLETE) < 0)
+ goto overflow;
+ }
+ if (add_null) {
+ if (outbuf.len == outbuf.size)
+ goto overflow;
+ outbuf.buf[outbuf.len++] = '\0';
+ }
+ msg_bytes = outbuf.len;
+ } else
+#endif
+ read_loop(fd, line, msg_bytes);
+ /* A directory name was sent with the trailing null */
+ if (msg_bytes > 0 && !line[msg_bytes-1])
+ log_delete(line, S_IFDIR);
+ else {
+ line[msg_bytes] = '\0';
+ log_delete(line, S_IFREG);
+ }
+ break;
+ case MSG_SUCCESS:
+ if (msg_bytes != 4) {
+ invalid_msg:
+ rprintf(FERROR, "invalid multi-message %d:%lu [%s%s]\n",
+ tag, (unsigned long)msg_bytes, who_am_i(),
+ inc_recurse ? "/inc" : "");
+ exit_cleanup(RERR_STREAMIO);
+ }
+ read_loop(fd, line, 4);
+ if (am_generator)
+ got_flist_entry_status(FES_SUCCESS, line);
+ else
+ successful_send(IVAL(line, 0));
+ break;
+ case MSG_NO_SEND:
+ if (msg_bytes != 4)
+ goto invalid_msg;
+ read_loop(fd, line, 4);
+ if (am_generator)
+ got_flist_entry_status(FES_NO_SEND, line);
+ else
+ send_msg_int(MSG_NO_SEND, IVAL(line, 0));
+ break;
+ case MSG_ERROR_SOCKET:
+ case MSG_ERROR_UTF8:
+ case MSG_CLIENT:
+ case MSG_LOG:
+ if (!am_generator)
+ goto invalid_msg;
+ if (tag == MSG_ERROR_SOCKET)
+ io_end_multiplex_out();
+ /* FALL THROUGH */
+ case MSG_INFO:
+ case MSG_ERROR:
+ case MSG_ERROR_XFER:
+ case MSG_WARNING:
+ if (msg_bytes >= sizeof line) {
+ overflow:
+ rprintf(FERROR,
+ "multiplexing overflow %d:%lu [%s%s]\n",
+ tag, (unsigned long)msg_bytes, who_am_i(),
+ inc_recurse ? "/inc" : "");
+ exit_cleanup(RERR_STREAMIO);
+ }
+ read_loop(fd, line, msg_bytes);
+ rwrite((enum logcode)tag, line, msg_bytes, !am_generator);
+ if (first_message) {
+ if (list_only && !am_sender && tag == 1) {
+ line[msg_bytes] = '\0';
+ check_for_d_option_error(line);
+ }
+ first_message = 0;
+ }
+ break;
+ default:
+ rprintf(FERROR, "unexpected tag %d [%s%s]\n",
+ tag, who_am_i(), inc_recurse ? "/inc" : "");
+ exit_cleanup(RERR_STREAMIO);
+ }
+
+ msg_fd_in = save_msg_fd_in;
+ no_flush--;
+
+ if (flist_parent >= 0) {
+ struct file_list *flist = recv_file_list(fd);
+ flist->parent_ndx = flist_parent;
+#ifdef SUPPORT_HARD_LINKS
+ if (preserve_hard_links)
+ match_hard_links(flist);
+#endif
+ }
+}
+
+/* Read from the file descriptor handling multiplexing and return the
+ * number of bytes read. Never returns <= 0. */