Demultiplexing with CoralQueue for Parallel Processing

In this article we examine how CoralQueue implements a demultiplexer, in other words, a one-producer-to-multiple-consumers queue. We also present throughput numbers for the three types of implementations supported: Atomic, CAS and modulus.

The Demultiplexer

A demultiplexer is a queue that accepts a single producer sending messages and distributes them across a set of consumers so that each message is only processed once in the consumer side. Of course you should only use a demultiplexer (i.e. demux) if your messages can be processed in parallel and out of order. A demux is very useful to avoid queue contention and speed things up when consuming messages can be potentially slower than producing them. By increasing the number of consumers running in their own dedicated cpu cores, you increase the horsepower in the consumer side to compensate for slower consumers.

Below a simple example of a Demux:

package com.coralblocks.coralqueue.sample.demux;

import com.coralblocks.coralqueue.demux.AtomicDemux;
import com.coralblocks.coralqueue.demux.Consumer;
import com.coralblocks.coralqueue.demux.Demux;

public class SampleWithConsumer {

	private static final int NUMBER_OF_CONSUMERS = 4;
	
	public static void main(String[] args) throws InterruptedException {
		
		final Demux<StringBuilder> demux = new AtomicDemux<StringBuilder>(1024, StringBuilder.class, NUMBER_OF_CONSUMERS);
		
		Thread[] consumers = new Thread[NUMBER_OF_CONSUMERS];
		
		for(int i = 0; i < consumers.length; i++) {
			
    		consumers[i] = new Thread("Consumer-" + i) {
    			
    			private final Consumer<StringBuilder> consumer = demux.nextConsumer();
    			
    			@Override
    			public void run() {
    				
    				boolean running = true;
    				
    				while(running) {
    					long avail;
    					while((avail = consumer.availableToPoll()) == 0); // busy spin
    					for(int i = 0; i < avail; i++) {
    						StringBuilder sb = consumer.poll();
    						if (sb == null) break; // mandatory for demuxes!
    						if (sb.length() == 0) {
    							running = false;
    							break; // exit immediately...
    						}
    						System.out.println(sb.toString());
    					}
    					consumer.donePolling();
    				}
    			}
    		};
    		
    		consumers[i].start();
		}
		
		StringBuilder sb;
		
		for(int i = 0; i < 3; i++) {
			while((sb = demux.nextToDispatch()) == null); // busy spin
			sb.setLength(0);
			sb.append("message ").append(i);
			demux.flush();
		}
		
		// send a message to stop consumers...
		for(int i = 0; i < NUMBER_OF_CONSUMERS; i++) {
			// routing is being used here...
			while((sb = demux.nextToDispatch(i)) == null); // busy spin
			sb.setLength(0);
		}
		
		demux.flush(); // sent batch
		
		for(int i = 0; i < consumers.length; i++) consumers[i].join();
	}
}

NOTE: For demuxes it is mandatory to check if the object returned is null and break out of the loop. See line 33 in the example above.

Routing Messages

You can also choose to route a message to a specific consumer. To do that, all you have to do is call nextToDispatch(int consumerIndex) and you can be sure that your message will be processed by the specific consumer. That can be useful to partition certain types of messages to specific consumers and avoid processing them in parallel and out of order. You can also pass a negative number as the consumer index and the demux will fall back to sending the message to a random consumer.

Modulus vs Atomic vc CAS

Modulus is the fastest but it has the disadvantage that a slow consumer can cause the demux queue to fill up and block all consumers. Atomic and CAS do not have this drawback, in other words, a slow consumer will not affect the whole system as messages will be simply processed by other available consumers. Atomic maintains a queue for each consumer and supports routing. CAS does not support routing but has the ability to help a slow consumer by processing messages pending on his queue. Modulus does not support router either, only Atomic does.

Throughput Numbers

The machine used to run the benchmark tests was an Intel i7 quad-core (4 x 3.50GHz) Ubuntu box overclocked to 4.50Ghz.

ATOMIC: One producer pinned to its own core sending to two consumers, each pinned to its own core:

Results: Iterations: 20 | Avg Time: 390.42 millis | Min Time: 386.647 millis | Max Time: 395.172 millis | Nano Timing Cost: 14.0 nanos
Average time to send 10,000,000 messages per pass in 20 passes: 390420106 nanos
Messages per second: 25,613,434

ATOMIC: One producer pinned to its own core sending to two consumers, each pinned to the same core through hyper-threading:

Results: Iterations: 20 | Avg Time: 388.403 millis | Min Time: 383.134 millis | Max Time: 392.386 millis | Nano Timing Cost: 15.0 nanos
Average time to send 10,000,000 messages per pass in 20 passes: 388402712 nanos
Messages per second: 25,746,473

ATOMIC: One producer pinned to its own core sending to four consumers each two using one core through hyper-threading:

Results: Iterations: 20 | Avg Time: 506.881 millis | Min Time: 505.326 millis | Max Time: 509.197 millis | Nano Timing Cost: 14.0 nanos
Average time to send 10,000,000 messages per pass in 20 passes: 506881400 nanos
Messages per second: 19,728,480

MODULUS: One producer pinned to its own core sending to two consumers, each pinned to its own core:

Results: Iterations: 20 | Avg Time: 207.266 millis | Min Time: 203.14 millis | Max Time: 209.389 millis | Nano Timing Cost: 14.0 nanos
Average time to send 10,000,000 messages per pass in 20 passes: 207,266,446 nanos
Messages per second: 48,247,076

MODULUS: One producer pinned to its own core sending to two consumers, each pinned to the same core through hyper-threading:

Results: Iterations: 20 | Avg Time: 112.38 millis | Min Time: 111.737 millis | Max Time: 112.703 millis | Nano Timing Cost: 14.0 nanos
Average time to send 10,000,000 messages per pass in 20 passes: 112,380,297 nanos
Messages per second: 88,983,569

MODULUS: One producer pinned to its own core sending to four consumers each two using one core through hyper-threading:

Results: Iterations: 20 | Avg Time: 133.334 millis | Min Time: 123.943 millis | Max Time: 140.385 millis | Nano Timing Cost: 14.0 nanos
Average time to send 10,000,000 messages per pass in 20 passes: 133,334,460 nanos
Messages per second: 74,999,366

CAS: One producer pinned to its own core sending to two consumers, each pinned to its own core:

Results: Iterations: 20 | Avg Time: 583.802 millis | Min Time: 555.686 millis | Max Time: 588.794 millis | Nano Timing Cost: 14.0 nanos
Average time to send 10,000,000 messages per pass in 20 passes: 583,802,394 nanos
Messages per second: 17,129,083

CAS: One producer pinned to its own core sending to two consumers, each pinned to the same core through hyper-threading:

Results: Iterations: 20 | Avg Time: 396.183 millis | Min Time: 393.696 millis | Max Time: 401.272 millis | Nano Timing Cost: 14.0 nanos
Average time to send 10,000,000 messages per pass in 20 passes: 396,182,573 nanos
Messages per second: 25,240,888

CAS: One producer pinned to its own core sending to four consumers each two using one core through hyper-threading:

Results: Iterations: 20 | Avg Time: 567.278 millis | Min Time: 565.862 millis | Max Time: 569.079 millis | Nano Timing Cost: 14.0 nanos
Average time to send 10,000,000 messages per pass in 20 passes: 567,277,788 nanos
Messages per second: 17,628,047

Conclusions

If you need more horsepower in the consumer side to keep up with the producer, you can increase the number of consumers and process your messages in parallel by using a demultiplexer. If you need to route messages to a specific consumer, you can also do that with CoralQueue’s demultiplexer.