Skip to content

Commit

Permalink
Retry bulk export requests that yield 5xx server errors
Browse files Browse the repository at this point in the history
Retry each request four times (for a total of five requests) in an
exponential backoff of 1, 2, 4, and 8 minutes (total of 15 minutes).

This should hopefully help when dealing with flaky EHRs.
  • Loading branch information
mikix committed Aug 7, 2024
1 parent 5668b30 commit 58bfa03
Show file tree
Hide file tree
Showing 4 changed files with 271 additions and 97 deletions.
18 changes: 11 additions & 7 deletions cumulus_etl/fhir/fhir_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,14 @@ async def request(
"""
Issues an HTTP request.
The default Accept type is application/fhir+json, but can be overridden by a provided header.
The default Accept type is application/fhir+json, but can be overridden by a provided
header.
This is a lightly modified version of FHIRServer._get(), but additionally supports streaming and
reauthorization.
This is a lightly modified version of FHIRServer._get(), but additionally supports
streaming and reauthorization.
Will raise a FatalError for an HTTP error, except for 429 which gets returned like a success code.
Will raise a FatalError for an HTTP error, except for 429 which gets returned like a
success code.
:param method: HTTP method to issue
:param path: relative path from the server root to request
Expand Down Expand Up @@ -169,16 +171,17 @@ def get_capabilities(self) -> dict:
"""
return self._capabilities

###################################################################################################################
#############################################################################################
#
# Helpers
#
###################################################################################################################
#############################################################################################

async def _read_capabilities(self) -> None:
"""
Reads the server's CapabilityStatement and sets any properties as a result (like server/vendor type).
Reads the server's CapabilityStatement and sets any properties as a result.
Notably, this gathers the server/vendor type.
This is expected to be called extremely early, right as the http session is opened.
"""
if not self._server_root:
Expand Down Expand Up @@ -259,6 +262,7 @@ def create_fhir_client_for_cli(
# Use the input URL as the base URL. But note that it may not be the server root.
# For example, it may be a Group export URL. Let's try to find the actual root.
client_base_url = root_input.path
client_base_url = re.sub(r"/\$export(\?.*)?$", "/", client_base_url)
client_base_url = re.sub(r"/Patient/?$", "/", client_base_url)
client_base_url = re.sub(r"/Group/[^/]+/?$", "/", client_base_url)

Expand Down
204 changes: 122 additions & 82 deletions cumulus_etl/loaders/fhir/bulk_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ class BulkExporter:
- The bulk-data-server test server (https://github.com/smart-on-fhir/bulk-data-server)
- Cerner (https://www.cerner.com/)
- Epic (https://www.epic.com/)
TODO: make it more robust against server flakiness (like random http errors during file
download or status checks). At least for intermittent issues (i.e. we should do some
retrying). Actual server errors we should continue to surface, if they persist after retries.
"""

_TIMEOUT_THRESHOLD = 60 * 60 * 24 # a day, which is probably an overly generous timeout
Expand Down Expand Up @@ -148,9 +144,10 @@ async def export(self) -> None:
print("Starting bulk FHIR export…")

# Request status report, until export is done
response = await self._request_with_logging(
response = await self._request_with_delay_status(
poll_location,
headers={"Accept": "application/json"},
retry_errors=True,
log_progress=self._log.status_progress,
log_error=self._log.status_error,
)
Expand Down Expand Up @@ -200,7 +197,7 @@ async def export(self) -> None:
async def _kick_off(self):
"""Initiate bulk export"""
try:
response = await self._request_with_delay(
response = await self._request_with_delay_status(
self._url,
headers={"Prefer": "respond-async"},
target_status_code=202,
Expand Down Expand Up @@ -236,90 +233,133 @@ async def _delete_export(self, poll_url: str) -> None:
Then the server knows it can delete the files.
"""
try:
await self._request_with_delay(poll_url, method="DELETE", target_status_code=202)
await self._request_with_delay_status(poll_url, method="DELETE", target_status_code=202)
except errors.FatalError:
# Ignore any fatal issue with this, since we don't actually need this to succeed
pass

async def _request_with_delay(
async def _request_with_delay_status(self, *args, **kwargs) -> httpx.Response:
"""
Requests a file, while respecting any requests to wait longer and telling the user.
:returns: the HTTP response
"""
status_box = rich.text.Text()
with rich.get_console().status(status_box):
response = await self._request_with_retries(*args, rich_text=status_box, **kwargs)

if status_box.plain:
print(f" Waited for a total of {common.human_time_offset(self._total_wait_time)}")

return response

async def _request_with_retries(
self,
path: str,
*,
headers: dict | None = None,
target_status_code: int = 200,
method: str = "GET",
log_begin: Callable[[], None] | None = None,
log_request: Callable[[], None] | None = None,
log_progress: Callable[[httpx.Response], None] | None = None,
log_error: Callable[[Exception], None] | None = None,
stream: bool = False,
retry_errors: bool = False,
rich_text: rich.text.Text | None = None,
) -> httpx.Response:
"""
Requests a file, while respecting any requests to wait longer.
Requests a file, while respecting any requests to wait longer and telling the user.
:param path: path to request
:param headers: headers for request
:param target_status_code: retries until this status code is returned
:param method: HTTP method to request
:param log_begin: method to call to report that we are about to start requests
:param log_request: method to call to report every request attempt
:param log_progress: method to call to report a successful request but not yet done
:param log_error: method to call to report request failures
:param stream: whether to stream the response
:param retry_errors: if True, server-side errors will be retried a few times
:returns: the HTTP response
"""
status_box = rich.text.Text()
with rich.get_console().status(status_box) as status:
while self._total_wait_time < self._TIMEOUT_THRESHOLD:
response = await self._client.request(method, path, headers=headers)

if response.status_code == target_status_code:
if status_box.plain:
status.stop()
print(
f" Waited for a total of {common.human_time_offset(self._total_wait_time)}"
)
return response

# 202 == server is still working on it, 429 == server is busy.
# In both cases, we wait.
if response.status_code in [202, 429]:
if log_progress:
log_progress(response)

# Print a message to the user, so they don't see us do nothing for a while
delay = int(response.headers.get("Retry-After", 60))
if response.status_code == 202:
# Some servers can request unreasonably long delays (e.g. I've seen Cerner ask for five hours),
# which is... not helpful for our UX and often way too long for small exports.
# So as long as the server isn't telling us it's overloaded, limit the delay time to 5 minutes.
delay = min(delay, 300)
progress_msg = response.headers.get("X-Progress", "waiting…")
formatted_total = common.human_time_offset(self._total_wait_time)
formatted_delay = common.human_time_offset(delay)
status_box.plain = f"{progress_msg} ({formatted_total} so far, waiting for {formatted_delay} more)"
# Set up error handling variables.
# These times are extremely generous - partly because we can afford to be
# as a long-running async task and partly because EHR servers seem prone to
# outages that clear up after a bit.
error_retry_minutes = [1, 2, 4, 8] # and then raise
max_errors = len(error_retry_minutes)
num_errors = 0

# And wait as long as the server requests
await asyncio.sleep(delay)
self._total_wait_time += delay
if log_begin:
log_begin()

# Actually loop, attempting the request multiple times as needed
while self._total_wait_time < self._TIMEOUT_THRESHOLD:
if log_request:
log_request()

try:
response = await self._client.request(method, path, headers=headers, stream=stream)
except errors.NetworkError as exc:
if log_error:
log_error(exc)
if retry_errors and exc.response.is_server_error and num_errors < max_errors:
response = exc.response
else:
# It feels silly to abort on an unknown *success* code, but the spec has such clear guidance on
# what the expected response codes are, that it's not clear if a code outside those parameters means
# we should keep waiting or stop waiting. So let's be strict here for now.
raise errors.NetworkError(
f"Unexpected status code {response.status_code} from the bulk FHIR export server.",
response,
)
raise

raise errors.FatalError("Timed out waiting for the bulk FHIR export to finish.")
if response.status_code == target_status_code:
return response

async def _request_with_logging(
self,
*args,
log_begin: Callable[[], None] | None = None,
log_error: Callable[[Exception], None] | None = None,
**kwargs,
) -> httpx.Response:
if log_begin:
log_begin()
if response.is_server_error:
num_errors += 1
else:
num_errors = 0 # reset count if server is back to normal

try:
return await self._request_with_delay(*args, **kwargs)
except Exception as exc:
if log_error:
log_error(exc)
raise
# 202 == server is still working on it, 429 == server is busy,
# 5xx == server-side error. In all cases, we wait.
if response.status_code in [202, 429] or response.is_server_error:
if log_progress and not response.is_server_error:
log_progress(response)

# Calculate how long to wait, with a basic exponential backoff for errors.
if num_errors:
default_delay = error_retry_minutes[num_errors - 1] * 60
else:
default_delay = 60 # one minute
delay = int(response.headers.get("Retry-After", default_delay))
if response.status_code == 202:
# Some servers can request unreasonably long delays (e.g. I've seen Cerner
# ask for five hours), which is... not helpful for our UX and often way
# too long for small exports. So as long as the server isn't telling us
# it's overloaded or erroring out, limit the delay time to 5 minutes.
delay = min(delay, 300)

# Print a message to the user, so they don't see us do nothing for a while
if rich_text is not None:
progress_msg = response.headers.get("X-Progress", "waiting…")
formatted_total = common.human_time_offset(self._total_wait_time)
formatted_delay = common.human_time_offset(delay)
rich_text.plain = f"{progress_msg} ({formatted_total} so far, waiting for {formatted_delay} more)"

# And wait as long as the server requests
await asyncio.sleep(delay)
self._total_wait_time += delay

else:
# It feels silly to abort on an unknown *success* code, but the spec has such clear guidance on
# what the expected response codes are, that it's not clear if a code outside those parameters means
# we should keep waiting or stop waiting. So let's be strict here for now.
raise errors.NetworkError(
f"Unexpected status code {response.status_code} from the bulk FHIR export server.",
response,
)

exc = errors.FatalError("Timed out waiting for the bulk FHIR export to finish.")
if log_error:
log_error(exc)
raise exc

async def _gather_all_messages(self, error_list: list[dict]) -> (list[str], list[str]):
"""
Expand All @@ -333,9 +373,10 @@ async def _gather_all_messages(self, error_list: list[dict]) -> (list[str], list
# per spec as of writing, OperationOutcome is the only allowed type
if error.get("type") == "OperationOutcome":
coroutines.append(
self._request_with_logging(
self._request_with_delay_status(
error["url"],
headers={"Accept": "application/fhir+ndjson"},
retry_errors=True,
log_begin=partial(
self._log.download_request,
error["url"],
Expand Down Expand Up @@ -394,27 +435,26 @@ async def _download_ndjson_file(self, url: str, resource_type: str, filename: st
:param resource_type: the resource type of the file
:param filename: local path to write data to
"""

self._log.download_request(url, "output", resource_type)
decompressed_size = 0

response = await self._request_with_retries(
url,
headers={"Accept": "application/fhir+ndjson"},
stream=True,
retry_errors=True,
log_request=partial(self._log.download_request, url, "output", resource_type),
log_error=partial(self._log.download_error, url),
)
try:
response = await self._client.request(
"GET",
url,
headers={"Accept": "application/fhir+ndjson"},
stream=True,
)
try:
with open(filename, "w", encoding="utf8") as file:
async for block in response.aiter_text():
file.write(block)
decompressed_size += len(block)
finally:
await response.aclose()
with open(filename, "w", encoding="utf8") as file:
async for block in response.aiter_text():
file.write(block)
decompressed_size += len(block)
except Exception as exc:
self._log.download_error(url, exc)
raise
raise errors.FatalError(f"Error downloading '{url}': {exc}")
finally:
await response.aclose()

lines = common.read_local_line_count(filename)
self._log.download_complete(url, lines, decompressed_size)
Expand Down
22 changes: 22 additions & 0 deletions tests/fhir/test_fhir_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,28 @@ def test_file_read_error(self):
fhir.create_fhir_client_for_cli(args, store.Root("/tmp"), [])
self.assertEqual(errors.ARGS_INVALID, cm.exception.code)

@ddt.data(
"http://example.invalid/root/",
"http://example.invalid/root/$export?",
"http://example.invalid/root/Group/xxx",
"http://example.invalid/root/Group/xxx/$export?_type=Patient",
"http://example.invalid/root/Patient",
"http://example.invalid/root/Patient/$export",
)
@mock.patch("cumulus_etl.fhir.fhir_client.FhirClient")
def test_can_find_auth_root(self, input_url, mock_client):
"""Verify that we detect the auth root for an input URL"""
args = argparse.Namespace(
fhir_url=None,
smart_client_id=None,
smart_jwks=None,
basic_user=None,
basic_passwd=None,
bearer_token=None,
)
fhir.create_fhir_client_for_cli(args, store.Root(input_url), [])
self.assertEqual("http://example.invalid/root/", mock_client.call_args[0][0])

async def test_must_be_context_manager(self):
"""Verify that FHIRClient enforces its use as a context manager."""
client = fhir.FhirClient(
Expand Down
Loading

0 comments on commit 58bfa03

Please sign in to comment.