Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(data-migrate): data mover task #726

Merged
merged 3 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import {
} from './stacks/oraCompressionPipelineManager';
import { getOraDecompressionManagerStackProps } from './stacks/oraDecompressionPipelineManager';
import { getPgDDProps } from './stacks/pgDD';
import { getDataMigrateStackProps } from './stacks/dataMigrate';

interface EnvironmentConfig {
name: string;
Expand Down Expand Up @@ -131,6 +132,7 @@ export const getEnvironmentConfig = (stage: AppStage): EnvironmentConfig | null
workflowManagerStackProps: getWorkflowManagerStackProps(stage),
stackyMcStackFaceProps: getGlueStackProps(stage),
fmAnnotatorProps: getFmAnnotatorProps(),
dataMigrateProps: getDataMigrateStackProps(stage),
pgDDProps: getPgDDProps(stage),
},
};
Expand Down
7 changes: 6 additions & 1 deletion config/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ export const icav2PipelineCacheBucket: Record<AppStage, string> = {
[AppStage.PROD]: 'pipeline-prod-cache-503977275616-ap-southeast-2',
};

// The test inventory bucket for dev.
export const fileManagerInventoryBucket: Record<AppStage.BETA, string> = {
[AppStage.BETA]: 'filemanager-inventory-test',
};

// The archive bucket. Noting that this is only present for prod data.
export const icav2ArchiveAnalysisBucket: Record<AppStage.PROD, string> = {
[AppStage.PROD]: 'archive-prod-analysis-503977275616-ap-southeast-2',
Expand Down Expand Up @@ -143,8 +148,8 @@ export const eventDlqNameFMAnnotator = 'orcabus-event-dlq-fmannotator';
export const serviceUserSecretName = 'orcabus/token-service-user'; // pragma: allowlist secret
export const jwtSecretName = 'orcabus/token-service-jwt'; // pragma: allowlist secret
export const icaAccessTokenSecretName = 'IcaSecretsPortal'; // pragma: allowlist secret

export const fileManagerIngestRoleName = 'orcabus-file-manager-ingest-role';
export const dataMoverRoleName = 'orcabus-data-mover-role';

/*
Resources required for BaseSpace TES stack
Expand Down
54 changes: 54 additions & 0 deletions config/stacks/dataMigrate.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import {
AppStage,
dataMoverRoleName,
fileManagerInventoryBucket,
icav2ArchiveAnalysisBucket,
icav2ArchiveFastqBucket,
icav2PipelineCacheBucket,
logsApiGatewayConfig,
oncoanalyserBucket,
vpcProps,
} from '../constants';
import { DataMigrateStackProps } from '../../lib/workload/stateless/stacks/data-migrate/deploy/stack';

export const getDataMigrateStackProps = (stage: AppStage): DataMigrateStackProps => {
let readFromBuckets = [];
let deleteFromBuckets = [];
let writeToBuckets = [];
switch (stage) {
case AppStage.BETA:
// For dev/staging we can write to and read from the same set of buckets.
readFromBuckets = [oncoanalyserBucket[stage], icav2PipelineCacheBucket[stage]];
deleteFromBuckets = [oncoanalyserBucket[stage], icav2PipelineCacheBucket[stage]];

// For dev additionally, write to the filemanager inventory bucket for testing.
writeToBuckets = [
oncoanalyserBucket[stage],
icav2PipelineCacheBucket[stage],
fileManagerInventoryBucket[stage],
];
break;
case AppStage.GAMMA:
readFromBuckets = [oncoanalyserBucket[stage], icav2PipelineCacheBucket[stage]];
deleteFromBuckets = [oncoanalyserBucket[stage], icav2PipelineCacheBucket[stage]];

writeToBuckets = [oncoanalyserBucket[stage], icav2PipelineCacheBucket[stage]];
break;
case AppStage.PROD:
readFromBuckets = [oncoanalyserBucket[stage], icav2PipelineCacheBucket[stage]];
deleteFromBuckets = [oncoanalyserBucket[stage], icav2PipelineCacheBucket[stage]];

// For prod, we only allow writing to the archive buckets, nothing else.
writeToBuckets = [icav2ArchiveAnalysisBucket[stage], icav2ArchiveFastqBucket[stage]];
break;
}

return {
vpcProps,
dataMoverRoleName,
deleteFromBuckets,
readFromBuckets,
writeToBuckets,
logRetention: logsApiGatewayConfig[stage].retention,
};
};
18 changes: 12 additions & 6 deletions config/stacks/fileManager.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,34 @@
import { FilemanagerConfig } from '../../lib/workload/stateless/stacks/filemanager/deploy/stack';
import {
AppStage,
cognitoApiGatewayConfig,
computeSecurityGroupName,
corsAllowOrigins,
databasePort,
dbClusterEndpointHostParameterName,
eventSourceQueueName,
vpcProps,
oncoanalyserBucket,
icav2PipelineCacheBucket,
fileManagerIngestRoleName,
fileManagerInventoryBucket,
icav2PipelineCacheBucket,
logsApiGatewayConfig,
cognitoApiGatewayConfig,
corsAllowOrigins,
oncoanalyserBucket,
vpcProps,
} from '../constants';

export const getFileManagerStackProps = (stage: AppStage): FilemanagerConfig => {
const inventorySourceBuckets = [];
if (stage == AppStage.BETA) {
inventorySourceBuckets.push(fileManagerInventoryBucket[stage]);
}

return {
securityGroupName: computeSecurityGroupName,
vpcProps,
eventSourceQueueName: eventSourceQueueName,
databaseClusterEndpointHostParameter: dbClusterEndpointHostParameterName,
port: databasePort,
migrateDatabase: true,
inventorySourceBuckets: ['filemanager-inventory-test'],
inventorySourceBuckets,
eventSourceBuckets: [oncoanalyserBucket[stage], icav2PipelineCacheBucket[stage]],
fileManagerRoleName: fileManagerIngestRoleName,
apiGatewayCognitoProps: {
Expand Down
5 changes: 5 additions & 0 deletions lib/workload/stateless/stacks/data-migrate/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
deploy
.gitignore
README.md
.ruff_cache
response.josn
2 changes: 2 additions & 0 deletions lib/workload/stateless/stacks/data-migrate/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.ruff_cache
response.json
10 changes: 10 additions & 0 deletions lib/workload/stateless/stacks/data-migrate/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
FROM public.ecr.aws/docker/library/python:3.13

RUN apt update -y && apt install -y awscli && pip3 install poetry

WORKDIR /app

COPY . .
RUN poetry install --no-interaction --no-root

ENTRYPOINT ["poetry", "run", "dm"]
16 changes: 16 additions & 0 deletions lib/workload/stateless/stacks/data-migrate/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
.PHONY: *

install:
@poetry update

lint: install
@poetry run ruff format .

check: lint
@poetry run ruff check .

dm: install
@poetry run dm $(COMMAND)

clean:
rm -rf data && rm -rf .ruff_cache
37 changes: 37 additions & 0 deletions lib/workload/stateless/stacks/data-migrate/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Data migrate

A service to migrate data between locations.

## Data mover

Locally, use the data mover to move or copy data between locations:

```sh
poetry run dm move --source <SOURCE> --destination <DESTINATION>
```

This command is also deployed as a fargate task triggered by step functions.
The step functions expects as input a JSON which specifies the command (move or copy),
a source and a destination. For example, to move a specified `portalRunId` into the archive
bucket (this is probably easier in the step functions console):

```sh
export ARN=$(aws stepfunctions list-state-machines | jq -r '.stateMachines | .[] | select(.name == "orcabus-data-migrate-mover") | .stateMachineArn')
export COMMAND="move"
export SOURCE="s3://umccr-temp-dev/move_test_2"
export DESTINATION="s3://filemanager-inventory-test/move_test_2"
aws stepfunctions start-execution --state-machine-arn $ARN --input "{\"command\" : \"$COMMAND\", \"source\": \"$SOURCE\", \"destination\": \"$DESTINATION\" }"
```

## Local development

This project uses [poetry] to manage dependencies.

Run the linter and formatter:

```
make check
```

[poetry]: https://python-poetry.org/
[env-example]: .env.example
Empty file.
66 changes: 66 additions & 0 deletions lib/workload/stateless/stacks/data-migrate/data_mover/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import logging
import sys

import click

from data_mover.data_mover import DataMover

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)


@click.group()
def cli():
pass


@cli.command()
@click.option(
"--source",
required=True,
help="The source to copy from.",
)
@click.option(
"--destination",
required=True,
help="The destination to copy to.",
)
def move(source, destination):
"""
Copy files from the source to the destination and delete the source if successful.
This command calls `aws s3 sync` directly, so it expects the AWS cli to be installed.
"""
data_mover = DataMover(source, destination, logger=logger)
data_mover.sync()
data_mover.delete()
data_mover.send_output()


@cli.command()
@click.option(
"--source",
required=True,
help="The source to copy from.",
)
@click.option(
"--destination",
required=True,
help="The destination to copy to.",
)
def copy(source, destination):
"""
Copy files from the source to the destination and keep the source if successful.
This command calls `aws s3 sync` directly, so it expects the AWS cli to be installed.
"""
data_mover = DataMover(source, destination, logger=logger)
data_mover.sync()
data_mover.send_output()


if __name__ == "__main__":
try:
cli(standalone_mode=False)
sys.exit(0)
except Exception as e:
DataMover.send_failure(str(e))
sys.exit(1)
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import json
import logging
import os
import subprocess

import boto3
from mypy_boto3_stepfunctions import SFNClient


class DataMover:
"""
A class to manage moving data.
"""

def __init__(
self,
source: str,
destination: str,
repeat: int = 2,
# 12 hours
timeout: int = 43200,
logger: logging.Logger = logging.getLogger(__name__),
):
self.source = source
self.destination = destination
self.repeat = repeat
self.timeout = timeout
self.logger = logger
self.output = ""

def sync(self):
"""
Sync destination and source.
"""
self.logger.info(
f"syncing {self.repeat} times from {self.source} to {self.destination}"
)

out = None
for _ in range(self.repeat):
out = subprocess.run(
["aws", "s3", "sync", self.source, self.destination],
check=True,
# 1 day
timeout=self.timeout,
)
self.logger.info(out.stdout)

self.output += out.stdout or ""

if out.stdout is not None:
raise Exception("failed to sync - non-empty output")

def delete(self):
"""
Delete the source.
"""
self.logger.info(f"deleting files from {self.source}")

out = subprocess.run(
["aws", "s3", "rm", "--recursive", self.source],
check=True,
# 1 day
timeout=self.timeout,
)
self.logger.info(out.stdout)

self.output += out.stdout or ""

def send_output(self):
"""
Send successful task response with the output.
"""
task_token = os.getenv("DM_TASK_TOKEN")
if task_token is not None:
client: SFNClient = boto3.client("stepfunctions")
client.send_task_success(
taskToken=task_token, output=json.dumps(self.output)
)

@staticmethod
def send_failure(error: str):
"""
Send a failed task response.
"""
task_token = os.getenv("DM_TASK_TOKEN")
if task_token is not None:
client: SFNClient = boto3.client("stepfunctions")
client.send_task_failure(taskToken=task_token, error=error)
Loading