Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use group-name/export-time from log file if present #306

Merged
merged 1 commit into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion cumulus_etl/fhir/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
"""Support for talking to FHIR servers & handling the FHIR spec"""

from .fhir_client import FhirClient, create_fhir_client_for_cli
from .fhir_utils import download_reference, get_docref_note, parse_datetime, ref_resource, unref_resource
from .fhir_utils import (
download_reference,
get_docref_note,
parse_datetime,
parse_group_from_url,
ref_resource,
unref_resource,
)
24 changes: 24 additions & 0 deletions cumulus_etl/fhir/fhir_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import datetime
import email.message
import re
import urllib.parse

import inscriptis

Expand Down Expand Up @@ -109,6 +110,29 @@ def parse_datetime(value: str | None) -> datetime.datetime | None:
return None


def parse_group_from_url(url: str) -> str:
"""
Parses the group out of a FHIR URL.

These URLS look something like:
- https://hostname/root/Group/my-group <- group name of `my-group`
- https://hostname/root/Group/my-group/$export <- group name of `my-group`
- https://hostname/root <- no group name
"""
parsed = urllib.parse.urlparse(url)
if not parsed.scheme:
raise ValueError(f"Could not parse URL '{url}'")

pieces = parsed.path.split("/Group/", 2)
match len(pieces):
case 2:
return pieces[1].split("/")[0]
case _:
# Global exports don't seem realistic, but if the user does do them,
# we'll use the empty string as the default group name for that.
return ""


######################################################################################################################
#
# Resource downloading
Expand Down
12 changes: 1 addition & 11 deletions cumulus_etl/loaders/fhir/bulk_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,7 @@ def __init__(

# Public properties, to be read after the export:
self.export_datetime = None

# Parse the group out of the URL, which will look something like:
# - https://hostname/root/Group/my-group$export <- group name of `my-group`
# - https://hostname/root$export <- no group name, global export
if "/Group/" in self._url:
latter_half = self._url.split("/Group/", 2)[-1]
self.group_name = latter_half.split("/")[0]
else:
# Global exports don't seem realistic, but the user does do them,
# we'll use the empty string as the default group name for that...
self.group_name = ""
self.group_name = fhir.parse_group_from_url(self._url)

async def export(self) -> None:
"""
Expand Down
99 changes: 99 additions & 0 deletions cumulus_etl/loaders/fhir/export_log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
"""
Parsing for bulk export log files

https://github.com/smart-on-fhir/bulk-data-client/wiki/Bulk-Data-Export-Log-Items
"""

import datetime
import json
import os
import re

from cumulus_etl import common, fhir, store


class BulkExportLogParser:
"""
Parses the log file generated by bulk exports.

These are the assumptions we make:
- There will be a log.ndjson or log.*.ndjson file in the given folder.
- There cannot be multiples (unless log.ndjson exists, in which case we always use that)
- That log file will be for a single export.
- e.g. We will generally grab the last "kickoff" event and ignore others.
"""

class LogParsingError(Exception):
pass

class IncompleteLog(LogParsingError):
pass

class MultipleLogs(LogParsingError):
pass

class NoLogs(LogParsingError):
pass

def __init__(self, root: store.Root):
self.group_name: str = None
self.export_datetime: datetime.datetime = None

self._parse(self._find(root))

def _parse(self, path: str) -> None:
# Go through every row, looking for the events we care about.
# Note that we parse every kickoff event we hit, for example.
# So we'll end up with the latest one (which works for single-export
# log files with maybe a false start at the beginning).
try:
for row in common.read_ndjson(path):
match row.get("eventId"):
case "kickoff":
self._parse_kickoff(row)
case "status_complete":
self._parse_status_complete(row)
except (KeyError, json.JSONDecodeError) as exc:
raise self.IncompleteLog(f"Error parsing '{path}'") from exc
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: consider a MissingLog error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah that actually does exist (I used NoLogs), in the find method below (along with MultipleLogs).

Currently nothing actually distinguishes between these flavors of error. Maybe they will in the future, but 🤷


if self.group_name is None:
raise self.IncompleteLog(f"No kickoff event found in '{path}'")
if self.export_datetime is None:
raise self.IncompleteLog(f"No status_complete event found in '{path}'")

def _parse_kickoff(self, row: dict) -> None:
details = row["eventDetail"]
self.group_name = fhir.parse_group_from_url(details["exportUrl"])

def _parse_status_complete(self, row: dict) -> None:
details = row["eventDetail"]
self.export_datetime = datetime.datetime.fromisoformat(details["transactionTime"])

def _find(self, root: store.Root) -> str:
"""Finds the log file inside the root"""
try:
paths = root.ls()
except FileNotFoundError as exc:
raise self.NoLogs("Folder does not exist") from exc
filenames = {os.path.basename(p): p for p in paths}

# In the easy case, it's just sitting there at log.ndjson,
# which is the filename that bulk-data-client uses.
# Because this is the standard name, we prefer this and don't
# error out even if there are other log.something.ndjson names in
# the folder (see below). Maybe this is a symlink to the most recent...
if full_path := filenames.get("log.ndjson"):
return full_path

# But possibly the user does some file renaming to manage different
# exports, so allow log.something.ndjson as well. (Much like we do
# for the input ndjson files.)
pattern = re.compile(r"log\..+\.ndjson")
log_files = list(filter(pattern.match, filenames.keys()))
match len(log_files):
case 0:
raise self.NoLogs("No log.ndjson file found")
case 1:
return filenames[log_files[0]]
case _:
raise self.MultipleLogs("Multiple log.*.ndjson files found")
11 changes: 11 additions & 0 deletions cumulus_etl/loaders/fhir/ndjson_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from cumulus_etl import cli_utils, common, errors, fhir, store
from cumulus_etl.loaders import base
from cumulus_etl.loaders.fhir.bulk_export import BulkExporter
from cumulus_etl.loaders.fhir.export_log import BulkExportLogParser


class FhirNdjsonLoader(base.Loader):
Expand Down Expand Up @@ -47,6 +48,16 @@ async def load_all(self, resources: list[str]) -> common.Directory:
"You provided FHIR bulk export parameters but did not provide a FHIR server", errors.ARGS_CONFLICT
)

# Parse logs for export information
try:
parser = BulkExportLogParser(self.root)
self.group_name = parser.group_name
self.export_datetime = parser.export_datetime
except BulkExportLogParser.LogParsingError:
# Once we require group name & export datetime, we should warn about this.
# For now, just ignore any errors.
pass

# Copy the resources we need from the remote directory (like S3 buckets) to a local one.
#
# We do this even if the files are local, because the next step in our pipeline is the MS deid tool,
Expand Down
26 changes: 26 additions & 0 deletions tests/fhir/test_fhir_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,32 @@ def test_parse_datetime(self, input_value, expected_value):
self.assertEqual(expected_value, parsed)


@ddt.ddt
class TestUrlParsing(utils.AsyncTestCase):
"""Tests for URL parsing"""

@ddt.data(
("//host", ValueError),
("https://host", ""),
("https://host/root", ""),
("https://Group/MyGroup", ""), # Group is hostname here
("https://host/root/?key=/Group/Testing/", ""),
("https://host/root/Group/MyGroup", "MyGroup"),
("https://host/root/Group/MyGroup/", "MyGroup"),
("https://host/Group/MyGroup/$export", "MyGroup"),
("https://host/Group/MyGroup?key=value", "MyGroup"),
("https://host/root/Group/Group/", "Group"),
)
@ddt.unpack
def test_parse_group_from_url(self, url, expected_group):
if isinstance(expected_group, str):
group = fhir.parse_group_from_url(url)
assert expected_group == group
else:
with self.assertRaises(expected_group):
fhir.parse_group_from_url(url)


@ddt.ddt
class TestDocrefNotesUtils(utils.AsyncTestCase):
"""Tests for the utility methods dealing with document reference clinical notes"""
Expand Down
File renamed without changes.
Empty file added tests/loaders/i2b2/__init__.py
Empty file.
File renamed without changes.
File renamed without changes.
Empty file.
File renamed without changes.
106 changes: 106 additions & 0 deletions tests/loaders/ndjson/test_log_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""Tests for bulk export log parsing"""

import datetime
import tempfile

import ddt

from cumulus_etl import common, store
from cumulus_etl.loaders.fhir.export_log import BulkExportLogParser
from tests.utils import AsyncTestCase


def kickoff(group: str) -> dict:
url = f"https://host/Group/{group}" if group else "https://host/"
return {
"eventId": "kickoff",
"eventDetail": {
"exportUrl": url,
},
}


def status_complete(timestamp: str) -> dict:
return {
"eventId": "status_complete",
"eventDetail": {
"transactionTime": timestamp,
},
}


@ddt.ddt
class TestBulkExportLogParser(AsyncTestCase):
"""Test case for parsing bulk export logs."""

def _assert_results(self, path, expected_result) -> None:
if isinstance(expected_result, tuple):
parser = BulkExportLogParser(store.Root(path))
expected_group = expected_result[0]
expected_datetime = datetime.datetime.fromisoformat(expected_result[1])
self.assertEqual(expected_group, parser.group_name)
self.assertEqual(expected_datetime, parser.export_datetime)
else:
with self.assertRaises(expected_result):
BulkExportLogParser(store.Root(path))

@ddt.data(
# Happy cases:
(["log.ndjson"], None),
(["log.blarg.ndjson"], None),
(["log.0001.ndjson"], None),
(["log.ndjson", "log.1.ndjson"], None),
# Error cases:
([], BulkExportLogParser.NoLogs),
(["log.1.ndjson", "log.2.ndjson"], BulkExportLogParser.MultipleLogs),
)
@ddt.unpack
def test_finding_the_log(self, files, error):
with tempfile.TemporaryDirectory() as tmpdir:
common.write_text(f"{tmpdir}/distraction.txt", "hello")
common.write_text(f"{tmpdir}/log.ndjson.bak", "bye")
for file in files:
with common.NdjsonWriter(f"{tmpdir}/{file}") as writer:
writer.write(kickoff("G"))
writer.write(status_complete("2020-10-17"))

error = error or ("G", "2020-10-17")
self._assert_results(tmpdir, error)

def test_no_dir(self):
self._assert_results("/path/does/not/exist", BulkExportLogParser.NoLogs)

@ddt.data(
# Happy cases:
( # basic simple case
[kickoff("G"), status_complete("2020-10-17")],
("G", "2020-10-17"),
),
( # multiple rows - we should pick last of each
[
kickoff("1st"),
kickoff("2nd"),
status_complete("2001-01-01"),
status_complete("2002-02-02"),
],
("2nd", "2002-02-02"),
),
([kickoff(""), status_complete("2020-10-17")], ("", "2020-10-17")), # global export group
# Error cases:
([status_complete("2010-03-09")], BulkExportLogParser.IncompleteLog), # missing group
([kickoff("G")], BulkExportLogParser.IncompleteLog), # missing time
([], BulkExportLogParser.IncompleteLog), # missing all
([{"eventId": "kickoff"}], BulkExportLogParser.IncompleteLog), # missing eventDetail
( # missing transactionTime
[{"eventId": "status_complete", "eventDetail": {}}],
BulkExportLogParser.IncompleteLog,
),
)
@ddt.unpack
def test_parsing(self, rows, expected_result):
with tempfile.TemporaryDirectory() as tmpdir:
with common.NdjsonWriter(f"{tmpdir}/log.ndjson", allow_empty=True) as writer:
for row in rows:
writer.write(row)

self._assert_results(tmpdir, expected_result)
Loading
Loading