diff --git a/dlt/sources/helpers/rest_client/client.py b/dlt/sources/helpers/rest_client/client.py index dc3591b698..ed10a5c9e5 100644 --- a/dlt/sources/helpers/rest_client/client.py +++ b/dlt/sources/helpers/rest_client/client.py @@ -13,16 +13,15 @@ from requests import Session as BaseSession # noqa: I251 from requests import Response, Request -from dlt.common import jsonpath -from dlt.common import logger +from dlt.common import jsonpath, logger from dlt.sources.helpers.requests.retry import Client from .typing import HTTPMethodBasic, HTTPMethod, Hooks from .paginators import BasePaginator from .auth import AuthConfigBase -from .detector import PaginatorFactory, find_records -from .exceptions import IgnoreResponseException +from .detector import PaginatorFactory, find_response_page_data +from .exceptions import IgnoreResponseException, PaginatorNotFound from .utils import join_url @@ -232,8 +231,28 @@ def extract_response(self, response: Response, data_selector: jsonpath.TJsonPath data: Any = jsonpath.find_values(data_selector, response.json()) # extract if single item selected data = data[0] if isinstance(data, list) and len(data) == 1 else data + if isinstance(data, list): + length_info = f" with length {len(data)}" + else: + length_info = "" + logger.info( + f"Extracted data of type {type(data).__name__} from path" + f" {data_selector}{length_info}" + ) else: - data = find_records(response.json()) + path, data = find_response_page_data(response.json()) + # if list is detected, it is probably a paged data + if isinstance(data, list): + # store it to reuse on next response + self.data_selector = ".".join(path) + logger.info( + f"Detected page data at path: {self.data_selector} type: list length:" + f" {len(data)}" + ) + else: + logger.info( + f"Detected single page data at path: {path} type: {type(data).__name__}" + ) # wrap single pages into lists if not isinstance(data, list): data = [data] @@ -248,8 +267,20 @@ def detect_paginator(self, response: Response) -> BasePaginator: Returns: BasePaginator: The paginator instance that was detected. """ - paginator = self.pagination_factory.create_paginator(response) + paginator, score = self.pagination_factory.create_paginator(response) if paginator is None: - raise ValueError(f"No suitable paginator found for the response at {response.url}") - logger.info(f"Detected paginator: {paginator.__class__.__name__}") + raise PaginatorNotFound( + f"No suitable paginator found for the response at {response.url}" + ) + if score == 1.0: + logger.info(f"Detected paginator: {paginator}") + elif score == 0.0: + logger.warning( + f"Fallback paginator used: {paginator}. Please provide right paginator manually." + ) + else: + logger.warning( + "Please verify the paginator settings. We strongly suggest to use explicit" + " instance of the paginator as some settings may not be guessed correctly." + ) return paginator diff --git a/dlt/sources/helpers/rest_client/detector.py b/dlt/sources/helpers/rest_client/detector.py index 136b73eba3..0b0f41a8e6 100644 --- a/dlt/sources/helpers/rest_client/detector.py +++ b/dlt/sources/helpers/rest_client/detector.py @@ -1,5 +1,6 @@ import re from typing import List, Dict, Any, Tuple, Union, Optional, Callable, Iterable +from urllib.parse import urlparse from requests import Response @@ -7,7 +8,9 @@ BasePaginator, HeaderLinkPaginator, JSONResponsePaginator, + JSONResponseCursorPaginator, SinglePagePaginator, + PageNumberPaginator, ) RECORD_KEY_PATTERNS = frozenset( @@ -36,123 +39,179 @@ ] ) -NEXT_PAGE_KEY_PATTERNS = frozenset(["next", "nextpage", "nexturl"]) +NEXT_PAGE_KEY_PATTERNS = frozenset(["next"]) NEXT_PAGE_DICT_KEY_PATTERNS = frozenset(["href", "url"]) +TOTAL_PAGES_KEYS = frozenset(["^total_pages$", "^pages$", "^totalpages$"]) def single_entity_path(path: str) -> bool: """Checks if path ends with path param indicating that single object is returned""" - return re.search(r"\{([a-zA-Z_][a-zA-Z0-9_]*)\}$", path) is not None + return re.search(r"\{([a-zA-Z_][a-zA-Z0-9_]*)\}/?$", path) is not None + + +def matches_any_pattern(key: str, patterns: Iterable[str]) -> bool: + normalized_key = key.lower() + return any(re.match(pattern, normalized_key) for pattern in patterns) def find_all_lists( dict_: Dict[str, Any], - result: List[Tuple[int, str, List[Any]]] = None, - level: int = 0, -) -> List[Tuple[int, str, List[Any]]]: + path: Tuple[str, ...] = (), + result: List[Tuple[Tuple[str, ...], List[Any]]] = None, +) -> List[Tuple[Tuple[str, ...], List[Any]]]: """Recursively looks for lists in dict_ and returns tuples - in format (nesting level, dictionary key, list) + in format (dictionary keys, list) """ - if level > 2: - return [] + if len(path) > 2: + return None for key, value in dict_.items(): if isinstance(value, list): - result.append((level, key, value)) + result.append(((*path, key), value)) elif isinstance(value, dict): - find_all_lists(value, result, level + 1) + find_all_lists(value, path=(*path, key), result=result) return result -def find_records( +def find_response_page_data( response: Union[Dict[str, Any], List[Any], Any], -) -> Union[Dict[str, Any], List[Any], Any]: +) -> Tuple[Tuple[str, ...], Any]: + """Finds a path to response data, assuming that data is a list, returns a tuple(path, data)""" # when a list was returned (or in rare case a simple type or null) if not isinstance(response, dict): - return response + return (("$",), response) lists = find_all_lists(response, result=[]) if len(lists) == 0: # could not detect anything - return response + return (("$",), response) # we are ordered by nesting level, find the most suitable list try: return next( - list_info[2] + list_info for list_info in lists - if list_info[1] in RECORD_KEY_PATTERNS and list_info[1] not in NON_RECORD_KEY_PATTERNS + if list_info[0][-1] in RECORD_KEY_PATTERNS + and list_info[0][-1] not in NON_RECORD_KEY_PATTERNS ) except StopIteration: # return the least nested element - return lists[0][2] - - -def matches_any_pattern(key: str, patterns: Iterable[str]) -> bool: - normalized_key = key.lower() - return any(pattern in normalized_key for pattern in patterns) + return lists[0] def find_next_page_path( - dictionary: Dict[str, Any], path: Optional[List[str]] = None -) -> Optional[List[str]]: - if not isinstance(dictionary, dict): - return None - - if path is None: - path = [] + response: Dict[str, Any], path: Tuple[str, ...] = () +) -> Tuple[Tuple[str, ...], Any]: + if not isinstance(response, dict): + return (None, None) - for key, value in dictionary.items(): + for key, value in response.items(): if matches_any_pattern(key, NEXT_PAGE_KEY_PATTERNS): if isinstance(value, dict): - for dict_key in value: - if matches_any_pattern(dict_key, NEXT_PAGE_DICT_KEY_PATTERNS): - return [*path, key, dict_key] - return [*path, key] + for dict_key, dict_value in value.items(): + if matches_any_pattern(dict_key, NEXT_PAGE_DICT_KEY_PATTERNS) and isinstance( + dict_value, (str, int, float) + ): + return ((*path, key, dict_key), dict_value) + else: + if isinstance(value, (str, int, float)): + return ((*path, key), value) + + if isinstance(value, dict): + result = find_next_page_path(value, (*path, key)) + if result != (None, None): + return result + + return (None, None) + + +def find_total_pages_path( + response: Dict[str, Any], path: Tuple[str, ...] = () +) -> Tuple[Tuple[str, ...], Any]: + if not isinstance(response, dict): + return (None, None) + + for key, value in response.items(): + if matches_any_pattern(key, TOTAL_PAGES_KEYS) and isinstance(value, (str, int, float)): + assert key != "pageSize" + return ((*path, key), value) if isinstance(value, dict): - result = find_next_page_path(value, [*path, key]) - if result: + result = find_total_pages_path(value, (*path, key)) + if result != (None, None): return result - return None + return (None, None) -def header_links_detector(response: Response) -> Optional[HeaderLinkPaginator]: +def header_links_detector(response: Response) -> Tuple[HeaderLinkPaginator, float]: links_next_key = "next" if response.links.get(links_next_key): - return HeaderLinkPaginator() - return None + return HeaderLinkPaginator(), 1.0 + return None, None -def json_links_detector(response: Response) -> Optional[JSONResponsePaginator]: +def json_links_detector(response: Response) -> Tuple[JSONResponsePaginator, float]: dictionary = response.json() - next_path_parts = find_next_page_path(dictionary) + next_path_parts, next_href = find_next_page_path(dictionary) if not next_path_parts: - return None + return None, None - return JSONResponsePaginator(next_url_path=".".join(next_path_parts)) + try: + urlparse(next_href) + if next_href.startswith("http") or next_href.startswith("/"): + return JSONResponsePaginator(next_url_path=".".join(next_path_parts)), 1.0 + except Exception: + pass + + return None, None + + +def cursor_paginator_detector(response: Response) -> Tuple[JSONResponseCursorPaginator, float]: + dictionary = response.json() + cursor_path_parts, _ = find_next_page_path(dictionary) + if not cursor_path_parts: + return None, None -def single_page_detector(response: Response) -> Optional[SinglePagePaginator]: + return JSONResponseCursorPaginator(cursor_path=".".join(cursor_path_parts)), 0.5 + + +def pages_number_paginator_detector(response: Response) -> Tuple[PageNumberPaginator, float]: + total_pages_path, total_pages = find_total_pages_path(response.json()) + if not total_pages_path: + return None, None + + try: + int(total_pages) + return PageNumberPaginator(total_path=".".join(total_pages_path)), 0.5 + except Exception: + pass + + return None, None + + +def single_page_detector(response: Response) -> Tuple[SinglePagePaginator, float]: """This is our fallback paginator, also for results that are single entities""" - return SinglePagePaginator() + return SinglePagePaginator(), 0.0 class PaginatorFactory: - def __init__(self, detectors: List[Callable[[Response], Optional[BasePaginator]]] = None): + def __init__(self, detectors: List[Callable[[Response], Tuple[BasePaginator, float]]] = None): if detectors is None: detectors = [ header_links_detector, json_links_detector, + pages_number_paginator_detector, + cursor_paginator_detector, single_page_detector, ] self.detectors = detectors - def create_paginator(self, response: Response) -> Optional[BasePaginator]: + def create_paginator(self, response: Response) -> Tuple[BasePaginator, float]: for detector in self.detectors: - paginator = detector(response) + paginator, score = detector(response) if paginator: - return paginator - return None + return paginator, score + return None, None diff --git a/dlt/sources/helpers/rest_client/exceptions.py b/dlt/sources/helpers/rest_client/exceptions.py index 4b4d555ca7..483daea40f 100644 --- a/dlt/sources/helpers/rest_client/exceptions.py +++ b/dlt/sources/helpers/rest_client/exceptions.py @@ -1,5 +1,17 @@ from dlt.common.exceptions import DltException -class IgnoreResponseException(DltException): +class RESTClientException(DltException): + pass + + +class IgnoreResponseException(RESTClientException): + pass + + +class PaginatorSetupError(RESTClientException, ValueError): + pass + + +class PaginatorNotFound(RESTClientException): pass diff --git a/dlt/sources/helpers/rest_client/paginators.py b/dlt/sources/helpers/rest_client/paginators.py index 11efd0215a..378a5f369d 100644 --- a/dlt/sources/helpers/rest_client/paginators.py +++ b/dlt/sources/helpers/rest_client/paginators.py @@ -216,7 +216,7 @@ def get_items(): def __init__( self, - initial_page: int = 1, + initial_page: int = 0, page_param: str = "page", total_path: jsonpath.TJsonPath = "total", maximum_page: Optional[int] = None, @@ -242,6 +242,13 @@ def __init__( error_message_items="pages", ) + def __str__(self) -> str: + return ( + super().__str__() + + f": current page: {self.current_value} page_param: {self.param_name} total_path:" + f" {self.total_path} maximum_value: {self.maximum_value}" + ) + class OffsetPaginator(RangePaginator): """A paginator that uses offset-based pagination strategy. @@ -339,6 +346,14 @@ def update_request(self, request: Request) -> None: super().update_request(request) request.params[self.limit_param] = self.limit + def __str__(self) -> str: + return ( + super().__str__() + + f": current offset: {self.current_value} offset_param: {self.param_name} limit:" + f" {self.value_step} total_path: {self.total_path} maximum_value:" + f" {self.maximum_value}" + ) + class BaseReferencePaginator(BasePaginator): """A base paginator class for paginators that use a reference to the next @@ -447,6 +462,9 @@ def update_state(self, response: Response) -> None: """Extracts the next page URL from the 'Link' header in the response.""" self._next_reference = response.links.get(self.links_next_key, {}).get("url") + def __str__(self) -> str: + return super().__str__() + f": links_next_key: {self.links_next_key}" + class JSONResponsePaginator(BaseNextUrlPaginator): """Locates the next page URL within the JSON response body. The key @@ -500,6 +518,9 @@ def update_state(self, response: Response) -> None: values = jsonpath.find_values(self.next_url_path, response.json()) self._next_reference = values[0] if values else None + def __str__(self) -> str: + return super().__str__() + f": next_url_path: {self.next_url_path}" + class JSONResponseCursorPaginator(BaseReferencePaginator): """Uses a cursor parameter for pagination, with the cursor value found in @@ -544,7 +565,7 @@ def get_data(): def __init__( self, cursor_path: jsonpath.TJsonPath = "cursors.next", - cursor_param: str = "after", + cursor_param: str = "cursor", ): """ Args: @@ -568,3 +589,9 @@ def update_request(self, request: Request) -> None: request.params = {} request.params[self.cursor_param] = self._next_reference + + def __str__(self) -> str: + return ( + super().__str__() + + f": cursor_path: {self.cursor_path} cursor_param: {self.cursor_param}" + ) diff --git a/tests/sources/helpers/rest_client/test_detector.py b/tests/sources/helpers/rest_client/test_detector.py index 933c9be9cc..f01f9409a1 100644 --- a/tests/sources/helpers/rest_client/test_detector.py +++ b/tests/sources/helpers/rest_client/test_detector.py @@ -1,11 +1,21 @@ import pytest + from dlt.common import jsonpath from dlt.sources.helpers.rest_client.detector import ( - find_records, + PaginatorFactory, + find_response_page_data, find_next_page_path, single_entity_path, ) +from dlt.sources.helpers.rest_client.paginators import ( + OffsetPaginator, + PageNumberPaginator, + JSONResponsePaginator, + HeaderLinkPaginator, + SinglePagePaginator, + JSONResponseCursorPaginator, +) TEST_RESPONSES = [ @@ -15,7 +25,7 @@ "pagination": {"offset": 0, "limit": 2, "total": 100}, }, "expected": { - "type": "offset_limit", + "type": OffsetPaginator, "records_path": "data", }, }, @@ -28,8 +38,9 @@ "page_info": {"current_page": 1, "items_per_page": 2, "total_pages": 50}, }, "expected": { - "type": "page_number", + "type": PageNumberPaginator, "records_path": "items", + "total_path": ("page_info", "total_pages"), }, }, { @@ -41,9 +52,9 @@ "next_cursor": "eyJpZCI6MTAyfQ==", }, "expected": { - "type": "cursor", + "type": JSONResponseCursorPaginator, "records_path": "products", - "next_path": ["next_cursor"], + "next_path": ("next_cursor",), }, }, { @@ -55,9 +66,9 @@ "cursors": {"next": "NjM=", "previous": "MTk="}, }, "expected": { - "type": "cursor", + "type": JSONResponseCursorPaginator, "records_path": "results", - "next_path": ["cursors", "next"], + "next_path": ("cursors", "next"), }, }, { @@ -67,9 +78,9 @@ "limit": 2, }, "expected": { - "type": "cursor", + "type": JSONResponseCursorPaginator, "records_path": "entries", - "next_path": ["next_id"], + "next_path": ("next_id",), }, }, { @@ -82,8 +93,9 @@ "total_pages": 15, }, "expected": { - "type": "page_number", + "type": PageNumberPaginator, "records_path": "comments", + "total_path": ("total_pages",), }, }, { @@ -94,9 +106,9 @@ "results": [{"id": 1, "name": "Account 1"}, {"id": 2, "name": "Account 2"}], }, "expected": { - "type": "json_link", + "type": JSONResponsePaginator, "records_path": "results", - "next_path": ["next"], + "next_path": ("next",), }, }, { @@ -111,9 +123,9 @@ "page": {"size": 2, "totalElements": 100, "totalPages": 50, "number": 1}, }, "expected": { - "type": "json_link", + "type": JSONResponsePaginator, "records_path": "_embedded.items", - "next_path": ["_links", "next", "href"], + "next_path": ("_links", "next", "href"), }, }, { @@ -133,9 +145,9 @@ }, }, "expected": { - "type": "json_link", + "type": JSONResponsePaginator, "records_path": "items", - "next_path": ["links", "nextPage"], + "next_path": ("links", "nextPage"), }, }, { @@ -149,8 +161,9 @@ }, }, "expected": { - "type": "page_number", + "type": PageNumberPaginator, "records_path": "data", + "total_path": ("pagination", "totalPages"), }, }, { @@ -159,8 +172,9 @@ "pagination": {"page": 1, "perPage": 2, "total": 10, "totalPages": 5}, }, "expected": { - "type": "page_number", + "type": PageNumberPaginator, "records_path": "items", + "total_path": ("pagination", "totalPages"), }, }, { @@ -183,9 +197,9 @@ }, }, "expected": { - "type": "json_link", + "type": JSONResponsePaginator, "records_path": "data", - "next_path": ["links", "next"], + "next_path": ("links", "next"), }, }, { @@ -197,8 +211,9 @@ "data": [{"id": 1, "name": "Item 1"}, {"id": 2, "name": "Item 2"}], }, "expected": { - "type": "page_number", + "type": PageNumberPaginator, "records_path": "data", + "total_path": ("pages",), }, }, { @@ -210,8 +225,9 @@ "items": [{"id": 1, "name": "Item 1"}, {"id": 2, "name": "Item 2"}], }, "expected": { - "type": "page_number", + "type": PageNumberPaginator, "records_path": "items", + "total_path": ("totalPages",), }, }, { @@ -223,7 +239,8 @@ "paging": {"current": 3, "size": 2, "total": 60}, }, "expected": { - "type": "page_number", + # we are not able to detect that + "type": SinglePagePaginator, "records_path": "articles", }, }, @@ -238,7 +255,7 @@ "total_count": 200, }, "expected": { - "type": "offset_limit", + "type": OffsetPaginator, "records_path": "feed", }, }, @@ -256,8 +273,9 @@ }, }, "expected": { - "type": "page_number", + "type": PageNumberPaginator, "records_path": "query_results", + "total_path": ("page_details", "total_pages"), }, }, { @@ -274,8 +292,9 @@ }, }, "expected": { - "type": "page_number", + "type": PageNumberPaginator, "records_path": "posts", + "total_path": ("pagination_details", "total_pages"), }, }, { @@ -292,41 +311,106 @@ }, }, "expected": { - "type": "page_number", + "type": PageNumberPaginator, "records_path": "catalog", + "total_path": ("page_metadata", "total_pages"), + }, + }, + { + "response": [ + {"id": 101, "product_name": "Product A"}, + {"id": 102, "product_name": "Product B"}, + ], + "expected": { + "type": SinglePagePaginator, + "records_path": "$", + }, + }, + { + "response": [ + {"id": 101, "product_name": "Product A"}, + {"id": 102, "product_name": "Product B"}, + ], + "links": {"next": "next_page"}, + "expected": { + "type": HeaderLinkPaginator, + "records_path": "$", }, }, + { + "response": {"id": 101, "product_name": "Product A"}, + "expected": { + "type": SinglePagePaginator, + "records_path": "$", + }, + }, + # { + # "response":{"id": 101, "product_name": "Product A"}, + # "expected": { + # "type": "single_page", + # "records_path": "$", + # }, + # }, ] +class PaginatorResponse: + def __init__(self, json, links): + self._json = json + self.links = links or {} + + def json(self): + return self._json + + @pytest.mark.parametrize("test_case", TEST_RESPONSES) -def test_find_records(test_case): +def test_find_response_page_data(test_case): response = test_case["response"] - expected = test_case["expected"]["records_path"] - r = find_records(response) - # all of them look fine mostly because those are simple cases... - # case 7 fails because it is nested but in fact we select a right response - # assert r is create_nested_accessor(expected)(response) - assert r == jsonpath.find_values(expected, response)[0] + records_path = test_case["expected"]["records_path"] + path, data = find_response_page_data(response) + assert jsonpath.find_values(records_path, response)[0] == data + assert ".".join(path) == records_path @pytest.mark.parametrize("test_case", TEST_RESPONSES) def test_find_next_page_key(test_case): response = test_case["response"] expected = test_case.get("expected").get("next_path", None) # Some cases may not have next_path - assert find_next_page_path(response) == expected + actual, actual_ref = find_next_page_path(response) + if expected is None: + assert actual is None, "No next page path expected" + else: + assert actual == expected + assert jsonpath.find_values(".".join(actual), response)[0] == actual_ref + + +@pytest.mark.parametrize("test_case", TEST_RESPONSES) +def test_find_paginator(test_case) -> None: + factory = PaginatorFactory() + mock_response = PaginatorResponse(test_case["response"], test_case.get("links")) + paginator, _ = factory.create_paginator(mock_response) # type: ignore[arg-type] + expected_paginator = test_case["expected"]["type"] + if expected_paginator is OffsetPaginator: + expected_paginator = SinglePagePaginator + assert type(paginator) is expected_paginator + if isinstance(paginator, PageNumberPaginator): + assert str(paginator.total_path) == ".".join(test_case["expected"]["total_path"]) + if isinstance(paginator, JSONResponsePaginator): + assert str(paginator.next_url_path) == ".".join(test_case["expected"]["next_path"]) + if isinstance(paginator, JSONResponseCursorPaginator): + assert str(paginator.cursor_path) == ".".join(test_case["expected"]["next_path"]) -@pytest.mark.skip @pytest.mark.parametrize( "path", [ "/users/{user_id}", "/api/v1/products/{product_id}/", - "/api/v1/products/{product_id}//", - "/api/v1/products/{product_id}?param1=value1", - "/api/v1/products/{product_id}#section", - "/api/v1/products/{product_id}/#section", + # those are not valid paths + # "/api/v1/products/{product_id}//", + # "/api/v1/products/{product_id}?param1=value1", + # "/api/v1/products/{product_id}#section", + # "/api/v1/products/{product_id}/#section", "/users/{user_id}/posts/{post_id}", "/users/{user_id}/posts/{post_id}/comments/{comment_id}", "{entity}",