Skip to content

Commit

Permalink
bulk: store "deleted" and "error" results in export subfolders
Browse files Browse the repository at this point in the history
If the bulk export server provides an array of error messages or
an array of deleted resources, make sure we persist those on disk.

Previously, we only printed the error messages to the console.
We still do that, but also write them to an error/ subfolder.

Previously, we ignored any deleted resources.
Now we write them to a deleted/ subfolder.

This matches the behavior of bulk-data-client.
  • Loading branch information
mikix committed Aug 29, 2024
1 parent 55db90e commit eb488f0
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 79 deletions.
93 changes: 38 additions & 55 deletions cumulus_etl/loaders/fhir/bulk_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import asyncio
import datetime
import json
import os
import urllib.parse
from collections.abc import Callable
Expand Down Expand Up @@ -162,17 +161,11 @@ async def export(self) -> None:
except ValueError:
pass # server gave us a bad timestamp, ignore it :shrug:

# Were there any server-side errors during the export?
# The spec acknowledges that "error" is perhaps misleading for an array that can contain
# info messages.
error_texts, warning_texts = await self._gather_all_messages(response_json.get("error", []))
if warning_texts:
print("\n - ".join(["Messages from server:", *warning_texts]))

# Download all the files
print("Bulk FHIR export finished, now downloading resources…")
files = response_json.get("output", [])
await self._download_all_ndjson_files(files)
await self._download_all_ndjson_files(response_json, "output")
await self._download_all_ndjson_files(response_json, "error")
await self._download_all_ndjson_files(response_json, "deleted")

self._log.export_complete()

Expand All @@ -181,6 +174,11 @@ async def export(self) -> None:
# files up there, so the user could try to manually recover.
await self._delete_export(poll_location)

# Were there any server-side errors during the export?
error_texts, warning_texts = self._gather_all_messages()
if warning_texts:
print("\n - ".join(["Messages from server:", *warning_texts]))

# Make sure we're fully done before we bail because the server told us the export has
# issues. We still want to DELETE the export in this case. And we still want to download
# all the files the server DID give us. Servers may have lots of ignorable errors that
Expand Down Expand Up @@ -260,7 +258,6 @@ async def _request_with_retries(
headers: dict | None = None,
target_status_code: int = 200,
method: str = "GET",
log_begin: Callable[[], None] | None = None,
log_request: Callable[[], None] | None = None,
log_progress: Callable[[httpx.Response], None] | None = None,
log_error: Callable[[Exception], None] | None = None,
Expand All @@ -275,7 +272,6 @@ async def _request_with_retries(
:param headers: headers for request
:param target_status_code: retries until this status code is returned
:param method: HTTP method to request
:param log_begin: method to call to report that we are about to start requests
:param log_request: method to call to report every request attempt
:param log_progress: method to call to report a successful request but not yet done
:param log_error: method to call to report request failures
Expand All @@ -291,9 +287,6 @@ async def _request_with_retries(
max_errors = len(error_retry_minutes)
num_errors = 0

if log_begin:
log_begin()

# Actually loop, attempting the request multiple times as needed
while self._total_wait_time < self._TIMEOUT_THRESHOLD:
if log_request:
Expand Down Expand Up @@ -361,57 +354,43 @@ async def _request_with_retries(
log_error(exc)
raise exc

async def _gather_all_messages(self, error_list: list[dict]) -> (list[str], list[str]):
def _gather_all_messages(self) -> (list[str], list[str]):
"""
Downloads all outcome message ndjson files from the bulk export server.
Parses all error/info ndjson files from the bulk export server.
:param error_list: info about each error file from the bulk FHIR server
:returns: (error messages, non-fatal messages)
"""
coroutines = []
for error in error_list:
# per spec as of writing, OperationOutcome is the only allowed type
if error.get("type") == "OperationOutcome":
coroutines.append(
self._request_with_delay_status(
error["url"],
headers={"Accept": "application/fhir+ndjson"},
retry_errors=True,
log_begin=partial(
self._log.download_request,
error["url"],
"error",
error["type"],
),
log_error=partial(self._log.download_error, error["url"]),
),
)
responses = await asyncio.gather(*coroutines)
# The spec acknowledges that "error" is perhaps misleading for an array that can contain
# info messages.
error_folder = store.Root(f"{self._destination}/error")
outcomes = common.read_resource_ndjson(error_folder, "OperationOutcome")

fatal_messages = []
info_messages = []
for response in responses:
# Create a list of OperationOutcomes
outcomes = [json.loads(x) for x in response.text.split("\n") if x]
self._log.download_complete(response.url, len(outcomes), len(response.text))
for outcome in outcomes:
for issue in outcome.get("issue", []):
text = issue.get("diagnostics")
text = text or issue.get("details", {}).get("text")
text = text or issue.get("code") # code is required at least
if issue.get("severity") in ("fatal", "error"):
fatal_messages.append(text)
else:
info_messages.append(text)
for outcome in outcomes:
for issue in outcome.get("issue", []):
text = issue.get("diagnostics")
text = text or issue.get("details", {}).get("text")
text = text or issue.get("code") # code is required at least
if issue.get("severity") in ("fatal", "error"):
fatal_messages.append(text)
else:
info_messages.append(text)

return fatal_messages, info_messages

async def _download_all_ndjson_files(self, files: list[dict]) -> None:
async def _download_all_ndjson_files(self, resource_json: dict, item_type: str) -> None:
"""
Downloads all exported ndjson files from the bulk export server.
:param files: info about each file from the bulk FHIR server
:param resource_json: the status response from bulk FHIR server
:param item_type: which type of object to download: output, error, or deleted
"""
files = resource_json.get(item_type, [])

# Use the same (sensible) download folder layout as bulk-data-client:
subfolder = "" if item_type == "output" else item_type

resource_counts = {} # how many of each resource we've seen
coroutines = []
for file in files:
Expand All @@ -422,12 +401,15 @@ async def _download_all_ndjson_files(self, files: list[dict]) -> None:
self._download_ndjson_file(
file["url"],
file["type"],
os.path.join(self._destination, filename),
os.path.join(self._destination, subfolder, filename),
item_type,
),
)
await asyncio.gather(*coroutines)

async def _download_ndjson_file(self, url: str, resource_type: str, filename: str) -> None:
async def _download_ndjson_file(
self, url: str, resource_type: str, filename: str, item_type: str
) -> None:
"""
Downloads a single ndjson file from the bulk export server.
Expand All @@ -442,10 +424,11 @@ async def _download_ndjson_file(self, url: str, resource_type: str, filename: st
headers={"Accept": "application/fhir+ndjson"},
stream=True,
retry_errors=True,
log_request=partial(self._log.download_request, url, "output", resource_type),
log_request=partial(self._log.download_request, url, item_type, resource_type),
log_error=partial(self._log.download_error, url),
)
try:
os.makedirs(os.path.dirname(filename), exist_ok=True)
with open(filename, "w", encoding="utf8") as file:
async for block in response.aiter_text():
file.write(block)
Expand Down
17 changes: 17 additions & 0 deletions docs/bulk-exports.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,23 @@ You can save the exported files for archiving after the fact with `--export-to=P
However, bulk exports tend to be brittle and slow for many EHRs at the time of this writing.
It might be wiser to separately export, make sure the data is all there and good, and then ETL it.

## Archiving Exports

Exports can take a long time, and it's often convenient to archive the results.
For later re-processing, sanity checking, quality assurance, or whatever.

It's recommended that you archive everything in the export folder.
This is what you may expect to archive:

- The resource export files themselves
(these will look like `1.Patient.ndjson` or `Patient.000.ndjson` or similar)
- The `log.ndjson` log file
- The `deleted/` subfolder, if present
(this will hold a list of resources that the FHIR server says should be deleted)
- The `error/` subfolder, if present
(this will hold a list of errors from the FHIR server
as well as warnings and informational messages, despite the name)

## Resuming an Interrupted Export

Bulk exports can be brittle.
Expand Down
8 changes: 2 additions & 6 deletions docs/setup/sample-runs.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,13 +205,9 @@ Follow the [S3 setup guide](aws.md) document for guidance there.
While technically optional, it's recommended that you manually specify these arguments because their
defaults are subject to change or might not match your situation.

* `--input-format`: There are two reasonable values (`ndjson` and `i2b2`). If you want to pull from
your bulk export FHIR server, pass in its URL as your input path and use `ndjson` as your input
format. Otherwise, you can use either value to point at a local folder with either FHIR ndjson
or i2b2 csv files sitting in them, respectively.

* `--output-format`: There are two reasonable values (`ndjson` and `deltalake`).
For production use, you want `deltalake` as it is supports incremental, batched updates.
For production use, you can use the default value of `deltalake` as it supports incremental,
batched updates.
But `ndjson` is useful when debugging as it is human-readable.

* `--batch-size`: How many resources to save in a single output file. If there are more resources
Expand Down
87 changes: 69 additions & 18 deletions tests/loaders/ndjson/test_bulk_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,31 +272,29 @@ async def test_export_error(self):
],
},
)
err1 = (
'{"resourceType": "OperationOutcome",'
' "issue": [{"severity": "error", "diagnostics": "err1"}]}'
)
self.respx_mock.get(
"https://example.com/err1",
headers={"Accept": "application/fhir+ndjson"},
).respond(
json={
"resourceType": "OperationOutcome",
"issue": [{"severity": "error", "diagnostics": "err1"}],
},
).respond(text=err1)
err2 = (
'{"resourceType": "OperationOutcome",'
'"issue": [{"severity": "fatal", "details": {"text": "err2"}}]}\n'
'{"resourceType": "OperationOutcome",'
'"issue": [{"severity": "warning", "diagnostics": "warning1"}]}\n'
'{"resourceType": "OperationOutcome",'
'"issue": ['
'{"severity": "error", "code": "err3"},'
'{"severity": "fatal", "code": "err4"}'
"]}\n"
)
self.respx_mock.get(
"https://example.com/err2",
headers={"Accept": "application/fhir+ndjson"},
).respond(
text=(
'{"resourceType": "OperationOutcome",'
'"issue": [{"severity": "fatal", "details": {"text": "err2"}}]}\n'
'{"resourceType": "OperationOutcome",'
'"issue": [{"severity": "warning", "diagnostics": "warning1"}]}\n'
'{"resourceType": "OperationOutcome",'
'"issue": ['
'{"severity": "error", "code": "err3"},'
'{"severity": "fatal", "code": "err4"}'
"]}\n"
)
)
).respond(text=err2)
self.respx_mock.get(
"https://example.com/con1",
headers={"Accept": "application/fhir+ndjson"},
Expand All @@ -311,6 +309,11 @@ async def test_export_error(self):

self.assertIsNone(self.exporter.export_datetime) # date time couldn't be parsed

err1_file = common.read_text(f"{self.tmpdir}/error/OperationOutcome.000.ndjson")
self.assertEqual(err1_file, err1)
err2_file = common.read_text(f"{self.tmpdir}/error/OperationOutcome.001.ndjson")
self.assertEqual(err2_file, err2)

self.assert_log_equals(
("kickoff", None),
(
Expand Down Expand Up @@ -389,6 +392,54 @@ async def test_export_warning(self):

self.assertIn("Messages from server:\n - warning1\n", stdout.getvalue())

async def test_deleted_resources(self):
"""Verify that we preserve the list of resources to be deleted"""
self.mock_kickoff()
self.mock_delete()
self.respx_mock.get("https://example.com/poll").respond(
json={
"deleted": [
{"type": "Bundle", "url": "https://example.com/deleted1"},
],
},
)
deleted1 = {
"resourceType": "Bundle",
"type": "transaction",
"entry": [
{
"request": {"method": "DELETE", "url": "Patient/123"},
}
],
}
self.respx_mock.get("https://example.com/deleted1").respond(json=deleted1)

await self.export()

bundle = common.read_json(f"{self.tmpdir}/deleted/Bundle.000.ndjson")
self.assertEqual(bundle, deleted1)

self.assert_log_equals(
("kickoff", None),
("status_complete", None),
(
"download_request",
{
"fileUrl": "https://example.com/deleted1",
"itemType": "deleted",
"resourceType": "Bundle",
},
),
(
"download_complete",
{"fileSize": 117, "fileUrl": "https://example.com/deleted1", "resourceCount": 1},
),
(
"export_complete",
{"attachments": None, "bytes": 117, "duration": 0, "files": 1, "resources": 1},
),
)

async def test_file_download_error(self):
"""Verify that we correctly handle a resource download failure"""
self.mock_kickoff()
Expand Down

0 comments on commit eb488f0

Please sign in to comment.