Handling Socket Lagging during Write Operations

A common problem when working with non-blocking sockets it that a client may lag when the send rate is too high, in other words, the client will push out messages faster then the network card can send and/or the other side’s network card can receive them. That will cause the underlying write socket buffer at the OS/Kernel level to fill up. In this article we explain how CoralReactor handles this complex scenario in a simple way so that you don’t have to worry about it.

Socket Lagging

A socket might lag for a combination of one of more of the reasons below:

  1. The underlying socket send buffer (native SO_SNDBUF option) is too small. You can actually change the size of this buffer from Java using the method setSendBufferSize of Socket. See the FAQ on how to change this size straight through CoralReactor. By default, CoralReactor tries to set it to 512k.
  2. The receiving party is reading and processing the messages too slowly.
  3. The sending party is writing the messages too fast.
  4. The network is slow.

In this article we focus on the third reason above, which is the most common.

Application Write Buffer

CoralReactor maintains a write buffer at the application level, in other words, when CoralReactor writes a message, it actually writes it to an internal java.nio.ByteBuffer named writeBuffer. Then later when you call flush(), CoralReactor tries to transfer bytes from this internal buffer to the underlying socket buffer. Having this intermediary write buffer allows the application to check the space available before it actually tries to write the message. Note that the same procedure is not possible with the underlying socket buffer, in other words, it is impossible for the application to know how much space is available in the underlying socket buffer without actually trying to write to it.

Sending Messages

CoralReactor provides two methods to send messages:

  • write(): writes a full message to the application writeBuffer and do not flush. Note that before this writing happens, the available space in the writeBuffer is checked to see if the message can be fully written. If there is no space available (i.e. the writeBuffer is nearly full) and automatic flush() is issued by CoralReactor to free up space. When you are done writing the messages, you should call flush() to send them.
  • send(): calls write() and then immediately calls flush() only if the message was successfully written to the writeBuffer, in other words, if write() returned true. Read below for more details on how you can handle this return value.

Avoiding Partial Writes

CoralReactor’s write(...) and send(...) methods return a boolean to indicate whether the message was written or not to the application internal writeBuffer. If the underlying socket buffer gets full, eventually the internal writeBuffer will also get full. As explained before, CoralReactor is able to check how much space is available or not in the writeBuffer. Therefore, for simplicity’s sake, CoralReactor does not write partial messages to its internal writeBuffer, in other words, it either writes the whole message and returns true or doesn’t write anything at all and returns false. If a write or send operation returns false, the application can save the message and retry on a later time, when the method handleFreedWriteBufferSpace(int freeSpace) is triggered. (read below)

Handling a Full Write Buffer

When the client is writing too fast there is a high chance that the application writeBuffer will get full. When that happens, write() or send() will return false indicating that the message was not written. Under-the-hood, CoralReactor will continue to try to flush the writeBuffer to free up space, through the OP_WRITE selector operation. When this operation succeeds, it will trigger the method handleFreedWriteBufferSpace(int freeSpace). Therefore, a client can choose to queue messages and wait until this method is triggered to retry to send the message. Other clients might choose to save the message to disk, drop the message or even disconnect. Another possibility is to try to increase the size of the writeBuffer and/or the underlying socket buffer. However, because these sizes cannot be infinite, there will always be the possibility that a client can write fast enough to fill the buffers.

Turning off disconnectOnFullWriteBuffer

By default the client config option disconnectOnFullWriteBuffer is turned on. That means that if you ever call write() or send() and they return false, an alert message will be logged and the client will be disconnected. If you choose to handle socket lagging yourself as explained in this article, you should pass a client config setting this option to false.

Source Code Example

Below we have a simple throughput test that will always overflow the writeBuffer and use handleFreedWriteBufferSpace to continue to push messages as space becomes available in the writeBuffer:

package com.coralblocks.coralreactor.client.bench.throughput;

import com.coralblocks.coralbits.util.SystemUtils;
import com.coralblocks.coralreactor.client.AbstractLineTcpClient;
import com.coralblocks.coralreactor.client.Client;
import com.coralblocks.coralreactor.nio.NioReactor;
import com.coralblocks.coralreactor.util.Configuration;
import com.coralblocks.coralreactor.util.MapConfiguration;

public class ThroughputTcpBatchedClient extends AbstractLineTcpClient {
	
	// java -DmessagesToSend=5000000 -server -verbose:gc -Xbootclasspath/p:/home/coralblocks/workspace/CoralReactor-boot-jdk7/target/coralreactor-boot-jdk7.jar -cp target/coralreactor-all.jar:lib/jna-3.5.1.jar -DnioReactorProcToBind=3 com.coralblocks.coralreactor.client.bench.throughput.ThroughputTcpBatchedClient
	
	private final int messagesToSend;
	private final int messageSize;
	private final byte[] msg;
	
	private int msgCount;
	private long start;

	public ThroughputTcpBatchedClient(NioReactor nio, String host, int port, Configuration config) {
	    super(nio, host, port, config);
	    this.messagesToSend = config.getInt("messagesToSend");
	    this.messageSize = config.getInt("messageSize");
	    this.msg = new byte[messageSize];
		for(int i = 0; i < msg.length; i++) {
			msg[i] = (byte) ('0' + (i % 10)); // fill the message with some data...
		}
    }
	
	@Override
	protected void handleConnectionOpened() {
		msgCount = 0;
		start = System.nanoTime();
		sendMessages();
	}
	
	@Override
	protected void handleFreedWriteBufferSpace(int freeSpace) {
		if (msgCount == messagesToSend && freeSpace == writeBuffer.capacity()) {
			printResults();
			disconnect();
			return;
		}
		
		if (msgCount < messagesToSend) sendMessages();
	}
	
	private void sendMessages() {
		while(write(msg)) { // <=== this will overflow the writeBuffer
			if (++msgCount == messagesToSend) {
				flush();
				if (!isLagging()) {
					// we only want to finish the test when everything was flushed to the underlying socket buffer, in other words,
					// do not finish here if lagging (finish instead on handleFreedWriteBufferSpace)
					printResults();
					disconnect();
				}
				break;
			}
		}
	}
	
	private void printResults() {
		long totalTime = System.nanoTime() - start;
		long latency = totalTime / msgCount;
		long ops = msgCount * 1000000000L / totalTime;
		System.out.println("Done sending messages! messagesSent=" 
			+ msgCount + " avgLatencyPerMsg=" + latency + " nanos throughput=" + ops + " msgs/sec");
	}
	
	public static void main(String[] args) {
		
		NioReactor nio = NioReactor.create();
		
		int messagesToSend = SystemUtils.getInt("messagesToSend", 5000000);
		int messageSize = SystemUtils.getInt("messageSize", 256);
		
		MapConfiguration config = new MapConfiguration();
		config.add("messagesToSend", messagesToSend);
		config.add("messageSize", messageSize);
		config.add("disconnectOnFullWriteBuffer", false);
		
		Client client = new ThroughputTcpBatchedClient(nio, "localhost", 45451, config);
		client.open();
		nio.start();
	}
}

Conclusion

CoralReactor can handle all the complexity of socket lagging for write operations under the hood for you. All you have to do is to implement the method handleFreedWriteBufferSpace(int) to continue to send messages as space becomes available. The write() and send() methods return false to indicate that the write operation is lagging and the message could not be sent. For simplicity, CoralReactor does not write partial messages to the writeBuffer, allowing the application to queue the messages and resend them later when handleFreedWriteBufferSpace is triggered.