diff --git a/fhirclient/client.py b/fhirclient/client.py index f492a6df..960d855f 100644 --- a/fhirclient/client.py +++ b/fhirclient/client.py @@ -1,4 +1,9 @@ import logging +import urllib +from typing import Optional, Iterable + +import requests +from fhirclient.models.bundle import Bundle from .server import FHIRServer, FHIRUnauthorizedException, FHIRNotFoundException @@ -240,3 +245,118 @@ def from_state(self, state): def save_state (self): self._save_func(self.state) + # MARK: Pagination + def _fetch_next_page(self, bundle: Bundle) -> Optional[Bundle]: + """ + Fetch the next page of results using the `next` link provided in the bundle. + + Args: + bundle (Bundle): The FHIR Bundle containing the `next` link. + + Returns: + Optional[Bundle]: The next page of results as a FHIR Bundle, or None if no "next" link is found. + """ + next_link = self._get_next_link(bundle) + if next_link: + sanitized_next_link = self._sanitize_next_link(next_link) + return self._execute_pagination_request(sanitized_next_link) + return None + + def _get_next_link(self, bundle: Bundle) -> Optional[str]: + """ + Extract the `next` link from the Bundle's links. + + Args: + bundle (Bundle): The FHIR Bundle containing pagination links. + + Returns: + Optional[str]: The URL of the next page if available, None otherwise. + """ + if not bundle.link: + return None + + for link in bundle.link: + if link.relation == "next": + return link.url + return None + + def _sanitize_next_link(self, next_link: str) -> str: + """ + Sanitize the `next` link to ensure it is safe to use. + + Args: + next_link (str): The raw `next` link URL. + + Returns: + str: The sanitized URL. + + Raises: + ValueError: If the URL scheme or domain is invalid. + """ + parsed_url = urllib.parse.urlparse(next_link) + + # Validate scheme and netloc (domain) + if parsed_url.scheme not in ["http", "https"]: + raise ValueError("Invalid URL scheme in `next` link.") + if not parsed_url.netloc: + raise ValueError("Invalid URL domain in `next` link.") + + # Additional sanitization if necessary, e.g., removing dangerous query parameters + query_params = urllib.parse.parse_qs(parsed_url.query) + sanitized_query = {k: v for k, v in query_params.items()} + + # Rebuild the sanitized URL + sanitized_url = urllib.parse.urlunparse( + ( + parsed_url.scheme, + parsed_url.netloc, + parsed_url.path, + parsed_url.params, + urllib.parse.urlencode(sanitized_query, doseq=True), + parsed_url.fragment, + ) + ) + + return sanitized_url + + def _execute_pagination_request(self, sanitized_url: str) -> Optional[Bundle]: + """ + Execute the request to retrieve the next page using the sanitized URL. + + Args: + sanitized_url (str): The sanitized URL to fetch the next page. + + Returns: + Optional[Bundle]: The next page of results as a FHIR Bundle, or None. + + Raises: + HTTPError: If the request fails due to network issues or server errors. + """ + try: + # Use requests.get directly to make the HTTP request + response = requests.get(sanitized_url) + response.raise_for_status() + next_bundle_data = response.json() + next_bundle = Bundle(next_bundle_data) + + return next_bundle + + except requests.exceptions.HTTPError as e: + # Handle specific HTTP errors as needed, possibly including retry logic + raise e + + def iter_pages(self, first_bundle: Bundle) -> Iterable[Bundle]: + """ + Iterator that yields each page of results as a FHIR Bundle. + + Args: + first_bundle (Bundle): The first Bundle to start pagination. + + Yields: + Bundle: Each page of results as a FHIR Bundle. + """ + bundle = first_bundle + while bundle: + yield bundle + bundle = self._fetch_next_page(bundle) + diff --git a/tests/client_pagination_test.py b/tests/client_pagination_test.py new file mode 100644 index 00000000..5bf76ad8 --- /dev/null +++ b/tests/client_pagination_test.py @@ -0,0 +1,176 @@ +import unittest +from unittest.mock import patch, MagicMock + +import requests +from fhirclient.models.bundle import Bundle + +from fhirclient.client import FHIRClient + + +class TestFHIRClientPagination(unittest.TestCase): + def setUp(self) -> None: + state = { + "app_id": "AppID", + "app_secret": "AppSecret", + "scope": "user/*.read", + "redirect": "http://test.invalid/redirect", + "patient_id": "PatientID", + "server": { + "base_uri": "http://test.invalid/", + "auth_type": "none", + "auth": { + "app_id": "AppId", + }, + }, + "launch_token": "LaunchToken", + "launch_context": { + "encounter": "EncounterID", + "patient": "PatientID", + }, + "jwt_token": "JwtToken", + } + + # Confirm round trip + self.client = FHIRClient(state=state) + + self.bundle = { + "resourceType": "Bundle", + "type": "searchset", + "link": [ + {"relation": "self", "url": "http://example.com/fhir/Bundle/1"}, + {"relation": "next", "url": "http://example.com/fhir/Bundle/2"}, + ], + "entry": [ + { + "fullUrl": "http://example.com/fhir/Patient/1", + "resource": { + "resourceType": "Patient", + "id": "1", + "name": [{"family": "Doe", "given": ["John"]}], + "gender": "male", + "birthDate": "1980-01-01", + }, + }, + { + "fullUrl": "http://example.com/fhir/Patient/2", + "resource": { + "resourceType": "Patient", + "id": "2", + "name": [{"family": "Smith", "given": ["Jane"]}], + "gender": "female", + "birthDate": "1990-05-15", + }, + }, + ], + } + + def test_get_next_link(self): + next_link = self.client._get_next_link(Bundle(self.bundle)) + self.assertEqual(next_link, "http://example.com/fhir/Bundle/2") + + def test_get_next_link_no_next(self): + bundle_without_next = { + "resourceType": "Bundle", + "type": "searchset", + "link": [{"relation": "self", "url": "http://example.com/fhir/Bundle/1"}], + } + bundle = Bundle(bundle_without_next) + next_link = self.client._get_next_link(bundle) + self.assertIsNone(next_link) + + def test_sanitize_next_link_valid(self): + next_link = "http://example.com/fhir/Bundle/2?page=2&size=10" + sanitized_link = self.client._sanitize_next_link(next_link) + self.assertEqual(sanitized_link, next_link) + + def test_sanitize_next_link_invalid_scheme(self): + next_link = "ftp://example.com/fhir/Bundle/2?page=2&size=10" + with self.assertRaises(ValueError): + self.client._sanitize_next_link(next_link) + + def test_sanitize_next_link_invalid_domain(self): + next_link = "http:///fhir/Bundle/2?page=2&size=10" + with self.assertRaises(ValueError): + self.client._sanitize_next_link(next_link) + + @patch("requests.get") + def test_execute_pagination_request_success(self, mock_get): + mock_response = MagicMock() + # Set up the mock to return a specific JSON payload when its json() method is called + mock_response.json.return_value = self.bundle + mock_response.raise_for_status = MagicMock() + mock_get.return_value = mock_response + + next_link = "http://example.com/fhir/Bundle/2" + sanitized_link = self.client._sanitize_next_link(next_link) + result = self.client._execute_pagination_request(sanitized_link) + self.assertIsInstance(result, Bundle) + self.assertIn("entry", result.as_json()) + mock_get.assert_called_once_with(sanitized_link) + + @patch("requests.get") + def test_execute_pagination_request_http_error(self, mock_get): + mock_get.side_effect = requests.exceptions.HTTPError("HTTP Error") + + next_link = "http://example.com/fhir/Bundle/2" + sanitized_link = self.client._sanitize_next_link(next_link) + + with self.assertRaises(requests.exceptions.HTTPError): + self.client._execute_pagination_request(sanitized_link) + + @patch("requests.get") + def test_execute_pagination_request_returns_last_bundle_if_no_next_link(self, mock_get): + # Mock the response to simulate a Bundle with no next link + mock_response = MagicMock() + mock_response.json.return_value = { + "resourceType": "Bundle", + "type": "searchset", + "link": [{"relation": "self", "url": "http://example.com/fhir/Bundle/1"}], + "entry": [{"resource": {"resourceType": "Patient", "id": "1"}}], + } + mock_response.raise_for_status = MagicMock() + mock_get.return_value = mock_response + + sanitized_link = "http://example.com/fhir/Bundle/1" + result = self.client._execute_pagination_request(sanitized_link) + + # Check that the result is the Bundle itself, not None + self.assertIsInstance(result, Bundle) + self.assertTrue(hasattr(result, 'entry')) + mock_get.assert_called_once_with(sanitized_link) + + @patch("fhirclient.client.FHIRClient._execute_pagination_request") + def test_fetch_next_page(self, mock_execute_request): + mock_execute_request.return_value = Bundle(self.bundle) + result = self.client._fetch_next_page(Bundle(self.bundle)) + self.assertIsInstance(result, Bundle) + self.assertTrue(hasattr(result, "entry")) + mock_execute_request.assert_called_once() + + def test_fetch_next_page_no_next_link(self): + bundle_without_next = { + "resourceType": "Bundle", + "type": "searchset", + "link": [{"relation": "self", "url": "http://example.com/fhir/Bundle/1"}], + } + bundle = Bundle(bundle_without_next) + result = self.client._fetch_next_page(bundle) + self.assertIsNone(result) + + @patch("fhirclient.client.FHIRClient._fetch_next_page") + def test_iter_pages(self, mock_fetch_next_page): + # Set up the mock to return a new bundle, then None to stop iteration + mock_fetch_next_page.side_effect = [Bundle(self.bundle), None] + pages = list(self.client.iter_pages(Bundle(self.bundle))) + + # Check that two bundles were returned (the first bundle and the one from mock) + self.assertEqual(len(pages), 2) + self.assertIsInstance(pages[0], Bundle) + self.assertIsInstance(pages[1], Bundle) + + # Compare JSON representations instead of object instances + self.assertEqual(pages[0].as_json(), self.bundle) + self.assertEqual(pages[1].as_json(), self.bundle) + + # Ensure that _fetch_next_page was called twice + self.assertEqual(mock_fetch_next_page.call_count, 2)