From 8ed879a971d51606d2cb657b5b3306a63d12e472 Mon Sep 17 00:00:00 2001 From: Anton Burnashev Date: Tue, 7 May 2024 18:23:50 +0200 Subject: [PATCH] RESTClient: add PageNumberPaginator (#1307) --- dlt/sources/helpers/rest_client/paginators.py | 249 ++++++++++++++---- .../helpers/rest_client/test_paginators.py | 75 +++++- 2 files changed, 274 insertions(+), 50 deletions(-) diff --git a/dlt/sources/helpers/rest_client/paginators.py b/dlt/sources/helpers/rest_client/paginators.py index 1c981e27a5..fc0ce459ad 100644 --- a/dlt/sources/helpers/rest_client/paginators.py +++ b/dlt/sources/helpers/rest_client/paginators.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import Optional +from typing import Optional, Dict, Any from urllib.parse import urlparse, urljoin from requests import Response, Request @@ -76,7 +76,173 @@ def update_request(self, request: Request) -> None: return -class OffsetPaginator(BasePaginator): +class RangePaginator(BasePaginator): + """A base paginator class for paginators that use a numeric parameter + for pagination, such as page number or offset. + + See `PageNumberPaginator` and `OffsetPaginator` for examples. + """ + + def __init__( + self, + param_name: str, + initial_value: int, + value_step: int, + maximum_value: Optional[int] = None, + total_path: Optional[jsonpath.TJsonPath] = None, + error_message_items: str = "items", + ): + """ + Args: + param_name (str): The query parameter name for the numeric value. + For example, 'page'. + initial_value (int): The initial value of the numeric parameter. + value_step (int): The step size to increment the numeric parameter. + maximum_value (int, optional): The maximum value for the numeric parameter. + If provided, pagination will stop once this value is reached + or exceeded, even if more data is available. This allows you + to limit the maximum range for pagination. + If not provided, `total_path` must be specified. Defaults to None. + total_path (jsonpath.TJsonPath, optional): The JSONPath expression + for the total number of items. For example, if the JSON response is + `{"items": [...], "total": 100}`, the `total_path` would be 'total'. + If not provided, `maximum_value` must be specified. + error_message_items (str): The name of the items in the error message. + Defaults to 'items'. + """ + super().__init__() + if total_path is None and maximum_value is None: + raise ValueError("Either `total_path` or `maximum_value` must be provided.") + self.param_name = param_name + self.current_value = initial_value + self.value_step = value_step + self.maximum_value = maximum_value + self.total_path = jsonpath.compile_path(total_path) if total_path else None + self.error_message_items = error_message_items + + def init_request(self, request: Request) -> None: + if request.params is None: + request.params = {} + + request.params[self.param_name] = self.current_value + + def update_state(self, response: Response) -> None: + total = None + if self.total_path: + response_json = response.json() + values = jsonpath.find_values(self.total_path, response_json) + total = values[0] if values else None + if total is None: + self._handle_missing_total(response_json) + + try: + total = int(total) + except ValueError: + self._handle_invalid_total(total) + + self.current_value += self.value_step + + if (total is not None and self.current_value >= total) or ( + self.maximum_value is not None and self.current_value >= self.maximum_value + ): + self._has_next_page = False + + def _handle_missing_total(self, response_json: Dict[str, Any]) -> None: + raise ValueError( + f"Total {self.error_message_items} is not found in the response in" + f" {self.__class__.__name__}. Expected a response with a '{self.total_path}' key, got" + f" {response_json}" + ) + + def _handle_invalid_total(self, total: Any) -> None: + raise ValueError( + f"'{self.total_path}' is not an integer in the response in {self.__class__.__name__}." + f" Expected an integer, got {total}" + ) + + def update_request(self, request: Request) -> None: + if request.params is None: + request.params = {} + request.params[self.param_name] = self.current_value + + +class PageNumberPaginator(RangePaginator): + """A paginator that uses page number-based pagination strategy. + + For example, consider an API located at `https://api.example.com/items` + that supports pagination through page number and page size query parameters, + and provides the total number of pages in its responses, as shown below: + + { + "items": [...], + "total_pages": 10 + } + + To use `PageNumberPaginator` with such an API, you can instantiate `RESTClient` + as follows: + + from dlt.sources.helpers.rest_client import RESTClient + + client = RESTClient( + base_url="https://api.example.com", + paginator=PageNumberPaginator( + total_path="total_pages" + ) + ) + + @dlt.resource + def get_items(): + for page in client.paginate("/items", params={"size": 100}): + yield page + + Note that we pass the `size` parameter in the initial request to the API. + The `PageNumberPaginator` will automatically increment the page number for + each subsequent request until all items are fetched. + + If the API does not provide the total number of pages, you can use the + `maximum_page` parameter to limit the number of pages to fetch. For example: + + client = RESTClient( + base_url="https://api.example.com", + paginator=PageNumberPaginator( + maximum_page=5 + ) + ) + ... + + In this case, pagination will stop after fetching 5 pages of data. + """ + + def __init__( + self, + initial_page: int = 1, + page_param: str = "page", + total_path: jsonpath.TJsonPath = "total", + maximum_page: Optional[int] = None, + ): + """ + Args: + initial_page (int): The initial page number. + page_param (str): The query parameter name for the page number. + Defaults to 'page'. + total_path (jsonpath.TJsonPath): The JSONPath expression for + the total number of pages. Defaults to 'total'. + maximum_page (int): The maximum page number. If provided, pagination + will stop once this page is reached or exceeded, even if more + data is available. This allows you to limit the maximum number + of pages for pagination. Defaults to None. + """ + super().__init__( + param_name=page_param, + initial_value=initial_page, + total_path=total_path, + value_step=1, + maximum_value=maximum_page, + error_message_items="pages", + ) + + +class OffsetPaginator(RangePaginator): """A paginator that uses offset-based pagination strategy. This paginator is useful for APIs where pagination is controlled @@ -100,33 +266,47 @@ class OffsetPaginator(BasePaginator): client = RESTClient( base_url="https://api.example.com", paginator=OffsetPaginator( - initial_limit=100, + limit=100, total_path="total" ) ) @dlt.resource def get_items(): - for page in client.paginate("/items", params={"limit": 100}): + for page in client.paginate("/items"): yield page - Note that we pass the `limit` parameter in the initial request to the API. The `OffsetPaginator` will automatically increment the offset for each subsequent request until all items are fetched. + + If the API does not provide the total count of items, you can use the + `maximum_offset` parameter to limit the number of items to fetch. For example: + + client = RESTClient( + base_url="https://api.example.com", + paginator=OffsetPaginator( + limit=100, + maximum_offset=1000 + ) + ) + ... + + In this case, pagination will stop after fetching 1000 items. """ def __init__( self, - initial_limit: int, - initial_offset: int = 0, + limit: int, + offset: int = 0, offset_param: str = "offset", limit_param: str = "limit", total_path: jsonpath.TJsonPath = "total", + maximum_offset: Optional[int] = None, ) -> None: """ Args: - initial_limit (int): The maximum number of items to retrieve + limit (int): The maximum number of items to retrieve in each request. - initial_offset (int): The offset for the first request. + offset (int): The offset for the first request. Defaults to 0. offset_param (str): The query parameter name for the offset. Defaults to 'offset'. @@ -134,50 +314,27 @@ def __init__( Defaults to 'limit'. total_path (jsonpath.TJsonPath): The JSONPath expression for the total number of items. + maximum_offset (int): The maximum offset value. If provided, + pagination will stop once this offset is reached or exceeded, + even if more data is available. This allows you to limit the + maximum range for pagination. Defaults to None. """ - super().__init__() - self.offset_param = offset_param + super().__init__( + param_name=offset_param, + initial_value=offset, + total_path=total_path, + value_step=limit, + maximum_value=maximum_offset, + ) self.limit_param = limit_param - self.total_path = jsonpath.compile_path(total_path) - - self.offset = initial_offset - self.limit = initial_limit + self.limit = limit def init_request(self, request: Request) -> None: - """Initializes the request with the offset and limit query parameters.""" - if request.params is None: - request.params = {} - - request.params[self.offset_param] = self.offset + super().init_request(request) request.params[self.limit_param] = self.limit - def update_state(self, response: Response) -> None: - """Extracts the total count from the response and updates the offset.""" - values = jsonpath.find_values(self.total_path, response.json()) - total = values[0] if values else None - - if total is None: - raise ValueError(f"Total count not found in response for {self.__class__.__name__}") - - try: - total = int(total) - except ValueError: - raise ValueError( - f"Total count is not an integer in response for {self.__class__.__name__}. " - f"Expected an integer, got {total}" - ) - - self.offset += self.limit - - if self.offset >= total: - self._has_next_page = False - def update_request(self, request: Request) -> None: - """Updates the request with the offset and limit query parameters.""" - if request.params is None: - request.params = {} - - request.params[self.offset_param] = self.offset + super().update_request(request) request.params[self.limit_param] = self.limit diff --git a/tests/sources/helpers/rest_client/test_paginators.py b/tests/sources/helpers/rest_client/test_paginators.py index 23a5775b6d..9ca54e814c 100644 --- a/tests/sources/helpers/rest_client/test_paginators.py +++ b/tests/sources/helpers/rest_client/test_paginators.py @@ -7,6 +7,7 @@ from dlt.sources.helpers.rest_client.paginators import ( SinglePagePaginator, OffsetPaginator, + PageNumberPaginator, HeaderLinkPaginator, JSONResponsePaginator, ) @@ -174,10 +175,10 @@ def test_update_state_with_next(self): class TestOffsetPaginator: def test_update_state(self): - paginator = OffsetPaginator(initial_offset=0, initial_limit=10) + paginator = OffsetPaginator(offset=0, limit=10) response = Mock(Response, json=lambda: {"total": 20}) paginator.update_state(response) - assert paginator.offset == 10 + assert paginator.current_value == 10 assert paginator.has_next_page is True # Test for reaching the end @@ -188,7 +189,7 @@ def test_update_state_with_string_total(self): paginator = OffsetPaginator(0, 10) response = Mock(Response, json=lambda: {"total": "20"}) paginator.update_state(response) - assert paginator.offset == 10 + assert paginator.current_value == 10 assert paginator.has_next_page is True def test_update_state_with_invalid_total(self): @@ -204,7 +205,7 @@ def test_update_state_without_total(self): paginator.update_state(response) def test_init_request(self): - paginator = OffsetPaginator(initial_offset=123, initial_limit=42) + paginator = OffsetPaginator(offset=123, limit=42) request = Mock(Request) request.params = {} @@ -225,3 +226,69 @@ def test_init_request(self): assert next_request.params["offset"] == 165 assert next_request.params["limit"] == 42 + + def test_maximum_offset(self): + paginator = OffsetPaginator(offset=0, limit=50, maximum_offset=100, total_path=None) + response = Mock(Response, json=lambda: {"items": []}) + paginator.update_state(response) # Offset 0 to 50 + assert paginator.current_value == 50 + assert paginator.has_next_page is True + + paginator.update_state(response) # Offset 50 to 100 + assert paginator.current_value == 100 + assert paginator.has_next_page is False + + +class TestPageNumberPaginator: + def test_update_state(self): + paginator = PageNumberPaginator(initial_page=1, total_path="total_pages") + response = Mock(Response, json=lambda: {"total_pages": 3}) + paginator.update_state(response) + assert paginator.current_value == 2 + assert paginator.has_next_page is True + + # Test for reaching the end + paginator.update_state(response) + assert paginator.has_next_page is False + + def test_update_state_with_string_total_pages(self): + paginator = PageNumberPaginator(1) + response = Mock(Response, json=lambda: {"total": "3"}) + paginator.update_state(response) + assert paginator.current_value == 2 + assert paginator.has_next_page is True + + def test_update_state_with_invalid_total_pages(self): + paginator = PageNumberPaginator(1) + response = Mock(Response, json=lambda: {"total_pages": "invalid"}) + with pytest.raises(ValueError): + paginator.update_state(response) + + def test_update_state_without_total_pages(self): + paginator = PageNumberPaginator(1) + response = Mock(Response, json=lambda: {}) + with pytest.raises(ValueError): + paginator.update_state(response) + + def test_update_request(self): + paginator = PageNumberPaginator(initial_page=1, page_param="page") + request = Mock(Request) + response = Mock(Response, json=lambda: {"total": 3}) + paginator.update_state(response) + request.params = {} + paginator.update_request(request) + assert request.params["page"] == 2 + paginator.update_state(response) + paginator.update_request(request) + assert request.params["page"] == 3 + + def test_maximum_page(self): + paginator = PageNumberPaginator(initial_page=1, maximum_page=3, total_path=None) + response = Mock(Response, json=lambda: {"items": []}) + paginator.update_state(response) # Page 1 + assert paginator.current_value == 2 + assert paginator.has_next_page is True + + paginator.update_state(response) # Page 2 + assert paginator.current_value == 3 + assert paginator.has_next_page is False