Multiple-Producers to Multiple-Consumers Queue

In this article we give an example of how to use the MpmcQueue so you can transfer messages between any number of producers and consumers through a lock-less concurrent queue.

The Sample Code

Without further ado we list a sample code below:

package com.coralblocks.coralqueue.sample.mpmc;

import java.util.concurrent.atomic.AtomicLong;

import com.coralblocks.coralqueue.mpmc.Consumer;
import com.coralblocks.coralqueue.mpmc.MpmcQueue;
import com.coralblocks.coralqueue.mpmc.Producer;

public class Sample {

	private static final int NUMBER_OF_PRODUCERS = 4;
	private static final int NUMBER_OF_CONSUMERS = 2;
	
	public static void main(String[] args) throws InterruptedException {
		
		final MpmcQueue<StringBuilder> mpmc = new MpmcQueue<StringBuilder>(1024, StringBuilder.class, NUMBER_OF_PRODUCERS, NUMBER_OF_CONSUMERS);
		
		Thread[] consumers = new Thread[NUMBER_OF_CONSUMERS];
		
		for(int i = 0; i < consumers.length; i++) {
			
    		consumers[i] = new Thread("Consumer-" + i) {
    			
    			private final Consumer<StringBuilder> consumer = mpmc.nextConsumer();
    			
    			@Override
    			public void run() {
    				
    				boolean running = true;
    				
    				while(running) {
    					long avail;
    					while((avail = consumer.availableToPoll()) == 0); // busy spin
    					for(int i = 0; i < avail; i++) {
    						StringBuilder sb = consumer.poll();
    						if (sb == null) break; // mandatory for mpmc!
    						if (sb.length() == 0) {
								running = false;
								break; // exit immediately...
    						}
    						System.out.println(sb.toString() + " got by consumer " + consumer.getIndex());
    					}
    					consumer.donePolling();
    				}
    			}
    		};
    		
    		consumers[i].start();
		}
		
		Thread[] producers = new Thread[NUMBER_OF_PRODUCERS];
		
		final AtomicLong counter = new AtomicLong(0);
		
		for(int i = 0; i < producers.length; i++) {
			
    		producers[i] = new Thread("Producer-" + i) {
    			
    			private final Producer<StringBuilder> producer = mpmc.nextProducer();
    			
    			@Override
    			public void run() {

    				StringBuilder sb;
    				
    				for(int i = 0; i < 3; i++) {
    					while((sb = producer.nextToDispatch()) == null); // busy spin
    					long msgNumber = counter.getAndIncrement();
    					sb.setLength(0);
    					sb.append("message ").append(msgNumber);
    					System.out.println("sending message " + msgNumber + " from producer " + producer.getIndex());
    					producer.flush();
    				}
    			}
    		};
    		
    		producers[i].start();
		}
		
		for(int i = 0; i < producers.length; i++) producers[i].join();
		
		Thread.sleep(4000);
		
		Producer<StringBuilder> p = mpmc.getProducer(0);
	
		for(int i = 0; i < consumers.length; i++) {
			StringBuilder sb;
			// send a message to stop consumers...
			// routing is being used here...
			while((sb = p.nextToDispatch(i)) == null); // busy spin
			sb.setLength(0);
		}
		
		p.flush();
		
		for(int i = 0; i < consumers.length; i++) consumers[i].join();
	}
}


Routing Messages

You can also choose to route a message to a specific consumer. To do that, all you have to do is call nextToDispatch(int consumerIndex) and you can be sure that your message will be processed by the specific consumer. That can be useful to partition certain types of messages to specific consumers and avoid processing them in parallel and out of order. You can also pass a negative number as the consumer index and the mpmc queue will fall back to sending the message to a random consumer.

Conclusion

With CoralQueue you can easily build an architecture where multiple producers send messages to multiple consumers in a lock-free, super fast way.