Skip to content

Commit

Permalink
RangePaginator: Stops pagination in case of page without data items
Browse files Browse the repository at this point in the history
  • Loading branch information
willi-mueller committed Aug 9, 2024
1 parent a0e2996 commit 8b4fc8c
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 27 deletions.
2 changes: 1 addition & 1 deletion dlt/sources/helpers/rest_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ def raise_for_status(response: Response, *args: Any, **kwargs: Any) -> None:

if paginator is None:
paginator = self.detect_paginator(response, data)
paginator.update_state(response)
paginator.update_state(response, data)
paginator.update_request(request)

# yield data with context
Expand Down
62 changes: 37 additions & 25 deletions dlt/sources/helpers/rest_client/paginators.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import warnings
from abc import ABC, abstractmethod
from typing import Optional, Dict, Any
from typing import Any, Dict, List, Optional
from urllib.parse import urlparse, urljoin

from requests import Response, Request
Expand Down Expand Up @@ -39,7 +39,7 @@ def init_request(self, request: Request) -> None: # noqa: B027, optional overri
pass

@abstractmethod
def update_state(self, response: Response) -> None:
def update_state(self, response: Response, data: List[Any] = None) -> None:
"""Updates the paginator's state based on the response from the API.
This method should extract necessary pagination details (like next page
Expand Down Expand Up @@ -73,7 +73,7 @@ def __str__(self) -> str:
class SinglePagePaginator(BasePaginator):
"""A paginator for single-page API responses."""

def update_state(self, response: Response) -> None:
def update_state(self, response: Response, data: List[Any] = None) -> None:
self._has_next_page = False

def update_request(self, request: Request) -> None:
Expand All @@ -96,6 +96,7 @@ def __init__(
maximum_value: Optional[int] = None,
total_path: Optional[jsonpath.TJsonPath] = None,
error_message_items: str = "items",
stop_after_empty_page: bool = False,
):
"""
Args:
Expand Down Expand Up @@ -127,33 +128,40 @@ def __init__(
self.maximum_value = maximum_value
self.total_path = jsonpath.compile_path(total_path) if total_path else None
self.error_message_items = error_message_items
self.stop_after_empty_page = stop_after_empty_page

def init_request(self, request: Request) -> None:
if request.params is None:
request.params = {}

request.params[self.param_name] = self.current_value

def update_state(self, response: Response) -> None:
total = None
if self.total_path:
response_json = response.json()
values = jsonpath.find_values(self.total_path, response_json)
total = values[0] if values else None
if total is None:
self._handle_missing_total(response_json)

try:
total = int(total)
except ValueError:
self._handle_invalid_total(total)

self.current_value += self.value_step

if (total is not None and self.current_value >= total + self.base_index) or (
self.maximum_value is not None and self.current_value >= self.maximum_value
):
def update_state(self, response: Response, data: List[Any] = None) -> None:
if self._stop_after_this_page(data):
self._has_next_page = False
else:
total = None
if self.total_path:
response_json = response.json()
values = jsonpath.find_values(self.total_path, response_json)
total = values[0] if values else None
if total is None:
self._handle_missing_total(response_json)

try:
total = int(total)
except ValueError:
self._handle_invalid_total(total)

self.current_value += self.value_step

if (total is not None and self.current_value >= total + self.base_index) or (
self.maximum_value is not None and self.current_value >= self.maximum_value
):
self._has_next_page = False

def _stop_after_this_page(self, data: List[Any]) -> bool:
return self.stop_after_empty_page and data == []

def _handle_missing_total(self, response_json: Dict[str, Any]) -> None:
raise ValueError(
Expand Down Expand Up @@ -229,6 +237,7 @@ def __init__(
page_param: str = "page",
total_path: jsonpath.TJsonPath = "total",
maximum_page: Optional[int] = None,
stop_after_empty_page: bool = False,
):
"""
Args:
Expand Down Expand Up @@ -260,6 +269,7 @@ def __init__(
value_step=1,
maximum_value=maximum_page,
error_message_items="pages",
stop_after_empty_page=stop_after_empty_page,
)

def __str__(self) -> str:
Expand Down Expand Up @@ -330,6 +340,7 @@ def __init__(
limit_param: str = "limit",
total_path: jsonpath.TJsonPath = "total",
maximum_offset: Optional[int] = None,
stop_after_empty_page: bool = False,
) -> None:
"""
Args:
Expand All @@ -356,6 +367,7 @@ def __init__(
total_path=total_path,
value_step=limit,
maximum_value=maximum_offset,
stop_after_empty_page=stop_after_empty_page,
)
self.limit_param = limit_param
self.limit = limit
Expand Down Expand Up @@ -484,7 +496,7 @@ def __init__(self, links_next_key: str = "next") -> None:
super().__init__()
self.links_next_key = links_next_key

def update_state(self, response: Response) -> None:
def update_state(self, response: Response, data: List[Any] = None) -> None:
"""Extracts the next page URL from the 'Link' header in the response."""
self._next_reference = response.links.get(self.links_next_key, {}).get("url")

Expand Down Expand Up @@ -539,7 +551,7 @@ def __init__(
super().__init__()
self.next_url_path = jsonpath.compile_path(next_url_path)

def update_state(self, response: Response) -> None:
def update_state(self, response: Response, data: List[Any] = None) -> None:
"""Extracts the next page URL from the JSON response."""
values = jsonpath.find_values(self.next_url_path, response.json())
self._next_reference = values[0] if values else None
Expand Down Expand Up @@ -618,7 +630,7 @@ def __init__(
self.cursor_path = jsonpath.compile_path(cursor_path)
self.cursor_param = cursor_param

def update_state(self, response: Response) -> None:
def update_state(self, response: Response, data: List[Any] = None) -> None:
"""Extracts the cursor value from the JSON response."""
values = jsonpath.find_values(self.cursor_path, response.json())
self._next_reference = values[0] if values else None
Expand Down
2 changes: 1 addition & 1 deletion tests/sources/helpers/rest_client/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ def test_paginate_json_body_without_params(self, rest_client) -> None:
posts_skip = (DEFAULT_TOTAL_PAGES - 3) * DEFAULT_PAGE_SIZE

class JSONBodyPageCursorPaginator(BaseReferencePaginator):
def update_state(self, response):
def update_state(self, response, data):
self._next_reference = response.json().get("next_page")

def update_request(self, request):
Expand Down
29 changes: 29 additions & 0 deletions tests/sources/helpers/rest_client/test_paginators.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from typing import Any, List
from unittest.mock import Mock

import pytest
Expand Down Expand Up @@ -312,6 +313,19 @@ def test_client_pagination(self, rest_client):

assert_pagination(pages)

def test_stop_after_empty_page(self):
paginator = OffsetPaginator(
offset=0,
limit=50,
maximum_offset=100,
total_path=None,
stop_after_empty_page=True,
)
response = Mock(Response, json=lambda: {"items": []})
no_data_found: List[Any] = []
paginator.update_state(response, no_data_found) # Page 1
assert paginator.has_next_page is False


@pytest.mark.usefixtures("mock_api_server")
class TestPageNumberPaginator:
Expand Down Expand Up @@ -372,6 +386,21 @@ def test_maximum_page(self):
assert paginator.current_value == 3
assert paginator.has_next_page is False

def test_stop_after_empty_page(self):
paginator = PageNumberPaginator(
base_page=1,
page=1,
maximum_page=5,
stop_after_empty_page=True,
total_path=None,
)
response = Mock(Response, json=lambda: {"items": []})
no_data_found: List[Any] = []
assert paginator.has_next_page is True
paginator.update_state(response, no_data_found)
assert paginator.current_value == 1
assert paginator.has_next_page is False

def test_client_pagination_one_based(self, rest_client):
pages_iter = rest_client.paginate(
"/posts",
Expand Down

0 comments on commit 8b4fc8c

Please sign in to comment.