package com.coralblocks.coralqueue.bench;
import com.coralblocks.coralqueue.AtomicQueue;
import com.coralblocks.coralqueue.Queue;
import com.coralblocks.coralqueue.util.MutableLong;
import com.coralblocks.coralthreads.Affinity;
import com.coralblocks.coralutils.SystemUtils;
import com.coralblocks.coralutils.bench.Benchmarker;
/**
* Different cores: (no hyper-threading)
*
* java -server -verbose:gc -cp target/coralqueue-all.jar -Xms2g -Xmx8g -XX:NewSize=512m -XX:MaxNewSize=1024m -DmessagesToTest=10000000 -DproducerProcToBind=2 -DconsumerProcToBind=3 -DexcludeNanoTimeCost=true -DqueueCapacity=1024 com.coralblocks.coralqueue.bench.TestProducerThroughput
*
* Results: Iterations: 20 | Avg Time: 139.555 millis | Min Time: 137.649 millis | Max Time: 145.542 millis
* Average time to send 10,000,000 messages: 139,555,424 nanos
* Messages per second: 71,656,118
*
* Same core: (with hyper-threaading)
*
* java -server -verbose:gc -cp target/coralqueue-all.jar -Xms2g -Xmx8g -XX:NewSize=512m -XX:MaxNewSize=1024m -DmessagesToTest=10000000 -DproducerProcToBind=2 -DconsumerProcToBind=6 -DexcludeNanoTimeCost=true -DqueueCapacity=1024 com.coralblocks.coralqueue.bench.TestProducerThroughput
*
* Results: Iterations: 20 | Avg Time: 102.703 millis | Min Time: 102.32 millis | Max Time: 109.105 millis
* Average time to send 10,000,000 messages: 102,702,560 nanos
* Messages per second: 97,368,556
*/
public class TestProducerThroughput {
public static void main(String[] args) throws InterruptedException {
final int messagesToTest = SystemUtils.getInt("messagesToTest", 10000000);
final int capacity = SystemUtils.getInt("queueCapacity", 1024);
final int passes = SystemUtils.getInt("passes", 20);
int prodProcToBind = SystemUtils.getInt("producerProcToBind", 2);
int consProcToBind = SystemUtils.getInt("consumerProcToBind", 3);
final Queue<MutableLong> queue = new AtomicQueue<MutableLong>(capacity, MutableLong.BUILDER);
final Benchmarker bench = Benchmarker.create(1);
System.out.println("messagesToTest: " + messagesToTest);
System.out.println("queueCapacity: " + capacity);
System.out.println("timestamper: " + bench.getTimestamper());
Thread producer = new Thread(new Runnable() {
@Override
public void run() {
Affinity.bind();
MutableLong ml = null;
for(int i = 0; i < passes + 1; i++) { // one for warmup
long count = 0;
bench.mark();
while(count < messagesToTest) {
while((ml = queue.nextToDispatch()) == null);
ml.set(count);
queue.flush(true);
count++;
}
long t = bench.measure();
System.out.println("Pass " + i + "... " + (i == 0 ? "(warmup)" : "(" + Benchmarker.convertNanoTime(t) + ")"));
}
while((ml = queue.nextToDispatch()) == null);
ml.set(-1);
queue.flush(true);
Affinity.unbind();
System.out.println("producer exiting...");
}
}, "Producer");
Thread consumer = new Thread(new Runnable() {
@Override
public void run() {
Affinity.bind();
boolean running = true;
while (running) {
long avail = queue.availableToPoll();
if (avail > 0) {
for(int i = 0; i < avail; i++) {
MutableLong ml = queue.poll();
long x = ml.get();
if (x == -1) running = false;
}
queue.donePolling(false); // no lazy, tell producer immediately in case queue was full...
}
}
Affinity.unbind();
System.out.println("consumer exiting...");
}
}, "Consumer");
if (Affinity.isAvailable()) {
Affinity.assignToProcessor(prodProcToBind, producer);
Affinity.assignToProcessor(consProcToBind, consumer);
} else {
System.err.println("Thread affinity not available!");
}
consumer.start();
producer.start();
consumer.join();
producer.join();
long time = Math.round(bench.getAverage());
long mps = messagesToTest * 1000000000L / time;
System.out.println("Results: " + bench.results());
System.out.println("Average time to send " + messagesToTest + " messages: " + time + " nanos");
System.out.println("Messages per second: " + mps);
}
}