Inter-thread communication within CoralReactor

CoralReactor was built on purpose, from the ground up, to be single-threaded. That means that no other thread besides the reactor thread should be executing any code or accessing any data belonging to servers and clients. This not only provides super-fast performance but also allows for much simpler code that does not have to worry about thread synchronization, lock contention, race-conditions, deadlocks, thread starvation and many other pitfalls of multithreaded programming. However there are common scenarios where other threads must interact with the reactor thread. In this article we analyze in detail how this is done, without breaking the single-threaded design principle and without creating any garbage.


The Scenario

Below we list the source code of a simple reactor client that increments a counter based on messages it receives from a server.

package com.coralblocks.coralreactor.client.callback;

import static com.coralblocks.corallog.Log.*;

import java.nio.ByteBuffer;

import com.coralblocks.coralreactor.client.AbstractLineTcpClient;
import com.coralblocks.coralreactor.nio.NioReactor;
import com.coralblocks.coralreactor.util.Configuration;
import com.coralblocks.coralreactor.util.MapConfiguration;

public class CounterClient extends AbstractLineTcpClient {

	private long total;
	
	public CounterClient(NioReactor nio, String host, int port, Configuration config) {
	    super(nio, host, port, config);
    }
	
	@Override
	protected void handleOpened() {
		total = 0;
	}
	
	@Override
	protected void handleMessage(ByteBuffer msg) {
		
		if (!msg.hasRemaining()) return;
		
		char c = (char) msg.get();
		
		if (c == '+') increment();
		if (c == '-') decrement();
	}
	
	public void increment() {
		total++;
		Info.log("Total was incremented:", total, "thread=", Thread.currentThread().getName());
	}
	
	public void decrement() {
		total--;
		Info.log("Total was decremented:", total, "thread=", Thread.currentThread().getName());
	}
	
	public static void main(String[] args) throws InterruptedException {
		
		NioReactor nio = NioReactor.create();
		
		MapConfiguration config = new MapConfiguration();
		
		final CounterClient client = new CounterClient(nio, "localhost", 45151, config);
		client.open();
		
		nio.start();
	}
}

As you can see, when the clients gets '+' from the server the counter is incremented. When it gets '-' the counter is decremented. You can easily simulate this server using netcat, as the screenshot below shows:

Screen Shot 2015-04-23 at 5.29.21 PM

After typing some pluses and minuses on the server, you can see the following log messages in the client console:

17:28:47.517089-INFO CounterClient-localhost:45151 Client opened! sequence=1 session=null
17:28:47.566073-INFO NioReactor Reactor started! type=OptimumNioReactor impl=KQueueSelectorImpl
17:28:47.567366-INFO CounterClient-localhost:45151 Connection established!
17:28:47.567400-INFO CounterClient-localhost:45151 Connection opened!
17:28:53.740893-INFO Total was incremented: 1 thread=NioReactor
17:28:55.044887-INFO Total was decremented: 0 thread=NioReactor
17:28:56.169573-INFO Total was incremented: 1 thread=NioReactor
17:28:56.946586-INFO Total was incremented: 2 thread=NioReactor
17:28:58.161256-INFO Total was decremented: 1 thread=NioReactor
17:28:59.229615-INFO Total was decremented: 0 thread=NioReactor

So far so good. Note that we are printing on the log message the name of the thread doing the increment/decrement on the counter, in this case the NioReactor thread.

Incrementing from Another Thread

Now for some reason you want another thread besides the reactor thread incrementing and decrementing the counter. One common scenario is when you have actions coming from a GUI being run by another thread that need to be communicated to the reactor thread when, for example, the user clicks a button. Below we change the main method of our code to simulate this scenario:

	public static void main(String[] args) throws InterruptedException {
		
		NioReactor nio = NioReactor.create();
		
		MapConfiguration config = new MapConfiguration();
		
		final CounterClient client = new CounterClient(nio, "localhost", 45151, config);
		client.open();
		
		nio.start();
		
		for(int i = 0; i < 100; i++) {
			
			Thread.sleep(2000);
			
			final boolean check = i % 3 == 0;
			
			if (check) client.decrement();
			else client.increment();
		}
	}

When we run this client, type some pluses and minuses on the server, we get the following log output:

17:45:38.069983-INFO CounterClient-localhost:45151 Client opened! sequence=1 session=null
17:45:38.126212-INFO NioReactor Reactor started! type=OptimumNioReactor impl=KQueueSelectorImpl
17:45:38.127543-INFO CounterClient-localhost:45151 Connection established!
17:45:38.127578-INFO CounterClient-localhost:45151 Connection opened!
17:45:39.686658-INFO Total was incremented: 1 thread=NioReactor
17:45:40.127196-INFO Total was decremented: 0 thread=main
17:45:41.035442-INFO Total was decremented: -1 thread=NioReactor
17:45:42.128232-INFO Total was incremented: 0 thread=main
17:45:42.408007-INFO Total was decremented: -1 thread=NioReactor

Note that we have just broken the single-threaded principle of CoralReactor and we now have two threads (main and NioReactor) calling the same piece of code and accessing the same variable. From a multithreading point of view, one might be tempted to fix this problem using the synchronized keyword, as below:

	public synchronized void increment() {
		total++;
		Info.log("Total was incremented:", total, "thread=", Thread.currentThread().getName());
	}
	
	public synchronized void decrement() {
		total--;
		Info.log("Total was decremented:", total, "thread=", Thread.currentThread().getName());
	}

Please don’t. That will introduce lock-contention on the critical reactor thread and attest your departure from the single-threaded design principle. Fortunately CoralReactor can easily restore the single-threaded principle through the use of callbacks.


Callbacks

Instead of having the external thread calling code from the reactor thread, notify the reactor thread that some code execution is pending by passing it a CallbackListener. The reactor thread will then call you back by executing the provided callback listener. See the example below:

	public static void main(String[] args) throws InterruptedException {
		
		NioReactor nio = NioReactor.create();
		
		MapConfiguration config = new MapConfiguration();
		
		final CounterClient client = new CounterClient(nio, "localhost", 45151, config);
		client.open();
		
		nio.start();
		
		for(int i = 0; i < 100; i++) {
			
			Thread.sleep(2000);
			
			final boolean check = i % 3 == 0;
			
			CallbackListener callback = new CallbackListener() {

				@Override
                public void onCallback(long now) {
					if (check) client.decrement();
					else client.increment();
                }
			};
			
			nio.requestCallback(callback);
		}
	}

Now when we run our client we don’t see the main thread anymore, just the NioReactor thread. We have successfully restored the single-threaded design principle, but at a cost. We are creating garbage on every callback at line 18. Fortunately CoralReactor can easily fix that by using an underlying lock-free queue that produces zero garbage, thanks to CoralQueue.

Callbacks without Garbage

CoralReactor allows you to use lock-free, ultra-fast and garbage-free queues for callback handling. It manages underlying queues, one for each callback listener that you implement. By doing that no garbage is ever created and the inter-thread communication latency is kept to a bare minimum, thanks to CoralQueue. Below a complete example:

	public static class Callback implements CallbackListener {
		
		private boolean check;
		private CounterClient client;
		
		public void reset(CounterClient client, boolean check) {
			this.client = client;
			this.check = check;
		}

		@Override
        public void onCallback(long now) {
			if (check) client.decrement();
			else client.increment();
        }
	} 

	public static void main(String[] args) throws InterruptedException {
		
		NioReactor nio = NioReactor.create();
		
		MapConfiguration config = new MapConfiguration();
		
		final CounterClient client = new CounterClient(nio, "localhost", 45151, config);
		client.open();
		
		nio.initCallbackQueue(Callback.class);
		
		nio.start();
		
		for(int i = 0; i < 100; i++) {
			
			Thread.sleep(2000);
			
			final boolean check = i % 3 == 0;
			
			Callback callback = nio.nextCallback(Callback.class);
			callback.reset(client, check);
			
			nio.flushCallbacks(Callback.class);
		}
	}

Note that you must initialize the queue before the reactor is started. Then all you have to do is use your callback listener class to get a callback object and dispatch it.


Conclusion

CoralReactor was built from the ground up to be single-threaded by design. Therefore you should never allow multiple threads to share state with the reactor thread. If communication among threads becomes necessary, callbacks should be used so that the reactor thread is the only one executing the code. By using CoralQueue under-the-hood, CoralReactor allows for ultra-fast, lock-free and garbage-free callback handling.