Multiplexing with CoralQueue

In this article we analyze how CoralQueue implements a multiplexer to allow multiple producers to send messages to a single consumer. We then present the throughput numbers for different set of configurations with different set of cpu cores.

The Multiplexer

Below an example of using the AtomicMux:

package com.coralblocks.coralqueue.sample.mux;

import com.coralblocks.coralbits.util.Builder;
import com.coralblocks.coralqueue.mux.AtomicMux;
import com.coralblocks.coralqueue.mux.Mux;
import com.coralblocks.coralqueue.mux.Producer;

public class SampleWithProducer {
	
	private static final int NUMBER_OF_PRODUCERS = 4;
	
	public static void main(String[] args) throws InterruptedException {
		
		Builder<StringBuilder> builder = new Builder<StringBuilder>() {
			@Override
            public StringBuilder newInstance() {
	            return new StringBuilder(1024);
            }
		};
		
		final Mux<StringBuilder> mux = new AtomicMux<StringBuilder>(1024, builder, NUMBER_OF_PRODUCERS);
		
		Thread[] producers = new Thread[NUMBER_OF_PRODUCERS];
		
		for(int i = 0; i < producers.length; i++) {
			
			producers[i] = new Thread(new Runnable() {
				
				private final Producer<StringBuilder> producer = mux.nextProducer();

				@Override
                public void run() {
					
					StringBuilder sb;
					
					for(int j = 0; j < 4; j++) {
						while((sb = producer.nextToDispatch()) == null); // busy spin
						sb.setLength(0);
						sb.append("message ").append(j).append(" from producer ").append(producer.getIndex());
						producer.flush();
					}
					
					// send final message
					while((sb = producer.nextToDispatch()) == null); // busy spin
					sb.setLength(0); // empty string to signal we are done
					producer.flush();
                }
			}, "Producer" + i);
		}
		
		Thread consumer = new Thread(new Runnable() {

			@Override
            public void run() {
				
				boolean running = true;
				int finalMessages = 0;
				
				while(running) {
					
					long avail;
					while((avail = mux.availableToPoll()) == 0); // busy spin
					for(int i = 0; i < avail; i++) {
						StringBuilder sb = mux.poll();
						if (sb.length() == 0) {
							if (++finalMessages == NUMBER_OF_PRODUCERS) {
								// and we are done!
								running = false;
								break;
							}
						} else {
							System.out.println(sb.toString());
						}
					}
					mux.donePolling();
				}
            }
		}, "Consumer");
		
		consumer.start();
		for(int i = 0; i < producers.length; i++) producers[i].start();
		
		consumer.join();
		for(int i = 0; i < producers.length; i++) producers[i].join();
	}
}

Throughput Numbers

The machine used to run the benchmark tests was an Intel i7 quad-core (4 x 3.50GHz) Ubuntu box overclocked to 4.50Ghz.

Two producers pinned to their own cores sending messages to one consumer pinned to its own core:

Results: Iterations: 20 | Avg Time: 613.93 millis | Min Time: 583.798 millis | Max Time: 639.488 millis | Nano Timing Cost: 15.0 nanos
Average time to send 20,000,000 messages per pass in 20 passes: 613,929,765 nanos
Messages per second: 32,577,016

Two producers pinned to the same core with hyper-threading sending messages to one consumer pinned to its own core:

Results: Iterations: 20 | Avg Time: 560.601 millis | Min Time: 535.936 millis | Max Time: 576.715 millis | Nano Timing Cost: 14.0 nanos
Average time to send 20,000,000 messages per pass in 20 passes: 560,601,268 nanos
Messages per second: 35,675,980

Four producers pinned to two cores with hyper-threading sending messages to one consumer pinned to its own core:

Results: Iterations: 20 | Avg Time: 1.061 secs | Min Time: 1.03 secs | Max Time: 1.091 secs | Nano Timing Cost: 14.0 nanos
Average time to send 40,000,000 messages per pass in 20 passes: 1,060,708,245 nanos
Messages per second: 37,710,652

Conclusions

With CoralQueue you can easily send message from multiple producers to a single consumer. Its throughput numbers are between 30 to 40 million messages per second.