Inter-Process Communication with CoralQueue

CoralQueue is great for inter-thread communication, when both threads are running in the same JVM. However it also supports inter-process communication (IPC) through a shared memory mapped file so that two threads running on the same machine but on different JVMs can exchange messages. This is much faster than the other option which would be network access through loopback. In this article we will examine how this can be easily done and present the benchmark numbers for IPC.

NOTE: The memory-mapped file is just used here as an in-memory bridge between the two JVMs. It is not used for any kind of file persistency. If you want to log huge amounts of data without producing any garbage and with ultra-low-latency you can refer to CoralLog.

Serializing the Transfer Object to Memory

In order to transfer your Java object with all its data from one JVM to another, you must make it implement the MemorySerializable interface. The purpose of this interface is to define how the object contents will be written to and read from memory as a sequence of primitives. The interface is listed below:

package com.coralblocks.coralbits.mem;

public interface MemorySerializable {
	
	public void writeTo(long pointer, Memory memory);
	
	public void readFrom(long pointer, Memory memory);
	
}

For example, to serialize the MutableLong class is easy:

// com.coralblocks.coralqueue.util.MutableLong implements MemorySerializable

	@Override
    public void writeTo(long pointer, Memory memory) {
		// write the value of this MutableLong object to memory
		memory.putLong(pointer, get()); // get() is the method from MutableLong that returns its value
	}

	@Override
    public void readFrom(long pointer, Memory memory) {
		// read the value of this MutableLong object from memory
		set(memory.getLong(pointer)); // set() is the method from MutableLong to change its value
    }

The interface Memory has putXXX and getXXX methods for each primitive as you can see below:

package com.coralblocks.coralbits.mem;

public interface Memory {
	
	public long getLong(long address);

	public void putLong(long address, long value);
	
	public int getInt(long address);
	
	public void putInt(long address, int value);
	
	public byte getByte(long address);
	
	public void putByte(long address, byte value);
	
	public short getShort(long address);
	
	public void putShort(long address, short value);

	public char getChar(long address);
	
	public void putChar(long address, char value);
}

Below a couple of more examples of how to serialize an object to memory:

	public class Message1 implements MemorySerializable {
		
		private static final int LEN = 32;
		public static final int MAX_SIZE_IN_BYTES = LEN;
		
		private final StringBuilder sb = new StringBuilder(LEN);
		
		@Override
        public void writeTo(long pointer, Memory memory) {
			int sbLength = sb.length();
			for(int i = 0; i < LEN; i++) {
				char c = ' '; // pad with blank
				if (i < sbLength) c = sb.charAt(i);
				// putChar writes 1 byte
				memory.putChar(pointer + i, c);
			}
        }

		@Override
        public void readFrom(long pointer, Memory memory) {
			sb.setLength(0);
			for(int i = 0; i < LEN; i++) {
				// getChar reads 1 byte...
				sb.append(memory.getChar(pointer + i));
			}
        }
		
		// (...)
	}
	
	public class Message2 implements MemorySerializable {
		
		private static final int SIZE_OF_LONG_IN_BYTES = 8;
		private static final int LEN = 32;
		public static final int MAX_SIZE_IN_BYTES = LEN * SIZE_OF_LONG_IN_BYTES;
		
		private final long[] data = new long[LEN];
		
		@Override
        public void writeTo(long pointer, Memory memory) {
			for(int i = 0; i < LEN; i++) {
				// putLong writes 8 bytes...
				memory.putLong(pointer + (i * SIZE_OF_LONG_IN_BYTES), data[i]);
			}
        }

		@Override
        public void readFrom(long pointer, Memory memory) {
			for(int i = 0; i < LEN; i++) {
				// getLong reads 8 bytes...
				data[i] = memory.getLong(pointer + (i * SIZE_OF_LONG_IN_BYTES));
			}
        }
		
		// (...)
	}


Writing the Producer

It follows the same easy API and design patterns of CoralQueue. Below a producer example:

package com.coralblocks.coralqueue.test;

import com.coralblocks.coralbits.util.SystemUtils;
import com.coralblocks.coralqueue.offheap.OffHeapProducer;
import com.coralblocks.coralqueue.util.MutableLong;

public class TestOffHeapProducer {
	
	public static void main(String[] args) throws Exception {
		
		final int messages = args.length > 0 ? Integer.parseInt(args[0]) : 10000000;
		
		final int queueCapacity = SystemUtils.getInt("capacity", 1024);
		
		final int maxObjectSize = 8; // a mutable long of course will have a max size of 8 bytes
		
		final OffHeapProducer<MutableLong> producer = 
			new OffHeapProducer<MutableLong>(queueCapacity, maxObjectSize, MutableLong.class, "testIPC.mmap");

		long time = System.currentTimeMillis();
		
		for(int i = 1; i <= messages; i++) {
			MutableLong ml;
			while((ml = producer.nextToDispatch()) == null); // busy spin... (no wait strategy)
			ml.set(i);
			producer.flush();
		}
		
		time = System.currentTimeMillis() - time;
		
		System.out.println("Number of messages: " + messages);
		System.out.println("Total time: " + time);
	}
}

NOTE: You must correctly specify the max object size that will be transferred so that a proper queue size can be calculated. Since a MutableLong only has one long as its data, the max size is of course 8 bytes (i.e. the size of a Java long)


Writing the Consumer

Again it follows the same easy API and design patterns of CoralQueue. Below a consumer example:

package com.coralblocks.coralqueue.test;

import com.coralblocks.coralbits.util.SystemUtils;
import com.coralblocks.coralqueue.offheap.OffHeapConsumer;
import com.coralblocks.coralqueue.util.MutableLong;

public class TestOffHeapConsumer {

	public static void main(String[] args) throws Exception {
		
		final int messages = args.length > 0 ? Integer.parseInt(args[0]) : 10000000;
		
		final int queueCapacity = SystemUtils.getInt("capacity", 1024);
		
		final int maxObjectSize = 8; // a mutable long of course will have a max size of 8 bytes
		
		final OffHeapConsumer<MutableLong> consumer = 
			new OffHeapConsumer<MutableLong>(queueCapacity, maxObjectSize, MutableLong.class, "testIPC.mmap");
		
		long expectedSeq = 1;
		
		int count = 0;
		
		boolean running = true;
		
		long time = System.currentTimeMillis();

		while(running) {
			
			long x = consumer.availableToPoll();
			
			if (x > 0) {
				
				for(int i = 0; i < x; i++) {
				
					 MutableLong ml = consumer.poll();
					 
					 long seq = ml.get();
					 
					 if (seq == expectedSeq) {
						expectedSeq++;
					 } else {
						throw new IllegalStateException("Got bad sequence! expected="+ expectedSeq + " received=" + seq);
					 }
					 
					 if (++count == messages) {
						 running = false;
						 break;
					 }
				}
				
				consumer.donePolling();
				
			} else {
				// busy spin... (no wait strategy)
			}
		}
		
		time = System.currentTimeMillis() - time;
		
		System.out.println("Number of messages: " + count);
		System.out.println("Total time: " + time);
		
	}
}


Latency Numbers

IPC is very fast, not faster than inter-thread communication inside the same JVM (around 53 nanos) but much faster than network access (around 2.15 micros). Below we present the message latencies for CoralQueue’s IPC. The machine used for the benchmarks was an Intel i7 quad-core (4 x 3.50GHz) Ubuntu box overclocked to 4.50Ghz.

Messages: 1,350,000 (8-byte size)
Avg Time: 97.64 nanos
Min Time: 61.0 nanos
Max Time: 3.922 micros
75% = [avg: 95.0 nanos, max: 103.0 nanos]
90% = [avg: 96.0 nanos, max: 105.0 nanos]
99% = [avg: 97.0 nanos, max: 110.0 nanos]
99.9% = [avg: 97.0 nanos, max: 163.0 nanos]
99.99% = [avg: 97.0 nanos, max: 239.0 nanos]
99.999% = [avg: 97.0 nanos, max: 1.597 micros]


Conclusion

Inter-process communication offers an in-between solution for messaging. At one extreme you have inter-thread communication inside the same JVM and at the other you have network access for processes which are running in different machines. IPC is a viable solution when for some reason you can’t run both threads inside the same JVM and you are not willing to afford the extra cost of network access. CoralQueue offers IPC through a very easy and straightforward API, following the same design principles of CoralQueue’s AtomicQueue.