-static void msg2sndr_flush(void)
-{
- while (msg2sndr.head && io_multiplexing_out) {
- struct msg_list_item *m = msg2sndr.head;
- if (!(msg2sndr.head = m->next))
- msg2sndr.tail = NULL;
- stats.total_written += m->len;
- defer_forwarding_messages = 1;
- writefd_unbuffered(sock_f_out, m->buf, m->len);
- defer_forwarding_messages = 0;
- free(m);
+static void msg_flush(void)
+{
+ if (am_generator) {
+ while (msg_queue.head && io_multiplexing_out) {
+ struct msg_list_item *m = msg_queue.head;
+ int len = IVAL(m->buf, 0) & 0xFFFFFF;
+ int tag = *((uchar*)m->buf+3) - MPLEX_BASE;
+ if (!(msg_queue.head = m->next))
+ msg_queue.tail = NULL;
+ stats.total_written += len + 4;
+ defer_forwarding_messages++;
+ mplex_write(sock_f_out, tag, m->buf + 4, len, m->convert);
+ defer_forwarding_messages--;
+ free(m);
+ }
+ } else {
+ while (msg_queue.head) {
+ struct msg_list_item *m = msg_queue.head;
+ int len = IVAL(m->buf, 0) & 0xFFFFFF;
+ int tag = *((uchar*)m->buf+3) - MPLEX_BASE;
+ if (!(msg_queue.head = m->next))
+ msg_queue.tail = NULL;
+ defer_forwarding_messages++;
+ mplex_write(msg_fd_out, tag, m->buf + 4, len, m->convert);
+ defer_forwarding_messages--;
+ free(m);
+ }