The Simplicity of CoralReactor

CoralReactor is a powerful, easy to use and ultra-low-latency Java library for network communication with zero garbage creation and minimal variance. Moreover, what stands out about CoralReactor is its simplicity. In this article we will demonstrate some examples of clients and servers to get you started with CoralReactor.

Discard Server

A discard server accepts clients and drops any data received from them, without sending any message back. Below the CoralReactor implementation:

import java.nio.ByteBuffer;

import com.coralblocks.coralreactor.nio.NioReactor;
import com.coralblocks.coralreactor.util.Configuration;
import com.coralblocks.coralreactor.server.AbstractTcpServer;
import com.coralblocks.coralreactor.server.Server;
import com.coralblocks.coralreactor.server.ServerClient;

public class DiscardServer extends AbstractTcpServer {
	
	public DiscardServer(NioReactor nio, int port) {
	    super(nio, port);
    }

	public DiscardServer(NioReactor nio, int port, Configuration config) {
	    super(nio, port, config);
    }
	
	@Override
    protected void handleBuffer(Client client, ByteBuffer buf) {
	    
		buf.position(buf.limit()); // smart way to read the buffer without reading it...
    }

	public static void main(String[] args) {
		
		NioReactor nio = NioReactor.create();		
		Server server = new DiscardServer(nio, 54321);
		server.open();
		nio.start();
	}
}

You can see on line 9 above that we are extending AbstractTcpServer. CoralReactor comes with a variety of base classes that you can use to easily implement all sorts of UDP and TCP clients and servers with minimum code and effort. The AbstractTcpServer class requires the implementation of only one method: handleBuffer(Client client, ByteBuffer buf).

As we will see later, the purpose of the handleBuffer method is to parse a message from the byte buffer but for our simple DISCARD server it just advances the byte buffer pointer, pretending it is reading it (line 22). Note that if you don’t read the byte buffer by advancing its position, bytes will accumulate, the buffer will eventually get full and an exception will be raised. Bytes accumulate because the buffer might contain a partial message that requires another reading cycle to be completed. When performing non-blocking i/o, bytes are read from the network as they are available therefore it is possible that the byte buffer will contain partial messages. When that happens, you just return from handleBuffer without reading or processing the partial message. CoralReactor then compacts the buffer and another reading cycle begins to read more data from the network and complete the message.

Notice how easy it is to create and open the server in the main method. By default, the server binds to 0.0.0.0 but that can be changed by passing a configuration in the constructor. For example, to bind the server to 192.168.1.20 instead, you would do:


	public static void main(String[] args) {
		
		NioReactor nio = NioReactor.create();	
		Configuration config = new MapConfiguration();
		config.overwriteDefault("bindAddress", "192.168.1.20");	
		Server server = new DiscardServer(nio, 54321, config);
		server.open();
		nio.start();
	}

Print Server

The DISCARD server is too boring because it does not do anything. Let’s change the handleBuffer method to print the data it receives to stdout:

	@Override
    protected void handleBuffer(Client client, ByteBuffer buf) {
		
		while(buf.hasRemaining()) {
			char c = (char) buf.get();
			System.out.print(c);
		}
		System.out.flush();
    }

Now you can telnet to port 54321 and see something happening in the server side.

Echo Server

We can easily change the handleBuffer method to echo back everything it receives:

	@Override
    protected void handleBuffer(Client client, ByteBuffer buf) {
		
		client.send(buf); // send = write + flush
    }

Notice that we are using the send method of the Client interface, which is equivalent to calling client.write(buf) and then client.flush().

Echo Line Server

To make things more interesting, let’s implement an ECHO server that works with ascii characters delimited by the carriage return character (‘\n’), in other words, it considers each line received from a client to be a new message to be processed. For that we will inherent from AbstractLineTcpServer and overwrite the handleMessage method:

import java.nio.ByteBuffer;

import com.coralblocks.coralreactor.client.Client;
import com.coralblocks.coralreactor.nio.NioReactor;
import com.coralblocks.coralreactor.server.AbstractLineTcpServer;
import com.coralblocks.coralreactor.server.Server;
import com.coralblocks.coralbits.util.ByteBufferUtils;
import com.coralblocks.coralbits.util.ByteArrayUtils;

public class EchoLineServer extends AbstractLineTcpServer {
	
	private final byte[] message = new byte[32];
	
	public EchoLineServer(NioReactor nio, int port) {
	    super(nio, port);
    }

	@Override
    protected void handleMessage(Client client, ByteBuffer msg) {
		
		if (ByteBufferUtils.equals(msg, "bye")) { // check if message == "bye"
			client.send("Goodbye!"); // send "Goodbye!"
			client.close(); // adios
		} else {
			client.send(msg); // write and flush message
		}
    }
	
	public static void main(String[] args) {
		
		NioReactor nio = NioReactor.create();
		Server server = new EchoLineServer(nio, 54321);
		server.open();
		nio.start();
	}
}

The first thing you must notice is that now we are overriding the method handleMessage instead of handleBuffer. Why? That’s because the AbstractLineTcpServer class implements handleBuffer to parse lines from the byte buffer and considers them protocol messages. When handleMessage is called, you can be sure that its byte buffer contains a single message. Also, because the handleMessage method does not care how the message was parsed, the carriage return character is not included in the byte buffer representing the message. We will soon see an example of how to implement the handleBuffer method to parse protocol messages, but for now you can rely on the AbstractLineTcpServer class to do this job for you and provide you with a clear line message.

Now it gets interesting when you process the message inside the handleMessage method. As mentioned in the introduction, CoralReactor produces zero garbage, so why would you produce garbage yourself if you don’t have to? Real-time programming without generating garbage and GC overhead is out of the scope of this article, but we will stick to the best practices for our ECHOLINE server. Instead of creating a String from the message byte buffer, we will read its contents into a byte array and perform a string comparison using ByteBufferUtils and ByteArrayUtils respectively. This is not just faster but it leaves zero garbage behind. Techniques like that (and other tricks) allow CoralReactor to be a ZERO garbage and super-fast library. You can refer to its white paper for more details about the zero garbage feature. For more information about ByteBufferUtils, ByteArrayUtils and other real-time utility classes offered by Coral Blocks, refer to CoralBits.

Notice that when you send a message back to a client through the send method, you do not have to include the carriage return character. All the protocol level work is being done for you by the AbstractLineTcpServer class.

Single Threaded

You might be asking yourself whether the message byte array is safe to be shared among multiple clients as it looks there is only one instance of it. The answer is a solid yes! CoralReactor, by design and on purpose, is single-threaded, in other words, all clients and all network i/o are handled inside the same super-fast, isolated (i.e. pinned), non-blocking reactor thread. This not just provides super-fast performance but also allows for much simpler code that does not have to worry about thread synchronization, lock contention, race-conditions, deadlocks, thread starvation and many other pitfalls of multithreaded programming.

Time Server

To continue with our server examples, we will now implement a TIME server that sends an integer with the current time to a connecting client and closes the connection.

import java.nio.ByteBuffer;

import com.coralblocks.coralreactor.client.Client;
import com.coralblocks.coralreactor.nio.NioReactor;
import com.coralblocks.coralreactor.server.AbstractTcpServer;
import com.coralblocks.coralreactor.server.Server;

public class TimeServer extends AbstractTcpServer {
	
	private final ByteBuffer outBuffer = ByteBuffer.allocate(4);

	public TimeServer(NioReactor nio, int port) {
	    super(nio, port);
    }

	@Override
    protected void handleConnectionOpened(Client client) {
		
		int time = (int) (System.currentTimeMillis() / 1000L + 2208988800L);
		
		outBuffer.clear();
		outBuffer.putInt(time);
		outBuffer.flip();

		client.send(outBuffer); // write and flush
		client.close();
    }
	
	@Override
    protected void handleBuffer(Client client, ByteBuffer buf) {
		
		throw new IllegalStateException("This should never be called!");
    }
	
	public static void main(String[] args) {
		
		NioReactor nio = NioReactor.create();		
		Server server = new TimeServer(nio, 54321);
		server.open();
		nio.start();
	}
}

This time we do all the work inside the handleConnectionOpened method, which is called when a new client is connecting to the server. Note that we still have to override the abstract method handleBuffer, but this method will never be called, in other words, because we are closing the client connection on handleConnectionOpened (line 26), the client will never have a chance to send anything. It connects, get a time integer and the server hangs up on it. The base classes of CoralReactor provide all the callbacks you need to track state, such as the handleConnectionOpened method. The other handle methods are: handleConnectionEstablished, handleConnectionTerminated, handleBatchProcessed, handleFlushed, handleReadTimeout, handleEventTimeout, handleCallback, handleSessionStarted, handleSessionEnded and handleReadException.

Time Client

To test our TIME server, we can use the UNIX rdate command:

$ rdate -o <port> -p <host>

However let’s take this opportunity to implement our first client. This will also give us the chance to revisit the handleBuffer method and parse a TIME protocol message.

import java.nio.ByteBuffer;
import java.util.Date;

import com.coralblocks.coralreactor.client.AbstractTcpClient;
import com.coralblocks.coralreactor.client.Client;
import com.coralblocks.coralreactor.nio.NioReactor;

public class TimeClient extends AbstractTcpClient {
	
	private static final int MESSAGE_LENGTH = 4;

	public TimeClient(NioReactor nio, String host, int port) {
	    super(nio, host, port);
    }
	
	@Override
    protected void handleBuffer(ByteBuffer buf) {
		
		while(isConnectionOpen() && buf.remaining() >= MESSAGE_LENGTH) { // do we have a full msg?
			int endLimit = buf.limit();
			int msgLimit = buf.position() + MESSAGE_LENGTH;
			buf.limit(msgLimit); // parse/mark the message
			onMessage(buf); // here is a clear protocol message
			buf.limit(endLimit).position(msgLimit);
		}
    }
	
	@Override
	protected void handleMessage(ByteBuffer msg) {

		long t = (((long) msg.getInt()) & 0xffffffffL); // unsigned int
		long time = (t - 2208988800L) * 1000L;
		// the garbage bellow can be easily avoided by using the class
		// com.coralblocks.coralreactor.util.CheapDateTimeFormatter
		// which returns a char array with the date and time
		Date d = new Date(time);
		System.out.println(d);
	}
	
	public static void main(String[] args) {
		
		NioReactor nio = NioReactor.create();
		Client client = new TimeClient(nio, "localhost", 54321);
		client.open();		
		nio.start();
	}
}

The goal of the handleBuffer method is to parse a protocol message and call onMessage with it. When it does that, the handleMessage method callback is triggered and the client has a chance to do whatever it wants with the received message. Some best practices about the implementation of handleBuffer are:

  • If for any reason a message causes a client to disconnect, we want to exit the while loop on line 19. That’s why we check if the client is still connected on every iteration with the method isConnectionOpen.
  • It does not make sense to try to parse partial messages, so we exit the while loop on line 19 if the byte buffer remaining is less than the message length.
  • We always save the endLimit and the msgLimit so the byte buffer is adjusted correctly for the next iteration no matter what the handleMessage callback does to the byte buffer pointers.
  • As mentioned before, the onMessage method must be called with a byte buffer containing exactly one clear protocol message.

Now for the handleMessage method, we just get the integer from the byte buffer, do the math and print the date and time. You can do whatever you want with the byte buffer inside the handleMessage method because handleBuffer guarantees its integrity by saving its pointers.

Notice that coding a client is very similar to coding a server. One difference is that the handle callback methods do not have a client argument because they are called from the client itself, in other words, the client you’ll be using is the this reference. Another small difference is that now when you create a client in the main method, you must pass the host or ip to which it will be connecting in the constructor (line 43).

Time Beat Server

You should have noticed that our TimeClient is ready to receive and process not just one TIME message but as many as the server wants to send it. Therefore, let’s implement a TIMEBEAT server that sends the time every second to connected clients.

import java.nio.ByteBuffer;

import com.coralblocks.coralreactor.client.Client;
import com.coralblocks.coralreactor.nio.NioReactor;
import com.coralblocks.coralreactor.server.AbstractTcpServer;
import com.coralblocks.coralreactor.server.Server;

public class TimeBeatServer extends AbstractTcpServer {
	
	private final static int BEAT_PERIOD = 1000; // 1s...
	
	private final ByteBuffer outBuffer = ByteBuffer.allocate(4);

	public TimeBeatServer(NioReactor nio, int port) {
	    super(nio, port);
    }
	
	@Override
	protected void handleConnectionOpened(Client client) {
		sendTime(client);
	}

    @Override
    protected void handleEventTimeout(Client client, long now, int timeout) {
    	sendTime(client);
    }
    
    private void sendTime(Client client) {
		
		int time = (int) (System.currentTimeMillis() / 1000L + 2208988800L);
		
		outBuffer.clear();
		outBuffer.putInt(time);
		outBuffer.flip();

		client.send(outBuffer); // write and flush
		
		client.setEventTimeout(BEAT_PERIOD); // send again in 1 sec
    }
    
	@Override
    protected void handleBuffer(Client client, ByteBuffer buf) {

		// client should not be sending anything
		buf.position(buf.limit()); // ignore!
    }
	
	public static void main(String[] args) {
		
		NioReactor nio = NioReactor.create();
		Server server = new TimeBeatServer(nio, 54321);
		server.open();		
		nio.start();
	}
}

Now instead of closing the client on handleConnectionOpened, we send the time and schedule a timeout event one second later. Then one second later the method handleEventTimeout gets called, we send the time again and schedule another timeout event one second later. And keep doing that in a loop, until the client disconnects. The client is still not supposed to be sending anything so if it does we just ignore inside the handleBuffer method, like we did for the DISCARD server.

As you can see, CoralReactor can schedule future events with millisecond precision that get triggered inside the reactor thread like any other i/o operation, in other words, it is fast, precise and thread-safe.

Conclusion

In this article, we saw how to create clients and servers using CoralReactor. We learned how we can parse protocol messages, schedule future events, handle callbacks and other cool and simple tricks for coding your own ultra-fast and garbage-free clients and servers. CoralReactor comes with many base classes that you can use to implement your own protocols and although it has many powerful features, its main goal is to be simple to use with a straightforward API that shields the developer from any complexity commonly associated with the reactor pattern.