+ tag = (tag >> 24) - MPLEX_BASE;
+
+ switch (tag) {
+ case MSG_DONE:
+ if (len != 0 || !am_generator) {
+ invalid_msg:
+ rprintf(FERROR, "invalid message %d:%d [%s%s]\n",
+ tag, len, who_am_i(),
+ incremental ? "/incremental" : "");
+ exit_cleanup(RERR_STREAMIO);
+ }
+ done_cnt++;
+ break;
+ case MSG_REDO:
+ if (len != 4 || !am_generator)
+ goto invalid_msg;
+ readfd_unbuffered(fd, buf, 4);
+ if (remove_source_files)
+ decrement_active_files(IVAL(buf,0));
+ flist_ndx_push(&redo_list, IVAL(buf,0));
+ if (incremental)
+ decrement_flist_in_progress(IVAL(buf,0), 1);
+ break;
+ case MSG_FLIST:
+ if (len != 4 || !am_generator || !incremental)
+ goto invalid_msg;
+ readfd_unbuffered(fd, buf, 4);
+ /* Read extra file list from receiver. */
+ assert(iobuf_in != NULL);
+ assert(iobuf_f_in == fd);
+ flist = recv_file_list(fd);
+ flist->parent_ndx = IVAL(buf,0);
+ break;
+ case MSG_FLIST_EOF:
+ if (len != 0 || !am_generator || !incremental)
+ goto invalid_msg;
+ flist_eof = 1;
+ break;
+ case MSG_DELETED:
+ if (len >= (int)sizeof buf || !am_generator)
+ goto invalid_msg;
+ readfd_unbuffered(fd, buf, len);
+ send_msg(MSG_DELETED, buf, len);
+ break;
+ case MSG_SUCCESS:
+ if (len != 4 || !am_generator)
+ goto invalid_msg;
+ readfd_unbuffered(fd, buf, len);
+ if (remove_source_files) {
+ decrement_active_files(IVAL(buf,0));
+ send_msg(MSG_SUCCESS, buf, len);
+ }
+ if (preserve_hard_links)
+ flist_ndx_push(&hlink_list, IVAL(buf,0));
+ if (incremental)
+ decrement_flist_in_progress(IVAL(buf,0), 0);
+ break;
+ case MSG_NO_SEND:
+ if (len != 4 || !am_generator)
+ goto invalid_msg;
+ readfd_unbuffered(fd, buf, len);
+ if (incremental)
+ decrement_flist_in_progress(IVAL(buf,0), 0);
+ break;
+ case MSG_SOCKERR:
+ case MSG_CLIENT:
+ if (!am_generator)
+ goto invalid_msg;
+ if (tag == MSG_SOCKERR)
+ io_end_multiplex_out();
+ /* FALL THROUGH */
+ case MSG_INFO:
+ case MSG_ERROR:
+ case MSG_LOG:
+ while (len) {
+ n = len;
+ if (n >= sizeof buf)
+ n = sizeof buf - 1;
+ readfd_unbuffered(fd, buf, n);
+ rwrite((enum logcode)tag, buf, n);
+ len -= n;
+ }
+ break;
+ default:
+ rprintf(FERROR, "unknown message %d:%d [%s]\n",
+ tag, len, who_am_i());
+ exit_cleanup(RERR_STREAMIO);
+ }