The Diamond Queue (Demultiplexer to set of threads to Multiplexer)

In this article we present the DiamondQueue, which is nothing more than thread A sending through a demultiplexer a bunch of requests to a fixed set of worker threads, then this set of worker threads sending the results to thread B through a multiplexer. It is important to note that thread A and thread B can be the same thread. The DiamondQueue also supports lanes to enforce message order when needed.

Below a simple example of how to use the DiamondQueue:

package com.coralblocks.coralqueue.test;

import java.util.Random;

import com.coralblocks.coralbits.util.ThreadUtils;
import com.coralblocks.coralqueue.diamond.AtomicDiamondQueue;
import com.coralblocks.coralqueue.diamond.DiamondQueue;
import com.coralblocks.coralqueue.diamond.Input;
import com.coralblocks.coralqueue.diamond.Output;
import com.coralblocks.coralqueue.diamond.Worker;

public class TestDiamond {
	
	private static final Random RAND = new Random();
	
	public static class AddWorker extends Worker {
		
		public int x;
		public int y;
		public int result;
		
		@Override
		public boolean execute() {
			ThreadUtils.sleep(RAND.nextInt(10));
			this.result = x + y;
			return true; // successful!
		}
	}
	
	public static void main(String[] args) {
		
		DiamondQueue<AddWorker> diamond = new AtomicDiamondQueue<AddWorker>(
				512 /* capacity per worker thread lane */, 
				AddWorker.class,  /* the worker class being used */
				2 /* number of worker threads */,
				new int[] { 1, 2 } /* proc_ids to bind worker threads */ );
		
		Input<AddWorker> input = diamond.getInput();
		Output<AddWorker> output = diamond.getOutput();
		
		// start all worker threads...
		diamond.start(false); // false = non-daemon thread...
		
		final int TOTAL_NUMBER_OF_MSG_TO_SEND = 8;
		
		// FIRST SCENARIO: No order
		
		for(int i = 0; i < TOTAL_NUMBER_OF_MSG_TO_SEND; i++) {
		
			AddWorker aw = null;
			
			final int lane = -1; // choose a random free lane...
			
			while((aw = input.nextToDispatch(lane)) == null); // busy spin wait strategy...
			
			aw.x = RAND.nextInt(50);
			aw.y = RAND.nextInt(50);
			
			System.out.println("===> Sending: lane=" + lane + " x=" + aw.x + " y=" + aw.y);
		}
		
		input.flush(true); // send the messages down the diamond...
		
		int messagesReceived = 0;
		
		while(messagesReceived < TOTAL_NUMBER_OF_MSG_TO_SEND) {
		
			long avail = output.availableToPoll();
			
			if (avail > 0) {
				
				for(long x = 0; x < avail; x++) {
					
					AddWorker aw = output.poll();
					
					if (aw.wasSuccessful()) {
					
						System.out.println("===> Receiving: x=" + aw.x + " y=" + aw.y + " result=" + aw.result);
						
					} else {
						
						System.out.println("XXXX> Receiving: x=" + aw.x + " y=" + aw.y + " result=" + aw.getException());
						if (aw.getException() != null) aw.getException().printStackTrace();
					}
					
					messagesReceived++;
				}
				
				output.donePolling(true);
				
			} else {
				
				// busy spin wait strategy...
			}
		}
		
		System.out.println();
		
		// SECOND SCENARIO: Ordered
		
		for(int i = 0; i < TOTAL_NUMBER_OF_MSG_TO_SEND; i++) {
		
			AddWorker aw = null;
			
			// lanes guarantee order... (same lane => same order)
			final int lane = (i % diamond.getNumberOfWorkerThreads());
			
			while((aw = input.nextToDispatch(lane)) == null); // busy spin wait strategy...
			
			aw.x = RAND.nextInt(50);
			aw.y = RAND.nextInt(50);
			
			System.out.println("===> Sending: lane=" + lane + " x=" + aw.x + " y=" + aw.y);
		}
		
		input.flush(true); // send the messages down the diamond...
		
		messagesReceived = 0;
		
		while(messagesReceived < TOTAL_NUMBER_OF_MSG_TO_SEND) {
		
			long avail = output.availableToPoll();
			
			if (avail > 0) {
				
				for(long x = 0; x < avail; x++) {
					
					AddWorker aw = output.poll();
					
					if (aw.wasSuccessful()) {
						
						System.out.println("===> Receiving: x=" + aw.x + " y=" + aw.y + " result=" + aw.result);
						
					} else {
						
						System.out.println("XXXX> Receiving: x=" + aw.x + " y=" + aw.y + " result=" + aw.getException());
						if (aw.getException() != null) aw.getException().printStackTrace();
					}
					
					messagesReceived++;
				}
				
				output.donePolling(true);
				
			} else {
				
				// busy spin wait strategy...
			}
		}
		
		diamond.stop(); // stop all worker threads...
	}
}

Output:

===> Sending: lane=-1 x=18 y=30
===> Sending: lane=-1 x=49 y=5
===> Sending: lane=-1 x=47 y=15
===> Sending: lane=-1 x=35 y=38
===> Sending: lane=-1 x=26 y=6
===> Sending: lane=-1 x=17 y=34
===> Sending: lane=-1 x=15 y=8
===> Sending: lane=-1 x=22 y=10
===> Receiving: x=49 y=5 result=54
===> Receiving: x=35 y=38 result=73
===> Receiving: x=17 y=34 result=51
===> Receiving: x=22 y=10 result=32
===> Receiving: x=18 y=30 result=48
===> Receiving: x=47 y=15 result=62
===> Receiving: x=26 y=6 result=32
===> Receiving: x=15 y=8 result=23

===> Sending: lane=0 x=43 y=44
===> Sending: lane=1 x=8 y=19
===> Sending: lane=0 x=14 y=25
===> Sending: lane=1 x=6 y=7
===> Sending: lane=0 x=28 y=11
===> Sending: lane=1 x=8 y=12
===> Sending: lane=0 x=40 y=26
===> Sending: lane=1 x=34 y=3
===> Receiving: x=43 y=44 result=87
===> Receiving: x=14 y=25 result=39
===> Receiving: x=28 y=11 result=39
===> Receiving: x=40 y=26 result=66
===> Receiving: x=8 y=19 result=27
===> Receiving: x=6 y=7 result=13
===> Receiving: x=8 y=12 result=20
===> Receiving: x=34 y=3 result=37

The DiamondQueue interface for reference:

package com.coralblocks.coralqueue.diamond;

public interface DiamondQueue<E extends Worker> {
	
	public Input<E> getInput();
	
	public Output<E> getOutput();
	
	public int getNumberOfWorkerThreads();
	
	public void start(boolean deamon);
	
	public void stop();
	
}