Skip to content

Commit

Permalink
improves rest client detections and logging
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed May 13, 2024
1 parent cba7e08 commit 7f39747
Show file tree
Hide file tree
Showing 5 changed files with 315 additions and 102 deletions.
47 changes: 39 additions & 8 deletions dlt/sources/helpers/rest_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,15 @@
from requests import Session as BaseSession # noqa: I251
from requests import Response, Request

from dlt.common import jsonpath
from dlt.common import logger
from dlt.common import jsonpath, logger

from dlt.sources.helpers.requests.retry import Client

from .typing import HTTPMethodBasic, HTTPMethod, Hooks
from .paginators import BasePaginator
from .auth import AuthConfigBase
from .detector import PaginatorFactory, find_records
from .exceptions import IgnoreResponseException
from .detector import PaginatorFactory, find_response_page_data
from .exceptions import IgnoreResponseException, PaginatorNotFound

from .utils import join_url

Expand Down Expand Up @@ -232,8 +231,28 @@ def extract_response(self, response: Response, data_selector: jsonpath.TJsonPath
data: Any = jsonpath.find_values(data_selector, response.json())
# extract if single item selected
data = data[0] if isinstance(data, list) and len(data) == 1 else data
if isinstance(data, list):
length_info = f" with length {len(data)}"
else:
length_info = ""
logger.info(
f"Extracted data of type {type(data).__name__} from path"
f" {data_selector}{length_info}"
)
else:
data = find_records(response.json())
path, data = find_response_page_data(response.json())
# if list is detected, it is probably a paged data
if isinstance(data, list):
# store it to reuse on next response
self.data_selector = ".".join(path)
logger.info(
f"Detected page data at path: {self.data_selector} type: list length:"
f" {len(data)}"
)
else:
logger.info(
f"Detected single page data at path: {path} type: {type(data).__name__}"
)
# wrap single pages into lists
if not isinstance(data, list):
data = [data]
Expand All @@ -248,8 +267,20 @@ def detect_paginator(self, response: Response) -> BasePaginator:
Returns:
BasePaginator: The paginator instance that was detected.
"""
paginator = self.pagination_factory.create_paginator(response)
paginator, score = self.pagination_factory.create_paginator(response)
if paginator is None:
raise ValueError(f"No suitable paginator found for the response at {response.url}")
logger.info(f"Detected paginator: {paginator.__class__.__name__}")
raise PaginatorNotFound(
f"No suitable paginator found for the response at {response.url}"
)
if score == 1.0:
logger.info(f"Detected paginator: {paginator}")
elif score == 0.0:
logger.warning(
f"Fallback paginator used: {paginator}. Please provide right paginator manually."
)
else:
logger.warning(
"Please verify the paginator settings. We strongly suggest to use explicit"
" instance of the paginator as some settings may not be guessed correctly."
)
return paginator
161 changes: 110 additions & 51 deletions dlt/sources/helpers/rest_client/detector.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import re
from typing import List, Dict, Any, Tuple, Union, Optional, Callable, Iterable
from urllib.parse import urlparse

from requests import Response

from .paginators import (
BasePaginator,
HeaderLinkPaginator,
JSONResponsePaginator,
JSONResponseCursorPaginator,
SinglePagePaginator,
PageNumberPaginator,
)

RECORD_KEY_PATTERNS = frozenset(
Expand Down Expand Up @@ -36,123 +39,179 @@
]
)

NEXT_PAGE_KEY_PATTERNS = frozenset(["next", "nextpage", "nexturl"])
NEXT_PAGE_KEY_PATTERNS = frozenset(["next"])
NEXT_PAGE_DICT_KEY_PATTERNS = frozenset(["href", "url"])
TOTAL_PAGES_KEYS = frozenset(["^total_pages$", "^pages$", "^totalpages$"])


def single_entity_path(path: str) -> bool:
"""Checks if path ends with path param indicating that single object is returned"""
return re.search(r"\{([a-zA-Z_][a-zA-Z0-9_]*)\}$", path) is not None
return re.search(r"\{([a-zA-Z_][a-zA-Z0-9_]*)\}/?$", path) is not None


def matches_any_pattern(key: str, patterns: Iterable[str]) -> bool:
normalized_key = key.lower()
return any(re.match(pattern, normalized_key) for pattern in patterns)


def find_all_lists(
dict_: Dict[str, Any],
result: List[Tuple[int, str, List[Any]]] = None,
level: int = 0,
) -> List[Tuple[int, str, List[Any]]]:
path: Tuple[str, ...] = (),
result: List[Tuple[Tuple[str, ...], List[Any]]] = None,
) -> List[Tuple[Tuple[str, ...], List[Any]]]:
"""Recursively looks for lists in dict_ and returns tuples
in format (nesting level, dictionary key, list)
in format (dictionary keys, list)
"""
if level > 2:
return []
if len(path) > 2:
return None

for key, value in dict_.items():
if isinstance(value, list):
result.append((level, key, value))
result.append(((*path, key), value))
elif isinstance(value, dict):
find_all_lists(value, result, level + 1)
find_all_lists(value, path=(*path, key), result=result)

return result


def find_records(
def find_response_page_data(
response: Union[Dict[str, Any], List[Any], Any],
) -> Union[Dict[str, Any], List[Any], Any]:
) -> Tuple[Tuple[str, ...], Any]:
"""Finds a path to response data, assuming that data is a list, returns a tuple(path, data)"""
# when a list was returned (or in rare case a simple type or null)
if not isinstance(response, dict):
return response
return (("$",), response)
lists = find_all_lists(response, result=[])
if len(lists) == 0:
# could not detect anything
return response
return (("$",), response)
# we are ordered by nesting level, find the most suitable list
try:
return next(
list_info[2]
list_info
for list_info in lists
if list_info[1] in RECORD_KEY_PATTERNS and list_info[1] not in NON_RECORD_KEY_PATTERNS
if list_info[0][-1] in RECORD_KEY_PATTERNS
and list_info[0][-1] not in NON_RECORD_KEY_PATTERNS
)
except StopIteration:
# return the least nested element
return lists[0][2]


def matches_any_pattern(key: str, patterns: Iterable[str]) -> bool:
normalized_key = key.lower()
return any(pattern in normalized_key for pattern in patterns)
return lists[0]


def find_next_page_path(
dictionary: Dict[str, Any], path: Optional[List[str]] = None
) -> Optional[List[str]]:
if not isinstance(dictionary, dict):
return None

if path is None:
path = []
response: Dict[str, Any], path: Tuple[str, ...] = ()
) -> Tuple[Tuple[str, ...], Any]:
if not isinstance(response, dict):
return (None, None)

for key, value in dictionary.items():
for key, value in response.items():
if matches_any_pattern(key, NEXT_PAGE_KEY_PATTERNS):
if isinstance(value, dict):
for dict_key in value:
if matches_any_pattern(dict_key, NEXT_PAGE_DICT_KEY_PATTERNS):
return [*path, key, dict_key]
return [*path, key]
for dict_key, dict_value in value.items():
if matches_any_pattern(dict_key, NEXT_PAGE_DICT_KEY_PATTERNS) and isinstance(
dict_value, (str, int, float)
):
return ((*path, key, dict_key), dict_value)
else:
if isinstance(value, (str, int, float)):
return ((*path, key), value)

if isinstance(value, dict):
result = find_next_page_path(value, (*path, key))
if result != (None, None):
return result

return (None, None)


def find_total_pages_path(
response: Dict[str, Any], path: Tuple[str, ...] = ()
) -> Tuple[Tuple[str, ...], Any]:
if not isinstance(response, dict):
return (None, None)

for key, value in response.items():
if matches_any_pattern(key, TOTAL_PAGES_KEYS) and isinstance(value, (str, int, float)):
assert key != "pageSize"
return ((*path, key), value)

if isinstance(value, dict):
result = find_next_page_path(value, [*path, key])
if result:
result = find_total_pages_path(value, (*path, key))
if result != (None, None):
return result

return None
return (None, None)


def header_links_detector(response: Response) -> Optional[HeaderLinkPaginator]:
def header_links_detector(response: Response) -> Tuple[HeaderLinkPaginator, float]:
links_next_key = "next"

if response.links.get(links_next_key):
return HeaderLinkPaginator()
return None
return HeaderLinkPaginator(), 1.0
return None, None


def json_links_detector(response: Response) -> Optional[JSONResponsePaginator]:
def json_links_detector(response: Response) -> Tuple[JSONResponsePaginator, float]:
dictionary = response.json()
next_path_parts = find_next_page_path(dictionary)
next_path_parts, next_href = find_next_page_path(dictionary)

if not next_path_parts:
return None
return None, None

return JSONResponsePaginator(next_url_path=".".join(next_path_parts))
try:
urlparse(next_href)
if next_href.startswith("http") or next_href.startswith("/"):
return JSONResponsePaginator(next_url_path=".".join(next_path_parts)), 1.0
except Exception:
pass

return None, None


def cursor_paginator_detector(response: Response) -> Tuple[JSONResponseCursorPaginator, float]:
dictionary = response.json()
cursor_path_parts, _ = find_next_page_path(dictionary)

if not cursor_path_parts:
return None, None

def single_page_detector(response: Response) -> Optional[SinglePagePaginator]:
return JSONResponseCursorPaginator(cursor_path=".".join(cursor_path_parts)), 0.5


def pages_number_paginator_detector(response: Response) -> Tuple[PageNumberPaginator, float]:
total_pages_path, total_pages = find_total_pages_path(response.json())
if not total_pages_path:
return None, None

try:
int(total_pages)
return PageNumberPaginator(total_path=".".join(total_pages_path)), 0.5
except Exception:
pass

return None, None


def single_page_detector(response: Response) -> Tuple[SinglePagePaginator, float]:
"""This is our fallback paginator, also for results that are single entities"""
return SinglePagePaginator()
return SinglePagePaginator(), 0.0


class PaginatorFactory:
def __init__(self, detectors: List[Callable[[Response], Optional[BasePaginator]]] = None):
def __init__(self, detectors: List[Callable[[Response], Tuple[BasePaginator, float]]] = None):
if detectors is None:
detectors = [
header_links_detector,
json_links_detector,
pages_number_paginator_detector,
cursor_paginator_detector,
single_page_detector,
]
self.detectors = detectors

def create_paginator(self, response: Response) -> Optional[BasePaginator]:
def create_paginator(self, response: Response) -> Tuple[BasePaginator, float]:
for detector in self.detectors:
paginator = detector(response)
paginator, score = detector(response)
if paginator:
return paginator
return None
return paginator, score
return None, None
14 changes: 13 additions & 1 deletion dlt/sources/helpers/rest_client/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
from dlt.common.exceptions import DltException


class IgnoreResponseException(DltException):
class RESTClientException(DltException):
pass


class IgnoreResponseException(RESTClientException):
pass


class PaginatorSetupError(RESTClientException, ValueError):
pass


class PaginatorNotFound(RESTClientException):
pass
Loading

0 comments on commit 7f39747

Please sign in to comment.