Multicasting with CoralQueue through a Splitter

In this article we show how to use CoralQueue to multicast/broadcast the same message to multiple consumers so each consumer receives and processes all messages. We also present throughput numbers for different configurations, each one using a different set of cpu cores.

The Splitter

Below an example of how to use the Splitter:

package com.coralblocks.coralqueue.sample.splitter;

import com.coralblocks.coralqueue.splitter.AtomicSplitter;
import com.coralblocks.coralqueue.splitter.Consumer;
import com.coralblocks.coralqueue.splitter.Splitter;

public class SampleWithConsumer {

	private static final int NUMBER_OF_CONSUMERS = 4;
	
	public static void main(String[] args) throws InterruptedException {
		
		final Splitter<StringBuilder> splitter = new AtomicSplitter<StringBuilder>(1024, StringBuilder.class, NUMBER_OF_CONSUMERS);
		
		Thread[] consumers = new Thread[NUMBER_OF_CONSUMERS];
		
		for(int i = 0; i < consumers.length; i++) {
			
    		consumers[i] = new Thread("Consumer-" + i) {
    			
    			private final Consumer<StringBuilder> consumer = splitter.nextConsumer();
    			
    			@Override
    			public void run() {
    				
    				boolean running = true;
    				
    				while(running) {
    					long avail;
    					while((avail = consumer.availableToPoll()) == 0); // busy spin
    					for(int i = 0; i < avail; i++) {
    						StringBuilder sb = consumer.poll();
    						if (sb == null) break; // mandatory for splitters!
    						if (sb.length() == 0) {
    							running = false;
    							break; // exit immediately...
    						}
    						System.out.println("got " + sb.toString() + " at consumer " + consumer.getIndex());
    					}
    					consumer.donePolling();
    				}
    			}
    		};
    		
    		consumers[i].start();
		}
		
		StringBuilder sb;
		
		for(int i = 0; i < 3; i++) {
			while((sb = splitter.nextToDispatch()) == null); // busy spin
			sb.setLength(0);
			sb.append("message ").append(i);
			splitter.flush();
		}
		
		// send a message to stop consumers...
		while((sb = splitter.nextToDispatch()) == null); // busy spin
		sb.setLength(0);
		splitter.flush();
		
		for(int i = 0; i < consumers.length; i++) consumers[i].join();
	}
}

Throughput Numbers

The machine used to run the benchmark tests was an Intel i7 quad-core (4 x 3.50GHz) Ubuntu box overclocked to 4.50Ghz.

One producer pinned to its own core sending to two consumers, each pinned to its own core:

Results: Iterations: 20 | Avg Time: 206.235 millis | Min Time: 203.929 millis | Max Time: 207.908 millis | Nano Timing Cost: 14.0 nanos
Average time to send 10,000,000 messages per pass in 20 passes: 206,235,222 nanos
Messages per second: 48,488,322

One producer pinned to its own core sending to two consumers, each pinned to the same core through hyper-threading:

Results: Iterations: 20 | Avg Time: 217.354 millis | Min Time: 216.239 millis | Max Time: 218.286 millis | Nano Timing Cost: 14.0 nanos
Average time to send 10,000,000 messages per pass in 20 passes: 217,353,789 nanos
Messages per second: 46,007,939

One producer pinned to its own core sending to four consumers each two using one core through hyper-threading:

Results: Iterations: 20 | Avg Time: 225.742 millis | Min Time: 224.252 millis | Max Time: 228.717 millis | Nano Timing Cost: 14.0 nanos
Average time to send 10,000,000 messages per pass in 20 passes: 225,742,165 nanos
Messages per second: 44,298,325

Conclusions

CoralQueue can multicast messages to a set of consumers at an approximate rate of 45 million messages per second. It is very simple to use the Splitter to broadcast messages from a single producer to multiple consumers.