Skip to content

Commit

Permalink
Some refactoring avoid coupling to OpenSearch
Browse files Browse the repository at this point in the history
Signed-off-by: Chase Engelbrecht <[email protected]>
  • Loading branch information
engechas committed Mar 11, 2024
1 parent cb3ba03 commit ae96e81
Show file tree
Hide file tree
Showing 62 changed files with 1,070 additions and 1,060 deletions.
1 change: 0 additions & 1 deletion data-prepper-plugins/opensearch/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ dependencies {
implementation project(':data-prepper-plugins:buffer-common')
implementation project(':data-prepper-plugins:common')
implementation project(':data-prepper-plugins:failures-common')
implementation project(':data-prepper-plugins:rule-engine')
implementation libs.opensearch.client
implementation libs.opensearch.rhlc
implementation libs.opensearch.java
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.opensearch.client.opensearch.core.bulk.BulkResponseItem;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.plugins.processor.model.event.EventWrapper;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.AccumulatingBulkRequest;
import org.opensearch.dataprepper.plugins.sink.opensearch.dlq.FailedBulkOperation;
import org.opensearch.rest.RestStatus;
Expand All @@ -24,7 +23,6 @@

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -187,23 +185,21 @@ private void incrementErrorCounters(final Exception e) {
}
}

public List<EventWrapper> execute(final AccumulatingBulkRequest bulkRequest) throws InterruptedException {
final List<EventWrapper> eventWrappers = new ArrayList<>();

public void execute(final AccumulatingBulkRequest bulkRequest) throws InterruptedException {
final Backoff backoff = Backoff.exponential(INITIAL_DELAY_MS, MAXIMUM_DELAY_MS).withMaxAttempts(maxRetries);
BulkOperationRequestResponse operationResponse;
BulkResponse response = null;
AccumulatingBulkRequest request = bulkRequest;
int attempt = 1;
do {
operationResponse = handleRetry(request, response, attempt, eventWrappers);
operationResponse = handleRetry(request, response, attempt);
if (operationResponse != null) {
final long delayMillis = backoff.nextDelayMillis(attempt++);
request = operationResponse.getBulkRequest();
response = operationResponse.getResponse();
if (delayMillis < 0) {
RuntimeException e = new RuntimeException(String.format("Number of retries reached the limit of max retries (configured value %d)", maxRetries));
handleFailures(request, null, e, eventWrappers);
handleFailures(request, null, e);
break;
}
// Wait for backOff duration
Expand All @@ -214,8 +210,6 @@ public List<EventWrapper> execute(final AccumulatingBulkRequest bulkRequest) thr
}
}
} while (operationResponse != null);

return eventWrappers;
}

public boolean canRetry(final BulkResponse response) {
Expand All @@ -236,8 +230,7 @@ public static boolean canRetry(final Exception e) {
private BulkOperationRequestResponse handleRetriesAndFailures(final AccumulatingBulkRequest bulkRequestForRetry,
final int retryCount,
final BulkResponse bulkResponse,
final Exception exceptionFromRequest,
final List<EventWrapper> eventWrappers) {
final Exception exceptionFromRequest) {
final boolean doRetry = (Objects.isNull(exceptionFromRequest)) ? canRetry(bulkResponse) : canRetry(exceptionFromRequest);
if (!Objects.isNull(bulkResponse) && retryCount == 1) { // first attempt
for (final BulkResponseItem bulkItemResponse : bulkResponse.items()) {
Expand All @@ -260,31 +253,29 @@ private BulkOperationRequestResponse handleRetriesAndFailures(final Accumulating
bulkRequestNumberOfRetries.increment();
return new BulkOperationRequestResponse(bulkRequestForRetry, bulkResponse);
} else {
handleFailures(bulkRequestForRetry, bulkResponse, exceptionFromRequest, eventWrappers);
handleFailures(bulkRequestForRetry, bulkResponse, exceptionFromRequest);
}
return null;
}

private void handleFailures(final AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> bulkRequest, final BulkResponse bulkResponse, final Throwable failure,
final List<EventWrapper> eventWrappers) {
private void handleFailures(final AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> bulkRequest, final BulkResponse bulkResponse, final Throwable failure) {
if (failure == null) {
for (final BulkResponseItem bulkItemResponse : bulkResponse.items()) {
// Skip logging the error for version conflicts
if (bulkItemResponse.error() != null && !VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) {
LOG.warn("operation = {}, error = {}", bulkItemResponse.operationType(), bulkItemResponse.error().reason());
}
}
handleFailures(bulkRequest, bulkResponse.items(), eventWrappers);
handleFailures(bulkRequest, bulkResponse.items());
} else {
LOG.warn("Bulk Operation Failed.", failure);
handleFailures(bulkRequest, failure);
}
bulkRequestFailedCounter.increment();
}

private BulkOperationRequestResponse handleRetry(final AccumulatingBulkRequest request, final BulkResponse response, int retryCount,
final List<EventWrapper> eventWrappers) throws InterruptedException {
final AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> bulkRequestForRetry = createBulkRequestForRetry(request, response, eventWrappers);
private BulkOperationRequestResponse handleRetry(final AccumulatingBulkRequest request, final BulkResponse response, int retryCount) throws InterruptedException {
final AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> bulkRequestForRetry = createBulkRequestForRetry(request, response);
if (bulkRequestForRetry.getOperationsCount() == 0) {
return null;
}
Expand All @@ -294,10 +285,10 @@ private BulkOperationRequestResponse handleRetry(final AccumulatingBulkRequest r
bulkResponse = requestFunction.apply(bulkRequestForRetry);
} catch (Exception e) {
incrementErrorCounters(e);
return handleRetriesAndFailures(bulkRequestForRetry, retryCount, null, e, eventWrappers);
return handleRetriesAndFailures(bulkRequestForRetry, retryCount, null, e);
}
if (bulkResponse.errors()) {
return handleRetriesAndFailures(bulkRequestForRetry, retryCount, bulkResponse, null, eventWrappers);
return handleRetriesAndFailures(bulkRequestForRetry, retryCount, bulkResponse, null);
} else {
final int numberOfDocs = bulkRequestForRetry.getOperationsCount();
final boolean firstAttempt = (retryCount == 1);
Expand All @@ -310,14 +301,13 @@ private BulkOperationRequestResponse handleRetry(final AccumulatingBulkRequest r
final BulkResponseItem bulkResponseItem = bulkResponse.items().get(i);

bulkOperation.releaseEventHandle(true);
eventWrappers.add(createEventWrapper(bulkOperation, bulkResponseItem));
}
}
return null;
}

private AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> createBulkRequestForRetry(
final AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> request, final BulkResponse response, final List<EventWrapper> eventWrappers) {
final AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> request, final BulkResponse response) {
if (response == null) {
// first attempt or retry due to Exception
return request;
Expand All @@ -335,7 +325,6 @@ private AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> createBulkReq
documentsVersionConflictErrors.increment();
LOG.debug("Received version conflict from OpenSearch: {}", bulkItemResponse.error().reason());
bulkOperation.releaseEventHandle(true);
eventWrappers.add(createEventWrapper(bulkOperation, bulkItemResponse));
} else {
nonRetryableFailures.add(FailedBulkOperation.builder()
.withBulkOperation(bulkOperation)
Expand All @@ -346,7 +335,6 @@ private AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> createBulkReq
} else {
sentDocumentsCounter.increment();
bulkOperation.releaseEventHandle(true);
eventWrappers.add(createEventWrapper(bulkOperation, bulkItemResponse));
}
index++;
}
Expand All @@ -358,8 +346,7 @@ private AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> createBulkReq
}
}

private void handleFailures(final AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> accumulatingBulkRequest, final List<BulkResponseItem> itemResponses,
final List<EventWrapper> eventWrappers) {
private void handleFailures(final AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> accumulatingBulkRequest, final List<BulkResponseItem> itemResponses) {
assert accumulatingBulkRequest.getOperationsCount() == itemResponses.size();
final ImmutableList.Builder<FailedBulkOperation> failures = ImmutableList.builder();
for (int i = 0; i < itemResponses.size(); i++) {
Expand All @@ -370,7 +357,6 @@ private void handleFailures(final AccumulatingBulkRequest<BulkOperationWrapper,
documentsVersionConflictErrors.increment();
LOG.debug("Received version conflict from OpenSearch: {}", bulkItemResponse.error().reason());
bulkOperation.releaseEventHandle(true);
eventWrappers.add(createEventWrapper(bulkOperation, bulkItemResponse));
} else {
failures.add(FailedBulkOperation.builder()
.withBulkOperation(bulkOperation)
Expand All @@ -381,7 +367,6 @@ private void handleFailures(final AccumulatingBulkRequest<BulkOperationWrapper,
} else {
sentDocumentsCounter.increment();
bulkOperation.releaseEventHandle(true);
eventWrappers.add(createEventWrapper(bulkOperation, bulkItemResponse));
}
}
logFailure.accept(failures.build(), null);
Expand All @@ -402,8 +387,4 @@ private void handleFailures(final AccumulatingBulkRequest<BulkOperationWrapper,
logFailure.accept(failures.build(), failure);
}

private EventWrapper createEventWrapper(final BulkOperationWrapper bulkOperationWrapper, final BulkResponseItem bulkResponseItem) {
return new EventWrapper(bulkResponseItem.index(), bulkResponseItem.id(), bulkOperationWrapper.getEvent());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@
import org.opensearch.dataprepper.plugins.common.opensearch.ServerlessOptionsFactory;
import org.opensearch.dataprepper.plugins.dlq.DlqProvider;
import org.opensearch.dataprepper.plugins.dlq.DlqWriter;
import org.opensearch.dataprepper.plugins.processor.RuleEngine;
import org.opensearch.dataprepper.plugins.processor.RuleEngineConfig;
import org.opensearch.dataprepper.plugins.processor.model.event.EventWrapper;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.AccumulatingBulkRequest;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkApiWrapper;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkApiWrapperFactory;
Expand Down Expand Up @@ -75,7 +72,6 @@
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -143,7 +139,6 @@ public class OpenSearchSink extends AbstractSink<Record<Event>> {
private DlqProvider dlqProvider;
private final ConcurrentHashMap<Long, AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest>> bulkRequestMap;
private final ConcurrentHashMap<Long, Long> lastFlushTimeMap;
private RuleEngine ruleEngine = null;

@DataPrepperPluginConstructor
public OpenSearchSink(final PluginSetting pluginSetting,
Expand Down Expand Up @@ -267,9 +262,6 @@ private void doInitializeInternal() throws IOException {

objectMapper = new ObjectMapper();

final Optional<RuleEngineConfig> ruleEngineConfig = openSearchSinkConfig.getRuleEngineConfig();
ruleEngineConfig.ifPresent(engineConfig -> ruleEngine = new RuleEngine(engineConfig, openSearchClient));

this.initialized = true;
LOG.info("Initialized OpenSearch sink");
}
Expand Down Expand Up @@ -503,10 +495,7 @@ private void flushBatch(AccumulatingBulkRequest accumulatingBulkRequest) {
bulkRequestTimer.record(() -> {
try {
LOG.debug("Sending data to OpenSearch");
final List<EventWrapper> eventWrappers = bulkRetryStrategy.execute(accumulatingBulkRequest);
if (ruleEngine != null) {
ruleEngine.doExecute(eventWrappers);
}
bulkRetryStrategy.execute(accumulatingBulkRequest);
bulkRequestSizeBytesSummary.record(accumulatingBulkRequest.getEstimatedSizeInBytes());
} catch (final InterruptedException e) {
LOG.error("Unexpected Interrupt:", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,16 @@
package org.opensearch.dataprepper.plugins.sink.opensearch;

import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.plugins.processor.RuleEngineConfig;
import org.opensearch.dataprepper.plugins.processor.RuleEngineConfigWrapper;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;

import java.util.Optional;

import static com.google.common.base.Preconditions.checkNotNull;

public class OpenSearchSinkConfiguration {

private final ConnectionConfiguration connectionConfiguration;
private final IndexConfiguration indexConfiguration;
private final RetryConfiguration retryConfiguration;
private final RuleEngineConfigWrapper ruleEngineConfigWrapper;

public ConnectionConfiguration getConnectionConfiguration() {
return connectionConfiguration;
Expand All @@ -34,21 +29,15 @@ public RetryConfiguration getRetryConfiguration() {
return retryConfiguration;
}

public Optional<RuleEngineConfig> getRuleEngineConfig() {
return Optional.ofNullable(ruleEngineConfigWrapper.getRuleEngineConfig());
}

private OpenSearchSinkConfiguration(
final ConnectionConfiguration connectionConfiguration, final IndexConfiguration indexConfiguration,
final RetryConfiguration retryConfiguration, final RuleEngineConfigWrapper ruleEngineConfigWrapper) {
final RetryConfiguration retryConfiguration) {
checkNotNull(connectionConfiguration, "connectionConfiguration cannot be null");
checkNotNull(indexConfiguration, "indexConfiguration cannot be null");
checkNotNull(retryConfiguration, "retryConfiguration cannot be null");
checkNotNull(ruleEngineConfigWrapper, "ruleEngineConfigWrapper cannot be null");
this.connectionConfiguration = connectionConfiguration;
this.indexConfiguration = indexConfiguration;
this.retryConfiguration = retryConfiguration;
this.ruleEngineConfigWrapper = ruleEngineConfigWrapper;
}

public static OpenSearchSinkConfiguration readESConfig(final PluginSetting pluginSetting) {
Expand All @@ -60,8 +49,7 @@ public static OpenSearchSinkConfiguration readESConfig(final PluginSetting plugi
ConnectionConfiguration.readConnectionConfiguration(pluginSetting);
final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting, expressionEvaluator);
final RetryConfiguration retryConfiguration = RetryConfiguration.readRetryConfig(pluginSetting);
final RuleEngineConfigWrapper ruleEngineConfigWrapper = RuleEngineConfigWrapper.readRuleEngineConfigWrapper(pluginSetting);

return new OpenSearchSinkConfiguration(connectionConfiguration, indexConfiguration, retryConfiguration, ruleEngineConfigWrapper);
return new OpenSearchSinkConfiguration(connectionConfiguration, indexConfiguration, retryConfiguration);
}
}
9 changes: 9 additions & 0 deletions data-prepper-plugins/rule-engine/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ dependencies {
exclude group: 'org.glassfish', module: 'javax.json'
}
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:common')
implementation project(':data-prepper-plugins:aws-plugin-api')
implementation project(':data-prepper-plugins:opensearch')
implementation 'javax.inject:javax.inject:1'
implementation(libs.spring.core) {
exclude group: 'commons-logging', module: 'commons-logging'
Expand All @@ -32,6 +35,7 @@ dependencies {
implementation libs.opensearch.client
implementation libs.opensearch.rhlc
implementation libs.opensearch.java
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation libs.commons.lang3
}
Expand All @@ -57,3 +61,8 @@ jacocoTestCoverageVerification {
}))
}
}

checkstyleMain {
source = fileTree('src/main/java')
}

Loading

0 comments on commit ae96e81

Please sign in to comment.