Writing a C++ CoralSequencer Node

Writing some C++ code that gets called by your Java code is trivial through shared libraries but a more interesting project is to do the inverse: to call Java code from a C++ system. In this article we write a C++ CoralSequencer node to perform a latency benchmark test.


Overview

To effectively call Java from C++, your C++ code must instantiate and start a Java Virtual Machine to execute the Java code. The same JVM will then call your C++ system back through a shared library. The goal is to allow any C++ subsystem to directly interact with the CoralSequencer distributed system.


The Code

Below a simple CNode CoralSequencer node that your C++ system can use to receive a callback with the CoralSequencer event-stream message.

public class CNode extends Node {
	
	static {
        	System.loadLibrary("CNode"); // the shared library used to send the callback to C++
	}

	public CNode(NioReactor nio, String name, Configuration config) {
		super(nio, name, config);
	}
	
	public native void handleMessageC(boolean isMine, Message msg);
	
	@Override
	protected void handleMessage(boolean isMine, Message msg) {
		handleMessageC(isMine, msg); // call C++
	}

}

To run this node we use the simple mq file below: (the file path is ./mqs/cnode.mq)

VM addAdmin telnet 57
VM newNode NODE7 com.coralblocks.coralsequencer.mq.CNode
NODE7 open
NODE7 activate

Now we write the C++ code to instantiate a JVM and run this mq file to start our CNode.

    #include <jni.h>
    // Many other includes here (omitted for clarity)
    #include "com_coralblocks_coralsequencer_mq_CNode.h"
    using namespace std;

    static const int ITERATIONS = 2000000;
    static const int WARMUP = 1000000;
    static const int MSG_SIZE = 256;

    long get_nano_ts(timespec* ts) {
        clock_gettime(CLOCK_MONOTONIC, ts);
        return ts->tv_sec * 1000000000 + ts->tv_nsec;
    }

    struct mi {
        long value;
    };

    void add_perc(stringstream& ss, int size, double perc, map<int, mi*>* map) {
    
        // omitted for clarity
    }

    char* createRandomCharArray(int size) {
        // omitted for clarity
    }

    int main(int argc, char **argv) {

        JavaVM *jvm;                       // Pointer to the JVM (Java Virtual Machine)
        JNIEnv *env;                       // Pointer to native interface
        JavaVMInitArgs vm_args;            // JVM initialization arguments
        JavaVMOption options[24];          // JVM options

        // add the 25 JVM options here (omitted for clarity)
        
        vm_args.version = JNI_VERSION_1_6;                      // Set the JNI version
        vm_args.nOptions = 24;                                  // Set the number of options
        vm_args.options = options;                              // Set the options to the JVM
        
        // Load and initialize the JVM
        JNI_CreateJavaVM(&jvm, (void**)&env, &vm_args);

        cout << "JVM created!!!" << endl;

        jvm->AttachCurrentThread((void**)&env, NULL);

        jclass startJavaClass = env->FindClass("com/coralblocks/coralsequencer/Start");
        jmethodID findAppMethod = env->GetStaticMethodID(startJavaClass, "findApplication", "(Ljava/lang/String;)Lcom/coralblocks/coralsequencer/app/Application;");
        jclass nodeClass = env->FindClass("com/coralblocks/coralsequencer/mq/CNode");
        jmethodID sendCommandMethod = env->GetMethodID(nodeClass, "sendCommand", "(Ljava/lang/CharSequence;)Z");

        jstring str1 = env->NewStringUTF("mqs/cnode.mq");
        jclass stringClass = env->FindClass("java/lang/String");
        jobject args = env->NewObjectArray(1, stringClass, str1);
        jmethodID mainMethod = env->GetStaticMethodID(startJavaClass, "main", "([Ljava/lang/String;)V");
        jmethodID isActiveMethod = env->GetMethodID(nodeClass, "isActive", "()Z");

        cout << "About to call Java main method..." << endl;

        env->CallStaticVoidMethod(startJavaClass, mainMethod, args);

        cout << "Returned from Java main method!" << endl;

        // Get the node (NODE7)
        jobject node = env->CallStaticObjectMethod(startJavaClass, findAppMethod, env->NewStringUTF("NODE7"));

        cout << "Waiting for node to become active..." << endl;

        // Sleep until node becomes active
        while(env->CallBooleanMethod(node, isActiveMethod) == JNI_FALSE) sleep(1);

        cout << "isActive() returned true!" << endl;

        jstring msgToSend = env->NewStringUTF(createRandomCharArray(MSG_SIZE));

        cout << "About to send first message!" << endl;

        env->CallObjectMethod(node, sendCommandMethod, msgToSend);

        cout << "First message sent!" << endl;

        jvm->DetachCurrentThread();

        // Release the JVM
        jvm->DestroyJavaVM(); // this will wait for Java threads to die...

        cout << "JVM Destroyed!!!" << endl;

        return 0;
    }

    struct timespec ts;
    long startTime = 0;
    long endTime = 0;
    map<int, mi*>* results;
    int iterations = 0;

    jobject node;
    jmethodID sendCommandMethod;
    jmethodID isRewindingMethod;
    jstring msgToSend;

    JNIEXPORT void JNICALL Java_com_coralblocks_coralsequencer_mq_CNode_handleMessageC
    (JNIEnv *env, jobject obj, jboolean isMine, jobject msg) {

        endTime = get_nano_ts(&ts);

        if (node == NULL) {
            jclass startJavaClass = env->FindClass("com/coralblocks/coralsequencer/Start");
            jmethodID findAppMethod = env->GetStaticMethodID(startJavaClass, "findApplication", "(Ljava/lang/String;)Lcom/coralblocks/coralsequencer/app/Application;");
            jclass nodeClass = env->FindClass("com/coralblocks/coralsequencer/mq/CNode");
            sendCommandMethod = env->GetMethodID(nodeClass, "sendCommand", "(Ljava/lang/CharSequence;)Z");
            isRewindingMethod = env->GetMethodID(nodeClass, "isRewinding", "()Z");

            node = env->CallStaticObjectMethod(startJavaClass, findAppMethod, env->NewStringUTF("NODE7"));

            node = env->NewGlobalRef(node);

            msgToSend = env->NewStringUTF(createRandomCharArray(MSG_SIZE));

            results = new map<int, mi*>();
        }

        if (env->CallBooleanMethod(node, isRewindingMethod) == JNI_TRUE) return;

        int res = startTime > 0 ? (endTime - startTime) : 1; // 1 only for first message/pass

        if (res <= 0) res = 1;

        if (iterations++ >= WARMUP) {
            
            // add the result (omitted for clarity)
        }

        if (iterations == ITERATIONS) {

            // print the results (omitted for clarity) 

        } else {

            startTime = get_nano_ts(&ts);

            env->CallObjectMethod(node, sendCommandMethod, msgToSend);

        }

    }

NOTE: The full source code can be seen here.

The trick is to compile this C++ code twice: as the main C++ program to be executed (the one that will start the JVM) and as the shared library that will be used by CNode.java (the one that will receive the callback):

# Using Java 21 and clang 14.0.6

# Compile the main C++ program to start the JVM
clang++ -I"$JAVA_HOME/include" -I"$JAVA_HOME/include/linux" -o bin/linux/Bench src/main/c/linux/Bench.cpp -L"$JAVA_HOME/lib/server" -ljvm -Wno-write-strings

# Generate the com_coralblocks_coralsequencer_mq_CNode.h header file
javac -h src/main/c/linux -d target/classes -sourcepath src/main/java -cp target/coralsequencer-all.jar src/main/java/com/coralblocks/coralsequencer/mq/CNode.java

# Compile the shared library to be used by CNode.java
clang++ -shared -fPIC -I"$JAVA_HOME/include" -I"$JAVA_HOME/include/linux/" src/main/c/linux/Bench.cpp -o lib/libCNode.so -L"$JAVA_HOME/lib/server" -ljvm -Wno-write-strings

Now when we execute our C++ application with the command-line below:

$ LD_LIBRARY_PATH=$JAVA_HOME/lib/server ./bin/linux/Bench

We get the following latency benchmark results:

Message Size: 256 bytes
Messages: 1,000,000
Avg Time: 4.808 micros
Min Time: 3.754 micros
Max Time: 717.729 micros
75% = [avg: 4.522 micros, max: 5.055 micros]
90% = [avg: 4.618 micros, max: 5.159 micros]
99% = [avg: 4.681 micros, max: 6.339 micros]
99.9% = [avg: 4.717 micros, max: 20.547 micros]
99.99% = [avg: 4.773 micros, max: 268.823 micros]
99.999% = [avg: 4.803 micros, max: 427.988 micros]

As expected this is very close to our official CoralSequencer latency numbers, as described here.

Conclusion

It is pretty straightforward to write C++ applications that use the CoralSequencer infra-structure to interact with your distributed system. The performance cost to cross the native to JVM border and back is very small as the benchmark results in this article demonstrate.