From eb1153f60fc8b7d85eaec8446861e4c474dbdfee Mon Sep 17 00:00:00 2001 From: Michael Terry Date: Wed, 23 Oct 2024 12:32:50 -0400 Subject: [PATCH] feat: add new `init` subcommand This will create an empty table for each basic resource that we support. This is a convenience command to get up and running faster or to refresh your database schemas in case we change them in the future. --- cumulus_etl/__init__.py | 2 +- cumulus_etl/cli.py | 7 +++- cumulus_etl/cli_utils.py | 9 ++++ cumulus_etl/etl/cli.py | 7 +--- cumulus_etl/etl/init/__init__.py | 3 ++ cumulus_etl/etl/init/cli.py | 70 ++++++++++++++++++++++++++++++++ docs/setup/sample-runs.md | 23 +++++++++-- tests/init/__init__.py | 0 tests/init/test_init_cli.py | 41 +++++++++++++++++++ 9 files changed, 150 insertions(+), 12 deletions(-) create mode 100644 cumulus_etl/etl/init/__init__.py create mode 100644 cumulus_etl/etl/init/cli.py create mode 100644 tests/init/__init__.py create mode 100644 tests/init/test_init_cli.py diff --git a/cumulus_etl/__init__.py b/cumulus_etl/__init__.py index 48451457..c45782d4 100644 --- a/cumulus_etl/__init__.py +++ b/cumulus_etl/__init__.py @@ -1,3 +1,3 @@ """Turns FHIR data into de-identified & aggregated records""" -__version__ = "1.4.0" +__version__ = "1.5.0" diff --git a/cumulus_etl/cli.py b/cumulus_etl/cli.py index 35f61e42..02c23957 100644 --- a/cumulus_etl/cli.py +++ b/cumulus_etl/cli.py @@ -10,7 +10,7 @@ import rich.logging from cumulus_etl import common, etl, export, upload_notes -from cumulus_etl.etl import convert +from cumulus_etl.etl import convert, init class Command(enum.Enum): @@ -23,6 +23,7 @@ class Command(enum.Enum): CONVERT = "convert" ETL = "etl" EXPORT = "export" + INIT = "init" UPLOAD_NOTES = "upload-notes" # Why isn't this part of Enum directly...? @@ -70,13 +71,15 @@ async def main(argv: list[str]) -> None: run_method = convert.run_convert elif subcommand == Command.EXPORT.value: run_method = export.run_export + elif subcommand == Command.INIT.value: + run_method = init.run_init else: parser.description = "Extract, transform, and load FHIR data." if not subcommand: # Add a note about other subcommands we offer, and tell argparse not to wrap our formatting parser.formatter_class = argparse.RawDescriptionHelpFormatter parser.description += "\n\nother commands available:\n" - parser.description += " convert\n export\n upload-notes" + parser.description += " convert\n export\n init\n upload-notes" run_method = etl.run_etl with tempfile.TemporaryDirectory() as tempdir: diff --git a/cumulus_etl/cli_utils.py b/cumulus_etl/cli_utils.py index 5402df96..08055993 100644 --- a/cumulus_etl/cli_utils.py +++ b/cumulus_etl/cli_utils.py @@ -69,6 +69,15 @@ def add_nlp(parser: argparse.ArgumentParser): return group +def add_output_format(parser: argparse.ArgumentParser) -> None: + parser.add_argument( + "--output-format", + default="deltalake", + choices=["deltalake", "ndjson"], + help="output format (default is deltalake)", + ) + + def add_task_selection(parser: argparse.ArgumentParser): task = parser.add_argument_group("task selection") task.add_argument( diff --git a/cumulus_etl/etl/cli.py b/cumulus_etl/etl/cli.py index 7f0f65e3..3569d783 100644 --- a/cumulus_etl/etl/cli.py +++ b/cumulus_etl/etl/cli.py @@ -102,12 +102,7 @@ def define_etl_parser(parser: argparse.ArgumentParser) -> None: choices=["i2b2", "ndjson"], help="input format (default is ndjson)", ) - parser.add_argument( - "--output-format", - default="deltalake", - choices=["deltalake", "ndjson"], - help="output format (default is deltalake)", - ) + cli_utils.add_output_format(parser) parser.add_argument( "--batch-size", type=int, diff --git a/cumulus_etl/etl/init/__init__.py b/cumulus_etl/etl/init/__init__.py new file mode 100644 index 00000000..f2bc7d8e --- /dev/null +++ b/cumulus_etl/etl/init/__init__.py @@ -0,0 +1,3 @@ +"""Subcommand to initialize basic tables""" + +from .cli import run_init diff --git a/cumulus_etl/etl/init/cli.py b/cumulus_etl/etl/init/cli.py new file mode 100644 index 00000000..705916a1 --- /dev/null +++ b/cumulus_etl/etl/init/cli.py @@ -0,0 +1,70 @@ +""" +Initializes basic resource tables. + +Creates the tables if they don't exist and pushes up a basic schema. +""" + +import argparse +from collections.abc import Iterable + +from cumulus_etl import cli_utils, formats, store +from cumulus_etl.etl import tasks +from cumulus_etl.etl.tasks import task_factory + + +def define_init_parser(parser: argparse.ArgumentParser) -> None: + parser.usage = "%(prog)s [OPTION]... OUTPUT" + parser.description = ( + "Initialize all basic output tables. " + "After this command is run, you will be ready to set up Cumulus Library. " + "This command is safe to run multiple times on the same folder, " + "or even on an existing folder with data already in it." + ) + + parser.add_argument("dir_output", metavar="/path/to/output") + cli_utils.add_output_format(parser) + + cli_utils.add_aws(parser) + + +def get_task_tables() -> Iterable[tuple[type[tasks.EtlTask], tasks.OutputTable]]: + for task_class in task_factory.get_default_tasks(): + for output in task_class.outputs: + if not output.get_name(task_class).startswith("etl__"): + yield task_class, output + + +async def init_main(args: argparse.Namespace) -> None: + """Main logic for initialization""" + # record filesystem options like --s3-region before creating Roots + store.set_user_fs_options(vars(args)) + + output_root = store.Root(args.dir_output) + + with cli_utils.make_progress_bar() as progress: + # Set up progress bar + total_steps = len(list(get_task_tables())) + 1 # extra 1 is initializing the formatter + task = progress.add_task("Initializing tables", total=total_steps) + + # Initialize formatter (which can take a moment with deltalake) + format_class = formats.get_format_class(args.output_format) + format_class.initialize_class(output_root) + progress.update(task, advance=1) + + # Create an empty JobConfig/ folder, so that the 'convert' command will recognize this + # folder as an ETL folder. + output_root.makedirs(output_root.joinpath("JobConfig")) + + # Now iterate through, pushing to each output table + for task_class, output in get_task_tables(): + batch = task_class.make_batch_from_rows(output.get_resource_type(task_class), []) + formatter = format_class(output_root, output.get_name(task_class)) + formatter.write_records(batch) + progress.update(task, advance=1) + + +async def run_init(parser: argparse.ArgumentParser, argv: list[str]) -> None: + """Parse arguments and do the work""" + define_init_parser(parser) + args = parser.parse_args(argv) + await init_main(args) diff --git a/docs/setup/sample-runs.md b/docs/setup/sample-runs.md index 68956fc2..282e167d 100644 --- a/docs/setup/sample-runs.md +++ b/docs/setup/sample-runs.md @@ -135,17 +135,33 @@ Congratulations! You've run your first Cumulus ETL process. The first of many! ### AWS Test Run -Let's do the same thing, but now pointing at S3 buckets. +Let's do that again, but now pointing at S3 buckets. This assumes you've followed the [S3 setup guide](aws.md). +We didn't do this above, but now that we're getting more serious, +let's run `cumulus-etl init` first, which will create all the basic tables for us. + When using S3 buckets, you'll need to set the `--s3-region` argument to the correct region. -Run this command, but replace: +Run the command below, but replace: * `us-east-2` with the region your buckets are in * `99999999999` with your account ID * `my-cumulus-prefix` with the bucket prefix you used when setting up AWS * and `subdir1` with the ETL subdirectory you used when setting up AWS +```sh +docker compose -f $CUMULUS_REPO_PATH/compose.yaml \ + run --rm \ + cumulus-etl init \ + --s3-region=us-east-2 \ + s3://my-cumulus-prefix-99999999999-us-east-2/subdir1/ +``` + +This will create empty tables for all the core resources that Cumulus works with. +You should now even be able to see some (very small) output files in your S3 buckets! + +Let's go one step further and put some actual (fake) test data in there too. + ```sh docker compose -f $CUMULUS_REPO_PATH/compose.yaml \ run --volume $CUMULUS_REPO_PATH:/cumulus-etl --rm \ @@ -156,7 +172,8 @@ docker compose -f $CUMULUS_REPO_PATH/compose.yaml \ s3://my-cumulus-prefix-phi-99999999999-us-east-2/subdir1/ ``` -You should now be able to see some (very small) output files in your S3 buckets! +(Though, note now that your S3 bucket has test data in it. +Before you put any real data in there, you should delete the S3 folder and start fresh.) Obviously, this was just example data. But if you'd prefer to keep PHI off of AWS when you deploy for real, diff --git a/tests/init/__init__.py b/tests/init/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/init/test_init_cli.py b/tests/init/test_init_cli.py new file mode 100644 index 00000000..f7657694 --- /dev/null +++ b/tests/init/test_init_cli.py @@ -0,0 +1,41 @@ +"""Tests for etl/init/cli.py""" + +import os + +import ddt + +from cumulus_etl import cli, common +from tests import utils + + +@ddt.ddt +class TestInit(utils.AsyncTestCase): + """Tests for high-level init support.""" + + def setUp(self): + super().setUp() + self.output_path = self.make_tempdir() + + async def run_init(self, output_path: str | None = None) -> None: + args = [ + "init", + output_path or self.output_path, + "--output-format=ndjson", + ] + await cli.main(args) + + async def test_happy_path(self): + """Verify that we can do a simple init""" + await self.run_init() + + # Do some spot checks + dirs = set(os.listdir(self.output_path)) + self.assertIn("device", dirs) + self.assertIn("patient", dirs) + self.assertIn("medicationrequest", dirs) + self.assertIn("medication", dirs) # secondary table + self.assertIn("JobConfig", dirs) # so that the dir is flagged as an ETL dir by 'convert' + + # Are folder contents what we expect? + self.assertEqual(["patient.000.ndjson"], os.listdir(f"{self.output_path}/patient")) + self.assertEqual("", common.read_text(f"{self.output_path}/patient/patient.000.ndjson"))