Skip to content

Commit

Permalink
Add no-retry logic and metric counter
Browse files Browse the repository at this point in the history
Signed-off-by: Chase Engelbrecht <[email protected]>
  • Loading branch information
engechas committed Feb 12, 2024
1 parent 6226851 commit d49e657
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public final class BulkRetryStrategy {
public static final String BULK_REQUEST_TIMEOUT_ERRORS = "bulkRequestTimeoutErrors";
public static final String BULK_REQUEST_SERVER_ERRORS = "bulkRequestServerErrors";
public static final String DOCUMENTS_VERSION_CONFLICT_ERRORS = "documentsVersionConflictErrors";
public static final String DOCUMENT_FAILED_DEPENDENCY_ERRORS = "documentFailedDependencyErrors";
static final long INITIAL_DELAY_MS = 50;
static final long MAXIMUM_DELAY_MS = Duration.ofMinutes(10).toMillis();
static final String VERSION_CONFLICT_EXCEPTION_TYPE = "version_conflict_engine_exception";
Expand All @@ -52,7 +53,8 @@ public final class BulkRetryStrategy {
Arrays.asList(
RestStatus.BAD_REQUEST.getStatus(),
RestStatus.NOT_FOUND.getStatus(),
RestStatus.CONFLICT.getStatus()
RestStatus.CONFLICT.getStatus(),
RestStatus.FAILED_DEPENDENCY.getStatus()
));

private static final Set<Integer> BAD_REQUEST_ERRORS = new HashSet<>(
Expand Down Expand Up @@ -119,6 +121,7 @@ public final class BulkRetryStrategy {
private final Counter bulkRequestTimeoutErrors;
private final Counter bulkRequestServerErrors;
private final Counter documentsVersionConflictErrors;
private final Counter documentFailedDependencyErrorsCounter;
private static final Logger LOG = LoggerFactory.getLogger(BulkRetryStrategy.class);

static class BulkOperationRequestResponse {
Expand Down Expand Up @@ -164,6 +167,7 @@ public BulkRetryStrategy(final RequestFunction<AccumulatingBulkRequest<BulkOpera
bulkRequestTimeoutErrors = pluginMetrics.counter(BULK_REQUEST_TIMEOUT_ERRORS);
bulkRequestServerErrors = pluginMetrics.counter(BULK_REQUEST_SERVER_ERRORS);
documentsVersionConflictErrors = pluginMetrics.counter(DOCUMENTS_VERSION_CONFLICT_ERRORS);
documentFailedDependencyErrorsCounter = pluginMetrics.counter(DOCUMENT_FAILED_DEPENDENCY_ERRORS);
}

private void incrementErrorCounters(final Exception e) {
Expand Down Expand Up @@ -323,6 +327,10 @@ private AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> createBulkReq
LOG.debug("Received version conflict from OpenSearch: {}", bulkItemResponse.error().reason());
bulkOperation.releaseEventHandle(true);
} else {
if (RestStatus.FAILED_DEPENDENCY.getStatus() == bulkItemResponse.status()) {
documentFailedDependencyErrorsCounter.increment();
}

nonRetryableFailures.add(FailedBulkOperation.builder()
.withBulkOperation(bulkOperation)
.withBulkResponseItem(bulkItemResponse)
Expand Down Expand Up @@ -355,6 +363,10 @@ private void handleFailures(final AccumulatingBulkRequest<BulkOperationWrapper,
LOG.debug("Received version conflict from OpenSearch: {}", bulkItemResponse.error().reason());
bulkOperation.releaseEventHandle(true);
} else {
if (RestStatus.FAILED_DEPENDENCY.getStatus() == bulkItemResponse.status()) {
documentFailedDependencyErrorsCounter.increment();
}

failures.add(FailedBulkOperation.builder()
.withBulkOperation(bulkOperation)
.withBulkResponseItem(bulkItemResponse)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@ public void testCanRetry() {
bulkResponse = mock(BulkResponse.class);
when(bulkResponse.items()).thenReturn(Arrays.asList(bulkItemResponse2, bulkItemResponse4));
assertTrue(bulkRetryStrategy.canRetry(bulkResponse));

final BulkResponseItem bulkItemResponse5 = failedDependencyItemResponse(testIndex);
bulkResponse = mock(BulkResponse.class);
when(bulkResponse.items()).thenReturn(Arrays.asList(bulkItemResponse1, bulkItemResponse5));
assertFalse(bulkRetryStrategy.canRetry(bulkResponse));
}

@Test
Expand Down Expand Up @@ -569,6 +574,10 @@ private static BulkResponseItem internalServerErrorItemResponse(final String ind
return customBulkFailureResponse(index, RestStatus.INTERNAL_SERVER_ERROR);
}

private static BulkResponseItem failedDependencyItemResponse(final String index) {
return customBulkFailureResponse(index, RestStatus.FAILED_DEPENDENCY);
}

private static BulkResponseItem versionConflictErrorItemResponse() {
return customBulkFailureResponse(RestStatus.CONFLICT, VERSION_CONFLICT_EXCEPTION_TYPE);
}
Expand Down

0 comments on commit d49e657

Please sign in to comment.