CoralQueue (Straight to the Point)

Below the basics of CoralQueue:


Queue

One producer sending messages to one consumer. One thread sending to another thread.

package com.coralblocks.coralqueue.sample.queue;

import com.coralblocks.coralqueue.AtomicQueue;
import com.coralblocks.coralqueue.Queue;
import com.coralblocks.coralqueue.util.MutableLong;

public class Sample {
	
	public static void main(String[] args) throws InterruptedException {
		
		final Queue<MutableLong> queue = new AtomicQueue<MutableLong>(1024, MutableLong.class);
		
		Thread consumer = new Thread() {
			
			@Override
			public void run() {
				
				boolean running = true;
				
				while(running) {
					long avail;
					while((avail = queue.availableToPoll()) == 0); // busy spin
					for(int i = 0; i < avail; i++) {
						MutableLong ml = queue.poll();
						if (ml.get() == -1) { // message to flag exit...
							running = false;
							break;
						}
						System.out.println(ml.get());
					}
					queue.donePolling();
				}
			}
			
		};
		
		consumer.start();
		
		MutableLong ml;
		
		for(int i = 0; i < 10; i++) {
			while((ml = queue.nextToDispatch()) == null); // busy spin
			ml.set(System.nanoTime());
			queue.flush();
		}
		
		// send a message to stop consumer...
		while((ml = queue.nextToDispatch()) == null); // busy spin
		ml.set(-1);
		queue.flush();
		
		consumer.join(); // wait for the consumer thread to die...
	}
}


Demultiplexer

One producer distributing messages to multiple consumers. Each message is processed only once by a random consumer. One thread sending to many threads. Great for parallel processing. Note that you can also route a message to a specific consumer (line 60).

package com.coralblocks.coralqueue.sample.demux;

import com.coralblocks.coralqueue.demux.AtomicDemux;
import com.coralblocks.coralqueue.demux.Consumer;
import com.coralblocks.coralqueue.demux.Demux;

public class SampleWithConsumer {

	private static final int NUMBER_OF_CONSUMERS = 4;
	
	public static void main(String[] args) throws InterruptedException {
		
		final Demux<StringBuilder> demux = new AtomicDemux<StringBuilder>(1024, StringBuilder.class, NUMBER_OF_CONSUMERS);
		
		Thread[] consumers = new Thread[NUMBER_OF_CONSUMERS];
		
		for(int i = 0; i < consumers.length; i++) {
			
    		consumers[i] = new Thread("Consumer-" + i) {
    			
    			private final Consumer<StringBuilder> consumer = demux.nextConsumer();
    			
    			@Override
    			public void run() {
    				
    				boolean running = true;
    				
    				while(running) {
    					long avail;
    					while((avail = consumer.availableToPoll()) == 0); // busy spin
    					for(int i = 0; i < avail; i++) {
    						StringBuilder sb = consumer.poll();
    						if (sb == null) break; // mandatory for demuxes!
    						if (sb.length() == 0) {
    							running = false;
    							break; // exit immediately...
    						}
    						System.out.println(sb.toString());
    					}
    					consumer.donePolling();
    				}
    			}
    		};
    		
    		consumers[i].start();
		}
		
		StringBuilder sb;
		
		for(int i = 0; i < 3; i++) {
			while((sb = demux.nextToDispatch()) == null); // busy spin
			sb.setLength(0);
			sb.append("message ").append(i);
			demux.flush();
		}
		
		// send a message to stop consumers...
		for(int i = 0; i < NUMBER_OF_CONSUMERS; i++) {
			// routing is being used here...
			while((sb = demux.nextToDispatch(i)) == null); // busy spin
			sb.setLength(0);
		}
		
		demux.flush(); // sent batch
		
		for(int i = 0; i < consumers.length; i++) consumers[i].join();
	}
}


Multiplexer

Multiple producers pipelining (i.e. sending) messages to a single consumer. Many threads sending to one thread. Great for gathering results from parallel processing.

package com.coralblocks.coralqueue.sample.mux;

import com.coralblocks.coralbits.util.Builder;
import com.coralblocks.coralqueue.mux.AtomicMux;
import com.coralblocks.coralqueue.mux.Mux;
import com.coralblocks.coralqueue.mux.Producer;

public class SampleWithProducer {
	
	private static final int NUMBER_OF_PRODUCERS = 4;
	
	public static void main(String[] args) throws InterruptedException {
		
		Builder<StringBuilder> builder = new Builder<StringBuilder>() {
			@Override
            public StringBuilder newInstance() {
	            return new StringBuilder(1024);
            }
		};
		
		final Mux<StringBuilder> mux = new AtomicMux<StringBuilder>(1024, builder, NUMBER_OF_PRODUCERS);
		
		Thread[] producers = new Thread[NUMBER_OF_PRODUCERS];
		
		for(int i = 0; i < producers.length; i++) {
			
			producers[i] = new Thread(new Runnable() {
				
				private final Producer<StringBuilder> producer = mux.nextProducer();

				@Override
                public void run() {
					
					StringBuilder sb;
					
					for(int j = 0; j < 4; j++) {
						while((sb = producer.nextToDispatch()) == null); // busy spin
						sb.setLength(0);
						sb.append("message ").append(j).append(" from producer ").append(producer.getIndex());
						producer.flush();
					}
					
					// send final message
					while((sb = producer.nextToDispatch()) == null); // busy spin
					sb.setLength(0); // empty string to signal we are done
					producer.flush();
                }
			}, "Producer" + i);
		}
		
		Thread consumer = new Thread(new Runnable() {

			@Override
            public void run() {
				
				boolean running = true;
				int finalMessages = 0;
				
				while(running) {
					
					long avail;
					while((avail = mux.availableToPoll()) == 0); // busy spin
					for(int i = 0; i < avail; i++) {
						StringBuilder sb = mux.poll();
						if (sb.length() == 0) {
							if (++finalMessages == NUMBER_OF_PRODUCERS) {
								// and we are done!
								running = false;
								break;
							}
						} else {
							System.out.println(sb.toString());
						}
					}
					mux.donePolling();
				}
            }
		}, "Consumer");
		
		consumer.start();
		for(int i = 0; i < producers.length; i++) producers[i].start();
		
		consumer.join();
		for(int i = 0; i < producers.length; i++) producers[i].join();
	}
}


Splitter / Broadcaster

One producer sending messages to multiple consumers. Each message is processed by each and every consumer. One thread sending to many threads. Great for broadcasting messages to multiple threads.

package com.coralblocks.coralqueue.sample.splitter;

import com.coralblocks.coralqueue.splitter.AtomicSplitter;
import com.coralblocks.coralqueue.splitter.Consumer;
import com.coralblocks.coralqueue.splitter.Splitter;

public class SampleWithConsumer {

	private static final int NUMBER_OF_CONSUMERS = 4;
	
	public static void main(String[] args) throws InterruptedException {
		
		final Splitter<StringBuilder> splitter = new AtomicSplitter<StringBuilder>(1024, StringBuilder.class, NUMBER_OF_CONSUMERS);
		
		Thread[] consumers = new Thread[NUMBER_OF_CONSUMERS];
		
		for(int i = 0; i < consumers.length; i++) {
			
    		consumers[i] = new Thread("Consumer-" + i) {
    			
    			private final Consumer<StringBuilder> consumer = splitter.nextConsumer();
    			
    			@Override
    			public void run() {
    				
    				boolean running = true;
    				
    				while(running) {
    					long avail;
    					while((avail = consumer.availableToPoll()) == 0); // busy spin
    					for(int i = 0; i < avail; i++) {
    						StringBuilder sb = consumer.poll();
    						if (sb == null) break; // mandatory for splitters!
    						if (sb.length() == 0) {
    							running = false;
    							break; // exit immediately...
    						}
    						System.out.println("got " + sb.toString() + " at consumer " + consumer.getIndex());
    					}
    					consumer.donePolling();
    				}
    			}
    		};
    		
    		consumers[i].start();
		}
		
		StringBuilder sb;
		
		for(int i = 0; i < 3; i++) {
			while((sb = splitter.nextToDispatch()) == null); // busy spin
			sb.setLength(0);
			sb.append("message ").append(i);
			splitter.flush();
		}
		
		// send a message to stop consumers...
		while((sb = splitter.nextToDispatch()) == null); // busy spin
		sb.setLength(0);
		splitter.flush();
		
		for(int i = 0; i < consumers.length; i++) consumers[i].join();
	}
}


MpmcQueue

Multiple-producers sending messages to multiple-consumers. You can choose any number of producers and consumers. You can also route a message to a specific consumer (line 90).

package com.coralblocks.coralqueue.sample.mpmc;

import java.util.concurrent.atomic.AtomicLong;

import com.coralblocks.coralqueue.mpmc.Consumer;
import com.coralblocks.coralqueue.mpmc.MpmcQueue;
import com.coralblocks.coralqueue.mpmc.Producer;

public class Sample {

	private static final int NUMBER_OF_PRODUCERS = 4;
	private static final int NUMBER_OF_CONSUMERS = 2;
	
	public static void main(String[] args) throws InterruptedException {
		
		final MpmcQueue<StringBuilder> mpmc = new MpmcQueue<StringBuilder>(1024, StringBuilder.class, NUMBER_OF_PRODUCERS, NUMBER_OF_CONSUMERS);
		
		Thread[] consumers = new Thread[NUMBER_OF_CONSUMERS];
		
		for(int i = 0; i < consumers.length; i++) {
			
    		consumers[i] = new Thread("Consumer-" + i) {
    			
    			private final Consumer<StringBuilder> consumer = mpmc.nextConsumer();
    			
    			@Override
    			public void run() {
    				
    				boolean running = true;
    				
    				while(running) {
    					long avail;
    					while((avail = consumer.availableToPoll()) == 0); // busy spin
    					for(int i = 0; i < avail; i++) {
    						StringBuilder sb = consumer.poll();
    						if (sb == null) break; // mandatory for mpmc!
    						if (sb.length() == 0) {
								running = false;
								break; // exit immediately...
    						}
    						System.out.println(sb.toString() + " got by consumer " + consumer.getIndex());
    					}
    					consumer.donePolling();
    				}
    			}
    		};
    		
    		consumers[i].start();
		}
		
		Thread[] producers = new Thread[NUMBER_OF_PRODUCERS];
		
		final AtomicLong counter = new AtomicLong(0);
		
		for(int i = 0; i < producers.length; i++) {
			
    		producers[i] = new Thread("Producer-" + i) {
    			
    			private final Producer<StringBuilder> producer = mpmc.nextProducer();
    			
    			@Override
    			public void run() {

    				StringBuilder sb;
    				
    				for(int i = 0; i < 3; i++) {
    					while((sb = producer.nextToDispatch()) == null); // busy spin
    					long msgNumber = counter.getAndIncrement();
    					sb.setLength(0);
    					sb.append("message ").append(msgNumber);
    					System.out.println("sending message " + msgNumber + " from producer " + producer.getIndex());
    					producer.flush();
    				}
    			}
    		};
    		
    		producers[i].start();
		}
		
		for(int i = 0; i < producers.length; i++) producers[i].join();
		
		Thread.sleep(4000);
		
		Producer<StringBuilder> p = mpmc.getProducer(0);
	
		for(int i = 0; i < consumers.length; i++) {
			StringBuilder sb;
			// send a message to stop consumers...
			// routing is being used here...
			while((sb = p.nextToDispatch(i)) == null); // busy spin
			sb.setLength(0);
		}
		
		p.flush();
		
		for(int i = 0; i < consumers.length; i++) consumers[i].join();
	}
}