Exploring the Sequencer Architecture through our SimSequencer

SimSequencer is a framework that lets you simulate through code the majority of the aspects of the sequencer architecture without any networking involved. It can be very useful for prototyping, testing and learning purposes. With SimSequencer you can code your nodes to test all the moving parts of your application as they were running and interacting in a real distributed system. And it has the same API of CoralSequencer.

Some of the main premises of the sequencer architecture are:

  • All nodes receive all messages in the exact same order, always
  • Nodes become deterministic finite-state machines
  • Clusters for high-availability and failover become trivial

Below we demonstrate these premises through working SimSequencer code:


All nodes receive all messages in the exact same order, always


  Sequencer sequencer = new PassThroughSequencer("SEQ");
  
  final List<Message> messages1 = new LinkedList<Message>();
  final List<Message> messages2 = new LinkedList<Message>();
  
  Node node1 = new Node("NODE1") {
    @Override
    protected void handleMessage(boolean isMine, Message msg) {
      messages1.add(msg);
    }
  };
  
  Node node2 = new Node("NODE2") {
    @Override
    protected void handleMessage(boolean isMine, Message msg) {
      messages2.add(msg);
    }
  };
  
  sequencer.addNode(node1, node2);
  
  sequencer.open().activate();
  node1.open().activate();
  node2.open().activate();
  
  Random rand = new Random();
  
  final int messagesToSend = 200;
  
  for(int i = 0; i < messagesToSend; i++) {
    Node n = rand.nextBoolean() ? node1 : node2;
    n.sendCommand("Hi" + rand.nextInt(1000));
  }
  
  Assert.assertEquals(messagesToSend, messages1.size());
  Assert.assertEquals(messagesToSend, messages2.size());
  
  Iterator<Message> iter1 = messages1.iterator();
  Iterator<Message> iter2 = messages2.iterator();
  
  while(iter1.hasNext() && iter2.hasNext()) {
    Message m1 = iter1.next();
    Message m2 = iter2.next();
    Assert.assertTrue(m1.isEqualTo(m2));
  }
  
  node1.close();
  node2.close();
  sequencer.close();


Nodes become deterministic finite-state machines


import static com.coralblocks.simsequencer.util.Log.*;

import com.coralblocks.simsequencer.Message;
import com.coralblocks.simsequencer.Node;

public class CounterNode extends Node {
	
	private long counter;

	public CounterNode(String name) {
		super(name);
	}
	
	@Override
	protected void handleOpened() {
		this.counter = 1;
		Info.log(name, "Counter was reset to 1");
	}
	
	public long getCounter() {
		return counter;
	}
	
	@Override
	protected void handleMessage(boolean isMine, Message msg) {
		
		String[] tokens = msg.getDataAsString().split("\\|");
		
		String type = tokens[0];
		
		if (type.equals("ADD_DETERMINISTIC_TIMESTAMP")) {
			counter += currentSequencerTime();
		} else if (type.equals("ADD_VALUE")) {
			counter += Integer.parseInt(tokens[1]);
		}
	}
}
  Sequencer sequencer = new PassThroughSequencer("SEQ");
  
  CounterNode node = new CounterNode("NODE1");
  
  sequencer.addNode(node);

  sequencer.open().activate();
  node.open().activate();
  
  Random rand = new Random();

  final int messagesToSend = 200;

  for(int i = 0; i < messagesToSend; i++) {
      int type = rand.nextInt(2);
      if (type == 0) {
        node.sendCommand("ADD_DETERMINISTIC_TIMESTAMP");
      } else if (type == 1) {
        node.sendCommand("ADD_VALUE|" + rand.nextInt(300000));
      }
  }
  
  final long counter = node.getCounter();

  for(int i = 0; i < 5; i++) {
    node.close();
    node.open(); // resets the counter to 1...
    // node then rewinds, receiving all messages again...
    Assert.assertEquals(counter, node.getCounter());
  }

  node.close();
  sequencer.close();


Clusters for high-availability and failover become trivial


  // Hot-Warm (Active-Passive) Cluster
  
  Sequencer sequencer = new PassThroughSequencer("SEQ");
  
  // Two nodes with the same account "NODE1" for a cluster
  CounterNode nodeA = new CounterNode("NODE1");
  CounterNode nodeB = new CounterNode("NODE1");
  
  sequencer.addNode(nodeA, nodeB);
  
  sequencer.open().activate();
  
  nodeA.open().activate(); // hot (active)
  nodeB.open(); // warm (passive)

  Random rand = new Random();

  final int messagesToSend = 200;

  for(int i = 0; i < messagesToSend; i++) {
    int type = rand.nextInt(2);
    if (type == 0) {
      nodeA.sendCommand("ADD_DETERMINISTIC_TIMESTAMP");
    } else if (type == 1) {
      nodeA.sendCommand("ADD_VALUE|" + rand.nextInt(300000));
    }
  }
  
  Assert.assertEquals(nodeA.getCounter(), nodeB.getCounter());
  
  // fail over to the warm node
  nodeA.deactivate(); // now it is warm
  nodeB.activate(); // now it is hot
  
  for(int i = 0; i < messagesToSend; i++) {
    int type = rand.nextInt(2);
    if (type == 0) {
      nodeB.sendCommand("ADD_DETERMINISTIC_TIMESTAMP");
    } else if (type == 1) {
      nodeB.sendCommand("ADD_VALUE|" + rand.nextInt(300000));
    }
  }
  
  Assert.assertEquals(nodeA.getCounter(), nodeB.getCounter());

  nodeA.close();
  nodeB.close();
  sequencer.close();
  // Hot-Hot (Active-Active) Cluster

  Sequencer sequencer = new PassThroughSequencer("SEQ");

  // Two nodes with the same account "NODE1" for a cluster
  CounterNode nodeA = new CounterNode("NODE1");
  CounterNode nodeB = new CounterNode("NODE1");
  
  sequencer.addNode(nodeA, nodeB);
  
  sequencer.open().activate();

  nodeA.open().activate(); // hot (active)
  nodeB.open().activate(); // hot (active)
  
  Random rand = new Random();

  final int messagesToSend = 200;
  
  for(int i = 0; i < messagesToSend; i++) {
    int type = rand.nextInt(2);
    if (type == 0) {
      if (rand.nextBoolean()) { // order does not matter
        nodeA.sendCommand("ADD_DETERMINISTIC_TIMESTAMP");
        nodeB.sendCommand("ADD_DETERMINISTIC_TIMESTAMP");
      } else {
        nodeB.sendCommand("ADD_DETERMINISTIC_TIMESTAMP");
        nodeA.sendCommand("ADD_DETERMINISTIC_TIMESTAMP");
      }
    } else if (type == 1) {
      int value = rand.nextInt(300000);
      if (rand.nextBoolean()) { // order does not matter
        nodeA.sendCommand("ADD_VALUE|" + value);
        nodeB.sendCommand("ADD_VALUE|" + value);
      } else {
        nodeB.sendCommand("ADD_VALUE|" + value);
        nodeA.sendCommand("ADD_VALUE|" + value);
      }
    }
  }

  Assert.assertEquals(nodeA.getCounter(), nodeB.getCounter());

  // pull the plug from one of the nodes
  nodeA.close(); // now it is dead
  nodeA = null;

  for(int i = 0; i < messagesToSend; i++) {
    int type = rand.nextInt(2);
    if (type == 0) {
      nodeB.sendCommand("ADD_DETERMINISTIC_TIMESTAMP");
    } else if (type == 1) {
      nodeB.sendCommand("ADD_VALUE|" + rand.nextInt(300000));
    }
  }

  // bring another node to the cluster...
  CounterNode nodeC = new CounterNode("NODE1");
  sequencer.addNode(nodeC);
  nodeC.open().activate(); // hot (active)

  Assert.assertEquals(nodeB.getCounter(), nodeC.getCounter());

  nodeB.close();
  nodeC.close();
  sequencer.close();