-
Notifications
You must be signed in to change notification settings - Fork 204
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RESTClient: add PageNumberPaginator #1307
Changes from 3 commits
a65c1c0
6f510db
15a4b2b
f0ed91a
f7f7860
e95a232
b09d2aa
dd528c8
ad48c0c
5dfb678
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
from abc import ABC, abstractmethod | ||
from typing import Optional | ||
from typing import Optional, Dict, Any | ||
from urllib.parse import urlparse, urljoin | ||
|
||
from requests import Response, Request | ||
|
@@ -76,7 +76,139 @@ def update_request(self, request: Request) -> None: | |
return | ||
|
||
|
||
class OffsetPaginator(BasePaginator): | ||
class BaseNumericPaginator(BasePaginator): | ||
"""A base paginator class for paginators that use a numeric parameter | ||
for pagination, such as page number or offset. | ||
|
||
See `PageNumberPaginator` and `OffsetPaginator` for examples. | ||
""" | ||
|
||
def __init__( | ||
self, | ||
param_name: str, | ||
initial_value: int, | ||
total_path: jsonpath.TJsonPath, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When writing a paginator for Where I could put a maximum value on total, with maximum value the total is optional. does it make sense to add it? IMO those broken paginators are quite common. in that case we could thing about base class as a real There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @rudolfix thanks for suggestion. I think it could be very useful to control the pagination. |
||
value_step: int, | ||
error_message_items: str = "items", | ||
): | ||
""" | ||
Args: | ||
param_name (str): The query parameter name for the numeric value. | ||
For example, 'page'. | ||
initial_value (int): The initial value of the numeric parameter. | ||
total_path (jsonpath.TJsonPath): The JSONPath expression for the total | ||
number of items. | ||
value_step (int): The step size to increment the numeric parameter. | ||
error_message_items (str): The name of the items in the error message. | ||
Defaults to 'items'. | ||
""" | ||
super().__init__() | ||
self.param_name = param_name | ||
self.current_value = initial_value | ||
self.total_path = jsonpath.compile_path(total_path) | ||
self.value_step = value_step | ||
self.error_message_items = error_message_items | ||
|
||
def init_request(self, request: Request) -> None: | ||
if request.params is None: | ||
request.params = {} | ||
|
||
request.params[self.param_name] = self.current_value | ||
|
||
def update_state(self, response: Response) -> None: | ||
response_json = response.json() | ||
values = jsonpath.find_values(self.total_path, response_json) | ||
total = values[0] if values else None | ||
if total is None: | ||
self._handle_missing_total(response_json) | ||
|
||
try: | ||
total = int(total) | ||
except ValueError: | ||
self._handle_invalid_total(total) | ||
|
||
self.current_value += self.value_step | ||
|
||
if self.current_value >= total: | ||
self._has_next_page = False | ||
|
||
def _handle_missing_total(self, response_json: Dict[str, Any]) -> None: | ||
raise ValueError( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is good! thx! I'll probably create specialized exceptions for pagintors in #1281 |
||
f"Total {self.error_message_items} is not found in the response in" | ||
f" {self.__class__.__name__}. Expected a response with a '{self.total_path}' key, got" | ||
f" {response_json}" | ||
) | ||
|
||
def _handle_invalid_total(self, total: Any) -> None: | ||
raise ValueError( | ||
f"'{self.total_path}' is not an integer in the response in {self.__class__.__name__}." | ||
f" Expected an integer, got {total}" | ||
) | ||
|
||
def update_request(self, request: Request) -> None: | ||
if request.params is None: | ||
request.params = {} | ||
request.params[self.param_name] = self.current_value | ||
|
||
|
||
class PageNumberPaginator(BaseNumericPaginator): | ||
"""A paginator that uses page number-based pagination strategy. | ||
|
||
For example, consider an API located at `https://api.example.com/items` | ||
that supports pagination through page number and page size query parameters, | ||
and provides the total number of pages in its responses, as shown below: | ||
|
||
{ | ||
"items": [...], | ||
"total_pages": 10 | ||
} | ||
|
||
To use `PageNumberPaginator` with such an API, you can instantiate `RESTClient` | ||
as follows: | ||
|
||
from dlt.sources.helpers.rest_client import RESTClient | ||
|
||
client = RESTClient( | ||
base_url="https://api.example.com", | ||
paginator=PageNumberPaginator( | ||
total_pages_path="total_pages" | ||
) | ||
) | ||
|
||
@dlt.resource | ||
def get_items(): | ||
for page in client.paginate("/items", params={"size": 100}): | ||
yield page | ||
|
||
Note that we pass the `size` parameter in the initial request to the API. | ||
The `PageNumberPaginator` will automatically increment the page number for | ||
each subsequent request until all items are fetched. | ||
""" | ||
|
||
def __init__( | ||
self, | ||
initial_page: int = 1, | ||
page_param: str = "page", | ||
total_pages_path: jsonpath.TJsonPath = "total", | ||
): | ||
""" | ||
Args: | ||
initial_page (int): The initial page number. | ||
page_param (str): The query parameter name for the page number. | ||
Defaults to 'page'. | ||
total_pages_path (jsonpath.TJsonPath): The JSONPath expression for | ||
the total number of pages. Defaults to 'total'. | ||
""" | ||
super().__init__( | ||
param_name=page_param, | ||
initial_value=initial_page, | ||
total_path=total_pages_path, | ||
value_step=1, | ||
error_message_items="pages", | ||
) | ||
|
||
|
||
class OffsetPaginator(BaseNumericPaginator): | ||
"""A paginator that uses offset-based pagination strategy. | ||
|
||
This paginator is useful for APIs where pagination is controlled | ||
|
@@ -135,49 +267,21 @@ def __init__( | |
total_path (jsonpath.TJsonPath): The JSONPath expression for | ||
the total number of items. | ||
""" | ||
super().__init__() | ||
self.offset_param = offset_param | ||
super().__init__( | ||
param_name=offset_param, | ||
initial_value=initial_offset, | ||
total_path=total_path, | ||
value_step=initial_limit, | ||
) | ||
self.limit_param = limit_param | ||
self.total_path = jsonpath.compile_path(total_path) | ||
|
||
self.offset = initial_offset | ||
self.limit = initial_limit | ||
|
||
def init_request(self, request: Request) -> None: | ||
"""Initializes the request with the offset and limit query parameters.""" | ||
if request.params is None: | ||
request.params = {} | ||
|
||
request.params[self.offset_param] = self.offset | ||
super().init_request(request) | ||
request.params[self.limit_param] = self.limit | ||
|
||
def update_state(self, response: Response) -> None: | ||
"""Extracts the total count from the response and updates the offset.""" | ||
values = jsonpath.find_values(self.total_path, response.json()) | ||
total = values[0] if values else None | ||
|
||
if total is None: | ||
raise ValueError(f"Total count not found in response for {self.__class__.__name__}") | ||
|
||
try: | ||
total = int(total) | ||
except ValueError: | ||
raise ValueError( | ||
f"Total count is not an integer in response for {self.__class__.__name__}. " | ||
f"Expected an integer, got {total}" | ||
) | ||
|
||
self.offset += self.limit | ||
|
||
if self.offset >= total: | ||
self._has_next_page = False | ||
|
||
def update_request(self, request: Request) -> None: | ||
"""Updates the request with the offset and limit query parameters.""" | ||
if request.params is None: | ||
request.params = {} | ||
|
||
request.params[self.offset_param] = self.offset | ||
super().update_request(request) | ||
request.params[self.limit_param] = self.limit | ||
|
||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rudolfix, not sure if the name should be "Base*", this class could be useful on its own. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe
Simple*
is better for tbh both look fineThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
heh I spent 10 minutes thinking about good name :) maybe RangePaginator? because it looks like python
range