Skip to content

Commit

Permalink
Merge pull request #726 from umccr/feat/data-mover
Browse files Browse the repository at this point in the history
feat(data-migrate): data mover task
  • Loading branch information
mmalenic authored Nov 29, 2024
2 parents 18059a2 + 94711c1 commit 41efd8c
Show file tree
Hide file tree
Showing 17 changed files with 750 additions and 7 deletions.
2 changes: 2 additions & 0 deletions config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import {
} from './stacks/oraCompressionPipelineManager';
import { getOraDecompressionManagerStackProps } from './stacks/oraDecompressionPipelineManager';
import { getPgDDProps } from './stacks/pgDD';
import { getDataMigrateStackProps } from './stacks/dataMigrate';

interface EnvironmentConfig {
name: string;
Expand Down Expand Up @@ -131,6 +132,7 @@ export const getEnvironmentConfig = (stage: AppStage): EnvironmentConfig | null
workflowManagerStackProps: getWorkflowManagerStackProps(stage),
stackyMcStackFaceProps: getGlueStackProps(stage),
fmAnnotatorProps: getFmAnnotatorProps(),
dataMigrateProps: getDataMigrateStackProps(stage),
pgDDProps: getPgDDProps(stage),
},
};
Expand Down
7 changes: 6 additions & 1 deletion config/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ export const icav2PipelineCacheBucket: Record<AppStage, string> = {
[AppStage.PROD]: 'pipeline-prod-cache-503977275616-ap-southeast-2',
};

// The test inventory bucket for dev.
export const fileManagerInventoryBucket: Record<AppStage.BETA, string> = {
[AppStage.BETA]: 'filemanager-inventory-test',
};

// The archive bucket. Noting that this is only present for prod data.
export const icav2ArchiveAnalysisBucket: Record<AppStage.PROD, string> = {
[AppStage.PROD]: 'archive-prod-analysis-503977275616-ap-southeast-2',
Expand Down Expand Up @@ -143,8 +148,8 @@ export const eventDlqNameFMAnnotator = 'orcabus-event-dlq-fmannotator';
export const serviceUserSecretName = 'orcabus/token-service-user'; // pragma: allowlist secret
export const jwtSecretName = 'orcabus/token-service-jwt'; // pragma: allowlist secret
export const icaAccessTokenSecretName = 'IcaSecretsPortal'; // pragma: allowlist secret

export const fileManagerIngestRoleName = 'orcabus-file-manager-ingest-role';
export const dataMoverRoleName = 'orcabus-data-mover-role';

/*
Resources required for BaseSpace TES stack
Expand Down
54 changes: 54 additions & 0 deletions config/stacks/dataMigrate.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import {
AppStage,
dataMoverRoleName,
fileManagerInventoryBucket,
icav2ArchiveAnalysisBucket,
icav2ArchiveFastqBucket,
icav2PipelineCacheBucket,
logsApiGatewayConfig,
oncoanalyserBucket,
vpcProps,
} from '../constants';
import { DataMigrateStackProps } from '../../lib/workload/stateless/stacks/data-migrate/deploy/stack';

export const getDataMigrateStackProps = (stage: AppStage): DataMigrateStackProps => {
let readFromBuckets = [];
let deleteFromBuckets = [];
let writeToBuckets = [];
switch (stage) {
case AppStage.BETA:
// For dev/staging we can write to and read from the same set of buckets.
readFromBuckets = [oncoanalyserBucket[stage], icav2PipelineCacheBucket[stage]];
deleteFromBuckets = [oncoanalyserBucket[stage], icav2PipelineCacheBucket[stage]];

// For dev additionally, write to the filemanager inventory bucket for testing.
writeToBuckets = [
oncoanalyserBucket[stage],
icav2PipelineCacheBucket[stage],
fileManagerInventoryBucket[stage],
];
break;
case AppStage.GAMMA:
readFromBuckets = [oncoanalyserBucket[stage], icav2PipelineCacheBucket[stage]];
deleteFromBuckets = [oncoanalyserBucket[stage], icav2PipelineCacheBucket[stage]];

writeToBuckets = [oncoanalyserBucket[stage], icav2PipelineCacheBucket[stage]];
break;
case AppStage.PROD:
readFromBuckets = [oncoanalyserBucket[stage], icav2PipelineCacheBucket[stage]];
deleteFromBuckets = [oncoanalyserBucket[stage], icav2PipelineCacheBucket[stage]];

// For prod, we only allow writing to the archive buckets, nothing else.
writeToBuckets = [icav2ArchiveAnalysisBucket[stage], icav2ArchiveFastqBucket[stage]];
break;
}

return {
vpcProps,
dataMoverRoleName,
deleteFromBuckets,
readFromBuckets,
writeToBuckets,
logRetention: logsApiGatewayConfig[stage].retention,
};
};
18 changes: 12 additions & 6 deletions config/stacks/fileManager.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,34 @@
import { FilemanagerConfig } from '../../lib/workload/stateless/stacks/filemanager/deploy/stack';
import {
AppStage,
cognitoApiGatewayConfig,
computeSecurityGroupName,
corsAllowOrigins,
databasePort,
dbClusterEndpointHostParameterName,
eventSourceQueueName,
vpcProps,
oncoanalyserBucket,
icav2PipelineCacheBucket,
fileManagerIngestRoleName,
fileManagerInventoryBucket,
icav2PipelineCacheBucket,
logsApiGatewayConfig,
cognitoApiGatewayConfig,
corsAllowOrigins,
oncoanalyserBucket,
vpcProps,
} from '../constants';

export const getFileManagerStackProps = (stage: AppStage): FilemanagerConfig => {
const inventorySourceBuckets = [];
if (stage == AppStage.BETA) {
inventorySourceBuckets.push(fileManagerInventoryBucket[stage]);
}

return {
securityGroupName: computeSecurityGroupName,
vpcProps,
eventSourceQueueName: eventSourceQueueName,
databaseClusterEndpointHostParameter: dbClusterEndpointHostParameterName,
port: databasePort,
migrateDatabase: true,
inventorySourceBuckets: ['filemanager-inventory-test'],
inventorySourceBuckets,
eventSourceBuckets: [oncoanalyserBucket[stage], icav2PipelineCacheBucket[stage]],
fileManagerRoleName: fileManagerIngestRoleName,
apiGatewayCognitoProps: {
Expand Down
5 changes: 5 additions & 0 deletions lib/workload/stateless/stacks/data-migrate/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
deploy
.gitignore
README.md
.ruff_cache
response.josn
2 changes: 2 additions & 0 deletions lib/workload/stateless/stacks/data-migrate/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.ruff_cache
response.json
10 changes: 10 additions & 0 deletions lib/workload/stateless/stacks/data-migrate/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
FROM public.ecr.aws/docker/library/python:3.13

RUN apt update -y && apt install -y awscli && pip3 install poetry

WORKDIR /app

COPY . .
RUN poetry install --no-interaction --no-root

ENTRYPOINT ["poetry", "run", "dm"]
16 changes: 16 additions & 0 deletions lib/workload/stateless/stacks/data-migrate/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
.PHONY: *

install:
@poetry update

lint: install
@poetry run ruff format .

check: lint
@poetry run ruff check .

dm: install
@poetry run dm $(COMMAND)

clean:
rm -rf data && rm -rf .ruff_cache
37 changes: 37 additions & 0 deletions lib/workload/stateless/stacks/data-migrate/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Data migrate

A service to migrate data between locations.

## Data mover

Locally, use the data mover to move or copy data between locations:

```sh
poetry run dm move --source <SOURCE> --destination <DESTINATION>
```

This command is also deployed as a fargate task triggered by step functions.
The step functions expects as input a JSON which specifies the command (move or copy),
a source and a destination. For example, to move a specified `portalRunId` into the archive
bucket (this is probably easier in the step functions console):

```sh
export ARN=$(aws stepfunctions list-state-machines | jq -r '.stateMachines | .[] | select(.name == "orcabus-data-migrate-mover") | .stateMachineArn')
export COMMAND="move"
export SOURCE="s3://umccr-temp-dev/move_test_2"
export DESTINATION="s3://filemanager-inventory-test/move_test_2"
aws stepfunctions start-execution --state-machine-arn $ARN --input "{\"command\" : \"$COMMAND\", \"source\": \"$SOURCE\", \"destination\": \"$DESTINATION\" }"
```

## Local development

This project uses [poetry] to manage dependencies.

Run the linter and formatter:

```
make check
```

[poetry]: https://python-poetry.org/
[env-example]: .env.example
Empty file.
66 changes: 66 additions & 0 deletions lib/workload/stateless/stacks/data-migrate/data_mover/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import logging
import sys

import click

from data_mover.data_mover import DataMover

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)


@click.group()
def cli():
pass


@cli.command()
@click.option(
"--source",
required=True,
help="The source to copy from.",
)
@click.option(
"--destination",
required=True,
help="The destination to copy to.",
)
def move(source, destination):
"""
Copy files from the source to the destination and delete the source if successful.
This command calls `aws s3 sync` directly, so it expects the AWS cli to be installed.
"""
data_mover = DataMover(source, destination, logger=logger)
data_mover.sync()
data_mover.delete()
data_mover.send_output()


@cli.command()
@click.option(
"--source",
required=True,
help="The source to copy from.",
)
@click.option(
"--destination",
required=True,
help="The destination to copy to.",
)
def copy(source, destination):
"""
Copy files from the source to the destination and keep the source if successful.
This command calls `aws s3 sync` directly, so it expects the AWS cli to be installed.
"""
data_mover = DataMover(source, destination, logger=logger)
data_mover.sync()
data_mover.send_output()


if __name__ == "__main__":
try:
cli(standalone_mode=False)
sys.exit(0)
except Exception as e:
DataMover.send_failure(str(e))
sys.exit(1)
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import json
import logging
import os
import subprocess

import boto3
from mypy_boto3_stepfunctions import SFNClient


class DataMover:
"""
A class to manage moving data.
"""

def __init__(
self,
source: str,
destination: str,
repeat: int = 2,
# 12 hours
timeout: int = 43200,
logger: logging.Logger = logging.getLogger(__name__),
):
self.source = source
self.destination = destination
self.repeat = repeat
self.timeout = timeout
self.logger = logger
self.output = ""

def sync(self):
"""
Sync destination and source.
"""
self.logger.info(
f"syncing {self.repeat} times from {self.source} to {self.destination}"
)

out = None
for _ in range(self.repeat):
out = subprocess.run(
["aws", "s3", "sync", self.source, self.destination],
check=True,
# 1 day
timeout=self.timeout,
)
self.logger.info(out.stdout)

self.output += out.stdout or ""

if out.stdout is not None:
raise Exception("failed to sync - non-empty output")

def delete(self):
"""
Delete the source.
"""
self.logger.info(f"deleting files from {self.source}")

out = subprocess.run(
["aws", "s3", "rm", "--recursive", self.source],
check=True,
# 1 day
timeout=self.timeout,
)
self.logger.info(out.stdout)

self.output += out.stdout or ""

def send_output(self):
"""
Send successful task response with the output.
"""
task_token = os.getenv("DM_TASK_TOKEN")
if task_token is not None:
client: SFNClient = boto3.client("stepfunctions")
client.send_task_success(
taskToken=task_token, output=json.dumps(self.output)
)

@staticmethod
def send_failure(error: str):
"""
Send a failed task response.
"""
task_token = os.getenv("DM_TASK_TOKEN")
if task_token is not None:
client: SFNClient = boto3.client("stepfunctions")
client.send_task_failure(taskToken=task_token, error=error)
Loading

0 comments on commit 41efd8c

Please sign in to comment.