Below the code for the http client performing the stress test on the CoralReactor and Vert.x http servers. You can refer to this article for more details.
package com.coralblocks.coralreactor.client.bench.client;
import java.nio.ByteBuffer;
import com.coralblocks.coralbits.bench.Benchmarker;
import com.coralblocks.coralreactor.client.Client;
import com.coralblocks.coralreactor.client.http.HttpClient;
import com.coralblocks.coralreactor.nio.NioReactor;
import com.coralblocks.coralreactor.util.Configuration;
import com.coralblocks.coralreactor.util.MapConfiguration;
public class StressHttpClient extends HttpClient {
public static interface StressHttpClientListener {
public void onConnectionOpened(Client client);
public void onMessageReceived(Client client, long latency);
}
private int totalReceived;
private StressHttpClientListener listener = null;
private long start;
private final int totalToReceive;
public StressHttpClient(NioReactor nio, String host, int port, Configuration config) {
super(nio, host, port, config);
this.totalToReceive = config.getInt("totalToReceive");
}
public void setListener(StressHttpClientListener listener) {
this.listener = listener;
}
@Override
protected void handleConnectionOpened() {
totalReceived = 0;
if (listener != null) listener.onConnectionOpened(this);
}
public void sendRequest() {
start = System.nanoTime();
sendRequest("/plaintext");
}
@Override
protected void handleMessage(ByteBuffer msg) {
long latency = System.nanoTime() - start;
totalReceived++;
if (listener != null) listener.onMessageReceived(this, latency);
if (totalReceived < totalToReceive) {
sendRequest();
}
}
public static void main(String[] args) throws Exception {
NioReactor nio = NioReactor.create();
String host = args[0];
int port = Integer.parseInt(args[1]);
final int totalToReceive = Integer.parseInt(args[2]);
final int warmup = Integer.parseInt(args[3]);
final int connections = Integer.parseInt(args[4]);
final StressHttpClient[] shc = new StressHttpClient[connections];
StressHttpClientListener listener = new StressHttpClientListener() {
private final Benchmarker bench = Benchmarker.create(warmup, true);
private int totalRequests = 0;
private int totalConnections = 0;
private long start;
@Override
public void onConnectionOpened(Client client) {
if (++totalConnections % 1000 == 0) System.out.println("Total connections: " + totalConnections);
if (totalConnections == connections) {
for(StressHttpClient s : shc) {
s.sendRequest();
}
}
}
@Override
public void onMessageReceived(Client client, long latency) {
bench.measure(latency);
totalRequests++;
if (totalRequests % 10000 == 0) System.out.println("Total requests: " + totalRequests);
if (totalRequests == warmup) start = System.nanoTime(); // don't take warmup into account for throughput
if (totalRequests == totalToReceive * connections) {
long totalTime = System.nanoTime() - start;
long totalResponses = totalToReceive * connections - warmup; // don't take warmup into account for throughput
System.out.println("Total time to receive " + totalResponses + " responses with " + connections + " clients: " + Benchmarker.convertNanoTime(totalTime));
double mps = totalResponses / ((double) totalTime / 1000000000L);
System.out.println("Messages per second: " + mps);
bench.printResults();
}
}
};
MapConfiguration config = new MapConfiguration();
config.add("totalToReceive", totalToReceive);
config.add("connectTimeout", -1); // disable
for(int i = 0; i < shc.length; i++) {
shc[i] = new StressHttpClient(nio, host, port, config);
shc[i].setListener(listener);
shc[i].open();
}
nio.start();
}
}