Skip to content

Commit

Permalink
Merge pull request #754 from umccr/fix/mover-payload-limit
Browse files Browse the repository at this point in the history
fix(data-migrate): add structured output to step functions
  • Loading branch information
mmalenic authored Dec 5, 2024
2 parents 2bcb605 + 917e37d commit bb61f87
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 22 deletions.
6 changes: 4 additions & 2 deletions lib/workload/stateless/stacks/data-migrate/data_mover/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from data_mover.data_mover import DataMover

logging.basicConfig()
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

Expand All @@ -15,6 +16,7 @@ def main():
sys.exit(0)
except Exception as e:
DataMover.send_failure(str(e))
logger.error(str(e))
sys.exit(1)


Expand Down Expand Up @@ -42,7 +44,7 @@ def move(source, destination):
data_mover = DataMover(source, destination, logger=logger)
data_mover.sync()
data_mover.delete()
data_mover.send_output()
data_mover.send_output(command="move")


@cli.command()
Expand All @@ -63,7 +65,7 @@ def copy(source, destination):
"""
data_mover = DataMover(source, destination, logger=logger)
data_mover.sync()
data_mover.send_output()
data_mover.send_output(command="copy")


if __name__ == "__main__":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import os
import subprocess
from typing import Literal

import boto3
from mypy_boto3_stepfunctions import SFNClient
Expand All @@ -26,7 +27,6 @@ def __init__(
self.repeat = repeat
self.timeout = timeout
self.logger = logger
self.output = ""

def sync(self):
"""
Expand All @@ -45,9 +45,7 @@ def sync(self):
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
self.logger.info(str(out.stdout))

self.output += str(out.stdout) or ""
self.logger.info(out.stdout.decode())

if out.stdout != b"":
raise Exception("failed to sync - non-empty output")
Expand All @@ -65,19 +63,24 @@ def delete(self):
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
self.logger.info(str(out.stdout))

self.output += str(out.stdout) or ""
self.logger.info(out.stdout.decode())

def send_output(self):
def send_output(self, command: Literal["copy", "move"] = "copy"):
"""
Send successful task response with the output.
"""
task_token = os.getenv("DM_TASK_TOKEN")
if task_token is not None:
client: SFNClient = boto3.client("stepfunctions")
client.send_task_success(
taskToken=task_token, output=json.dumps(self.output)
taskToken=task_token,
output=json.dumps(
{
"command": command,
"source": self.source,
"destination": self.destination,
}
),
)

@staticmethod
Expand Down
22 changes: 11 additions & 11 deletions lib/workload/stateless/stacks/data-migrate/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit bb61f87

Please sign in to comment.