CoralReactor vs Netty Performance Comparison

In this article we compare CoralReactor and Netty in terms of performance to show that CoralReactor is 10 times faster and produces zero garbage. (Note: If you are looking for an API comparison instead, you can refer to the article The Simplicity of CoralReactor which presents the same examples from the Netty 5.x User Guide)

The Test Details

In our benchmark test we have two separate JVMs running on the same machine and communicating through TCP over loopback. The details are outlined below:

  • The first JVM runs the client, the second JVM runs the server.
  • The client connects to the server and sends a 256-byte message to the server.
  • The first 8 bytes of the message is the timestamp marked by the client of when the message was sent.
  • The server receives the message, reads the timestamp, reads the remaining 248 bytes and calculates the latency from client to server (one-way latency).
  • The server then echoes back the message to the client.
  • The client receives the echo and send the next message with a new timestamp.
  • To warmup we send 1 million messages. Then we send another 1 million messages and benchmark the latencies.

CoralReactor Results

Messages: 1,000,000
Message Size: 256 bytes
Avg Time: 2.061 micros
Min Time: 1.931 micros
Max Time: 66.968 micros
Garbage creation: zero
75% = [avg: 2.041 micros, max: 2.078 micros]
90% = [avg: 2.049 micros, max: 2.106 micros]
99% = [avg: 2.056 micros, max: 2.173 micros]
99.9% = [avg: 2.058 micros, max: 2.758 micros]
99.99% = [avg: 2.06 micros, max: 5.714 micros]
99.999% = [avg: 2.06 micros, max: 6.683 micros]

Netty Results

Messages: 1,000,000
Avg Time: 21.167 micros
Min Time: 5.529 micros
Max Time: 10.264 millis
Garbage creation: the more messages you send the more garbage is created
75% = [avg: 20.082 micros, max: 20.667 micros]
90% = [avg: 20.272 micros, max: 25.595 micros]
99% = [avg: 21.02 micros, max: 29.672 micros]
99.9% = [avg: 21.121 micros, max: 39.088 micros]
99.99% = [avg: 21.141 micros, max: 55.728 micros]
99.999% = [avg: 21.146 micros, max: 91.416 micros]

Conclusion

  • CoralReactor has an average latency 10 times smaller than Netty (2 micros x 21 micros).
  • CoralReactor produces zero garbage. Netty produces garbage in proportion to the number of messages sent.
  • CoralReactor has lower variance than Netty, with a max latency of 67 micros against 10 millis from Netty.
  • At the 99.99 percentile, CoralReactor shows an average latency of 2.06 micros with a max latency of 5.714 micros. Netty shows for the same percentile an average latency of 21.141 micros with a max latency of 55.728 micros.

CoralReactor Code

Client:

package com.coralblocks.coralreactor.client.bench.oneway;

import static com.coralblocks.corallog.Log.*;

import java.nio.ByteBuffer;

import com.coralblocks.coralreactor.client.AbstractFixedSizeTcpClient;
import com.coralblocks.coralreactor.nio.NioReactor;
import com.coralblocks.coralbits.util.SystemUtils;

public class BenchmarkTcpClient extends AbstractFixedSizeTcpClient {
	
	private int count = 0;
	private boolean warmingUp = false;
	private boolean benchmarking = false;
	private final ByteBuffer tsMsg; 
	private long tsSent;
	private final byte[] readArray;
	private final int messages;
	private final int msgSize;

	public BenchmarkTcpClient(NioReactor nio, String host, int port) {
		
		super(nio, host, port);
		
		this.msgSize = SystemUtils.getInt("msgSize", 256);
		this.messages = SystemUtils.getInt("messages", 1000000);
		
		this.tsMsg = ByteBuffer.allocateDirect(msgSize);
		this.readArray = new byte[msgSize];
		
		tsMsg.clear();
		for(int i = 0; i < msgSize; i++) {
			tsMsg.put((byte) 'x');
		}
		tsMsg.flip();
	}
	
	@Override
	protected void handleConnectionOpened() {

		this.warmingUp = true;
		this.benchmarking = false;
		this.count = 0;

		sendMsg(-1); // very first message, so the other side knows we are starting...
	}
	
	@Override
	protected final int getMessageFixedSize() {
		return msgSize;
	}
	
	@Override
	public void handleMessage(ByteBuffer msg) {
		
		long tsReceived = msg.getLong();
		
		msg.get(readArray, 0, msg.remaining()); // read fully
			
		if (tsReceived != tsSent) {
			Error.log(name, "Bad timestap received:", "tsSent=", tsSent, "tsReceived=", tsReceived);
			close();
			return;
		}
		
		if (warmingUp) {

			if (++count == messages) { // done warming up...
				
				Info.log(name, "Finished warming up!", "messages=", count);
				
				this.warmingUp = false;
				this.benchmarking = true;
				this.count = 0;
				
				sendMsg(System.nanoTime()); // first testing message
				
			} else {

				sendMsg(0);
			}
				
		} else if (benchmarking) {
			
			if (++count == messages) {
			
				Info.log(name, "Finished sending messages!", "messages=", count);
				
				// send the last message to tell the client we are done...
				sendMsg(-2);
				close();
				
			} else {
				
				sendMsg(System.nanoTime());
			}
		}
	}
	
	private final void sendMsg(long value) {
		// add the timestamp in the first 8 bytes...
		tsMsg.position(0);
		tsMsg.putLong(value);
		tsMsg.position(0);
		send(tsMsg);
		tsSent = value; // save to check echo msg...
	}

	public static void main(String[] args) throws Exception {

		NioReactor nio = NioReactor.create();
		
		String destAddress = SystemUtils.getString("destAddress", "localhost");
		int destPort = SystemUtils.getInt("destPort", 8080);
		
		final BenchmarkTcpClient client = new BenchmarkTcpClient(nio, destAddress, destPort);
		client.open();
		nio.start();
	}
}

Server:

package com.coralblocks.coralreactor.server.bench;

import static com.coralblocks.corallog.Log.*;

import java.io.IOException;
import java.nio.ByteBuffer;

import com.coralblocks.coralreactor.client.Client;
import com.coralblocks.coralreactor.nio.NioReactor;
import com.coralblocks.coralreactor.server.AbstractFixedSizeTcpServer;
import com.coralblocks.coralreactor.server.Server;
import com.coralblocks.coralbits.util.SystemUtils;
import com.coralblocks.coralbits.bench.Benchmarker;

public class EchoTcpServer extends AbstractFixedSizeTcpServer {
	
	private byte[] readArray = new byte[1024 * 2];
	private final Benchmarker bench = Benchmarker.create();
	private final int msgSize;
	
	public EchoTcpServer(NioReactor nio, int port) throws IOException {
		
		super(nio, port);
		
		this.msgSize = SystemUtils.getInt("msgSize", 256);
	}
	
	@Override
	public final int getMessageFixedSize() {
		return msgSize;
	}
	
	@Override
	public void handleMessage(Client client, ByteBuffer msg) {

		int pos = msg.position();
		
		long tsReceived = msg.getLong();
		
		msg.get(readArray, 0, msgSize - 8);
		
		if (tsReceived > 0) {
			bench.measure(System.nanoTime() - tsReceived);
		} else if (tsReceived == -1) {
			// first message
		} else if (tsReceived == -2) {
			// last message
			close();
			printResults();
			return;
		} else if (tsReceived < 0) {
			Error.log(name, "Received bad timestamp:", tsReceived);
			close();
			return;
		}
		
		msg.position(pos);
		echoBack(client, msg);
	}
	
	private final void echoBack(Client client, ByteBuffer buf) {
		
		int pos = buf.position();
		buf.position(pos + 8);
		buf.putLong(System.nanoTime());
		buf.position(pos);
		
		client.send(buf);
	}
	
	private void printResults() {
		StringBuilder results = new StringBuilder();
		results.append("results=");
		results.append(bench.results());
		System.out.println(results.toString());
	}
	
	public static void main(String[] args) throws Exception {

		NioReactor nio = NioReactor.create();
		
		int listeningPort = SystemUtils.getInt("serverPort", 8080);
		
		Server server = new EchoTcpServer(nio, listeningPort);
		server.open();
		nio.start();
	}
}

Netty Code

Client:

package com.coralblocks.nettybenchmarks.bench;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.io.IOException;
import java.nio.ByteBuffer;

import com.coralblocks.nettybenchmarks.util.SystemUtils;

public class BenchmarkTcpClient extends ChannelHandlerAdapter {
	
	private int count = 0;
	private boolean warmingUp = false;
	private boolean benchmarking = false;
	private long tsSent;
	private final byte[] readArray;
	private final int messages;
	private final int msgSize;
	
	public BenchmarkTcpClient() throws IOException {
		super();

		this.msgSize = SystemUtils.getInt("msgSize", 256);
		this.messages = SystemUtils.getInt("messages", 1000000);
		this.readArray = new byte[msgSize];
	}
	
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
    
    @Override
    public void channelActive(final ChannelHandlerContext ctx) {
    	
		this.warmingUp = true;
		this.benchmarking = false;
		this.count = 0;

		sendMsg(-1, ctx); // very first message, so the other side knows we are starting...
    }
	
	@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
		 
    	ByteBuf in = (ByteBuf) msg;
    	ByteBuffer bb = in.nioBuffer();

    	handleBuffer(ctx, bb, msg);
    	
    	in.release(); // netty uses reference count...
    }
	
    private void handleBuffer(ChannelHandlerContext ctx, ByteBuffer buf, Object msg) {
    	
		while(buf.remaining() >= msgSize) {
			int pos = buf.position();
			int lim = buf.limit();
			buf.limit(pos + msgSize);
			handleMessage(ctx, buf, msg);
			buf.limit(lim).position(pos + msgSize);
		}
	}
    
	private void handleMessage(ChannelHandlerContext ctx, ByteBuffer buf, Object msg) {
		
		long tsReceived = buf.getLong();
		
		buf.get(readArray, 0, buf.remaining()); // read fully
			
		if (tsReceived != tsSent) {
			System.err.println("Bad timestap received: tsSent=" + tsSent + " tsReceived=" + tsReceived);
			ctx.close();
			return;
		}
		
		if (warmingUp) {

			if (++count == messages) { // done warming up...
				
				System.out.println("Finished warming up! messages=" + count);
				
				this.warmingUp = false;
				this.benchmarking = true;
				this.count = 0;
				
				sendMsg(System.nanoTime(), ctx); // first testing message
				
			} else {

				sendMsg(0, ctx);
			}
				
		} else if (benchmarking) {
			
			if (++count == messages) {
			
				System.out.println("Finished sending messages! messages=" + count);
				
				// send the last message to tell the client we are done...
				sendMsg(-2, ctx);
				ctx.close();
				
			} else {
				
				sendMsg(System.nanoTime(), ctx);
			}
		}
	}
	
	private final void sendMsg(long value, ChannelHandlerContext ctx) {
		
		ByteBuf tsMsg = ctx.alloc().directBuffer(msgSize);
		
		tsMsg.writeLong(value);
		
		for(int i = 0; i < msgSize - 8; i++) {
			tsMsg.writeByte((byte) 'x');
		}
		
		ctx.writeAndFlush(tsMsg);
		
		tsSent = value; // save to check echo msg...
	}
	
    public static void main(String[] args) throws Exception {
 
    	String host = args[0];
        int port = Integer.parseInt(args[1]);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NioSocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new BenchmarkTcpClient());
                }
            });

            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (5)

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

Server:

package com.coralblocks.nettybenchmarks.bench;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.io.IOException;
import java.nio.ByteBuffer;

import com.coralblocks.nettybenchmarks.util.Benchmarker;
import com.coralblocks.nettybenchmarks.util.SystemUtils;

public class EchoTcpServer extends ChannelHandlerAdapter {
	
	private byte[] readArray = new byte[1024 * 2];
	
	private final Benchmarker bench = Benchmarker.create();
	
	private final int msgSize;
	
	public EchoTcpServer() throws IOException {
		super();
		this.msgSize = SystemUtils.getInt("msgSize", 256);
	}
	
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
	
	 @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
		 
    	ByteBuf in = (ByteBuf) msg;
    	ByteBuffer bb = in.nioBuffer();

    	handleBuffer(ctx, bb, msg);
    }
	
    private void handleBuffer(ChannelHandlerContext ctx, ByteBuffer buf, Object msg) {
    	
		while(buf.remaining() >= msgSize) {
			int pos = buf.position();
			int lim = buf.limit();
			buf.limit(pos + msgSize);
			handleMessage(ctx, buf, msg);
			buf.limit(lim).position(pos + msgSize);
		}
	}
	
	private void handleMessage(ChannelHandlerContext ctx, ByteBuffer buf, Object msg) {

		int pos = buf.position();
		
		long tsReceived = buf.getLong();
		
		buf.get(readArray, 0, buf.remaining());
		
		if (tsReceived > 0) {
			bench.measure(System.nanoTime() - tsReceived);
		} else if (tsReceived == -1) {
			// first message
		} else if (tsReceived == -2) {
			// last message
			ctx.close();
			printResults();
			return;
		} else if (tsReceived < 0) {
			System.err.println("Received bad timestamp: " + tsReceived);
			ctx.close();
			return;
		}
		
		buf.position(pos);
		ctx.writeAndFlush(msg);
	}
	
	private void printResults() {
		StringBuilder results = new StringBuilder();
		results.append("results=");
		results.append(bench.results());
		System.out.println(results);
	}
	
	public static void main(String[] args) throws Exception {
		
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
		
		EventLoopGroup bossGroup = new NioEventLoopGroup();
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			ServerBootstrap b = new ServerBootstrap();
			b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
			        .childHandler(new ChannelInitializer<SocketChannel>() {
				                @Override
				                public void initChannel(SocketChannel ch) throws Exception {
					                ch.pipeline().addLast(new EchoTcpServer());
				                }
			                }).option(ChannelOption.SO_BACKLOG, 128)
			        .childOption(ChannelOption.SO_KEEPALIVE, true);

			ChannelFuture f = b.bind(port).sync(); // (7)
			f.channel().closeFuture().sync();
		} finally {
			workerGroup.shutdownGracefully();
			bossGroup.shutdownGracefully();
		}
	}
}