Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RESTClient: add PageNumberPaginator #1307

Merged
merged 10 commits into from
May 7, 2024
249 changes: 203 additions & 46 deletions dlt/sources/helpers/rest_client/paginators.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import Optional
from typing import Optional, Dict, Any
from urllib.parse import urlparse, urljoin

from requests import Response, Request
Expand Down Expand Up @@ -76,7 +76,173 @@ def update_request(self, request: Request) -> None:
return


class OffsetPaginator(BasePaginator):
class RangePaginator(BasePaginator):
"""A base paginator class for paginators that use a numeric parameter
for pagination, such as page number or offset.

See `PageNumberPaginator` and `OffsetPaginator` for examples.
"""

def __init__(
self,
param_name: str,
initial_value: int,
value_step: int,
maximum_value: Optional[int] = None,
total_path: Optional[jsonpath.TJsonPath] = None,
error_message_items: str = "items",
):
"""
Args:
param_name (str): The query parameter name for the numeric value.
For example, 'page'.
initial_value (int): The initial value of the numeric parameter.
value_step (int): The step size to increment the numeric parameter.
maximum_value (int, optional): The maximum value for the numeric parameter.
If provided, pagination will stop once this value is reached
or exceeded, even if more data is available. This allows you
to limit the maximum range for pagination.
If not provided, `total_path` must be specified. Defaults to None.
total_path (jsonpath.TJsonPath, optional): The JSONPath expression
for the total number of items. For example, if the JSON response is
`{"items": [...], "total": 100}`, the `total_path` would be 'total'.
If not provided, `maximum_value` must be specified.
error_message_items (str): The name of the items in the error message.
Defaults to 'items'.
"""
super().__init__()
if total_path is None and maximum_value is None:
raise ValueError("Either `total_path` or `maximum_value` must be provided.")
self.param_name = param_name
self.current_value = initial_value
self.value_step = value_step
self.maximum_value = maximum_value
self.total_path = jsonpath.compile_path(total_path) if total_path else None
self.error_message_items = error_message_items

def init_request(self, request: Request) -> None:
if request.params is None:
request.params = {}

request.params[self.param_name] = self.current_value

def update_state(self, response: Response) -> None:
total = None
if self.total_path:
response_json = response.json()
values = jsonpath.find_values(self.total_path, response_json)
total = values[0] if values else None
if total is None:
self._handle_missing_total(response_json)

try:
total = int(total)
except ValueError:
self._handle_invalid_total(total)

self.current_value += self.value_step

if (total is not None and self.current_value >= total) or (
self.maximum_value is not None and self.current_value >= self.maximum_value
):
self._has_next_page = False

def _handle_missing_total(self, response_json: Dict[str, Any]) -> None:
raise ValueError(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is good! thx! I'll probably create specialized exceptions for pagintors in #1281

f"Total {self.error_message_items} is not found in the response in"
f" {self.__class__.__name__}. Expected a response with a '{self.total_path}' key, got"
f" {response_json}"
)

def _handle_invalid_total(self, total: Any) -> None:
raise ValueError(
f"'{self.total_path}' is not an integer in the response in {self.__class__.__name__}."
f" Expected an integer, got {total}"
)

def update_request(self, request: Request) -> None:
if request.params is None:
request.params = {}
request.params[self.param_name] = self.current_value


class PageNumberPaginator(RangePaginator):
"""A paginator that uses page number-based pagination strategy.

For example, consider an API located at `https://api.example.com/items`
that supports pagination through page number and page size query parameters,
and provides the total number of pages in its responses, as shown below:

{
"items": [...],
"total_pages": 10
}

To use `PageNumberPaginator` with such an API, you can instantiate `RESTClient`
as follows:

from dlt.sources.helpers.rest_client import RESTClient

client = RESTClient(
base_url="https://api.example.com",
paginator=PageNumberPaginator(
total_path="total_pages"
)
)

@dlt.resource
def get_items():
for page in client.paginate("/items", params={"size": 100}):
yield page

Note that we pass the `size` parameter in the initial request to the API.
The `PageNumberPaginator` will automatically increment the page number for
each subsequent request until all items are fetched.

If the API does not provide the total number of pages, you can use the
`maximum_page` parameter to limit the number of pages to fetch. For example:

client = RESTClient(
base_url="https://api.example.com",
paginator=PageNumberPaginator(
maximum_page=5
)
)
...

In this case, pagination will stop after fetching 5 pages of data.
"""

def __init__(
self,
initial_page: int = 1,
page_param: str = "page",
total_path: jsonpath.TJsonPath = "total",
maximum_page: Optional[int] = None,
):
"""
Args:
initial_page (int): The initial page number.
page_param (str): The query parameter name for the page number.
Defaults to 'page'.
total_path (jsonpath.TJsonPath): The JSONPath expression for
the total number of pages. Defaults to 'total'.
maximum_page (int): The maximum page number. If provided, pagination
will stop once this page is reached or exceeded, even if more
data is available. This allows you to limit the maximum number
of pages for pagination. Defaults to None.
"""
super().__init__(
param_name=page_param,
initial_value=initial_page,
total_path=total_path,
value_step=1,
maximum_value=maximum_page,
error_message_items="pages",
)


class OffsetPaginator(RangePaginator):
"""A paginator that uses offset-based pagination strategy.

This paginator is useful for APIs where pagination is controlled
Expand All @@ -100,84 +266,75 @@ class OffsetPaginator(BasePaginator):
client = RESTClient(
base_url="https://api.example.com",
paginator=OffsetPaginator(
initial_limit=100,
limit=100,
total_path="total"
)
)
@dlt.resource
def get_items():
for page in client.paginate("/items", params={"limit": 100}):
for page in client.paginate("/items"):
yield page

Note that we pass the `limit` parameter in the initial request to the API.
The `OffsetPaginator` will automatically increment the offset for each
subsequent request until all items are fetched.

If the API does not provide the total count of items, you can use the
`maximum_offset` parameter to limit the number of items to fetch. For example:

client = RESTClient(
base_url="https://api.example.com",
paginator=OffsetPaginator(
limit=100,
maximum_offset=1000
)
)
...

In this case, pagination will stop after fetching 1000 items.
"""

def __init__(
self,
initial_limit: int,
initial_offset: int = 0,
limit: int,
offset: int = 0,
offset_param: str = "offset",
limit_param: str = "limit",
total_path: jsonpath.TJsonPath = "total",
maximum_offset: Optional[int] = None,
) -> None:
"""
Args:
initial_limit (int): The maximum number of items to retrieve
limit (int): The maximum number of items to retrieve
in each request.
initial_offset (int): The offset for the first request.
offset (int): The offset for the first request.
Defaults to 0.
offset_param (str): The query parameter name for the offset.
Defaults to 'offset'.
limit_param (str): The query parameter name for the limit.
Defaults to 'limit'.
total_path (jsonpath.TJsonPath): The JSONPath expression for
the total number of items.
maximum_offset (int): The maximum offset value. If provided,
pagination will stop once this offset is reached or exceeded,
even if more data is available. This allows you to limit the
maximum range for pagination. Defaults to None.
"""
super().__init__()
self.offset_param = offset_param
super().__init__(
param_name=offset_param,
initial_value=offset,
total_path=total_path,
value_step=limit,
maximum_value=maximum_offset,
)
self.limit_param = limit_param
self.total_path = jsonpath.compile_path(total_path)

self.offset = initial_offset
self.limit = initial_limit
self.limit = limit

def init_request(self, request: Request) -> None:
"""Initializes the request with the offset and limit query parameters."""
if request.params is None:
request.params = {}

request.params[self.offset_param] = self.offset
super().init_request(request)
request.params[self.limit_param] = self.limit

def update_state(self, response: Response) -> None:
"""Extracts the total count from the response and updates the offset."""
values = jsonpath.find_values(self.total_path, response.json())
total = values[0] if values else None

if total is None:
raise ValueError(f"Total count not found in response for {self.__class__.__name__}")

try:
total = int(total)
except ValueError:
raise ValueError(
f"Total count is not an integer in response for {self.__class__.__name__}. "
f"Expected an integer, got {total}"
)

self.offset += self.limit

if self.offset >= total:
self._has_next_page = False

def update_request(self, request: Request) -> None:
"""Updates the request with the offset and limit query parameters."""
if request.params is None:
request.params = {}

request.params[self.offset_param] = self.offset
super().update_request(request)
request.params[self.limit_param] = self.limit


Expand Down
Loading
Loading