From 34c652ffcfda701ee8649402e89b2ec6bdc5cb10 Mon Sep 17 00:00:00 2001 From: Michael Terry Date: Fri, 12 Apr 2024 09:06:59 -0400 Subject: [PATCH] feat: use group-name/export-time from log file if present When loading ndjson from a folder, parse any sibling logs we find for export info like group/time. These are the current workflow assumptions this adds: - There will be a log.ndjson or log.*.ndjson file in the given folder. - There cannot be multiples (unless log.ndjson exists, in which case we always use that) - That log file will be for a single export. - e.g. We will generally grab the last "kickoff" event and ignore others. --- cumulus_etl/fhir/__init__.py | 9 +- cumulus_etl/fhir/fhir_utils.py | 24 ++++ cumulus_etl/loaders/fhir/bulk_export.py | 12 +- cumulus_etl/loaders/fhir/export_log.py | 99 ++++++++++++++++ cumulus_etl/loaders/fhir/ndjson_loader.py | 11 ++ tests/fhir/test_fhir_utils.py | 26 +++++ tests/{i2b2 => loaders}/__init__.py | 0 tests/loaders/i2b2/__init__.py | 0 tests/{ => loaders}/i2b2/test_i2b2_etl.py | 0 tests/{ => loaders}/i2b2/test_i2b2_loader.py | 0 .../i2b2/test_i2b2_oracle_connect.py | 0 .../i2b2/test_i2b2_oracle_extract.py | 0 .../i2b2/test_i2b2_oracle_query.py | 0 .../{ => loaders}/i2b2/test_i2b2_transform.py | 0 tests/loaders/ndjson/__init__.py | 0 .../{ => loaders/ndjson}/test_bulk_export.py | 0 tests/loaders/ndjson/test_log_parser.py | 106 ++++++++++++++++++ .../ndjson}/test_ndjson_loader.py | 49 +++++++- 18 files changed, 323 insertions(+), 13 deletions(-) create mode 100644 cumulus_etl/loaders/fhir/export_log.py rename tests/{i2b2 => loaders}/__init__.py (100%) create mode 100644 tests/loaders/i2b2/__init__.py rename tests/{ => loaders}/i2b2/test_i2b2_etl.py (100%) rename tests/{ => loaders}/i2b2/test_i2b2_loader.py (100%) rename tests/{ => loaders}/i2b2/test_i2b2_oracle_connect.py (100%) rename tests/{ => loaders}/i2b2/test_i2b2_oracle_extract.py (100%) rename tests/{ => loaders}/i2b2/test_i2b2_oracle_query.py (100%) rename tests/{ => loaders}/i2b2/test_i2b2_transform.py (100%) create mode 100644 tests/loaders/ndjson/__init__.py rename tests/{ => loaders/ndjson}/test_bulk_export.py (100%) create mode 100644 tests/loaders/ndjson/test_log_parser.py rename tests/{ => loaders/ndjson}/test_ndjson_loader.py (83%) diff --git a/cumulus_etl/fhir/__init__.py b/cumulus_etl/fhir/__init__.py index 9060125a..b5c51fe8 100644 --- a/cumulus_etl/fhir/__init__.py +++ b/cumulus_etl/fhir/__init__.py @@ -1,4 +1,11 @@ """Support for talking to FHIR servers & handling the FHIR spec""" from .fhir_client import FhirClient, create_fhir_client_for_cli -from .fhir_utils import download_reference, get_docref_note, parse_datetime, ref_resource, unref_resource +from .fhir_utils import ( + download_reference, + get_docref_note, + parse_datetime, + parse_group_from_url, + ref_resource, + unref_resource, +) diff --git a/cumulus_etl/fhir/fhir_utils.py b/cumulus_etl/fhir/fhir_utils.py index f96ba8b8..91eecfb7 100644 --- a/cumulus_etl/fhir/fhir_utils.py +++ b/cumulus_etl/fhir/fhir_utils.py @@ -4,6 +4,7 @@ import datetime import email.message import re +import urllib.parse import inscriptis @@ -109,6 +110,29 @@ def parse_datetime(value: str | None) -> datetime.datetime | None: return None +def parse_group_from_url(url: str) -> str: + """ + Parses the group out of a FHIR URL. + + These URLS look something like: + - https://hostname/root/Group/my-group <- group name of `my-group` + - https://hostname/root/Group/my-group/$export <- group name of `my-group` + - https://hostname/root <- no group name + """ + parsed = urllib.parse.urlparse(url) + if not parsed.scheme: + raise ValueError(f"Could not parse URL '{url}'") + + pieces = parsed.path.split("/Group/", 2) + match len(pieces): + case 2: + return pieces[1].split("/")[0] + case _: + # Global exports don't seem realistic, but if the user does do them, + # we'll use the empty string as the default group name for that. + return "" + + ###################################################################################################################### # # Resource downloading diff --git a/cumulus_etl/loaders/fhir/bulk_export.py b/cumulus_etl/loaders/fhir/bulk_export.py index 665283bc..21a25ac0 100644 --- a/cumulus_etl/loaders/fhir/bulk_export.py +++ b/cumulus_etl/loaders/fhir/bulk_export.py @@ -57,17 +57,7 @@ def __init__( # Public properties, to be read after the export: self.export_datetime = None - - # Parse the group out of the URL, which will look something like: - # - https://hostname/root/Group/my-group$export <- group name of `my-group` - # - https://hostname/root$export <- no group name, global export - if "/Group/" in self._url: - latter_half = self._url.split("/Group/", 2)[-1] - self.group_name = latter_half.split("/")[0] - else: - # Global exports don't seem realistic, but the user does do them, - # we'll use the empty string as the default group name for that... - self.group_name = "" + self.group_name = fhir.parse_group_from_url(self._url) async def export(self) -> None: """ diff --git a/cumulus_etl/loaders/fhir/export_log.py b/cumulus_etl/loaders/fhir/export_log.py new file mode 100644 index 00000000..0e8fae8b --- /dev/null +++ b/cumulus_etl/loaders/fhir/export_log.py @@ -0,0 +1,99 @@ +""" +Parsing for bulk export log files + +https://github.com/smart-on-fhir/bulk-data-client/wiki/Bulk-Data-Export-Log-Items +""" + +import datetime +import json +import os +import re + +from cumulus_etl import common, fhir, store + + +class BulkExportLogParser: + """ + Parses the log file generated by bulk exports. + + These are the assumptions we make: + - There will be a log.ndjson or log.*.ndjson file in the given folder. + - There cannot be multiples (unless log.ndjson exists, in which case we always use that) + - That log file will be for a single export. + - e.g. We will generally grab the last "kickoff" event and ignore others. + """ + + class LogParsingError(Exception): + pass + + class IncompleteLog(LogParsingError): + pass + + class MultipleLogs(LogParsingError): + pass + + class NoLogs(LogParsingError): + pass + + def __init__(self, root: store.Root): + self.group_name: str = None + self.export_datetime: datetime.datetime = None + + self._parse(self._find(root)) + + def _parse(self, path: str) -> None: + # Go through every row, looking for the events we care about. + # Note that we parse every kickoff event we hit, for example. + # So we'll end up with the latest one (which works for single-export + # log files with maybe a false start at the beginning). + try: + for row in common.read_ndjson(path): + match row.get("eventId"): + case "kickoff": + self._parse_kickoff(row) + case "status_complete": + self._parse_status_complete(row) + except (KeyError, json.JSONDecodeError) as exc: + raise self.IncompleteLog(f"Error parsing '{path}'") from exc + + if self.group_name is None: + raise self.IncompleteLog(f"No kickoff event found in '{path}'") + if self.export_datetime is None: + raise self.IncompleteLog(f"No status_complete event found in '{path}'") + + def _parse_kickoff(self, row: dict) -> None: + details = row["eventDetail"] + self.group_name = fhir.parse_group_from_url(details["exportUrl"]) + + def _parse_status_complete(self, row: dict) -> None: + details = row["eventDetail"] + self.export_datetime = datetime.datetime.fromisoformat(details["transactionTime"]) + + def _find(self, root: store.Root) -> str: + """Finds the log file inside the root""" + try: + paths = root.ls() + except FileNotFoundError as exc: + raise self.NoLogs("Folder does not exist") from exc + filenames = {os.path.basename(p): p for p in paths} + + # In the easy case, it's just sitting there at log.ndjson, + # which is the filename that bulk-data-client uses. + # Because this is the standard name, we prefer this and don't + # error out even if there are other log.something.ndjson names in + # the folder (see below). Maybe this is a symlink to the most recent... + if full_path := filenames.get("log.ndjson"): + return full_path + + # But possibly the user does some file renaming to manage different + # exports, so allow log.something.ndjson as well. (Much like we do + # for the input ndjson files.) + pattern = re.compile(r"log\..+\.ndjson") + log_files = list(filter(pattern.match, filenames.keys())) + match len(log_files): + case 0: + raise self.NoLogs("No log.ndjson file found") + case 1: + return filenames[log_files[0]] + case _: + raise self.MultipleLogs("Multiple log.*.ndjson files found") diff --git a/cumulus_etl/loaders/fhir/ndjson_loader.py b/cumulus_etl/loaders/fhir/ndjson_loader.py index 4f14f741..f5984870 100644 --- a/cumulus_etl/loaders/fhir/ndjson_loader.py +++ b/cumulus_etl/loaders/fhir/ndjson_loader.py @@ -6,6 +6,7 @@ from cumulus_etl import cli_utils, common, errors, fhir, store from cumulus_etl.loaders import base from cumulus_etl.loaders.fhir.bulk_export import BulkExporter +from cumulus_etl.loaders.fhir.export_log import BulkExportLogParser class FhirNdjsonLoader(base.Loader): @@ -47,6 +48,16 @@ async def load_all(self, resources: list[str]) -> common.Directory: "You provided FHIR bulk export parameters but did not provide a FHIR server", errors.ARGS_CONFLICT ) + # Parse logs for export information + try: + parser = BulkExportLogParser(self.root) + self.group_name = parser.group_name + self.export_datetime = parser.export_datetime + except BulkExportLogParser.LogParsingError: + # Once we require group name & export datetime, we should warn about this. + # For now, just ignore any errors. + pass + # Copy the resources we need from the remote directory (like S3 buckets) to a local one. # # We do this even if the files are local, because the next step in our pipeline is the MS deid tool, diff --git a/tests/fhir/test_fhir_utils.py b/tests/fhir/test_fhir_utils.py index fb15c5d7..d8638e91 100644 --- a/tests/fhir/test_fhir_utils.py +++ b/tests/fhir/test_fhir_utils.py @@ -72,6 +72,32 @@ def test_parse_datetime(self, input_value, expected_value): self.assertEqual(expected_value, parsed) +@ddt.ddt +class TestUrlParsing(utils.AsyncTestCase): + """Tests for URL parsing""" + + @ddt.data( + ("//host", ValueError), + ("https://host", ""), + ("https://host/root", ""), + ("https://Group/MyGroup", ""), # Group is hostname here + ("https://host/root/?key=/Group/Testing/", ""), + ("https://host/root/Group/MyGroup", "MyGroup"), + ("https://host/root/Group/MyGroup/", "MyGroup"), + ("https://host/Group/MyGroup/$export", "MyGroup"), + ("https://host/Group/MyGroup?key=value", "MyGroup"), + ("https://host/root/Group/Group/", "Group"), + ) + @ddt.unpack + def test_parse_group_from_url(self, url, expected_group): + if isinstance(expected_group, str): + group = fhir.parse_group_from_url(url) + assert expected_group == group + else: + with self.assertRaises(expected_group): + fhir.parse_group_from_url(url) + + @ddt.ddt class TestDocrefNotesUtils(utils.AsyncTestCase): """Tests for the utility methods dealing with document reference clinical notes""" diff --git a/tests/i2b2/__init__.py b/tests/loaders/__init__.py similarity index 100% rename from tests/i2b2/__init__.py rename to tests/loaders/__init__.py diff --git a/tests/loaders/i2b2/__init__.py b/tests/loaders/i2b2/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/i2b2/test_i2b2_etl.py b/tests/loaders/i2b2/test_i2b2_etl.py similarity index 100% rename from tests/i2b2/test_i2b2_etl.py rename to tests/loaders/i2b2/test_i2b2_etl.py diff --git a/tests/i2b2/test_i2b2_loader.py b/tests/loaders/i2b2/test_i2b2_loader.py similarity index 100% rename from tests/i2b2/test_i2b2_loader.py rename to tests/loaders/i2b2/test_i2b2_loader.py diff --git a/tests/i2b2/test_i2b2_oracle_connect.py b/tests/loaders/i2b2/test_i2b2_oracle_connect.py similarity index 100% rename from tests/i2b2/test_i2b2_oracle_connect.py rename to tests/loaders/i2b2/test_i2b2_oracle_connect.py diff --git a/tests/i2b2/test_i2b2_oracle_extract.py b/tests/loaders/i2b2/test_i2b2_oracle_extract.py similarity index 100% rename from tests/i2b2/test_i2b2_oracle_extract.py rename to tests/loaders/i2b2/test_i2b2_oracle_extract.py diff --git a/tests/i2b2/test_i2b2_oracle_query.py b/tests/loaders/i2b2/test_i2b2_oracle_query.py similarity index 100% rename from tests/i2b2/test_i2b2_oracle_query.py rename to tests/loaders/i2b2/test_i2b2_oracle_query.py diff --git a/tests/i2b2/test_i2b2_transform.py b/tests/loaders/i2b2/test_i2b2_transform.py similarity index 100% rename from tests/i2b2/test_i2b2_transform.py rename to tests/loaders/i2b2/test_i2b2_transform.py diff --git a/tests/loaders/ndjson/__init__.py b/tests/loaders/ndjson/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/test_bulk_export.py b/tests/loaders/ndjson/test_bulk_export.py similarity index 100% rename from tests/test_bulk_export.py rename to tests/loaders/ndjson/test_bulk_export.py diff --git a/tests/loaders/ndjson/test_log_parser.py b/tests/loaders/ndjson/test_log_parser.py new file mode 100644 index 00000000..6a424c0b --- /dev/null +++ b/tests/loaders/ndjson/test_log_parser.py @@ -0,0 +1,106 @@ +"""Tests for bulk export log parsing""" + +import datetime +import tempfile + +import ddt + +from cumulus_etl import common, store +from cumulus_etl.loaders.fhir.export_log import BulkExportLogParser +from tests.utils import AsyncTestCase + + +def kickoff(group: str) -> dict: + url = f"https://host/Group/{group}" if group else "https://host/" + return { + "eventId": "kickoff", + "eventDetail": { + "exportUrl": url, + }, + } + + +def status_complete(timestamp: str) -> dict: + return { + "eventId": "status_complete", + "eventDetail": { + "transactionTime": timestamp, + }, + } + + +@ddt.ddt +class TestBulkExportLogParser(AsyncTestCase): + """Test case for parsing bulk export logs.""" + + def _assert_results(self, path, expected_result) -> None: + if isinstance(expected_result, tuple): + parser = BulkExportLogParser(store.Root(path)) + expected_group = expected_result[0] + expected_datetime = datetime.datetime.fromisoformat(expected_result[1]) + self.assertEqual(expected_group, parser.group_name) + self.assertEqual(expected_datetime, parser.export_datetime) + else: + with self.assertRaises(expected_result): + BulkExportLogParser(store.Root(path)) + + @ddt.data( + # Happy cases: + (["log.ndjson"], None), + (["log.blarg.ndjson"], None), + (["log.0001.ndjson"], None), + (["log.ndjson", "log.1.ndjson"], None), + # Error cases: + ([], BulkExportLogParser.NoLogs), + (["log.1.ndjson", "log.2.ndjson"], BulkExportLogParser.MultipleLogs), + ) + @ddt.unpack + def test_finding_the_log(self, files, error): + with tempfile.TemporaryDirectory() as tmpdir: + common.write_text(f"{tmpdir}/distraction.txt", "hello") + common.write_text(f"{tmpdir}/log.ndjson.bak", "bye") + for file in files: + with common.NdjsonWriter(f"{tmpdir}/{file}") as writer: + writer.write(kickoff("G")) + writer.write(status_complete("2020-10-17")) + + error = error or ("G", "2020-10-17") + self._assert_results(tmpdir, error) + + def test_no_dir(self): + self._assert_results("/path/does/not/exist", BulkExportLogParser.NoLogs) + + @ddt.data( + # Happy cases: + ( # basic simple case + [kickoff("G"), status_complete("2020-10-17")], + ("G", "2020-10-17"), + ), + ( # multiple rows - we should pick last of each + [ + kickoff("1st"), + kickoff("2nd"), + status_complete("2001-01-01"), + status_complete("2002-02-02"), + ], + ("2nd", "2002-02-02"), + ), + ([kickoff(""), status_complete("2020-10-17")], ("", "2020-10-17")), # global export group + # Error cases: + ([status_complete("2010-03-09")], BulkExportLogParser.IncompleteLog), # missing group + ([kickoff("G")], BulkExportLogParser.IncompleteLog), # missing time + ([], BulkExportLogParser.IncompleteLog), # missing all + ([{"eventId": "kickoff"}], BulkExportLogParser.IncompleteLog), # missing eventDetail + ( # missing transactionTime + [{"eventId": "status_complete", "eventDetail": {}}], + BulkExportLogParser.IncompleteLog, + ), + ) + @ddt.unpack + def test_parsing(self, rows, expected_result): + with tempfile.TemporaryDirectory() as tmpdir: + with common.NdjsonWriter(f"{tmpdir}/log.ndjson", allow_empty=True) as writer: + for row in rows: + writer.write(row) + + self._assert_results(tmpdir, expected_result) diff --git a/tests/test_ndjson_loader.py b/tests/loaders/ndjson/test_ndjson_loader.py similarity index 83% rename from tests/test_ndjson_loader.py rename to tests/loaders/ndjson/test_ndjson_loader.py index 0010a6fd..468a2927 100644 --- a/tests/test_ndjson_loader.py +++ b/tests/loaders/ndjson/test_ndjson_loader.py @@ -1,10 +1,11 @@ """Tests for ndjson loading (both bulk export and local)""" +import datetime import os import tempfile from unittest import mock -from cumulus_etl import cli, errors, loaders, store +from cumulus_etl import cli, common, errors, loaders, store from cumulus_etl.loaders.fhir.bulk_export import BulkExporter from tests.utils import AsyncTestCase @@ -33,6 +34,52 @@ def setUp(self): self.mock_exporter = mock.AsyncMock() self.mock_exporter_class.return_value = self.mock_exporter + @staticmethod + def _write_log_file(path: str, group: str, timestamp: str) -> None: + with common.NdjsonWriter(path) as writer: + writer.write( + { + "eventId": "kickoff", + "eventDetail": {"exportUrl": f"https://host/Group/{group}/$export"}, + } + ) + writer.write( + { + "eventId": "status_complete", + "eventDetail": {"transactionTime": timestamp}, + } + ) + + async def test_local_happy_path(self): + """Do a full local load from a folder.""" + with tempfile.TemporaryDirectory() as tmpdir: + self._write_log_file(f"{tmpdir}/log.ndjson", "G", "1999-03-14T14:12:10") + with common.NdjsonWriter(f"{tmpdir}/Patient.ndjson") as writer: + writer.write({"id": "A"}) + + loader = loaders.FhirNdjsonLoader(store.Root(tmpdir)) + loaded_dir = await loader.load_all(["Patient"]) + + self.assertEqual(["Patient.ndjson"], os.listdir(loaded_dir.name)) + self.assertEqual({"id": "A"}, common.read_json(f"{loaded_dir.name}/Patient.ndjson")) + self.assertEqual("G", loader.group_name) + self.assertEqual(datetime.datetime.fromisoformat("1999-03-14T14:12:10"), loader.export_datetime) + + # At some point, we do want to make this fatal. + # But not while this feature is still optional. + async def test_log_parsing_is_non_fatal(self): + """Do a local load with a bad log setup.""" + with tempfile.TemporaryDirectory() as tmpdir: + self._write_log_file(f"{tmpdir}/log.1.ndjson", "G1", "2001-01-01") + self._write_log_file(f"{tmpdir}/log.2.ndjson", "G2", "2002-02-02") + + loader = loaders.FhirNdjsonLoader(store.Root(tmpdir)) + await loader.load_all([]) + + # We used neither log and didn't error out. + self.assertIsNone(loader.group_name) + self.assertIsNone(loader.export_datetime) + @mock.patch("cumulus_etl.fhir.fhir_client.FhirClient") @mock.patch("cumulus_etl.etl.cli.loaders.FhirNdjsonLoader") async def test_etl_passes_args(self, mock_loader, mock_client):