Skip to content

Commit

Permalink
Add export URL & ETL version into completion table
Browse files Browse the repository at this point in the history
This should help track how we exported and processed the data.
  • Loading branch information
mikix committed Oct 15, 2024
1 parent 6c62a53 commit 91bc7c6
Show file tree
Hide file tree
Showing 42 changed files with 86 additions and 55 deletions.
2 changes: 2 additions & 0 deletions cumulus_etl/completion/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ def completion_schema() -> pyarrow.Schema:
# datetime) would then require conversion to and fro, it's easier to
# just mirror our FHIR tables and use strings for timestamps.
pyarrow.field("export_time", pyarrow.string()),
pyarrow.field("export_url", pyarrow.string()),
pyarrow.field("etl_version", pyarrow.string()),
]
)

Expand Down
1 change: 1 addition & 0 deletions cumulus_etl/etl/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ async def etl_main(args: argparse.Namespace) -> None:
tasks=[t.name for t in selected_tasks],
export_group_name=export_group_name,
export_datetime=export_datetime,
export_url=loader_results.export_url,
deleted_ids=loader_results.deleted_ids,
)
common.write_json(config.path_config(), config.as_json(), indent=4)
Expand Down
3 changes: 3 additions & 0 deletions cumulus_etl/etl/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def __init__(
tasks: list[str] | None = None,
export_group_name: str | None = None,
export_datetime: datetime.datetime | None = None,
export_url: str | None = None,
deleted_ids: dict[str, set[str]] | None = None,
):
self._dir_input_orig = dir_input_orig
Expand All @@ -51,6 +52,7 @@ def __init__(
self.tasks = tasks or []
self.export_group_name = export_group_name
self.export_datetime = export_datetime
self.export_url = export_url
self.deleted_ids = deleted_ids or {}

# initialize format class
Expand Down Expand Up @@ -82,6 +84,7 @@ def as_json(self):
"tasks": ",".join(self.tasks),
"export_group_name": self.export_group_name,
"export_timestamp": self.export_datetime and self.export_datetime.isoformat(),
"export_url": self.export_url,
}


Expand Down
3 changes: 3 additions & 0 deletions cumulus_etl/etl/tasks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import rich.table
import rich.text

import cumulus_etl
from cumulus_etl import cli_utils, common, completion, deid, formats, store
from cumulus_etl.etl import config
from cumulus_etl.etl.tasks import batching
Expand Down Expand Up @@ -272,6 +273,8 @@ def _update_completion_table(self) -> None:
"table_name": output.get_name(self),
"group_name": self.task_config.export_group_name,
"export_time": self.task_config.export_datetime.isoformat(),
"export_url": self.task_config.export_url,
"etl_version": cumulus_etl.__version__,
}
for output in self.outputs
if not output.get_name(self).startswith("etl__")
Expand Down
1 change: 1 addition & 0 deletions cumulus_etl/loaders/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def path(self) -> str:
# and the time when it was exported ("transactionTime" in bulk-export terms).
group_name: str | None = None
export_datetime: datetime.datetime | None = None
export_url: str | None = None

# A list of resource IDs that should be deleted from the output tables.
# This is a map of resource -> set of IDs like {"Patient": {"A", "B"}}
Expand Down
1 change: 1 addition & 0 deletions cumulus_etl/loaders/fhir/bulk_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def __init__(
# Public properties, to be read after the export:
self.export_datetime = None
self.group_name = fhir.parse_group_from_url(self._url)
self.export_url = self._url

def format_kickoff_url(
self,
Expand Down
2 changes: 2 additions & 0 deletions cumulus_etl/loaders/fhir/export_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class NoLogs(LogParsingError):
def __init__(self, root: store.Root):
self.group_name: str = None
self.export_datetime: datetime.datetime = None
self.export_url: str = None

self._parse(root, self._find(root))

Expand All @@ -67,6 +68,7 @@ def _parse(self, root: store.Root, path: str) -> None:
def _parse_kickoff(self, row: dict) -> None:
details = row["eventDetail"]
self.group_name = fhir.parse_group_from_url(details["exportUrl"])
self.export_url = details["exportUrl"]

def _parse_status_complete(self, row: dict) -> None:
details = row["eventDetail"]
Expand Down
48 changes: 25 additions & 23 deletions cumulus_etl/loaders/fhir/ndjson_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,30 +40,16 @@ def __init__(
async def load_all(self, resources: list[str]) -> base.LoaderResults:
# Are we doing a bulk FHIR export from a server?
if self.root.protocol in ["http", "https"]:
results = await self.load_from_bulk_export(resources)
input_root = store.Root(results.path)
bulk_dir = await self.load_from_bulk_export(resources)
input_root = store.Root(bulk_dir.name)
else:
if self.export_to or self.since or self.until or self.resume:
errors.fatal(
"You provided FHIR bulk export parameters but did not provide a FHIR server",
errors.ARGS_CONFLICT,
)

results = base.LoaderResults(directory=self.root.path)
input_root = self.root

# Parse logs for export information
try:
parser = BulkExportLogParser(input_root)
results.group_name = parser.group_name
results.export_datetime = parser.export_datetime
except BulkExportLogParser.LogParsingError:
# Once we require group name & export datetime, we should warn about this.
# For now, just ignore any errors.
pass

results.deleted_ids = self.read_deleted_ids(input_root)

# Copy the resources we need from the remote directory (like S3 buckets) to a local one.
#
# We do this even if the files are local, because the next step in our pipeline is the MS deid tool,
Expand All @@ -78,13 +64,12 @@ async def load_all(self, resources: list[str]) -> base.LoaderResults:
filenames = common.ls_resources(input_root, set(resources), warn_if_empty=True)
for filename in filenames:
input_root.get(filename, f"{tmpdir.name}/")
results.directory = tmpdir

return results
return self.read_loader_results(input_root, tmpdir)

async def load_from_bulk_export(
self, resources: list[str], prefer_url_resources: bool = False
) -> base.LoaderResults:
) -> common.Directory:
"""
Performs a bulk export and drops the results in an export dir.
Expand All @@ -109,12 +94,29 @@ async def load_from_bulk_export(
except errors.FatalError as exc:
errors.fatal(str(exc), errors.BULK_EXPORT_FAILED)

return base.LoaderResults(
directory=target_dir,
group_name=bulk_exporter.group_name,
export_datetime=bulk_exporter.export_datetime,
return target_dir

def read_loader_results(
self, input_root: store.Root, results_dir: common.Directory
) -> base.LoaderResults:
results = base.LoaderResults(
directory=results_dir,
deleted_ids=self.read_deleted_ids(input_root),
)

# Parse logs for export information
try:
parser = BulkExportLogParser(input_root)
results.group_name = parser.group_name
results.export_datetime = parser.export_datetime
results.export_url = parser.export_url
except BulkExportLogParser.LogParsingError:
# Once we require group name & export datetime, we should warn about this.
# For now, just ignore any errors.
pass

return results

def read_deleted_ids(self, root: store.Root) -> dict[str, set[str]]:
"""
Reads any deleted IDs that a bulk export gave us.
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "covid_symptom__nlp_results", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00"}
{"table_name": "covid_symptom__nlp_results", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "covid_symptom__nlp_results_term_exists", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00"}
{"table_name": "covid_symptom__nlp_results_term_exists", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "hftest__summary", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00"}
{"table_name": "hftest__summary", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "encounter", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00"}
{"table_name": "encounter", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "patient", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00"}
{"table_name": "patient", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "condition", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00"}
{"table_name": "condition", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "documentreference", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00"}
{"table_name": "documentreference", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
{"table_name": "medication", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00"}
{"table_name": "medicationrequest", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00"}
{"table_name": "medication", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
{"table_name": "medicationrequest", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "observation", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00"}
{"table_name": "observation", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "encounter", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00"}
{"table_name": "encounter", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "patient", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00"}
{"table_name": "patient", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "condition", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00"}
{"table_name": "condition", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "documentreference", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00"}
{"table_name": "documentreference", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
{"table_name": "medication", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00"}
{"table_name": "medicationrequest", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00"}
{"table_name": "medication", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
{"table_name": "medicationrequest", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "observation", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00"}
{"table_name": "observation", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "procedure", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00"}
{"table_name": "procedure", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "servicerequest", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00"}
{"table_name": "servicerequest", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "encounter", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00"}
{"table_name": "encounter", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "patient", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00"}
{"table_name": "patient", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "allergyintolerance", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00"}
{"table_name": "allergyintolerance", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "condition", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00"}
{"table_name": "condition", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "device", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00"}
{"table_name": "device", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "diagnosticreport", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00"}
{"table_name": "diagnosticreport", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "documentreference", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00"}
{"table_name": "documentreference", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "immunization", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00"}
{"table_name": "immunization", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
{"table_name": "medication", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00"}
{"table_name": "medicationrequest", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00"}
{"table_name": "medication", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
{"table_name": "medicationrequest", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "observation", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00"}
{"table_name": "observation", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "procedure", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00"}
{"table_name": "procedure", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"table_name": "servicerequest", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00"}
{"table_name": "servicerequest", "group_name": "test-group", "export_time": "2020-10-13T12:00:20-05:00", "etl_version": "1.0.0+test"}
2 changes: 2 additions & 0 deletions tests/etl/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ def setUp(self) -> None:
os.makedirs(self.phi_dir)
self.json_file_count = 0

self.export_url = "https://example.com/Group/test-group/$export"
self.job_config = JobConfig(
self.input_dir,
self.input_dir,
Expand All @@ -138,6 +139,7 @@ def setUp(self) -> None:
export_datetime=datetime.datetime(
2012, 10, 10, 5, 30, 12, tzinfo=datetime.timezone.utc
),
export_url=self.export_url,
)

def make_formatter(dbname: str, **kwargs):
Expand Down
1 change: 1 addition & 0 deletions tests/etl/test_etl_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ async def test_serialization(self):
"tasks": "patient,condition",
"export_group_name": "test-group",
"export_timestamp": "2020-10-13T12:00:20-05:00",
"export_url": None,
},
config_file,
)
Expand Down
8 changes: 8 additions & 0 deletions tests/etl/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ async def test_encounter_completion(self):
"table_name": "encounter",
"group_name": "test-group",
"export_time": "2012-10-10T05:30:12+00:00",
"export_url": self.export_url,
"etl_version": "1.0.0+test",
}
],
comp_batch.rows,
Expand Down Expand Up @@ -272,11 +274,15 @@ async def test_medication_completion(self):
"table_name": "medication",
"group_name": "test-group",
"export_time": "2012-10-10T05:30:12+00:00",
"export_url": self.export_url,
"etl_version": "1.0.0+test",
},
{
"table_name": "medicationrequest",
"group_name": "test-group",
"export_time": "2012-10-10T05:30:12+00:00",
"export_url": self.export_url,
"etl_version": "1.0.0+test",
},
],
comp_batch.rows,
Expand Down Expand Up @@ -322,6 +328,8 @@ async def test_allow_empty_group(self):
"table_name": "device",
"group_name": "",
"export_time": "2012-10-10T05:30:12+00:00",
"export_url": self.export_url,
"etl_version": "1.0.0+test",
}
],
comp_format.write_records.call_args[0][0].rows,
Expand Down
2 changes: 2 additions & 0 deletions tests/loaders/ndjson/test_bulk_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,8 @@ async def test_successful_etl_bulk_export(self):
"table_name": "patient",
"group_name": "MyGroup",
"export_time": "2015-02-07T13:28:17+02:00",
"export_url": f"{self.fhir_url}/$export?_type=Patient",
"etl_version": "1.0.0+test",
},
common.read_json(f"{tmpdir}/output/etl__completion/etl__completion.000.ndjson"),
)
Expand Down
3 changes: 3 additions & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ def setUp(self):
# Make it easy to grab test data, regardless of where the test is
self.datadir = os.path.join(os.path.dirname(__file__), "data")

# Lock our version in place (it's referenced in some static files)
self.patch("cumulus_etl.__version__", new="1.0.0+test")

# Several tests involve timestamps in some form, so just pick a standard time for all tests.
traveller = time_machine.travel(_FROZEN_TIME, tick=False)
self.addCleanup(traveller.stop)
Expand Down

0 comments on commit 91bc7c6

Please sign in to comment.