Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Controlling the Migration Assistant Control Plane #1060

Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 17 additions & 74 deletions DocumentsFromSnapshotMigration/docker/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,85 +3,28 @@
# Fail the script if any command fails
set -e

# Print our ENV variables
if [[ $RFS_COMMAND != *"--target-password"* ]]; then
echo "RFS_COMMAND: $RFS_COMMAND"
else
echo "RFS Target Cluster password found in RFS_COMMAND; skipping logging of the value"
fi

echo "RFS_TARGET_USER: $RFS_TARGET_USER"
echo "RFS_TARGET_PASSWORD: <redacted>"
echo "RFS_TARGET_PASSWORD_ARN: $RFS_TARGET_PASSWORD_ARN"

# Check if the RFS Command already contains a username; only do special work if it does not
if [[ $RFS_COMMAND != *"--target-username"* ]]; then
if [[ -n "$RFS_TARGET_USER" ]]; then
echo "Using username from ENV variable RFS_TARGET_USER. Updating RFS Command with username."
RFS_COMMAND="$RFS_COMMAND --target-username \"$RFS_TARGET_USER\""
fi
fi

# Check if the RFS Command already contains a password; only do special work if it does not
if [[ $RFS_COMMAND != *"--target-password"* ]]; then
PASSWORD_TO_USE=""

# Check if the password is available in plaintext; if, use it. Otherwise, retrieve it from AWS Secrets Manager
if [[ -n "$RFS_TARGET_PASSWORD" ]]; then
echo "Using plaintext password from ENV variable RFS_TARGET_PASSWORD"
PASSWORD_TO_USE="$RFS_TARGET_PASSWORD"
elif [[ -n "$RFS_TARGET_PASSWORD_ARN" ]]; then
# Retrieve password from AWS Secrets Manager if ARN is provided
echo "Using password from AWS Secrets Manager ARN in ENV variable RFS_TARGET_PASSWORD_ARN"
PASSWORD_TO_USE=$(aws secretsmanager get-secret-value --secret-id "$RFS_TARGET_PASSWORD_ARN" --query SecretString --output text)
fi

# Append the username/password to the RFS Command if have an updated password
if [[ -n "$PASSWORD_TO_USE" ]]; then
echo "Updating RFS Command with password."
RFS_COMMAND="$RFS_COMMAND --target-password \"$PASSWORD_TO_USE\""
fi
fi

# Extract the value passed after --s3-local-dir
S3_LOCAL_DIR=$(echo "$RFS_COMMAND" | sed -n 's/.*--s3-local-dir\s\+\("[^"]\+"\|[^ ]\+\).*/\1/p' | tr -d '"')
# Extract the value passed after --lucene-dir
LUCENE_DIR=$(echo "$RFS_COMMAND" | sed -n 's/.*--lucene-dir\s\+\("[^"]\+"\|[^ ]\+\).*/\1/p' | tr -d '"')
if [[ -n "$S3_LOCAL_DIR" ]]; then
echo "Will delete S3 local directory between runs: $S3_LOCAL_DIR"
else
echo "--s3-local-dir argument not found in RFS_COMMAND. Will not delete S3 local directory between runs."
fi

if [[ -n "$LUCENE_DIR" ]]; then
echo "Will delete lucene local directory between runs: $LUCENE_DIR"
else
echo "--lucene-dir argument not found in RFS_COMMAND. This is required."
exit 1
fi

cleanup_directories() {
if [[ -n "$S3_LOCAL_DIR" ]]; then
echo "Cleaning up S3 local directory: $S3_LOCAL_DIR"
rm -rf "$S3_LOCAL_DIR"
echo "Directory $S3_LOCAL_DIR has been cleaned up."
fi

if [[ -n "$LUCENE_DIR" ]]; then
echo "Cleaning up Lucene local directory: $LUCENE_DIR"
rm -rf "$LUCENE_DIR"
echo "Directory $LUCENE_DIR has been cleaned up."
fi
}


# Discussion needed: Container will have a minimum number of
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧵 For Discussion

# parameters after on start services.yaml is loaded.
#
# This script was parsing the parameters that were feed into the
# container, unless this script is pulling parameters from the
# services.yaml its going to be out of sync.
#
# Metadata & Snapshot will already need to know how to read the
# Secrets from ARNs so this seems like it aligns well to do this
# at the same time.

# Discussion needed: Directly cleanup was being done by script based
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧵 For Discussion

# on arguments
#
# Alternative, java can clean these directories on start since this script
# won't be able to resolve the path(s) without parsing the services.yaml

[ -z "$RFS_COMMAND" ] && \
{ echo "Warning: RFS_COMMAND is empty! Exiting."; exit 1; } || \
until ! {
echo "Running command $RFS_COMMAND"
eval "$RFS_COMMAND"
}; do
echo "Cleaning up directories before the next run."
cleanup_directories
echo "About to start the next run."
done
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public Duration convert(String value) {
}

public static class Args {


@Parameter(names = {"--help", "-h"}, help = true, description = "Displays information about how to use this tool")
private boolean help;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,7 @@ public class MetadataArgs {
@Parameter(names = { "--otel-collector-endpoint" }, description = "Endpoint (host:port) for the OpenTelemetry Collector to which metrics logs should be"
+ "forwarded. If no value is provided, metrics will not be forwarded.")
public String otelCollectorEndpoint;

@Parameter(names = { "--config-file", "-c" }, description = "The path to a config file")
public String configFile;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.migrations.tracing.CompositeContextTracker;
import org.opensearch.migrations.tracing.RootOtelContext;
import org.opensearch.migrations.utils.ProcessHelpers;
import org.opensearch.migrations.config.MigrationConfig;

import com.beust.jcommander.JCommander;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -22,15 +23,62 @@
public class MetadataMigration {

public static void main(String[] args) throws Exception {

var metadataArgs = new MetadataArgs();
var migrateArgs = new MigrateArgs();
// Note; need to handle these effectively duplicated parsed args
var evaluateArgs = new EvaluateArgs();
var jCommander = JCommander.newBuilder()

// Load from the command line first
JCommander.newBuilder()
.addObject(metadataArgs)
.addCommand(migrateArgs)
.addCommand(evaluateArgs)
.build();
jCommander.parse(args);
.build()
.parse(args);

// Then override with settings
var config = getConfig(args);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧵 For Discussion

if (config != null) {
metadataArgs.otelCollectorEndpoint = config.metadata_migration.otel_endpoint;

// Note; we've got some serious null ref risk in this block of code, will need to use a lot of optionals.
migrateArgs.dataFilterArgs.indexAllowlist = config.metadata_migration.index_allowlist;
migrateArgs.dataFilterArgs.indexTemplateAllowlist = config.metadata_migration.index_template_allowlist;
migrateArgs.dataFilterArgs.componentTemplateAllowlist = config.metadata_migration.component_template_allowlist;

migrateArgs.fileSystemRepoPath = config.snapshot.fs.repo_path;
migrateArgs.snapshotName = config.snapshot.snapshot_name;
migrateArgs.s3LocalDirPath = config.metadata_migration.local_dir;
migrateArgs.s3Region = config.snapshot.s3.aws_region;
migrateArgs.s3RepoUri = config.snapshot.s3.repo_uri;

migrateArgs.sourceArgs.host = config.source_cluster.endpoint;
migrateArgs.sourceArgs.username = config.source_cluster.basic_auth.username;
migrateArgs.sourceArgs.password = config.source_cluster.basic_auth.password;
migrateArgs.sourceArgs.awsRegion = config.source_cluster.sigv4.region;
migrateArgs.sourceArgs.awsServiceSigningName = config.source_cluster.sigv4.service;
migrateArgs.sourceArgs.insecure = config.source_cluster.allow_insecure;

// Need to special case indirect values such as AWS Secrets
if (config.source_cluster.basic_auth.password_from_secret_arn != null) {
migrateArgs.sourceArgs.password = ""; // Load this from AWS and insert into this arg + log a message
}

migrateArgs.targetArgs.host = config.target_cluster.endpoint;
migrateArgs.targetArgs.username = config.target_cluster.basic_auth.username;
migrateArgs.targetArgs.password = config.target_cluster.basic_auth.password;
migrateArgs.targetArgs.awsRegion = config.target_cluster.sigv4.region;
migrateArgs.targetArgs.awsServiceSigningName = config.target_cluster.sigv4.service;
migrateArgs.targetArgs.insecure = config.target_cluster.allow_insecure;

// Need to special case indirect values such as AWS Secrets
if (config.target_cluster.basic_auth.password != null) {
migrateArgs.targetArgs.password = ""; // Load this from AWS and insert into this arg + log a message
}

migrateArgs.minNumberOfReplicas = config.metadata_migration.min_replicas;
}

var context = new RootMetadataMigrationContext(
RootOtelContext.initializeOpenTelemetryWithCollectorOrAsNoop(metadataArgs.otelCollectorEndpoint, "metadata",
Expand Down Expand Up @@ -115,4 +163,24 @@ private static void printCommandUsage(JCommander jCommander) {
jCommander.getUsageFormatter().usage(jCommander.getParsedCommand(), sb);
log.info(sb.toString());
}

private static MigrationConfig getConfig(String[] args) {
var metadataArgs = new MetadataArgs();

JCommander.newBuilder()
.addObject(metadataArgs)
.acceptUnknownOptions(true)
.build()
.parse(args);

if (metadataArgs.configFile != null) {
try {
return MigrationConfig.loadFrom(metadataArgs.configFile);
} catch (Exception e) {
log.warn("Unable to load from config file, falling back to command line arguments.");
}
}
return null;

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ services:
- migrations
volumes:
- sharedLogsVolume:/shared-logs-output
- ./migrationConsole/lib/console_link/services.yaml:/etc/migration_services.yaml
- ./migrationConsole/lib/console_link/services.yaml:/shared-logs-output/migration_services.yaml
# this is a convenience thing for testing -- it should be removed before this makes it to prod.
# - ./migrationConsole/lib/console_link:/root/lib/console_link
- ~/.aws:/root/.aws
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ services:
networks:
- migrations
volumes:
- ./lib/console_link/services.yaml:/etc/migration_services.yaml
- sharedLogsVolume:/shared-logs-output
- ./lib/console_link/services.yaml:/shared-logs-output/migration_services.yaml
# this is a convenience thing for testing -- it should be removed before this makes it to prod.
- ./lib:/root/lib
- ~/.aws:/root/.aws
Expand All @@ -26,6 +27,10 @@ services:
- "8000:8000"
command: pipenv run python /root/console_api/manage.py runserver_plus 0.0.0.0:8000

volumes:
sharedLogsVolume:
driver: local

networks:
migrations:
driver: bridge
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ The console link library is designed to provide a unified interface for the many

![Console_link Library Diagram](console_library_diagram.svg)

The user defines their migration services in a `migration_services.yaml` file, by default found at `/etc/migration_services.yaml`.
The user defines their migration services in a `migration_services.yaml` file, by default found at `/shared-logs-output/migration_services.yaml`.

Currently, the supported services are:

Expand All @@ -46,12 +46,14 @@ source_cluster:
endpoint: "https://capture-proxy-es:9200"
allow_insecure: true
no_auth:
version: ES_7_10
target_cluster:
endpoint: "https://opensearchtarget:9200"
allow_insecure: true
basic_auth:
username: "admin"
password: "myStrongPassword123!"
version: OS_2_15
metrics_source:
prometheus:
endpoint: "http://prometheus:9090"
Expand All @@ -71,8 +73,8 @@ backfill:
- "migration_deployment=1.0.6"
replay:
ecs:
cluster-name: "migrations-dev-cluster"
service-name: "migrations-dev-replayer-service"
cluster_name: "migrations-dev-cluster"
service_name: "migrations-dev-replayer-service"
snapshot:
snapshot_name: "snapshot_2023_01_01"
s3:
Expand Down Expand Up @@ -165,7 +167,7 @@ backfill:
ecs:
cluster_name: migration-aws-integ-ecs-cluster
service_name: migration-aws-integ-reindex-from-snapshot
aws-region: us-east-1
aws_region: us-east-1
```

#### OpenSearch Ingestion
Expand Down Expand Up @@ -262,7 +264,7 @@ The structure of cli commands is:

The available global options are:

- `--config-file FILE` to specify the path to a config file (default is `/etc/migration_services.yaml`)
- `--config-file FILE` to specify the path to a config file (default is `/shared-logs-output/migration_services.yaml`)
- `--json` to get output in JSON designed for machine consumption instead of printing to the console

#### Objects
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def __init__(self, config_file) -> None:

@click.group()
@click.option(
"--config-file", default="/etc/migration_services.yaml", help="Path to config file"
"--config-file", default="/shared-logs-output/migration_services.yaml", help="Path to config file"
)
@click.option("--json", is_flag=True)
@click.option('-v', '--verbose', count=True, help="Verbosity level. Default is warn, -v is info, -vv is debug.")
Expand Down Expand Up @@ -522,7 +522,7 @@ def describe_topic_records_cmd(ctx, topic_name):

@cli.command()
@click.option(
"--config-file", default="/etc/migration_services.yaml", help="Path to config file"
"--config-file", default="/shared-logs-output/migration_services.yaml", help="Path to config file"
)
@click.option("--json", is_flag=True)
@click.argument('shell', type=click.Choice(['bash', 'zsh', 'fish']))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ def __init__(self, config_file: str):
logger.info("No snapshot provided")
if 'metadata_migration' in self.config:
self.metadata: Metadata = Metadata(self.config["metadata_migration"],
target_cluster=self.target_cluster,
snapshot=self.snapshot)
config_file)
if 'kafka' in self.config:
self.kafka: Kafka = get_kafka(self.config["kafka"])
logger.info(f"Kafka initialized: {self.kafka}")
Loading
Loading