Skip to content

Commit

Permalink
feat: add new init subcommand
Browse files Browse the repository at this point in the history
This will create an empty table for each basic resource that we
support. This is a convenience command to get up and running faster
or to refresh your database schemas in case we change them in the
future.
  • Loading branch information
mikix committed Oct 23, 2024
1 parent 3256616 commit eb1153f
Show file tree
Hide file tree
Showing 9 changed files with 150 additions and 12 deletions.
2 changes: 1 addition & 1 deletion cumulus_etl/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Turns FHIR data into de-identified & aggregated records"""

__version__ = "1.4.0"
__version__ = "1.5.0"
7 changes: 5 additions & 2 deletions cumulus_etl/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import rich.logging

from cumulus_etl import common, etl, export, upload_notes
from cumulus_etl.etl import convert
from cumulus_etl.etl import convert, init


class Command(enum.Enum):
Expand All @@ -23,6 +23,7 @@ class Command(enum.Enum):
CONVERT = "convert"
ETL = "etl"
EXPORT = "export"
INIT = "init"
UPLOAD_NOTES = "upload-notes"

# Why isn't this part of Enum directly...?
Expand Down Expand Up @@ -70,13 +71,15 @@ async def main(argv: list[str]) -> None:
run_method = convert.run_convert
elif subcommand == Command.EXPORT.value:
run_method = export.run_export
elif subcommand == Command.INIT.value:
run_method = init.run_init
else:
parser.description = "Extract, transform, and load FHIR data."
if not subcommand:
# Add a note about other subcommands we offer, and tell argparse not to wrap our formatting
parser.formatter_class = argparse.RawDescriptionHelpFormatter
parser.description += "\n\nother commands available:\n"
parser.description += " convert\n export\n upload-notes"
parser.description += " convert\n export\n init\n upload-notes"
run_method = etl.run_etl

with tempfile.TemporaryDirectory() as tempdir:
Expand Down
9 changes: 9 additions & 0 deletions cumulus_etl/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ def add_nlp(parser: argparse.ArgumentParser):
return group


def add_output_format(parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"--output-format",
default="deltalake",
choices=["deltalake", "ndjson"],
help="output format (default is deltalake)",
)


def add_task_selection(parser: argparse.ArgumentParser):
task = parser.add_argument_group("task selection")
task.add_argument(
Expand Down
7 changes: 1 addition & 6 deletions cumulus_etl/etl/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,7 @@ def define_etl_parser(parser: argparse.ArgumentParser) -> None:
choices=["i2b2", "ndjson"],
help="input format (default is ndjson)",
)
parser.add_argument(
"--output-format",
default="deltalake",
choices=["deltalake", "ndjson"],
help="output format (default is deltalake)",
)
cli_utils.add_output_format(parser)
parser.add_argument(
"--batch-size",
type=int,
Expand Down
3 changes: 3 additions & 0 deletions cumulus_etl/etl/init/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""Subcommand to initialize basic tables"""

from .cli import run_init
70 changes: 70 additions & 0 deletions cumulus_etl/etl/init/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""
Initializes basic resource tables.
Creates the tables if they don't exist and pushes up a basic schema.
"""

import argparse
from collections.abc import Iterable

from cumulus_etl import cli_utils, formats, store
from cumulus_etl.etl import tasks
from cumulus_etl.etl.tasks import task_factory


def define_init_parser(parser: argparse.ArgumentParser) -> None:
parser.usage = "%(prog)s [OPTION]... OUTPUT"
parser.description = (
"Initialize all basic output tables. "
"After this command is run, you will be ready to set up Cumulus Library. "
"This command is safe to run multiple times on the same folder, "
"or even on an existing folder with data already in it."
)

parser.add_argument("dir_output", metavar="/path/to/output")
cli_utils.add_output_format(parser)

cli_utils.add_aws(parser)


def get_task_tables() -> Iterable[tuple[type[tasks.EtlTask], tasks.OutputTable]]:
for task_class in task_factory.get_default_tasks():
for output in task_class.outputs:
if not output.get_name(task_class).startswith("etl__"):
yield task_class, output


async def init_main(args: argparse.Namespace) -> None:
"""Main logic for initialization"""
# record filesystem options like --s3-region before creating Roots
store.set_user_fs_options(vars(args))

output_root = store.Root(args.dir_output)

with cli_utils.make_progress_bar() as progress:
# Set up progress bar
total_steps = len(list(get_task_tables())) + 1 # extra 1 is initializing the formatter
task = progress.add_task("Initializing tables", total=total_steps)

# Initialize formatter (which can take a moment with deltalake)
format_class = formats.get_format_class(args.output_format)
format_class.initialize_class(output_root)
progress.update(task, advance=1)

# Create an empty JobConfig/ folder, so that the 'convert' command will recognize this
# folder as an ETL folder.
output_root.makedirs(output_root.joinpath("JobConfig"))

# Now iterate through, pushing to each output table
for task_class, output in get_task_tables():
batch = task_class.make_batch_from_rows(output.get_resource_type(task_class), [])
formatter = format_class(output_root, output.get_name(task_class))
formatter.write_records(batch)
progress.update(task, advance=1)


async def run_init(parser: argparse.ArgumentParser, argv: list[str]) -> None:
"""Parse arguments and do the work"""
define_init_parser(parser)
args = parser.parse_args(argv)
await init_main(args)
23 changes: 20 additions & 3 deletions docs/setup/sample-runs.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,17 +135,33 @@ Congratulations! You've run your first Cumulus ETL process. The first of many!

### AWS Test Run

Let's do the same thing, but now pointing at S3 buckets.
Let's do that again, but now pointing at S3 buckets.
This assumes you've followed the [S3 setup guide](aws.md).

We didn't do this above, but now that we're getting more serious,
let's run `cumulus-etl init` first, which will create all the basic tables for us.

When using S3 buckets, you'll need to set the `--s3-region` argument to the correct region.

Run this command, but replace:
Run the command below, but replace:
* `us-east-2` with the region your buckets are in
* `99999999999` with your account ID
* `my-cumulus-prefix` with the bucket prefix you used when setting up AWS
* and `subdir1` with the ETL subdirectory you used when setting up AWS

```sh
docker compose -f $CUMULUS_REPO_PATH/compose.yaml \
run --rm \
cumulus-etl init \
--s3-region=us-east-2 \
s3://my-cumulus-prefix-99999999999-us-east-2/subdir1/
```

This will create empty tables for all the core resources that Cumulus works with.
You should now even be able to see some (very small) output files in your S3 buckets!

Let's go one step further and put some actual (fake) test data in there too.

```sh
docker compose -f $CUMULUS_REPO_PATH/compose.yaml \
run --volume $CUMULUS_REPO_PATH:/cumulus-etl --rm \
Expand All @@ -156,7 +172,8 @@ docker compose -f $CUMULUS_REPO_PATH/compose.yaml \
s3://my-cumulus-prefix-phi-99999999999-us-east-2/subdir1/
```

You should now be able to see some (very small) output files in your S3 buckets!
(Though, note now that your S3 bucket has test data in it.
Before you put any real data in there, you should delete the S3 folder and start fresh.)

Obviously, this was just example data.
But if you'd prefer to keep PHI off of AWS when you deploy for real,
Expand Down
Empty file added tests/init/__init__.py
Empty file.
41 changes: 41 additions & 0 deletions tests/init/test_init_cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Tests for etl/init/cli.py"""

import os

import ddt

from cumulus_etl import cli, common
from tests import utils


@ddt.ddt
class TestInit(utils.AsyncTestCase):
"""Tests for high-level init support."""

def setUp(self):
super().setUp()
self.output_path = self.make_tempdir()

async def run_init(self, output_path: str | None = None) -> None:
args = [
"init",
output_path or self.output_path,
"--output-format=ndjson",
]
await cli.main(args)

async def test_happy_path(self):
"""Verify that we can do a simple init"""
await self.run_init()

# Do some spot checks
dirs = set(os.listdir(self.output_path))
self.assertIn("device", dirs)
self.assertIn("patient", dirs)
self.assertIn("medicationrequest", dirs)
self.assertIn("medication", dirs) # secondary table
self.assertIn("JobConfig", dirs) # so that the dir is flagged as an ETL dir by 'convert'

# Are folder contents what we expect?
self.assertEqual(["patient.000.ndjson"], os.listdir(f"{self.output_path}/patient"))
self.assertEqual("", common.read_text(f"{self.output_path}/patient/patient.000.ndjson"))

0 comments on commit eb1153f

Please sign in to comment.