In this article we show how to use CoralQueue to multicast/broadcast the same message to multiple consumers so each consumer receives and processes all messages. We also present throughput numbers for different configurations, each one using a different set of cpu cores.
The Splitter
Below an example of how to use the Splitter:
package com.coralblocks.coralqueue.sample.splitter;
import com.coralblocks.coralqueue.splitter.AtomicSplitter;
import com.coralblocks.coralqueue.splitter.Consumer;
import com.coralblocks.coralqueue.splitter.Splitter;
public class SampleWithConsumer {
private static final int NUMBER_OF_CONSUMERS = 4;
public static void main(String[] args) throws InterruptedException {
final Splitter<StringBuilder> splitter = new AtomicSplitter<StringBuilder>(1024, StringBuilder.class, NUMBER_OF_CONSUMERS);
Thread[] consumers = new Thread[NUMBER_OF_CONSUMERS];
for(int i = 0; i < consumers.length; i++) {
consumers[i] = new Thread("Consumer-" + i) {
private final Consumer<StringBuilder> consumer = splitter.nextConsumer();
@Override
public void run() {
boolean running = true;
while(running) {
long avail;
while((avail = consumer.availableToPoll()) == 0); // busy spin
for(int i = 0; i < avail; i++) {
StringBuilder sb = consumer.poll();
if (sb == null) break; // mandatory for splitters!
if (sb.length() == 0) {
running = false;
break; // exit immediately...
}
System.out.println("got " + sb.toString() + " at consumer " + consumer.getIndex());
}
consumer.donePolling();
}
}
};
consumers[i].start();
}
StringBuilder sb;
for(int i = 0; i < 3; i++) {
while((sb = splitter.nextToDispatch()) == null); // busy spin
sb.setLength(0);
sb.append("message ").append(i);
splitter.flush();
}
// send a message to stop consumers...
while((sb = splitter.nextToDispatch()) == null); // busy spin
sb.setLength(0);
splitter.flush();
for(int i = 0; i < consumers.length; i++) consumers[i].join();
}
}
Throughput Numbers
The machine used to run the benchmark tests was an Intel i7 quad-core (4 x 3.50GHz) Ubuntu box overclocked to 4.50Ghz.
One producer pinned to its own core sending to two consumers, each pinned to its own core:
Results: Iterations: 20 | Avg Time: 206.235 millis | Min Time: 203.929 millis | Max Time: 207.908 millis | Nano Timing Cost: 14.0 nanos Average time to send 10,000,000 messages per pass in 20 passes: 206,235,222 nanos Messages per second: 48,488,322
One producer pinned to its own core sending to two consumers, each pinned to the same core through hyper-threading:
Results: Iterations: 20 | Avg Time: 217.354 millis | Min Time: 216.239 millis | Max Time: 218.286 millis | Nano Timing Cost: 14.0 nanos Average time to send 10,000,000 messages per pass in 20 passes: 217,353,789 nanos Messages per second: 46,007,939
One producer pinned to its own core sending to four consumers each two using one core through hyper-threading:
Results: Iterations: 20 | Avg Time: 225.742 millis | Min Time: 224.252 millis | Max Time: 228.717 millis | Nano Timing Cost: 14.0 nanos Average time to send 10,000,000 messages per pass in 20 passes: 225,742,165 nanos Messages per second: 44,298,325
Conclusions
CoralQueue can multicast messages to a set of consumers at an approximate rate of 45 million messages per second. It is very simple to use the Splitter to broadcast messages from a single producer to multiple consumers.