package com.coralblocks.coralqueue.bench; import com.coralblocks.coralqueue.AtomicQueue; import com.coralblocks.coralqueue.Queue; import com.coralblocks.coralqueue.util.MutableLong; import com.coralblocks.coralqueue.waitstrategy.SpinWaitStrategy; import com.coralblocks.coralqueue.waitstrategy.WaitStrategy; import com.coralblocks.coralthreads.Affinity; import com.coralblocks.coralutils.SystemUtils; import com.coralblocks.coralutils.bench.Benchmarker; import com.coralblocks.coralutils.tsutils.Timestamper; /** * Two different cores: (no hyper-threading) * * java -server -verbose:gc -cp target/coralqueue-all.jar -Xms2g -Xmx8g -XX:NewSize=512m -XX:MaxNewSize=1024m -DmessagesToWarmup=10000000 -DmessagesToTest=10000000 -DproducerProcToBind=2 -DconsumerProcToBind=3 -DblockCount=true -DdetailedBenchmarker=true -DexcludeNanoTimeCost=true com.coralblocks.coralqueue.bench.TestMessageLatency * * Iterations: 10000000 | Avg Time: 88.18 nanos | Min Time: 64.0 nanos | Max Time: 5.961 micros | 75% = [avg: 84.0 nanos, max: 94.0 nanos] | 90% = [avg: 86.0 nanos, max: 98.0 nanos] | 99% = [avg: 87.0 nanos, max: 109.0 nanos] | 99.9% = [avg: 88.0 nanos, max: 134.0 nanos] | 99.99% = [avg: 88.0 nanos, max: 236.0 nanos] | 99.999% = [avg: 88.0 nanos, max: 1.198 micros] * * Same core: (with hyper-threading) * * java -server -verbose:gc -cp target/coralqueue-all.jar -Xms2g -Xmx8g -XX:NewSize=512m -XX:MaxNewSize=1024m -DmessagesToWarmup=10000000 -DmessagesToTest=10000000 -DproducerProcToBind=2 -DconsumerProcToBind=6 -DblockCount=true -DdetailedBenchmarker=true -DexcludeNanoTimeCost=true com.coralblocks.coralqueue.bench.TestMessageLatency * * Iterations: 10000000 | Avg Time: 52.97 nanos | Min Time: 32.0 nanos | Max Time: 9.052 micros | 75% = [avg: 51.0 nanos, max: 56.0 nanos] | 90% = [avg: 52.0 nanos, max: 58.0 nanos] | 99% = [avg: 52.0 nanos, max: 61.0 nanos] | 99.9% = [avg: 52.0 nanos, max: 66.0 nanos] | 99.99% = [avg: 52.0 nanos, max: 287.0 nanos] | 99.999% = [avg: 52.0 nanos, max: 1.27 micros] */ public class TestMessageLatency { public static void main(String[] args) throws InterruptedException { final int messagesToWarmup = SystemUtils.getInt("messagesToWarmup", 10000000); final int messagesToTest = SystemUtils.getInt("messagesToTest", 10000000); final int queueCapacity = SystemUtils.getInt("queueCapacity", 512); int prodProcToBind = SystemUtils.getInt("producerProcToBind", 2); int consProcToBind = SystemUtils.getInt("consumerProcToBind", 3); /* * messagesToWarmup is VERY important to warmup benchmark class. Failure to pass it will producer outliers !!! */ final Benchmarker bench = Benchmarker.create(messagesToWarmup); final Timestamper timestamper = bench.getTimestamper(); final Queue<MutableLong> queue = new AtomicQueue<MutableLong>(queueCapacity, MutableLong.BUILDER); final Queue<MutableLong> echoQueue = new AtomicQueue<MutableLong>(queueCapacity, MutableLong.BUILDER); // we use a strategy to check block count final WaitStrategy producerWaitStrategy = new SpinWaitStrategy(); System.out.println("messagesToWarnup: " + messagesToWarmup); System.out.println("messagesToTest: " + messagesToTest); System.out.println("queueCapacity: " + queueCapacity); System.out.println("timestamper: " + timestamper); Thread producer = new Thread(new Runnable() { @Override public void run() { Affinity.bind(); long count = 0; long total = messagesToWarmup + messagesToTest; while(count <= total) { MutableLong ml = null; boolean done = count == total; bench.mark(); while((ml = queue.nextToDispatch()) == null) { producerWaitStrategy.block(); } ml.set(done ? -1 : count); queue.flush(false); // no lazySet, send immediately producerWaitStrategy.reset(); long avail; while((avail = echoQueue.availableToPoll()) == 0); for(int i = 0; i < avail; i++) { echoQueue.poll().get(); } echoQueue.donePolling(true); // can be lazy because queue will not be full... count++; } Affinity.unbind(); System.out.println("producer exiting..."); } }, "Producer"); Thread consumer = new Thread(new Runnable() { @Override public void run() { Affinity.bind(); boolean running = true; while (running) { long avail = queue.availableToPoll(); if (avail > 0) { for(int i = 0; i < avail; i++) { MutableLong ml = queue.poll(); long x = ml.get(); if (x != -1) { bench.measure(); } else { running = false; } // echo back! while((ml = echoQueue.nextToDispatch()) == null); ml.set(x); echoQueue.flush(false); } queue.donePolling(true); // can be lazy because queue will never be full... } } Affinity.unbind(); System.out.println("consumer exiting..."); } }, "Consumer"); if (Affinity.isAvailable()) { Affinity.assignToProcessor(prodProcToBind, producer); Affinity.assignToProcessor(consProcToBind, consumer); } else { System.err.println("Thread affinity not available!"); } consumer.start(); producer.start(); consumer.join(); producer.join(); System.out.println("Producer block count: " + producerWaitStrategy.getTotalBlockCount()); System.out.println(bench.results()); } }