Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify Jenkins Full E2E Integ Test to perform Transformations #1182

Merged
merged 21 commits into from
Jan 3, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Handle changes from main and refactor further
Signed-off-by: Tanner Lewis <[email protected]>
lewijacn committed Dec 12, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit 0e2f4530963a685fbe4f7288244a01022f742996
Empty file.
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
package org.opensearch.migrations.bulkload;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -27,7 +22,6 @@
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
@@ -124,8 +118,8 @@ private void testProcess(int expectedExitCode, Function<RunData, Integer> proces
var proxyContainer = new ToxiProxyWrapper(network)
) {
CompletableFuture.allOf(
CompletableFuture.runAsync(() -> esSourceContainer.start()),
CompletableFuture.runAsync(() -> osTargetContainer.start()),
CompletableFuture.runAsync(esSourceContainer::start),
CompletableFuture.runAsync(osTargetContainer::start),
CompletableFuture.runAsync(() -> proxyContainer.start(TARGET_DOCKER_HOSTNAME, OPENSEARCH_PORT))
).join();

@@ -180,36 +174,7 @@ private static int runProcessAgainstToxicTarget(
}

int timeoutSeconds = 90;
ProcessBuilder processBuilder = setupProcess(tempDirSnapshot, tempDirLucene, targetAddress, failHow);

var process = runAndMonitorProcess(processBuilder);
boolean finished = process.waitFor(timeoutSeconds, TimeUnit.SECONDS);
if (!finished) {
log.atError().setMessage("Process timed out, attempting to kill it...").log();
process.destroy(); // Try to be nice about things first...
if (!process.waitFor(10, TimeUnit.SECONDS)) {
log.atError().setMessage("Process still running, attempting to force kill it...").log();
process.destroyForcibly();
}
Assertions.fail("The process did not finish within the timeout period (" + timeoutSeconds + " seconds).");
}

return process.exitValue();
}


@NotNull
private static ProcessBuilder setupProcess(
Path tempDirSnapshot,
Path tempDirLucene,
String targetAddress,
FailHow failHow
) {
String classpath = System.getProperty("java.class.path");
String javaHome = System.getProperty("java.home");
String javaExecutable = javaHome + File.separator + "bin" + File.separator + "java";

String[] args = {
String[] processArgs = {
"--snapshot-name",
SNAPSHOT_NAME,
"--snapshot-local-dir",
@@ -228,51 +193,21 @@ private static ProcessBuilder setupProcess(
"ES_7_10",
"--initial-lease-duration",
failHow == FailHow.NEVER ? "PT10M" : "PT1S" };
ProcessBuilder processBuilder = setupProcess(processArgs);

// Kick off the doc migration process
log.atInfo().setMessage("Running RfsMigrateDocuments with args: {}")
.addArgument(() -> Arrays.toString(args))
.log();
ProcessBuilder processBuilder = new ProcessBuilder(
javaExecutable,
"-cp",
classpath,
"org.opensearch.migrations.RfsMigrateDocuments"
);
processBuilder.command().addAll(Arrays.asList(args));
processBuilder.redirectErrorStream(true);
processBuilder.redirectOutput();
return processBuilder;
}

@NotNull
private static Process runAndMonitorProcess(ProcessBuilder processBuilder) throws IOException {
var process = processBuilder.start();

log.atInfo().setMessage("Process started with ID: {}").addArgument(() -> process.toHandle().pid()).log();

BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
var readerThread = new Thread(() -> {
String line;
while (true) {
try {
if ((line = reader.readLine()) == null) break;
} catch (IOException e) {
log.atWarn().setCause(e).setMessage("Couldn't read next line from sub-process").log();
return;
}
String finalLine = line;
log.atInfo()
.setMessage("from sub-process [{}]: {}")
.addArgument(() -> process.toHandle().pid())
.addArgument(finalLine)
.log();
var process = runAndMonitorProcess(processBuilder);
boolean finished = process.waitFor(timeoutSeconds, TimeUnit.SECONDS);
if (!finished) {
log.atError().setMessage("Process timed out, attempting to kill it...").log();
process.destroy(); // Try to be nice about things first...
if (!process.waitFor(10, TimeUnit.SECONDS)) {
log.atError().setMessage("Process still running, attempting to force kill it...").log();
process.destroyForcibly();
}
});
Assertions.fail("The process did not finish within the timeout period (" + timeoutSeconds + " seconds).");
}

// Kill the process and fail if we have to wait too long
readerThread.start();
return process;
return process.exitValue();
}

}
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Clock;
@@ -47,23 +48,16 @@
import lombok.extern.slf4j.Slf4j;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Assertions;
import reactor.core.publisher.Flux;

@Slf4j
public class SourceTestBase {
public static final String GENERATOR_BASE_IMAGE = "migrations/elasticsearch_client_test_console:latest";
public static final int MAX_SHARD_SIZE_BYTES = 64 * 1024 * 1024;
public static final String SOURCE_SERVER_ALIAS = "source";
public static final long TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS = 3600;

protected static Object[] makeParamsForBase(SearchClusterContainer.ContainerVersion baseSourceImage) {
return new Object[]{
baseSourceImage,
GENERATOR_BASE_IMAGE,
new String[]{"/root/runTestBenchmarks.sh", "--endpoint", "http://" + SOURCE_SERVER_ALIAS + ":9200/"}};
}

@NotNull
protected static Process runAndMonitorProcess(ProcessBuilder processBuilder) throws IOException {
var process = processBuilder.start();

Unchanged files with check annotations Beta

protected RfsLuceneDocument getDocument(IndexReader reader, int docSegId, boolean isLive) {
Document document;
try {
document = reader.document(docSegId);

Check failure on line 167 in RFS/src/main/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReader.java

GitHub Actions / Run SonarQube Analysis

java:S1874

Remove this use of "document"; it is deprecated.
} catch (IOException e) {
log.atError()
.setCause(e)
public abstract JsonNode getMappings();
public abstract String getName();

Check failure on line 33 in RFS/src/main/java/org/opensearch/migrations/bulkload/models/IndexMetadata.java

GitHub Actions / Run SonarQube Analysis

java:S3038

"getName" is defined in the "Index" interface and can be removed from this class.
public abstract int getNumberOfShards();
try {
// Create the index; it's fine if it already exists
try {

Check failure on line 47 in RFS/src/main/java/org/opensearch/migrations/bulkload/version_os_2_11/IndexCreator_OS_2_11.java

GitHub Actions / Run SonarQube Analysis

java:S1141

Extract this nested try block into a separate method.
var alreadyExists = false;
if (mode == MigrationMode.SIMULATE) {
alreadyExists = client.hasIndex(index.getName());
if (responseDoc.has(SUCCESSOR_ITEMS_FIELD_NAME)) {
return new ArrayList<>(Arrays.asList(responseDoc.get(SUCCESSOR_ITEMS_FIELD_NAME).asText().split(SUCCESSOR_ITEM_DELIMITER)));
}
return null;

Check failure on line 385 in RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java

GitHub Actions / Run SonarQube Analysis

java:S1168

Return an empty collection instead of null.
}
@Override
refresh(ctx::getRefreshContext);
final var leaseChecker = new LeaseChecker(leaseDuration, System.nanoTime());
int driftRetries = 0;
while (true) {

Check failure on line 1010 in RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java

GitHub Actions / Run SonarQube Analysis

java:S135

Reduce the total number of break and continue statements in this loop to use at most one.
Duration sleepBeforeNextRetryDuration;
try {
final var obtainResult = assignOneWorkItem(leaseDuration.toSeconds());
public static String formatAsWorkItemString(String name, int shardId) {
if (name.contains(SEPARATOR)) {
throw new IllegalArgumentException(
"Illegal work item name: '" + name + "'. " + "Work item names cannot contain '" + SEPARATOR + "'"

Check failure on line 20 in RFS/src/main/java/org/opensearch/migrations/bulkload/worker/IndexAndShardCursor.java

GitHub Actions / Run SonarQube Analysis

java:S1192

Define a constant instead of duplicating this literal "Illegal work item name: '" 3 times.
);
}
return name + SEPARATOR + shardId;
}
private void writeByteStringToCurrentStream(int fieldNum, String str) throws IOException {
if (str.length() > 0) {

Check failure on line 280 in TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java

GitHub Actions / Run SonarQube Analysis

java:S7158

Use isEmpty() to check whether a string is empty or not.
getOrCreateCodedOutputStream().writeString(fieldNum, str);
} else {
getOrCreateCodedOutputStream().writeUInt32NoTag(0);
throws IOException {
int dataSize = 0;
int lengthSize = 1;
if (str.length() > 0) {

Check failure on line 368 in TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java

GitHub Actions / Run SonarQube Analysis

java:S7158

Use isEmpty() to check whether a string is empty or not.
dataSize = CodedOutputStream.computeStringSize(dataFieldNumber, str);
lengthSize = CodedOutputStream.computeInt32SizeNoTag(dataSize);
}
message = f"The tool '{tool_name}' does not have a 'define_arguments' function. \
Please add one to specify its arguments."
logger.error(message)
raise Exception(message)

Check failure on line 72 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/cluster_tools/src/cluster_tools/base/main.py

GitHub Actions / Run SonarQube Analysis

python:S112

Replace this generic exception class with a more specific one.
tool_parser.set_defaults(func=tool_module.main) # Set the main function as the handler
except Exception as e:
logger.error(f"An error occurred while importing the tool '{tool_name}': {e}")
def define_arguments(parser: argparse.ArgumentParser) -> None:
"""Defines arguments for disabling compatibility mode."""
parser.description = "Disables compatibility mode on the OpenSearch cluster."
pass

Check failure on line 12 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/cluster_tools/src/cluster_tools/tools/disable_compatibility_mode.py

GitHub Actions / Run SonarQube Analysis

python:S2772

Remove this unneeded "pass".
def modify_compatibility_mode(env: Environment, enable: bool) -> dict: