package com.coralblocks.coralqueue.bench;
import com.coralblocks.coralqueue.AtomicQueue;
import com.coralblocks.coralqueue.Queue;
import com.coralblocks.coralqueue.util.MutableLong;
import com.coralblocks.coralqueue.util.PauseSupport;
import com.coralblocks.coralqueue.waitstrategy.SpinWaitStrategy;
import com.coralblocks.coralqueue.waitstrategy.WaitStrategy;
import com.coralblocks.coralthreads.Affinity;
import com.coralblocks.coralutils.SystemUtils;
import com.coralblocks.coralutils.bench.Benchmarker;
import com.coralblocks.coralutils.tsutils.Timestamper;
/**
* Different cores: (without hyper-threading)
*
* java -server -verbose:gc -cp target/coralqueue-all.jar -Xms2g -Xmx8g -XX:NewSize=512m -XX:MaxNewSize=1024m -DmessagesToWarmup=1000000 -DmessagesToTest=10000000 -Dpause=100 -DproducerProcToBind=2 -DconsumerProcToBind=3 -DblockCount=true -DdetailedBenchmarker=true -DexcludeNanoTimeCost=true com.coralblocks.coralqueue.bench.TestProducerLatency
*
* Iterations: 10000000 | Avg Time: 24.65 nanos | Min Time: 4.0 nanos | Max Time: 6.03 micros | 75% = [avg: 24.0 nanos, max: 25.0 nanos] | 90% = [avg: 24.0 nanos, max: 25.0 nanos] | 99% = [avg: 24.0 nanos, max: 26.0 nanos] | 99.9% = [avg: 24.0 nanos, max: 148.0 nanos] | 99.99% = [avg: 24.0 nanos, max: 264.0 nanos] | 99.999% = [avg: 24.0 nanos, max: 415.0 nanos]
*
* Same core: (with hyper-threading)
*
* java -server -verbose:gc -cp target/coralqueue-all.jar -Xms2g -Xmx8g -XX:NewSize=512m -XX:MaxNewSize=1024m -DmessagesToWarmup=1000000 -DmessagesToTest=10000000 -Dpause=100 -DproducerProcToBind=2 -DconsumerProcToBind=6 -DblockCount=true -DdetailedBenchmarker=true -DexcludeNanoTimeCost=true com.coralblocks.coralqueue.bench.TestProducerLatency
*
* Iterations: 10000000 | Avg Time: 13.91 nanos | Min Time: 7.0 nanos | Max Time: 10.248 micros | 75% = [avg: 13.0 nanos, max: 15.0 nanos] | 90% = [avg: 13.0 nanos, max: 16.0 nanos] | 99% = [avg: 13.0 nanos, max: 27.0 nanos] | 99.9% = [avg: 13.0 nanos, max: 32.0 nanos] | 99.99% = [avg: 13.0 nanos, max: 281.0 nanos] | 99.999% = [avg: 13.0 nanos, max: 1.031 micros]
*/
public class TestProducerLatency {
public static void main(String[] args) throws InterruptedException {
final int messagesToWarmup = SystemUtils.getInt("messagesToWarmup", 1000000);
final int messagesToTest = SystemUtils.getInt("messagesToTest", 10000000);
final long pause = SystemUtils.getLong("pause", 100);
int prodProcToBind = SystemUtils.getInt("producerProcToBind", 2);
int consProcToBind = SystemUtils.getInt("consumerProcToBind", 3);
/*
* messagesToWarmup is VERY important to warmup benchmark class. Failure to pass it will producer outliers !!!
*/
final Benchmarker bench = Benchmarker.create(messagesToWarmup);
final Timestamper timestamper = bench.getTimestamper();
final Queue<MutableLong> queue = new AtomicQueue<MutableLong>(512, MutableLong.BUILDER);
// we use a strategy to check block count
final WaitStrategy producerWaitStrategy = new SpinWaitStrategy();
System.out.println("messagesToWarnup: " + messagesToWarmup);
System.out.println("messagesToTest: " + messagesToTest);
System.out.println("pause: " + pause);
System.out.println("timestamper: " + timestamper);
Thread producer = new Thread(new Runnable() {
@Override
public void run() {
Affinity.bind();
long count = 0;
long total = messagesToWarmup + messagesToTest;
while(count < total) {
MutableLong ml = null;
bench.mark();
while((ml = queue.nextToDispatch()) == null) {
producerWaitStrategy.block();
}
ml.set(count);
queue.flush(true); // lazySet true
bench.measure();
producerWaitStrategy.reset();
PauseSupport.assignment(pause);
count++;
}
Affinity.unbind();
System.out.println("producer exiting...");
}
}, "Producer");
Thread consumer = new Thread(new Runnable() {
@Override
public void run() {
Affinity.bind();
long total = messagesToWarmup + messagesToTest;
boolean running = true;
while (running) {
long avail = queue.availableToPoll();
if (avail > 0) {
for(int i = 0; i < avail; i++) {
MutableLong ml = queue.poll();
long x = ml.get();
if (x == total - 1) running = false;
}
queue.donePolling(false); // no lazy, tell producer immediately in case queue was full...
}
}
Affinity.unbind();
System.out.println("consumer exiting...");
}
}, "Consumer");
if (Affinity.isAvailable()) {
Affinity.assignToProcessor(prodProcToBind, producer);
Affinity.assignToProcessor(consProcToBind, consumer);
} else {
System.err.println("Thread affinity not available!");
}
consumer.start();
producer.start();
consumer.join();
producer.join();
System.out.println("Producer block count: " + producerWaitStrategy.getTotalBlockCount());
System.out.println(bench.results());
}
}