State-of-the-Art Distributed Systems with CoralSequencer

In this article we introduce the big picture of CoralSequencer, a full-fledged, ultra-low-latency, high-reliability, software-based middleware for the development of distributed systems based on asynchronous messages. We discuss CoralSequencer’s main parts and how it uses a sophisticated and low-latency protocol to distribute messages across nodes through reliable UDP multicast.

You should also check the YouTube video below presented by Brian Nigito from Jane Street describing the philosophy behind the CoralSequencer architecture. There is also this podcast where Nigito discusses the benefits of single-threaded design and multicast systems, which are the core fundamentals of CoralSequencer.



Fundamentals

  • What is a distributed system?
    It is an integrated system that spans multiple machines, clients or nodes which execute their tasks in parallel in a non-disruptive way. A distributed system should be robust enough to continue to operate when one or more nodes fail, stop working, lag or are taken down for maintenance. Each machine can run multiple nodes, not just one. Each node communicates with each other by sending messages to the sequencer (i.e. broker). Nodes can produce and consume messages.
  • What is a Messaging Queue or MQ?
    It is a message-oriented middleware that enables the development of a distributed system by using asynchronous messages for inter-node communication. A traditional approach for message distribution is the publish-subscribe pattern. CoralSequencer uses a different all-messages-to-all-nodes approach, through multicast, by implementing a reliable UDP protocol.
  • What are distributed systems built on top of a MQ good at?
    They are great at implementing applications with a lot of moving parts (i.e. nodes) which need real-time monitoring and real-time reaction capabilities. To summarize they provide: parallelism (nodes can truly run in parallel), tight integration (all nodes see the same messages in the same order), decoupling (nodes can evolve independently), failover/redundancy (when a node fails, another one can be running and building state to take over immediately), scalability/load balancing (just add more nodes), elasticity (nodes can lag during activity peaks without affecting the system as a whole) and resiliency (nodes can fail / stop working without taking the whole system down).
  • What is the advantage of CoralSequencer?
    CoralSequencer is a state-of-the-art implementation of a messaging queue for ultra-low-latency distributed systems that require a high degree of reliability. It has all the Coral Blocks’ advantages: high performance, low latency, low variance, zero garbage and simplicity. As you will see in subsequent articles, CoralSequencer is extremely easy to develop, configure, deploy, monitor and maintain.
  • What are some examples of systems that can be built on top of CoralSequencer?
    Electronic exchanges (ECNs), trading platforms, banking systems, automated online advertising systems, defense/military systems, credit card systems or any distributed system with a large number of transactions that requires all the advantages listed above plus ultra low latency and high reliability.
  • Is CoralSequencer fully deterministic?
    All nodes read all messages in the exact same order, allowing nodes to become finite state machines, which in turn makes perfect clusters and high-availability possible. CoralSequencer even provides a centralized clock (and timers) in the event-stream so nodes can rebuild the exact same state every time, with the same timestamp they have observed in the past.

Quick Facts

  • All nodes read all messages in the exact same order, dropping messages they are not interested in
  • All messages are persisted so late-joining nodes can rewind and catch up to build the exact same state as other nodes
  • Message broadcasting is done through a reliable UDP protocol: no message is ever lost
  • No single point of failure, from the software down to the hardware infrastructure
  • Each session is automatically archived with all its messages so it can be replayed later for testing, simulation, analysis and auditing
  • High-level, straightforward API makes it easy to write nodes that publish/consume messages
  • As low-latency as it can be through UDP multicast
  • Zero garbage created per message – no gc overhead

Quick Features

  • Message agnostic – send and receive anything you want
  • Automatic replayer discovery through multicast, making it easy to move your replayers across machines
  • Message fragmentation at the protocol level, so you can transparently send messages of any size
  • Comprehensive test framework for deterministic single-threaded memory-transport automated tests
  • Choose your transport protocol through configuration without changing a single line of your application code: TCP (for the cloud), UDP (multicast), Shared-Memory (same machine) and Memory (for tests)
  • TCP Rewind
  • Non-rewinding nodes
  • Transparent batching and in-flight messages
  • Shared-memory Dispatcher node to avoid multicast fan-out in machines running several nodes pinned to the same CPU core
  • Full CLOUD support through TCP transport. Later to switch from TCP to multicast UDP you can simply flip a configuration flag
  • Easy replay a full past session archive file through an offline node for testing, validation, auditing, reports, etc. (from a local file or from a centralized remote server)
  • Tiered replayer architecture for scalability (optional)
  • Comprehensive, zero-garbage, binary and high-performance native serialization protocol with repeating groups, optional fields, IDL, etc. (optional)
  • Full duplex bridges (UDP and TCP)
  • Long distance bridges with TCP and UDP redundant channels for performance
  • A variety of internal messages providing features like node active/passive, node heartbeats, force passive, etc
  • Fully deterministic clock for the centralized distributed system time
  • Local and centralized timers with nanosecond precision
  • Sequencer-generated messages
  • Hot-Hot nodes in a perfect cluster, using the same node account with different instance IDs
  • Multiple sequencers in parallel with cross-connect nodes
  • Remote administration (telnet, rest and http)
  • Logger node
  • Archiver node
  • Admin Node
  • And many others

Node Example


package com.coralblocks.coralsequencer.node;

import java.nio.ByteBuffer;

import com.coralblocks.coralbits.util.ByteBufferUtils;
import com.coralblocks.coralbits.util.DateTimeUtils;
import com.coralblocks.coralreactor.admin.AdminCommand;
import com.coralblocks.coralreactor.nio.NioReactor;
import com.coralblocks.coralreactor.util.Configuration;
import com.coralblocks.coralsequencer.message.Message;
import com.coralblocks.coralsequencer.mq.Node;

public class SampleNode extends Node {
	
	public SampleNode(NioReactor nio, String name, Configuration config) {
		
	    super(nio, name, config);
	    
	    addAdminCommand(new AdminCommand("sendTime") {
			@Override
			public boolean execute(CharSequence args, StringBuilder results) {
				sendTime();
				results.append("Time successfully sent!");
				return true;
			}
	    });
    }
	
	private void sendTime() {
		sendCommand("TIME-" + System.currentTimeMillis());
	}
	
	@Override
    protected void handleMessage(boolean isMine, Message msg) {
		
		if (!isMine) return; // not interested, quickly drop it...
		
		ByteBuffer data = msg.getData(); // the raw bytes of the message...
		
		long epochInNanos = currentSequencerTime(); // deterministic centralized sequencer clock...
		
		CharSequence now = DateTimeUtils.formatDateTimeInNanos(epochInNanos);
		
		System.out.println("Saw my message at " + now + ": " + ByteBufferUtils.parseString(data));
    }
}