State-of-the-Art Distributed Systems with CoralSequencer

In this article we introduce the big picture of CoralSequencer, a full-fledged, ultra-low-latency, high-reliability, software-based middleware for the development of distributed systems based on asynchronous messages. We discuss CoralSequencer’s main parts and how it uses a sophisticated and low-latency protocol to distribute messages across nodes through reliable UDP multicast.

You should also check the YouTube video below presented by Brian Nigito from Jane Street describing the philosophy behind the CoralSequencer architecture.



Quick Facts

  • All nodes read all messages in the exact same order, dropping messages they are not interested in
  • All messages are persisted so late-joining nodes can rewind and catch up to build the exact same state as other nodes
  • Message broadcasting is done through a reliable UDP protocol: no message is ever lost
  • No single point of failure, from the software down to the hardware infrastructure
  • Each session is automatically archived with all its messages so it can be replayed later for testing, simulation, analysis and auditing
  • High-level, straightforward API makes it easy to write nodes that publish/consume messages
  • As low-latency as it can be through UDP multicast
  • Zero garbage created per message – no gc overhead

Quick Features

  • Message agnostic – send and receive anything you want
  • Automatic replayer discovery through multicast, making it easy to move your replayers across machines
  • Message fragmentation at the protocol level, so you can transparently send messages of any size
  • Comprehensive test framework for deterministic single-threaded memory-transport automated tests
  • Choose your transport protocol through configuration without changing a single line of your application code: TCP (for the cloud), UDP (multicast), Shared-Memory (same machine) and Memory (for tests)
  • TCP Rewind
  • Non-rewinding nodes
  • Transparent batching and in-flight messages
  • Shared-memory Dispatcher node to avoid multicast fan-out in machines running several nodes pinned to the same CPU core
  • Full CLOUD support through TCP transport. Later to switch from TCP to multicast UDP you can simply flip a configuration flag
  • Easy replay a full past session archive file through an offline node for testing, validation, auditing, reports, etc. (from a local file or from a centralized remote server)
  • Tiered replayer architecture for scalability (optional)
  • Comprehensive, zero-garbage, binary and high-performance native serialization protocol with repeating groups, optional fields, IDL, etc. (optional)
  • Full duplex bridges (UDP and TCP)
  • Long distance bridges with TCP and UDP redundant channels for performance
  • A variety of internal messages providing features like node active/passive, node heartbeats, force passive, etc
  • Fully deterministic clock for the centralized distributed system time
  • Local and centralized timers with nanosecond precision
  • Sequencer-generated messages
  • Hot-Hot nodes in a perfect cluster, using the same node account with different instance IDs
  • Multiple sequencers in parallel with cross-connect nodes
  • Remote administration (telnet, rest and http)
  • Logger node
  • Archiver node
  • Admin Node
  • And many others

Node Example


package com.coralblocks.coralsequencer.node;

import java.nio.ByteBuffer;

import com.coralblocks.coralbits.util.ByteBufferUtils;
import com.coralblocks.coralbits.util.DateTimeUtils;
import com.coralblocks.coralreactor.admin.AdminCommand;
import com.coralblocks.coralreactor.nio.NioReactor;
import com.coralblocks.coralreactor.util.Configuration;
import com.coralblocks.coralsequencer.message.Message;
import com.coralblocks.coralsequencer.mq.Node;

public class SampleNode extends Node {
	
	public SampleNode(NioReactor nio, String name, Configuration config) {
		
	    super(nio, name, config);
	    
	    addAdminCommand(new AdminCommand("sendTime") {
			@Override
			public boolean execute(CharSequence args, StringBuilder results) {
				sendTime();
				results.append("Time successfully sent!");
				return true;
			}
	    });
    }
	
	private void sendTime() {
		sendCommand("TIME-" + System.currentTimeMillis());
	}
	
	@Override
    protected void handleMessage(boolean isMine, Message msg) {
		
		if (!isMine) return; // not interested, quickly drop it...
		
		ByteBuffer data = msg.getData(); // the raw bytes of the message...
		
		long epochInNanos = currentSequencerTime(); // deterministic centralized sequencer clock...
		
		CharSequence now = DateTimeUtils.formatDateTimeInNanos(epochInNanos);
		
		System.out.println("Saw my message at " + now + ": " + ByteBufferUtils.parseString(data));
    }
}