package com.coralblocks.coralqueue.bench;

import com.coralblocks.coralqueue.AtomicQueue;
import com.coralblocks.coralqueue.Queue;
import com.coralblocks.coralqueue.util.MutableLong;
import com.coralblocks.coralqueue.waitstrategy.SpinWaitStrategy;
import com.coralblocks.coralqueue.waitstrategy.WaitStrategy;
import com.coralblocks.coralthreads.Affinity;
import com.coralblocks.coralutils.SystemUtils;
import com.coralblocks.coralutils.bench.Benchmarker;
import com.coralblocks.coralutils.tsutils.Timestamper;

/**
 * Two different cores: (no hyper-threading)
 * 
 * java -server -verbose:gc -cp target/coralqueue-all.jar -Xms2g -Xmx8g -XX:NewSize=512m -XX:MaxNewSize=1024m -DmessagesToWarmup=10000000 -DmessagesToTest=10000000 -DproducerProcToBind=2 -DconsumerProcToBind=3 -DblockCount=true -DdetailedBenchmarker=true -DexcludeNanoTimeCost=true com.coralblocks.coralqueue.bench.TestMessageLatency
 * 
 * Iterations: 10000000 | Avg Time: 88.18 nanos | Min Time: 64.0 nanos | Max Time: 5.961 micros | 75% = [avg: 84.0 nanos, max: 94.0 nanos] | 90% = [avg: 86.0 nanos, max: 98.0 nanos] | 99% = [avg: 87.0 nanos, max: 109.0 nanos] | 99.9% = [avg: 88.0 nanos, max: 134.0 nanos] | 99.99% = [avg: 88.0 nanos, max: 236.0 nanos] | 99.999% = [avg: 88.0 nanos, max: 1.198 micros]
 *
 * Same core: (with hyper-threading)
 * 
 * java -server -verbose:gc -cp target/coralqueue-all.jar -Xms2g -Xmx8g -XX:NewSize=512m -XX:MaxNewSize=1024m -DmessagesToWarmup=10000000 -DmessagesToTest=10000000 -DproducerProcToBind=2 -DconsumerProcToBind=6 -DblockCount=true -DdetailedBenchmarker=true -DexcludeNanoTimeCost=true com.coralblocks.coralqueue.bench.TestMessageLatency
 *  
 * Iterations: 10000000 | Avg Time: 52.97 nanos | Min Time: 32.0 nanos | Max Time: 9.052 micros | 75% = [avg: 51.0 nanos, max: 56.0 nanos] | 90% = [avg: 52.0 nanos, max: 58.0 nanos] | 99% = [avg: 52.0 nanos, max: 61.0 nanos] | 99.9% = [avg: 52.0 nanos, max: 66.0 nanos] | 99.99% = [avg: 52.0 nanos, max: 287.0 nanos] | 99.999% = [avg: 52.0 nanos, max: 1.27 micros] 
 */
public class TestMessageLatency {
	
	public static void main(String[] args) throws InterruptedException {
		
		final int messagesToWarmup = SystemUtils.getInt("messagesToWarmup", 10000000);
		final int messagesToTest = SystemUtils.getInt("messagesToTest", 10000000);
		final int queueCapacity = SystemUtils.getInt("queueCapacity", 512);
		
		int prodProcToBind = SystemUtils.getInt("producerProcToBind", 2);
		int consProcToBind = SystemUtils.getInt("consumerProcToBind", 3);
		
		/*
		 * messagesToWarmup is VERY important to warmup benchmark class. Failure to pass it will producer outliers !!!
		 */
		final Benchmarker bench = Benchmarker.create(messagesToWarmup);
		final Timestamper timestamper = bench.getTimestamper();
		
		final Queue<MutableLong> queue = new AtomicQueue<MutableLong>(queueCapacity, MutableLong.BUILDER);
		
		final Queue<MutableLong> echoQueue = new AtomicQueue<MutableLong>(queueCapacity, MutableLong.BUILDER);
		
		// we use a strategy to check block count
		final WaitStrategy producerWaitStrategy = new SpinWaitStrategy();
		
		System.out.println("messagesToWarnup: " + messagesToWarmup);
		System.out.println("messagesToTest: " + messagesToTest);
		System.out.println("queueCapacity: " + queueCapacity);
		System.out.println("timestamper: " + timestamper);
		
		Thread producer = new Thread(new Runnable() {
			
			@Override
			public void run() {
				
				Affinity.bind();
				
				long count = 0;
				
				long total = messagesToWarmup + messagesToTest;
				
				while(count <= total) {
					
					MutableLong ml = null;
					
					boolean done = count == total;
					
					bench.mark();
					
					while((ml = queue.nextToDispatch()) == null) {
						producerWaitStrategy.block();
					}
					
					ml.set(done ? -1 : count);
					
					queue.flush(false); // no lazySet, send immediately
					
					producerWaitStrategy.reset();
					
					long avail;
					
					while((avail = echoQueue.availableToPoll()) == 0);
					
					for(int i = 0; i < avail; i++) {
						echoQueue.poll().get();
					}
					
					echoQueue.donePolling(true); // can be lazy because queue will not be full...
					
					count++;
				}
				
				Affinity.unbind();
				
				System.out.println("producer exiting...");				
			}
		}, "Producer");
		
		Thread consumer = new Thread(new Runnable() {

			@Override
			public void run() {
				
				Affinity.bind();
				
				boolean running = true;
				
				while (running) {
					
					long avail = queue.availableToPoll();
					if (avail > 0) {
						for(int i = 0; i < avail; i++) {
							MutableLong ml = queue.poll();
							long x = ml.get();
							if (x != -1) {
								bench.measure();
							} else {
								running = false;
							}
							
							// echo back!
							while((ml = echoQueue.nextToDispatch()) == null);
							ml.set(x);
							echoQueue.flush(false);
							
						}
						queue.donePolling(true); // can be lazy because queue will never be full...
					} 
				}
				
				Affinity.unbind();
				
				System.out.println("consumer exiting...");
			}
		}, "Consumer");
		
		if (Affinity.isAvailable()) {
			Affinity.assignToProcessor(prodProcToBind, producer);
			Affinity.assignToProcessor(consProcToBind, consumer);
		} else {
			System.err.println("Thread affinity not available!");
		}
		
		consumer.start();
		producer.start();
		
		consumer.join();
		producer.join();
		
		System.out.println("Producer block count: " + producerWaitStrategy.getTotalBlockCount());
		System.out.println(bench.results());
	}
}