Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Create OpenSearchClient for 6.x
Browse files Browse the repository at this point in the history
Signed-off-by: Mikayla Thompson <[email protected]>
mikaylathompson committed Jan 10, 2025
1 parent 9010fcf commit d809f75
Showing 19 changed files with 204 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@
import java.util.List;

import org.opensearch.migrations.bulkload.common.FileSystemSnapshotCreator;
import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.common.OpenSearchClientFactory;
import org.opensearch.migrations.bulkload.common.S3SnapshotCreator;
import org.opensearch.migrations.bulkload.common.SnapshotCreator;
import org.opensearch.migrations.bulkload.common.http.ConnectionContext;
@@ -135,7 +135,8 @@ public static void main(String[] args) throws Exception {
private ICreateSnapshotContext context;

public void run() {
var client = new OpenSearchClient(arguments.sourceArgs.toConnectionContext());
var clientFactory = new OpenSearchClientFactory(null);
var client = clientFactory.get(arguments.sourceArgs.toConnectionContext());
SnapshotCreator snapshotCreator;
if (arguments.fileSystemRepoPath != null) {
snapshotCreator = new FileSystemSnapshotCreator(
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@
import java.text.NumberFormat;

import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.common.OpenSearchClientFactory;
import org.opensearch.migrations.data.WorkloadGenerator;
import org.opensearch.migrations.utils.ProcessHelpers;

@@ -35,7 +36,8 @@ public static void main(String[] args) {

public void run(DataGeneratorArgs arguments) {
var connectionContext = arguments.targetArgs.toConnectionContext();
var client = new OpenSearchClient(connectionContext);
var clientFactory = new OpenSearchClientFactory(null);
var client = clientFactory.get(connectionContext);

var startTimeMillis = System.currentTimeMillis();
var workloadGenerator = new WorkloadGenerator(client);
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@
import org.opensearch.migrations.bulkload.common.FileSystemRepo;
import org.opensearch.migrations.bulkload.common.LuceneDocumentsReader;
import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.common.OpenSearchClientFactory;
import org.opensearch.migrations.bulkload.common.S3Repo;
import org.opensearch.migrations.bulkload.common.S3Uri;
import org.opensearch.migrations.bulkload.common.SnapshotShardUnpacker;
@@ -308,7 +309,8 @@ public static void main(String[] args) throws Exception {
);
) {
MDC.put(LOGGING_MDC_WORKER_ID, workerId); // I don't see a need to clean this up since we're in main
OpenSearchClient targetClient = new OpenSearchClient(connectionContext);
var clientFactory = new OpenSearchClientFactory(null);
OpenSearchClient targetClient = clientFactory.get(connectionContext);
DocumentReindexer reindexer = new DocumentReindexer(targetClient,
arguments.numDocsPerBulkRequest,
arguments.numBytesPerBulkRequest,
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@
import org.opensearch.migrations.bulkload.common.FileSystemRepo;
import org.opensearch.migrations.bulkload.common.FileSystemSnapshotCreator;
import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.common.OpenSearchClientFactory;
import org.opensearch.migrations.bulkload.common.RestClient;
import org.opensearch.migrations.bulkload.common.http.ConnectionContextTestParams;
import org.opensearch.migrations.bulkload.framework.SearchClusterContainer;
@@ -106,7 +107,8 @@ private void migrationDocumentsWithClusters(

// === ACTION: Take a snapshot ===
var snapshotName = "my_snap";
var sourceClient = new OpenSearchClient(ConnectionContextTestParams.builder()
var sourceClientFactory = new OpenSearchClientFactory(sourceCluster.getContainerVersion().getVersion());
var sourceClient = sourceClientFactory.get(ConnectionContextTestParams.builder()
.host(sourceCluster.getUrl())
.insecure(true)
.build()
Original file line number Diff line number Diff line change
@@ -6,9 +6,13 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Stream;

import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.opensearch.migrations.CreateSnapshot;
import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.common.OpenSearchClientFactory;
import org.opensearch.migrations.bulkload.common.http.ConnectionContextTestParams;
import org.opensearch.migrations.bulkload.framework.SearchClusterContainer;
import org.opensearch.migrations.bulkload.http.ClusterOperations;
@@ -35,9 +39,18 @@ public class LeaseExpirationTest extends SourceTestBase {

public static final String TARGET_DOCKER_HOSTNAME = "target";

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testProcessExitsAsExpected(boolean forceMoreSegments) {
private static Stream<Arguments> testParameters() {
List<Boolean> forceMoreSegmentsValues = List.of(true, false);
List<SearchClusterContainer.ContainerVersion> sourceClusterVersions = List.of(SearchClusterContainer.ES_V6_8_23, SearchClusterContainer.ES_V7_10_2);

return forceMoreSegmentsValues.stream()
.flatMap(force -> sourceClusterVersions.stream()
.map(version -> Arguments.of(force, version)));
}

@ParameterizedTest(name = "forceMoreSegments={0}, sourceClusterVersion={1}")
@MethodSource("testParameters")
public void testProcessExitsAsExpected(boolean forceMoreSegments, SearchClusterContainer.ContainerVersion sourceClusterVersion) {
// Sending 5 docs per request with 4 requests concurrently with each taking 0.250 second is 80 docs/sec
// will process 9680 docs in 121 seconds. With 40s lease duration, expect to be finished in 4 leases.
// This is ensured with the toxiproxy settings, the migration should not be able to be completed
@@ -50,24 +63,24 @@ public void testProcessExitsAsExpected(boolean forceMoreSegments) {
int continueExitCode = 2;
int finalExitCodePerShard = 0;
runTestProcessWithCheckpoint(continueExitCode, (migrationProcessesPerShard - 1) * shards,
finalExitCodePerShard, shards, shards, indexDocCount, forceMoreSegments,
d -> runProcessAgainstToxicTarget(d.tempDirSnapshot, d.tempDirLucene, d.proxyContainer
finalExitCodePerShard, shards, shards, indexDocCount, forceMoreSegments, sourceClusterVersion,
d -> runProcessAgainstToxicTarget(d.tempDirSnapshot, d.tempDirLucene, d.proxyContainer, sourceClusterVersion
));
}

@SneakyThrows
private void runTestProcessWithCheckpoint(int expectedInitialExitCode, int expectedInitialExitCodeCount,
int expectedEventualExitCode, int expectedEventualExitCodeCount,
int shards, int indexDocCount,
boolean forceMoreSegments,
boolean forceMoreSegments, SearchClusterContainer.ContainerVersion sourceClusterVersion,
Function<RunData, Integer> processRunner) {
final var testSnapshotContext = SnapshotTestContext.factory().noOtelTracking();

var tempDirSnapshot = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_snapshot");
var tempDirLucene = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_lucene");

try (
var esSourceContainer = new SearchClusterContainer(SearchClusterContainer.ES_V7_10_2)
var esSourceContainer = new SearchClusterContainer(sourceClusterVersion)
.withAccessToHost(true);
var network = Network.newNetwork();
var osTargetContainer = new SearchClusterContainer(SearchClusterContainer.OS_V2_14_0)
@@ -84,13 +97,17 @@ private void runTestProcessWithCheckpoint(int expectedInitialExitCode, int expec
proxyContainer.start("target", 9200);

// Populate the source cluster with data
var client = new OpenSearchClient(ConnectionContextTestParams.builder()
var clientFactory = new OpenSearchClientFactory(sourceClusterVersion.getVersion());
var client = clientFactory.get(ConnectionContextTestParams.builder()
.host(esSourceContainer.getUrl())
.build()
.toConnectionContext()
);
var generator = new WorkloadGenerator(client);
var workloadOptions = new WorkloadOptions();
if (sourceClusterVersion.getVersion().getMajor() <= 6) {
workloadOptions.setIncludeTypeName(false);
}

var sourceClusterOperations = new ClusterOperations(esSourceContainer.getUrl());

@@ -177,7 +194,8 @@ private void runTestProcessWithCheckpoint(int expectedInitialExitCode, int expec
private static int runProcessAgainstToxicTarget(
Path tempDirSnapshot,
Path tempDirLucene,
ToxiProxyWrapper proxyContainer
ToxiProxyWrapper proxyContainer,
SearchClusterContainer.ContainerVersion sourceClusterVersion
) {
String targetAddress = proxyContainer.getProxyUriAsString();
var tp = proxyContainer.getProxy();
@@ -189,7 +207,8 @@ private static int runProcessAgainstToxicTarget(
String[] additionalArgs = {
"--documents-per-bulk-request", "5",
"--max-connections", "4",
"--initial-lease-duration", "PT40s"
"--initial-lease-duration", "PT40s",
"--source-version", sourceClusterVersion.toString()
};

ProcessBuilder processBuilder = setupProcess(
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@

import org.opensearch.migrations.bulkload.common.FileSystemRepo;
import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.common.OpenSearchClientFactory;
import org.opensearch.migrations.bulkload.common.http.ConnectionContextTestParams;
import org.opensearch.migrations.bulkload.framework.SearchClusterContainer;
import org.opensearch.migrations.data.WorkloadGenerator;
@@ -71,7 +72,8 @@ public void testDocumentMigration(
).join();

// Populate the source cluster with data
var client = new OpenSearchClient(ConnectionContextTestParams.builder()
var clientFactory = new OpenSearchClientFactory(sourceVersion.getVersion());
var client = clientFactory.get(ConnectionContextTestParams.builder()
.host(esSourceContainer.getUrl())
.build()
.toConnectionContext()
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@

import org.opensearch.migrations.CreateSnapshot;
import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.common.OpenSearchClientFactory;
import org.opensearch.migrations.bulkload.common.http.ConnectionContextTestParams;
import org.opensearch.migrations.bulkload.framework.SearchClusterContainer;
import org.opensearch.migrations.data.WorkloadGenerator;
@@ -117,7 +118,8 @@ private void testProcess(int expectedExitCode, Function<RunData, Integer> proces
).join();

// Populate the source cluster with data
var client = new OpenSearchClient(ConnectionContextTestParams.builder()
var clientFactory = new OpenSearchClientFactory(esSourceContainer.getContainerVersion().getVersion());
var client = clientFactory.get(ConnectionContextTestParams.builder()
.host(esSourceContainer.getUrl())
.build()
.toConnectionContext()
Original file line number Diff line number Diff line change
@@ -29,6 +29,7 @@
import org.opensearch.migrations.bulkload.common.FileSystemRepo;
import org.opensearch.migrations.bulkload.common.LuceneDocumentsReader;
import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.common.OpenSearchClientFactory;
import org.opensearch.migrations.bulkload.common.RestClient;
import org.opensearch.migrations.bulkload.common.RfsLuceneDocument;
import org.opensearch.migrations.bulkload.common.SnapshotShardUnpacker;
@@ -247,9 +248,10 @@ public static DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(
UUID.randomUUID().toString(),
Clock.offset(Clock.systemUTC(), Duration.ofMillis(nextClockShift))
)) {
var clientFactory = new OpenSearchClientFactory(version);
return RfsMigrateDocuments.run(
readerFactory,
new DocumentReindexer(new OpenSearchClient(ConnectionContextTestParams.builder()
new DocumentReindexer(clientFactory.get(ConnectionContextTestParams.builder()
.host(targetAddress)
.compressionEnabled(compressionEnabled)
.build()
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@

import org.opensearch.migrations.bulkload.common.FileSystemSnapshotCreator;
import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.common.OpenSearchClientFactory;
import org.opensearch.migrations.bulkload.common.http.ConnectionContextTestParams;
import org.opensearch.migrations.bulkload.framework.SearchClusterContainer;
import org.opensearch.migrations.bulkload.http.ClusterOperations;
@@ -58,7 +59,8 @@ protected void startClusters() {
@SneakyThrows
protected String createSnapshot(String snapshotName) {
var snapshotContext = SnapshotTestContext.factory().noOtelTracking();
var sourceClient = new OpenSearchClient(ConnectionContextTestParams.builder()
var clientFactory = new OpenSearchClientFactory(null);
var sourceClient = clientFactory.get(ConnectionContextTestParams.builder()
.host(sourceCluster.getUrl())
.insecure(true)
.build()
@@ -115,7 +117,8 @@ protected MigrationItemResult executeMigration(MigrateOrEvaluateArgs arguments,
* @return An OpenSearch client.
*/
protected OpenSearchClient createClient(SearchClusterContainer cluster) {
return new OpenSearchClient(ConnectionContextTestParams.builder()
var clientFactory = new OpenSearchClientFactory(cluster.getContainerVersion().getVersion());
return clientFactory.get(ConnectionContextTestParams.builder()
.host(cluster.getUrl())
.insecure(true)
.build()
Original file line number Diff line number Diff line change
@@ -2,6 +2,8 @@

import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
@@ -30,7 +32,7 @@
import reactor.util.retry.Retry;

@Slf4j
public class OpenSearchClient {
public abstract class OpenSearchClient {

protected static final ObjectMapper objectMapper = new ObjectMapper();

@@ -269,6 +271,30 @@ public boolean hasIndex(String indexName) {
return hasObjectCheck(indexName, null);
}

// private String appendUrlParameters(String path, Map<String, String> urlParameters) {
// if (urlParameters == null || urlParameters.isEmpty()) {
// return path;
// }
//
// StringBuilder sb = new StringBuilder(path);
// sb.append('?');
//
// boolean first = true;
// for (Map.Entry<String, String> entry : urlParameters.entrySet()) {
// if (!first) {
// sb.append('&');
// }
// sb.append(URLEncoder.encode(entry.getKey(), StandardCharsets.UTF_8))
// .append('=')
// .append(URLEncoder.encode(entry.getValue(), StandardCharsets.UTF_8));
// first = false;
// }
//
// return sb.toString();
// }

protected abstract String getCreateIndexPath(String indexName);

/*
* Create an index if it does not already exist. Returns an Optional; if the index was created, it
* will be the created object and empty otherwise.
@@ -278,7 +304,7 @@ public Optional<ObjectNode> createIndex(
ObjectNode settings,
IRfsContexts.ICheckedIdempotentPutRequestContext context
) {
String targetPath = indexName;
var targetPath = getCreateIndexPath(indexName);
return createObjectIdempotent(targetPath, settings, context);
}

@@ -442,6 +468,8 @@ public Optional<ObjectNode> getSnapshotStatus(
}
}

protected abstract String getBulkRequestPath(String indexName);

Retry getBulkRetryStrategy() {
return BULK_RETRY_STRATEGY;
}
@@ -451,7 +479,7 @@ public Mono<BulkResponse> sendBulkRequest(String indexName, List<BulkDocSection>
{
final var docsMap = docs.stream().collect(Collectors.toMap(d -> d.getId(), d -> d));
return Mono.defer(() -> {
final String targetPath = indexName + "/_bulk";
final String targetPath = getBulkRequestPath(indexName);
log.atTrace().setMessage("Creating bulk body with document ids {}").addArgument(docsMap::keySet).log();
var body = BulkDocSection.convertToBulkRequestBody(docsMap.values());
var additionalHeaders = new HashMap<String, List<String>>();
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.opensearch.migrations.bulkload.common;


import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.migrations.Version;
import org.opensearch.migrations.VersionMatchers;
import org.opensearch.migrations.bulkload.common.http.ConnectionContext;
import org.opensearch.migrations.bulkload.version_es_6_8.OpenSearchClient_ES_6_8;
import org.opensearch.migrations.bulkload.version_os_2_11.OpenSearchClient_OS_2_11;
import org.opensearch.migrations.reindexer.FailedRequestsLogger;

@RequiredArgsConstructor
@Slf4j
public class OpenSearchClientFactory {
// Version can be null, and if so, the "default" client, for OS_2_11 will be provided (matching the pre-factory behavior)
private final Version version;

public OpenSearchClient get(
ConnectionContext connectionContext
) {
if (version == null || VersionMatchers.isOS_1_X.test(version) || VersionMatchers.isOS_2_X.test(version) || VersionMatchers.isES_7_X.test(version)) {
return new OpenSearchClient_OS_2_11(connectionContext);
} else if (VersionMatchers.isES_6_X.test(version)) {
return new OpenSearchClient_ES_6_8(connectionContext);
} else {
throw new IllegalArgumentException("Unsupported version: " + version);
}
}

public OpenSearchClient get(
RestClient client,
FailedRequestsLogger failedRequestsLogger
) {
if (version == null || VersionMatchers.isOS_1_X.test(version) || VersionMatchers.isOS_2_X.test(version) || VersionMatchers.isES_7_X.test(version)) {
return new OpenSearchClient_OS_2_11(client, failedRequestsLogger);
} else if (VersionMatchers.isES_6_X.test(version)) {
return new OpenSearchClient_ES_6_8(client, failedRequestsLogger);
} else {
throw new IllegalArgumentException("Unsupported version: " + version);
}
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.opensearch.migrations.bulkload.version_es_6_8;

import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.common.RestClient;
import org.opensearch.migrations.bulkload.common.http.ConnectionContext;
import org.opensearch.migrations.reindexer.FailedRequestsLogger;

public class OpenSearchClient_ES_6_8 extends OpenSearchClient {
public OpenSearchClient_ES_6_8(ConnectionContext connectionContext) {
super(connectionContext);
}

public OpenSearchClient_ES_6_8(RestClient client, FailedRequestsLogger failedRequestsLogger) {
super(client, failedRequestsLogger);
}

protected String getCreateIndexPath(String indexName) {
return indexName + "?include_type_name=false";
}

protected String getBulkRequestPath(String indexName) {
return indexName + "/_doc/_bulk";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.opensearch.migrations.bulkload.version_os_2_11;

import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.common.RestClient;
import org.opensearch.migrations.bulkload.common.http.ConnectionContext;
import org.opensearch.migrations.reindexer.FailedRequestsLogger;

public class OpenSearchClient_OS_2_11 extends OpenSearchClient {
public OpenSearchClient_OS_2_11(ConnectionContext connectionContext) {
super(connectionContext);
}

public OpenSearchClient_OS_2_11(RestClient client, FailedRequestsLogger failedRequestsLogger) {
super(client, failedRequestsLogger);
}

protected String getCreateIndexPath(String indexName) {
return indexName;
}

protected String getBulkRequestPath(String indexName) {
return indexName + "/_bulk";
}
}
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@
import org.opensearch.migrations.Version;
import org.opensearch.migrations.VersionMatchers;
import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.common.OpenSearchClientFactory;
import org.opensearch.migrations.bulkload.common.http.ConnectionContext;
import org.opensearch.migrations.bulkload.models.DataFilterArgs;
import org.opensearch.migrations.cluster.ClusterWriter;
@@ -76,7 +77,8 @@ public String toString() {

private OpenSearchClient getClient() {
if (client == null) {
client = new OpenSearchClient(getConnection());
var clientFactory = new OpenSearchClientFactory(version);
client = clientFactory.get(getConnection());
}
return client;
}
Original file line number Diff line number Diff line change
@@ -2,7 +2,8 @@

import org.opensearch.migrations.Version;
import org.opensearch.migrations.VersionMatchers;
import org.opensearch.migrations.bulkload.common.OpenSearchClient;

Check failure on line 5 in RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteReader.java

GitHub Actions / Run SonarQube Analysis

java:S1128

Remove this unused import 'org.opensearch.migrations.bulkload.common.OpenSearchClient'.
import org.opensearch.migrations.bulkload.common.OpenSearchClientFactory;
import org.opensearch.migrations.bulkload.common.http.ConnectionContext;
import org.opensearch.migrations.bulkload.models.GlobalMetadata.Factory;
import org.opensearch.migrations.bulkload.models.IndexMetadata;
@@ -44,7 +45,8 @@
public Version getVersion() {
if (version == null) {
// Use a throw away client that will work on any version of the service
version = new OpenSearchClient(getConnection()).getClusterVersion();
var clientFactory = new OpenSearchClientFactory(null);
version = clientFactory.get(connection).getClusterVersion();
}
return version;
}
Original file line number Diff line number Diff line change
@@ -21,6 +21,16 @@ public RemoteReaderClient(ConnectionContext connection) {
super(connection);
}

@Override
protected String getCreateIndexPath(String indexName) {
return indexName;
}

@Override
protected String getBulkRequestPath(String indexName) {
return indexName + "/_bulk";
}

protected Map<String, String> getTemplateEndpoints() {
return Map.of(
"index_template", "_index_template",
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@

import org.opensearch.migrations.Version;
import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.common.OpenSearchClientFactory;
import org.opensearch.migrations.bulkload.common.SourceRepo;
import org.opensearch.migrations.bulkload.common.http.ConnectionContext;
import org.opensearch.migrations.bulkload.models.DataFilterArgs;
@@ -58,7 +59,8 @@ public ClusterSnapshotReader getSnapshotReader(Version version, SourceRepo repo)
* @return The remote resource provider
*/
public ClusterReader getRemoteReader(ConnectionContext connection) {
var client = new OpenSearchClient(connection);
var clientFactory = new OpenSearchClientFactory(null);
var client = clientFactory.get(connection);
var version = client.getClusterVersion();

var remoteProvider = getRemoteProviders(connection)
@@ -78,8 +80,9 @@ public ClusterReader getRemoteReader(ConnectionContext connection) {
* @return The remote resource creator
*/
public ClusterWriter getRemoteWriter(ConnectionContext connection, Version versionOverride, DataFilterArgs dataFilterArgs) {
var clientFactory = new OpenSearchClientFactory(null);
var version = Optional.ofNullable(versionOverride)
.orElseGet(() -> new OpenSearchClient(connection).getClusterVersion());
.orElseGet(() -> clientFactory.get(connection).getClusterVersion());

var remoteProvider = getRemoteProviders(connection)
.filter(p -> p.compatibleWith(version))
Original file line number Diff line number Diff line change
@@ -97,7 +97,7 @@ class OpenSearchClientTest {
@BeforeEach
void beforeTest() {
doReturn(connectionContext).when(restClient).getConnectionContext();
openSearchClient = spy(new OpenSearchClient(restClient, failedRequestLogger));
openSearchClient = spy(new OpenSearchClientFactory(null).get(restClient, failedRequestLogger));
}

@Test
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@
import java.util.List;

import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.common.OpenSearchClientFactory;
import org.opensearch.migrations.bulkload.common.http.ConnectionContextTestParams;
import org.opensearch.migrations.bulkload.models.IndexMetadata;
import org.opensearch.migrations.reindexer.tracing.IDocumentMigrationContexts;
@@ -28,7 +29,8 @@ public default void fullMigrationViaLocalSnapshot(
tempSnapshotName,
unpackedShardDataDir
);
final var targetClusterClient = new OpenSearchClient(ConnectionContextTestParams.builder()
var clientFactory = new OpenSearchClientFactory(null);
final var targetClusterClient =clientFactory.get(ConnectionContextTestParams.builder()
.host(targetClusterUrl)
.build()
.toConnectionContext());

0 comments on commit d809f75

Please sign in to comment.