CoralSequencer’s structured data serialization framework

CoralSequencer uses its own binary and garbage-free serialization framework to read and write its internal messages. For your application messages, you are free to use any serialization library or binary data model you choose. The fact that CoralSequencer is message agnostic gives you total flexibility in that decision. But you can also consider using CoralSequencer’s native serialization framework described in this article.

To define the schema of a message you simply inherit from AbstractProto and define the message data fields. Then you proceed to implement the required methods from AbstractProto. Below a self-explanatory example:

import java.nio.ByteBuffer;

import com.coralblocks.coralbits.util.DoubleUtils;
import com.coralblocks.coralsequencer.protocol.AbstractProto;
import com.coralblocks.coralsequencer.protocol.FixedChars;
import com.coralblocks.coralsequencer.protocol.ProtoException;

public class OrderNew extends AbstractProto {
	
	private static final int SYMBOL_LENGTH = 8;

	public static final char 	TYPE 	= 'O';
	public static final char 	SUBTYPE = 'N';
	
	public		FixedChars		symbol = new FixedChars(SYMBOL_LENGTH);
	public		char			side;
	public		long			size;
	public 		long			price;
	public		long			myTimestamp;
	public		long			splitTimestamp;
	public		boolean			isLastChild;
	
	@Override
	public final char getType() {
		return TYPE;
	}
	
	@Override
	public final char getSubtype() {
		return SUBTYPE;
	}
	
	@Override
	public final int getLength() {
		
		return 2 /* Type + Subtype */
			   + SYMBOL_LENGTH /* symbol */
			   + 1  /* side */
			   + 8  /* size */
			   + 8  /* price */
			   + 8  /* myTimestamp */
			   + 8  /* splitTimestamp */
			   + 1; /* isLastChild */
	}

	@Override
    public final void read(ByteBuffer buf) throws ProtoException {

		symbol			=	read(buf, symbol);
		side			=	read(buf, side);
		size			=	read(buf, size);
		price			=	read(buf, price);
		myTimestamp		= 	read(buf, myTimestamp);
		splitTimestamp	= 	read(buf, splitTimestamp);
		isLastChild		=	read(buf, isLastChild);
    }

	@Override
    public final void write(ByteBuffer buf) {

		write(buf, TYPE);
		write(buf, SUBTYPE);
		write(buf, symbol);
		write(buf, side);
		write(buf, size);
		write(buf, price);
		write(buf, myTimestamp);
		write(buf, splitTimestamp);
		write(buf, isLastChild);
    }
	
	@Override
	public final void writeAscii(ByteBuffer buf) {
		
		writeAsciiTypeSubtypeName(buf);
		
		writeAsciiSeparator(buf);
		writeAscii(buf, symbol);
		
		writeAsciiSeparator(buf);
		writeAscii(buf, side);
		
		writeAsciiSeparator(buf);
		writeAscii(buf, size);
		
		writeAsciiSeparator(buf);
		writeAscii(buf, DoubleUtils.toDouble(price));
		
		writeAsciiSeparator(buf);
		writeAsciiTimestamp(buf, myTimestamp);
		
		writeAsciiSeparator(buf);
		writeAsciiTimestamp(buf, splitTimestamp);
		
		writeAsciiSeparator(buf);
		writeAscii(buf, isLastChild);
	}
}


To send out the OrderNew message above, you can simply re-use the same OrderNew instance, over and over again, creating zero garbage. Below an example of how you would send an OrderNew message from a CoralSequencer node:

			if (topBook.isSignal) {
				
				long splitTimestamp = useEpoch ? timestamper.nanoEpoch() : timestamper.nanoTime();

				for(int i = 0; i < ordersToSend; i++) {

					boolean isBid = (i % 2 == 0);
					
					orderNew.symbol.clear().append(topBook.symbol);
					
					if (isBid) {
						orderNew.side = 'B';
						orderNew.size = topBook.bidSize;
						orderNew.price = topBook.bidPrice;
					} else {
						orderNew.side = 'S';
						orderNew.size = topBook.askSize;
						orderNew.price = topBook.askPrice;
					}
					
					orderNew.myTimestamp = useEpoch ? timestamper.nanoEpoch() : timestamper.nanoTime();
					orderNew.splitTimestamp = splitTimestamp;
					
					orderNew.isLastChild = (i == ordersToSend - 1);
					
					if (batching) {
						writeCommand(orderNew);
					} else {
						sendCommand(orderNew);
					}
				}

As you can see, you simply populate the fields with data and call the sendCommand(Proto) method of a CoralSequencer node.

Now to receive a CoralSequencer Proto message, you first need to define a parser for your Proto messages. Luckily that’s super easy as you can see below:

import com.coralblocks.coralsequencer.protocol.AbstractMessageProtoParser;
import com.coralblocks.coralsequencer.protocol.Proto;

public class ProtoParser extends AbstractMessageProtoParser {

	@Override
    protected Proto[] createProtoMessages() {
	    return new Proto[] { new OrderNew(), new TopBook(), new OrderCancel() };
    }
}

Then you can use the proto parser above inside your Node’s handleMessage method to parse a Proto message out of a CoralSequencer message:

    private final ProtoParser protoParser = new ProtoParser();

	@Override
    protected void handleMessage(Message msg) {
	    
		if (isRewinding()) return; // do nothing during rewind...
		
		char type = protoParser.getType(msg);
		char subtype = protoParser.getSubtype(msg);
		
		if (type == OrderNew.TYPE && subtype == OrderNew.SUBTYPE) {
			
			OrderNew orderNew = (OrderNew) protoParser.parse(msg);
			
			if (orderNew == null) {
				Error.log(name, "Can't parse OrderNew:", msg.toCharSequence());
				return;
			}
			
			long now = useEpoch ? timestamper.nanoEpoch() : timestamper.nanoTime();
			
			long latency = now - orderNew.myTimestamp;
			
			ordersBench.measure(latency);

			if (orderNew.isLastChild) {
				latency = now - orderNew.splitTimestamp;
				splitBench.measure(latency);
			}
		}
    }	
}

Cool! That’s great! So what is the downside of using CoralSequencer’s serialization framework? To keep things simple, super fast and garbage-free, it does not give you any help for schema evolution, versioning and backwards compatibility. You may be able to add new fields to an existing message without having to update all nodes, but if you attempt to remove a field or change the order of the fields appearing in a message, then all nodes of your distributed system will have to be updated with the new schema class code in order to use/support the new message format. There is also a version 2 with support of IDL, optional fields and repeating groups.