+
+ if ((ff_xb.len = s - sob) == 0)
+ ff_lastchar = '\0';
+ else {
+ /* Handle a partial string specially, saving any incomplete chars. */
+ flags &= ~ICB_INCLUDE_INCOMPLETE;
+ if (iconvbufs(ic_send, &ff_xb, &iobuf.out, flags) < 0) {
+ if (errno == E2BIG)
+ exit_cleanup(RERR_PROTOCOL); /* impossible? */
+ if (ff_xb.pos)
+ memmove(ff_xb.buf, ff_xb.buf + ff_xb.pos, ff_xb.len);
+ }
+ ff_lastchar = 'x'; /* Anything non-zero. */
+ }
+ } else
+#endif
+
+ if (len) {
+ char *f = ff_xb.buf + ff_xb.pos;
+ char *t = ff_xb.buf;
+ char *eob = f + len;
+ /* Eliminate any multi-'\0' runs. */
+ while (f != eob) {
+ if (!(*t++ = *f++)) {
+ while (f != eob && *f == '\0')
+ f++;
+ }
+ }
+ ff_lastchar = f[-1];
+ if ((len = t - ff_xb.buf) != 0) {
+ /* This will not circle back to perform_io() because we only get
+ * called when there is plenty of room in the output buffer. */
+ write_buf(iobuf.out_fd, ff_xb.buf, len);
+ }
+ }
+}
+
+/* Perform buffered input and output until specified conditions are met. When
+ * given a "needed" read requirement, we'll return without doing any I/O if the
+ * iobuf.in bytes are already available. When reading, we'll read as many
+ * bytes as we can into the buffer, and return as soon as we meet the minimum
+ * read requirement. When given a "needed" write requirement, we'll return
+ * without doing any I/O if that many bytes will fit in the output buffer (we
+ * check either iobuf.out or iobuf.msg, depending on the flags). When writing,
+ * we write out as much as we can, and return as soon as the given free-space
+ * requirement is available.
+ *
+ * The iobuf.out and iobuf.msg buffers are circular, so some writes into them
+ * will need to be split when the data needs to wrap around to the start. In
+ * order to help make this easier for some operations (such as the use of
+ * SIVAL() into the buffer) the buffers MUST have 4 bytes of overflow space at
+ * the end that is not not counted in the "size". The iobuf.in buffer is not
+ * (currently) circular. To facilitate the handling of MSG_DATA bytes as they
+ * are read-from/written-into the buffers, see the three raw_* iobuf vars.
+ *
+ * When writing, we flush data in the following priority order:
+ *
+ * 1. Finish writing any in-progress MSG_DATA sequence from iobuf.out.
+ *
+ * 2. Write out all the messages from the message buf (if iobuf.msg is active).
+ * Yes, this means that a PIO_NEED_OUTROOM call will completely flush any
+ * messages before getting to the iobuf.out flushing (except for rule 1).
+ *
+ * 3. Write out the raw data from iobuf.out, possibly filling in the multiplexed
+ * MSG_DATA header that was pre-allocated (when output is multiplexed).
+ *
+ * TODO: items for possible future work:
+ *
+ * - Make this routine able to read the generator-to-receiver batch flow?
+ *
+ * - Make the input buffer circular?
+ *
+ * Unlike the old routines that this replaces, it is OK to read ahead as far as
+ * we can because the read_a_msg() routine now reads its bytes out of the input
+ * buffer. In the old days, only raw data was in the input buffer, and any
+ * unused raw data in the buf would prevent the reading of socket data. */
+static char *perform_io(size_t needed, int flags)
+{
+ fd_set r_fds, e_fds, w_fds;
+ struct timeval tv;
+ int cnt, max_fd;
+ size_t empty_buf_len = 0;
+ xbuf *out;
+ char *data;
+
+ if (iobuf.in.len == 0 && iobuf.in.pos != 0) {
+ if (iobuf.raw_input_ends_before)
+ iobuf.raw_input_ends_before -= iobuf.in.pos;
+ iobuf.in.pos = 0;
+ }
+
+ switch (flags & PIO_NEED_FLAGS) {
+ case PIO_NEED_INPUT:
+ if (DEBUG_GTE(IO, 3)) {
+ rprintf(FINFO, "[%s] perform_io(%ld, %sinput)\n",
+ who_am_i(), (long)needed, flags & PIO_CONSUME_INPUT ? "consume&" : "");
+ }
+
+ /* Make sure the input buffer is big enough to hold "needed" bytes.
+ * Also make sure it will fit in the free space at the end, or
+ * else we need to shift some bytes. */
+ if (needed && iobuf.in.size < needed) {
+ if (!(iobuf.in.buf = realloc_array(iobuf.in.buf, char, needed)))
+ out_of_memory("perform_io");
+ if (DEBUG_GTE(IO, 4)) {
+ rprintf(FINFO, "[%s] resized input buffer from %ld to %ld bytes.\n",
+ who_am_i(), (long)iobuf.in.size, (long)needed);
+ }
+ iobuf.in.size = needed;
+ }
+ if (iobuf.in.size - iobuf.in.pos < needed
+ || (iobuf.in.len < needed && iobuf.in.len < 1024
+ && iobuf.in.size - (iobuf.in.pos + iobuf.in.len) < 1024)) {
+ memmove(iobuf.in.buf, iobuf.in.buf + iobuf.in.pos, iobuf.in.len);
+ if (DEBUG_GTE(IO, 4)) {
+ rprintf(FINFO,
+ "[%s] moved %ld bytes from %ld to 0 in the input buffer (size=%ld, needed=%ld).\n",
+ who_am_i(), (long)iobuf.in.len, (long)iobuf.in.pos, (long)iobuf.in.size, (long)needed);
+ }
+ if (iobuf.raw_input_ends_before)
+ iobuf.raw_input_ends_before -= iobuf.in.pos;
+ iobuf.in.pos = 0;