Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Add v1 cortisol code #514

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ dependencies {
implementation 'io.grpc:grpc-protobuf:1.28.0'
implementation 'io.grpc:grpc-stub:1.28.0'
implementation 'javax.annotation:javax.annotation-api:1.3.2'
implementation 'org.elasticsearch.client:elasticsearch-rest-high-level-client:7.4.0'

// JDK9+ has to run powermock 2+. https://github.com/powermock/powermock/issues/888
testCompile group: 'org.powermock', name: 'powermock-api-mockito2', version: '2.0.0'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.amazon.opendistro.elasticsearch.performanceanalyzer.tools.cortisol;

/**
* Class containing the metadata for the bulk stress test.
*/
public class BulkLoadParams {
private int durationInSeconds;
private int docsPerSecond;
private int docsPerRequest = 1;
private String mappingTemplateJson = "";
private String trackLocation = "";

public int getDurationInSeconds() {
return durationInSeconds;
}

public void setDurationInSeconds(int durationInSeconds) {
this.durationInSeconds = durationInSeconds;
}

public int getDocsPerSecond() {
return docsPerSecond;
}

public void setDocsPerSecond(int docsPerSecond) {
this.docsPerSecond = docsPerSecond;
}

public int getDocsPerRequest() {
return docsPerRequest;
}

public void setDocsPerRequest(int docsPerRequest) {
this.docsPerRequest = docsPerRequest;
}

public String getMappingTemplateJson() {
return mappingTemplateJson;
}

public void setMappingTemplateJson(String mappingTemplateJson) {
this.mappingTemplateJson = mappingTemplateJson;
}

public String getTrackLocation() {
return trackLocation;
}

public void setTrackLocation(String trackLocation) {
this.trackLocation = trackLocation;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.amazon.opendistro.elasticsearch.performanceanalyzer.tools.cortisol;

/**
* The client interface for Cortisol.
*/
public interface CortisolClient {
/**
* Runs an ingest workload as specified by the bulkLoadParams object.
* @param bulkLoadParams The object specifying the parameters for the stress test.
*/
void stressBulk(final BulkLoadParams bulkLoadParams);

/**
* Runs a search workload as specified by the searchLoadParams object.
* @param searchLoadParams The object specifying the parameters for the stress test.
*/
void stressSearch(final SearchLoadParams searchLoadParams);

/**
* Cleans up the cluster by deleting the indices created for stress testing.
*/
void cleanup();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.amazon.opendistro.elasticsearch.performanceanalyzer.tools.cortisol;

import org.jooq.tools.json.JSONObject;

public class CortisolHelper {
public static String buildIndexSettingsJson(final int nPrimaries, final int nReplicas) {
JSONObject container = new JSONObject();
JSONObject settings = new JSONObject();
JSONObject index = new JSONObject();
index.put("number_of_shards", nPrimaries);
index.put("number_of_replicas", nReplicas);
settings.put("index", index);
container.put("settings", settings);
return container.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package com.amazon.opendistro.elasticsearch.performanceanalyzer.tools.cortisol;

import org.apache.http.HttpHost;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.*;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.common.xcontent.XContentType;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class IngestTask implements Runnable {
private static final String CORTISOL_PREFIX = "cort-";
private final String endpoint;
private final int port;
private final BulkLoadParams params;
private final long endTime;
private final RestHighLevelClient client;
private final Random random;
private final String indexName;
private final TokenBucket tokenBucket;
private final AtomicInteger requestCount;
private final AtomicInteger successCount;
private final AtomicInteger failureCount;

public IngestTask(final BulkLoadParams bulkLoadParams, final String endpoint, final int port, final int nIngestThreads) {
this.params = bulkLoadParams;
this.endpoint = endpoint;
this.port = port;
long startTime = System.currentTimeMillis();
this.endTime = startTime + bulkLoadParams.getDurationInSeconds() * 1000;
this.client = buildEsClient();
this.random = new Random();
this.requestCount = new AtomicInteger();
this.successCount = new AtomicInteger();
this.failureCount = new AtomicInteger();
this.indexName = CORTISOL_PREFIX + random.nextInt(100);
this.tokenBucket = new TokenBucket(params.getDocsPerSecond() / (params.getDocsPerRequest() * nIngestThreads));
}

@Override
public void run() {
try {
BufferedReader br = new BufferedReader(new FileReader(params.getTrackLocation()));
createIndex(params.getMappingTemplateJson(), 3, 1);

while (System.currentTimeMillis() < endTime) {
// take blocks till tokens are available in the token bucket.
// 0 indicates an interrupted exception, so break out and exit.
if (tokenBucket.take() != 0) {
List<String> docs = new ArrayList<>();
for (int i = 0; i < params.getDocsPerRequest(); ++i) {
String doc = br.readLine();
if (doc == null) {
// restart the indexing from the top.
br = new BufferedReader(new FileReader(params.getTrackLocation()));
doc = br.readLine();
}

docs.add(doc);
}
makeBulkRequest(docs);
} else {
break;
}
}

br.close();
} catch (IOException e) {
e.printStackTrace();
}
}

private RestHighLevelClient buildEsClient() {
final RestClientBuilder builder = RestClient.builder(new HttpHost(endpoint, port, "http"));
return new RestHighLevelClient(builder);
}

private void createIndex(final String mappingTemplateJson, int nPrimaryShards, int nReplicaShards) throws IOException {
final CreateIndexRequest cir = new CreateIndexRequest(indexName);
cir.mapping(mappingTemplateJson, XContentType.JSON);
cir.settings(CortisolHelper.buildIndexSettingsJson(nPrimaryShards, nReplicaShards), XContentType.JSON);
final CreateIndexResponse response = client.indices().create(cir, RequestOptions.DEFAULT);
assert response.isAcknowledged();
}

private void makeBulkRequest(final List<String> bulkDocs) {
final BulkRequest bulkRequest = new BulkRequest();
for (String bulkDoc : bulkDocs) {
final IndexRequest indexRequest = Requests.indexRequest(indexName);
indexRequest.source(bulkDoc, XContentType.JSON);
bulkRequest.add(indexRequest);
}
client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkItemResponses) {
if (!bulkItemResponses.hasFailures()) {
successCount.addAndGet(bulkDocs.size());
} else {
for (BulkItemResponse response : bulkItemResponses) {
if (response.isFailed()) {
failureCount.incrementAndGet();
} else {
successCount.incrementAndGet();
}
}
}
}

@Override
public void onFailure(Exception e) {
failureCount.addAndGet(bulkDocs.size());
}
});
requestCount.addAndGet(bulkDocs.size());
}

/**
* Custom token bucket class that lets at most {@code nTokens} be available every second.
*/
private class TokenBucket {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should add a comment to this class since it functions slightly different than a standard token bucket. I believe this lets you make at most nTokens concurrent take() calls every second.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

private final Lock lock = new ReentrantLock();
private final Condition hasTokens = lock.newCondition();
private final ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can avoid having a scheduler here. Have calls to take() block, and make sure every take() refills what it took from the bucket once it's complete.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

take() already blocks with await() but I see what you mean here. The idea is to make sure that we make a fixed number of calls every second. If we made the call to makeBulkRequest() block and replenish the tokens once the requests are done, then we won't be able to guarantee a constant request rate. We don't want to replenish too soon(if bulk calls finish faster) or too late(if the bulk calls take longer to finish).


private final int maxTokens;
private int nTokens;

public TokenBucket(final int maxTokens) {
this.maxTokens = maxTokens;
this.ses.schedule(this::refill, 1, TimeUnit.SECONDS);
}

public int take() {
lock.lock();
try {
while (nTokens == 0) {
hasTokens.await();
}
Comment on lines +154 to +156
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be in a while loop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, to guard against spurious wake ups.

return nTokens--;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}

return 0;
}

private void refill() {
lock.lock();
try {
nTokens = maxTokens;
hasTokens.signal();
} finally {
lock.unlock();
}
Comment on lines +167 to +174
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why doesn't the take() method just replenish the bucket by 1 once it's done?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The work done after taking a token is dependent on the time each bulk() takes. We won't be able to guarantee a constant request rate. Some more explanation is in the next comment.

}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.amazon.opendistro.elasticsearch.performanceanalyzer.tools.cortisol;

/**
* [TBD] Class containing the metadata for search stress test.
*/
public class SearchLoadParams {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.amazon.opendistro.elasticsearch.performanceanalyzer.tools.cortisol;

import org.elasticsearch.client.RestHighLevelClient;

import java.util.concurrent.*;
import java.util.stream.IntStream;

/**
* A basic implementation of {@link CortisolClient}.
*/
public class SimpleCortisolClient implements CortisolClient {
private final String endpoint;
private final int port;
private final int nThreads;
private final ExecutorService executorService;

public SimpleCortisolClient(final String endpoint, final int port) {
this.endpoint = endpoint;
this.port = port;
nThreads = Runtime.getRuntime().availableProcessors();
executorService = Executors.newFixedThreadPool(nThreads);
}

/**
* Tries to run an ingest load as specified by the docsPerSecond configured in the {@link BulkLoadParams} instance.
* The way it runs the ingest load is by creating multiple threads where each thread will create its own index on
* the cluster and ingests into it.
* @param bulkLoadParams The object specifying the parameters for the stress test.
*/
@Override
public void stressBulk(BulkLoadParams bulkLoadParams) {
setupShutDownHook(bulkLoadParams.getDurationInSeconds());
IntStream.of(nThreads).forEach(i -> executorService.submit(new IngestTask(bulkLoadParams, endpoint, port, nThreads)));
}

@Override
public void stressSearch(SearchLoadParams searchLoadParams) {
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
public void cleanup() {
try {
executorService.shutdown();
executorService.awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
executorService.shutdownNow();
}
}

private void setupShutDownHook(final int duration) {
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
scheduledExecutorService.schedule(this::cleanup, duration, TimeUnit.SECONDS);
System.out.println("Shutdown hook enabled. Will shutdown after");
}
}