Skip to content

Commit

Permalink
Get all client tests passing
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Nied <[email protected]>
Signed-off-by: Mikayla Thompson <[email protected]>
  • Loading branch information
peternied authored and mikaylathompson committed Jan 14, 2025
1 parent 9adbfa9 commit b8bd500
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public abstract class OpenSearchClient {
private static final Duration DEFAULT_MAX_BACKOFF = Duration.ofSeconds(10);
private static final Retry SNAPSHOT_RETRY_STRATEGY = Retry.backoff(DEFAULT_MAX_RETRY_ATTEMPTS, DEFAULT_BACKOFF)
.maxBackoff(DEFAULT_MAX_BACKOFF);
protected static final Retry CHECK_IF_ITEM_EXISTS_RETRY_STRATEGY =
public static final Retry CHECK_IF_ITEM_EXISTS_RETRY_STRATEGY =
Retry.backoff(DEFAULT_MAX_RETRY_ATTEMPTS, DEFAULT_BACKOFF)
.maxBackoff(DEFAULT_MAX_BACKOFF);
private static final Retry CREATE_ITEM_EXISTS_RETRY_STRATEGY =
Expand Down Expand Up @@ -312,7 +312,7 @@ public Optional<ObjectNode> getSnapshotStatus(

protected abstract String getBulkRequestPath(String indexName);

Retry getBulkRetryStrategy() {
protected Retry getBulkRetryStrategy() {
return BULK_RETRY_STRATEGY;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@


import java.lang.reflect.InvocationTargetException;
import java.time.Duration;
import java.util.HashSet;
import java.util.Optional;

Expand All @@ -20,21 +19,20 @@
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

@Getter
@Slf4j
public class OpenSearchClientFactory {
private static final ObjectMapper objectMapper = new ObjectMapper();

private ConnectionContext connectionContext;
private Version version;
protected final RestClient client;
protected static final ObjectMapper objectMapper = new ObjectMapper();

public OpenSearchClientFactory(RestClient client) {
this.client = client;
}

RestClient client;

public OpenSearchClientFactory(ConnectionContext connectionContext) {
if (connectionContext == null) {
throw new IllegalArgumentException("Connection context was not provided in constructor.");
}
this.connectionContext = connectionContext;
this.client = new RestClient(connectionContext);
}
Expand All @@ -43,9 +41,6 @@ public OpenSearchClient determineVersionAndCreate() {
if (version == null) {
version = getClusterVersion();
}
if (connectionContext == null) {
throw new IllegalArgumentException("Connection context was not provided in constructor."); // not crazy about this
}
var clientClass = getOpenSearchClientClass(version);
try {
return clientClass.getConstructor(ConnectionContext.class, Version.class)
Expand All @@ -69,7 +64,7 @@ public OpenSearchClient determineVersionAndCreate(RestClient restClient, FailedR
}

private Class<? extends OpenSearchClient> getOpenSearchClientClass(Version version) {
if (version == null || VersionMatchers.isOS_1_X.or(VersionMatchers.isOS_2_X).or(VersionMatchers.isES_7_X).test(version)) {
if (VersionMatchers.isOS_1_X.or(VersionMatchers.isOS_2_X).or(VersionMatchers.isES_7_X).test(version)) {
return OpenSearchClient_OS_2_11.class;
} else if (VersionMatchers.isES_6_X.test(version)) {
return OpenSearchClient_ES_6_8.class;
Expand All @@ -84,14 +79,6 @@ private Class<? extends OpenSearchClient> getOpenSearchClientClass(Version versi
.flavor(Flavor.AMAZON_SERVERLESS_OPENSEARCH)
.major(2)
.build();
private static final int DEFAULT_MAX_RETRY_ATTEMPTS = 3;
private static final Duration DEFAULT_BACKOFF = Duration.ofSeconds(1);
private static final Duration DEFAULT_MAX_BACKOFF = Duration.ofSeconds(10);
private static final Retry SNAPSHOT_RETRY_STRATEGY = Retry.backoff(DEFAULT_MAX_RETRY_ATTEMPTS, DEFAULT_BACKOFF)
.maxBackoff(DEFAULT_MAX_BACKOFF);
protected static final Retry CHECK_IF_ITEM_EXISTS_RETRY_STRATEGY =
Retry.backoff(DEFAULT_MAX_RETRY_ATTEMPTS, DEFAULT_BACKOFF)
.maxBackoff(DEFAULT_MAX_BACKOFF);

public Version getClusterVersion() {
var versionFromRootApi = client.getAsync("", null)
Expand All @@ -106,7 +93,7 @@ public Version getClusterVersion() {
return Mono.error(new OpenSearchClient.UnexpectedStatusCode(resp));
})
.doOnError(e -> log.error(e.getMessage()))
.retryWhen(CHECK_IF_ITEM_EXISTS_RETRY_STRATEGY)
.retryWhen(OpenSearchClient.CHECK_IF_ITEM_EXISTS_RETRY_STRATEGY)
.block();

// Compatibility mode is only enabled on OpenSearch clusters responding with the version of 7.10.2
Expand All @@ -116,7 +103,7 @@ public Version getClusterVersion() {
return client.getAsync("_cluster/settings", null)
.flatMap(this::checkCompatibilityModeFromResponse)
.doOnError(e -> log.error(e.getMessage()))
.retryWhen(CHECK_IF_ITEM_EXISTS_RETRY_STRATEGY)
.retryWhen(OpenSearchClient.CHECK_IF_ITEM_EXISTS_RETRY_STRATEGY)
.flatMap(hasCompatibilityModeEnabled -> {
log.atInfo().setMessage("Checking CompatibilityMode, was enabled? {}").addArgument(hasCompatibilityModeEnabled).log();
if (Boolean.FALSE.equals(hasCompatibilityModeEnabled)) {
Expand All @@ -125,7 +112,7 @@ public Version getClusterVersion() {
return client.getAsync("_nodes/_all/nodes,version?format=json", null)
.flatMap(this::getVersionFromNodes)
.doOnError(e -> log.error(e.getMessage()))
.retryWhen(CHECK_IF_ITEM_EXISTS_RETRY_STRATEGY);
.retryWhen(OpenSearchClient.CHECK_IF_ITEM_EXISTS_RETRY_STRATEGY);
})
.onErrorResume(e -> {
log.atWarn()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public ObjectNode getClusterData() {
.flatMap(this::getJsonForTemplateApis)
.map(json -> Map.entry(entry.getKey(), json))
.doOnError(e -> log.error("Error fetching template {}: {}", entry.getKey(), e.getMessage()))
.retryWhen(CHECK_IF_ITEM_EXISTS_RETRY_STRATEGY)
.retryWhen(OpenSearchClient.CHECK_IF_ITEM_EXISTS_RETRY_STRATEGY)
)
.collectMap(Entry::getKey, Entry::getValue)
.block();
Expand Down Expand Up @@ -82,7 +82,7 @@ public ObjectNode getIndexes() {
.getAsync(endpoint, null)
.flatMap(this::getJsonForIndexApis)
.doOnError(e -> log.error(e.getMessage()))
.retryWhen(CHECK_IF_ITEM_EXISTS_RETRY_STRATEGY)
.retryWhen(OpenSearchClient.CHECK_IF_ITEM_EXISTS_RETRY_STRATEGY)
)
.collectList()
.block();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.opensearch.migrations.bulkload.common;

import java.net.URI;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;
Expand All @@ -24,7 +25,6 @@

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -66,7 +66,7 @@ class OpenSearchClientFactoryTest {
@Mock(strictness = Strictness.LENIENT)
RestClient restClient;

@Mock
@Mock(strictness = Strictness.LENIENT)
ConnectionContext connectionContext;

@Mock
Expand All @@ -76,10 +76,11 @@ class OpenSearchClientFactoryTest {

@BeforeEach
void beforeTest() {
doReturn(connectionContext).when(restClient).getConnectionContext();
// setupOkResponse(restClient, "", ROOT_RESPONSE_OS_1_0_0);
// setupOkResponse(restClient, "_cluster/settings", CLUSTER_SETTINGS_COMPATIBILITY_OVERRIDE_DISABLED);
openSearchClientFactory = spy(new OpenSearchClientFactory(restClient));
when(connectionContext.getUri()).thenReturn(URI.create("http://localhost/"));
when(connectionContext.isAwsSpecificAuthentication()).thenReturn(false);
when(restClient.getConnectionContext()).thenReturn(connectionContext);
openSearchClientFactory = spy(new OpenSearchClientFactory(connectionContext));
openSearchClientFactory.client = restClient;
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
package org.opensearch.migrations.bulkload.common;

import java.net.URI;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import org.opensearch.migrations.Version;
import org.opensearch.migrations.bulkload.common.http.ConnectionContext;
import org.opensearch.migrations.bulkload.common.http.HttpResponse;
import org.opensearch.migrations.bulkload.http.BulkRequestGenerator;
import org.opensearch.migrations.bulkload.http.BulkRequestGenerator.BulkItemResponseEntry;
import org.opensearch.migrations.bulkload.tracing.IRfsContexts;
import org.opensearch.migrations.bulkload.tracing.IRfsContexts.ICheckedIdempotentPutRequestContext;
import org.opensearch.migrations.bulkload.version_os_2_11.OpenSearchClient_OS_2_11;
import org.opensearch.migrations.reindexer.FailedRequestsLogger;

import com.fasterxml.jackson.core.StreamReadFeature;
Expand All @@ -35,7 +38,6 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
Expand All @@ -50,16 +52,14 @@

@ExtendWith(MockitoExtension.class)
class OpenSearchClientTest {
private static final String CLUSTER_SETTINGS_COMPATIBILITY_OVERRIDE_DISABLED = "{\"persistent\":{\"compatibility\":{\"override_main_response_version\":\"false\"}}}";
private static final String ROOT_RESPONSE_OS_1_0_0 = "{\"version\":{\"distribution\":\"opensearch\",\"number\":\"1.0.0\"}}";
private static final ObjectMapper OBJECT_MAPPER = JsonMapper.builder()
.enable(StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION)
.build();

@Mock(strictness = Strictness.LENIENT)
RestClient restClient;

@Mock
@Mock(strictness = Strictness.LENIENT)
ConnectionContext connectionContext;

@Mock
Expand All @@ -69,11 +69,9 @@ class OpenSearchClientTest {

@BeforeEach
void beforeTest() {
doReturn(connectionContext).when(restClient).getConnectionContext();
setupOkResponse(restClient, "", ROOT_RESPONSE_OS_1_0_0);
setupOkResponse(restClient, "_cluster/settings", CLUSTER_SETTINGS_COMPATIBILITY_OVERRIDE_DISABLED);
openSearchClient = spy(new OpenSearchClientFactory(restClient).determineVersionAndCreate(restClient, failedRequestLogger));
clearInvocations(restClient);
when(connectionContext.getUri()).thenReturn(URI.create("http://localhost/"));
when(restClient.getConnectionContext()).thenReturn(connectionContext);
openSearchClient = spy(new OpenSearchClient_OS_2_11(restClient, failedRequestLogger, Version.fromString("OS 2.11")));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private HttpResponse bulkItemResponse(boolean hasErrors, List<BulkItemResponseEn

private BulkDocSection createBulkDoc(String docId) {
var bulkDoc = mock(BulkDocSection.class, withSettings().strictness(org.mockito.quality.Strictness.LENIENT));
when(bulkDoc.getId()).thenReturn(docId);
when(bulkDoc.getDocId()).thenReturn(docId);
when(bulkDoc.asBulkIndexString()).thenReturn("BULK-INDEX\nBULK_BODY");
return bulkDoc;
}
Expand Down

0 comments on commit b8bd500

Please sign in to comment.