Getting Started with CoralQueue

CoralQueue is a ultra-low-latency lock-free and garbage-free queue for inter-thread communication. It can be defined as a batching queue backed up by a circular array (i.e. the ring buffer) filled with pre-allocated transfer objects which uses memory-barriers to synchronize producers and consumers through sequences. Fortunately you don’t have to understand all its intrinsic details to use it. In this article we show how to use CoralQueue to send messages from a producer thread to a consumer thread in a very fast way without producing any garbage.

The Queue

The queue is a circular array with pre-allocated transfer objects. For the example below we use a StringBuilder as the transfer object.

Queue<StringBuilder> queue = new AtomicQueue<StringBuilder>(1024, StringBuilder.class);

The code above creates a queue with 1024 pre-allocated StringBuilders. Note that it uses the default constructor of StringBuilder which by default creates a StringBuilder with size 16. That may be too small for our transfer objects and we don’t want the StringBuilder resizing itself during runtime. So to create a bigger StringBuilder we can use a com.coralblocks.coralbits.util.Builder class like below:

Builder<StringBuilder> builder = new Builder<StringBuilder>() {
	@Override
    public StringBuilder newInstance() {
		return new StringBuilder(1024);
    }
};

final Queue<StringBuilder> queue = new AtomicQueue<StringBuilder>(1024, builder);

Sending Messages

To send a message to the queue, you grab a transfer object from the queue, fill it with your data and call flush() as the code below illustrates:

StringBuilder sb;
while((sb = queue.nextToDispatch()) == null); // busy spin...
sb.setLength(0);
sb.append("Hello there!");
queue.flush();

If the queue is full we just busy spin until a transfer object becomes available. Later we will see how we can also use a WaitStrategy instead of busy spinning.

You can also send messages in batches:

StringBuilder sb;

while((sb = queue.nextToDispatch()) == null); // busy spin...
sb.setLength(0);
sb.append("Hello there!");

while((sb = queue.nextToDispatch()) == null); // busy spin...
sb.setLength(0);
sb.append("Hello again!");

queue.flush();

Polling Messages

To read message from the queue you poll them from a consumer thread, as the code below shows:

long avail;
while((avail = queue.availableToPoll()) == 0); // busy spin
for(int i = 0; i < avail; i++) {
    StringBuilder sb = queue.poll();
    // do whatever you want with the StringBuilder
    // just do not create garbage
    // copy char-by-char instead
}
queue.donePolling();

Again we busy spin if the queue is empty. Later we will see how we can also use a WaitStrategy instead of busy spinning.

Note that we poll in batches, reducing the number of times we have to check for an empty queue through availableToPoll().

Wait Strategies

By default, you should busy-spin when the queue is full or empty respectively. That’s usually the fastest approach but not always the best as you might want to save energy or allow other threads to use the idle processor. CoralQueue comes with many wait strategies that you can use instead of busy-spinning, or you can create your owns by implementing the WaitStrategy interface. Below are some examples of wait strategies that come with CoralQueue:

  • ParkWaitStrategy: park (i.e. sleep) for 1 nanosecond with the option to back off up to a maximum of N nanoseconds. N defaults to 1 million nanoseconds if not specified (1 millisecond).
  • SpinParkWaitStrategy: first busy spins for C cycles (default to 1 million cycles) then it starts to park (i.e. sleep) for 1 nanosecond with the option to back off up to a maximum of N nanoseconds (default 1 million nanoseconds).
  • SpinYieldParkWaitStrategy: busy spins for some cycles, yield for some cycles then starts to sleep for 1 nanosecond with the option to back off up to a maximum of N nanoseconds (defaults to 1 million nanoseconds).

To use a wait strategy, all you have to do is call its block and reset methods instead of busy spinning:

WaitStrategy waitStrategy = new ParkWaitStrategy();
StringBuilder sb;
while((sb = queue.nextToDispatch()) == null) {
    waitStrategy.block();
}
sb.setLength(0);
sb.append("Hello there!");
queue.flush();
waitStrategy.reset(); // you can reset here to save some nanoseconds...

Same thing when polling:

WaitStrategy waitStrategy = new SpinParkWaitStrategy();
long avail;
while((avail = queue.availableToPoll()) == 0) {
    waitStrategy.block();
}
for(int i = 0; i < avail; i++) {
    StringBuilder sb = queue.poll();
    // do whatever you want with the StringBuilder
    // just do not create garbage
    // copy char-by-char instead
}
queue.donePolling();
waitStrategy.reset(); // you can reset here to save some nanoseconds...

Semi-volatile writes (lazySet)

To squeeze every bit of performance out of CoralQueue, you can use semi-volatile writes when sending and polling messages. Basically, a semi-volatile write is done through the lazySet method from java.util.concurrent.AtomicLong. It is a faster operation for the thread that’s modifying the variable at the expense of the thread that’s interested in knowing about updates in the variable. For example, if you want to minimize the latency in the producer, you should use lazySet. If you want to minimize the message transit time, you should not use lazySet so the consumer is notified as soon as possible about a new message in the queue.

By default, CoralQueue does not use lazySet, but you can easily take control of that by using the methods below:

queue.flush(); // no lazySet by default
queue.flush(true); // use lazySet

queue.donePolling(); // no lazySet by default
queue.donePolling(true); // use lazySet

Complete Example

To put all parts together, we write a simple program that send 10 timestamps to a consumer thread and then exits:

package com.coralblocks.coralqueue.sample.queue;

import com.coralblocks.coralqueue.AtomicQueue;
import com.coralblocks.coralqueue.Queue;
import com.coralblocks.coralqueue.util.MutableLong;

public class Sample {
	
	public static void main(String[] args) throws InterruptedException {
		
		final Queue<MutableLong> queue = new AtomicQueue<MutableLong>(1024, MutableLong.class);
		
		Thread consumer = new Thread() {
			
			@Override
			public void run() {
				
				boolean running = true;
				
				while(running) {
					long avail;
					while((avail = queue.availableToPoll()) == 0); // busy spin
					for(int i = 0; i < avail; i++) {
						MutableLong ml = queue.poll();
						if (ml.get() == -1) { // message to flag exit...
							running = false;
							break;
						}
						System.out.println(ml.get());
					}
					queue.donePolling();
				}
			}
			
		};
		
		consumer.start();
		
		MutableLong ml;
		
		for(int i = 0; i < 10; i++) {
			while((ml = queue.nextToDispatch()) == null); // busy spin
			ml.set(System.nanoTime());
			queue.flush();
		}
		
		// send a message to stop consumer...
		while((ml = queue.nextToDispatch()) == null); // busy spin
		ml.set(-1);
		queue.flush();
		
		consumer.join(); // wait for the consumer thread to die...
	}
}

Conclusion

CoralQueue makes the development of ultra-low-latency, lock-free and garbage-free multithreading applications easy by pipelining messages among threads. It also offers batching, semi-volatile writes and wait strategies though a simple API. CoralQueue also provides a multiplexer (multiple-producers to one-consumer), a demultiplexer (one-producer to multiple-consumers) and a mpmc queue (multiple-producers to multiple-consumers).