Skip to content

Commit

Permalink
puts data selector detector first
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed May 13, 2024
1 parent 25a13bf commit 48cb2f9
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 33 deletions.
78 changes: 45 additions & 33 deletions dlt/sources/helpers/rest_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,57 +212,63 @@ def raise_for_status(response: Response, *args: Any, **kwargs: Any) -> None:
except IgnoreResponseException:
break

if paginator is None:
paginator = self.detect_paginator(response)

if not data_selector:
data_selector = self.detect_data_selector(response)
data = self.extract_response(response, data_selector)

if paginator is None:
paginator = self.detect_paginator(response, data)
paginator.update_state(response)
paginator.update_request(request)

# yield data with context
yield PageData(data, request=request, response=response, paginator=paginator, auth=auth)

if not paginator.has_next_page:
logger.info(f"Paginator {str(paginator)} does not have more pages")
break

def extract_response(self, response: Response, data_selector: jsonpath.TJsonPath) -> List[Any]:
if data_selector:
# we should compile data_selector
data: Any = jsonpath.find_values(data_selector, response.json())
# extract if single item selected
data = data[0] if isinstance(data, list) and len(data) == 1 else data
if isinstance(data, list):
length_info = f" with length {len(data)}"
else:
length_info = ""
logger.info(
f"Extracted data of type {type(data).__name__} from path"
f" {data_selector}{length_info}"
)
# we should compile data_selector
data: Any = jsonpath.find_values(data_selector, response.json())
# extract if single item selected
data = data[0] if isinstance(data, list) and len(data) == 1 else data
if isinstance(data, list):
length_info = f" with length {len(data)}"
else:
path, data = find_response_page_data(response.json())
# if list is detected, it is probably a paged data
if isinstance(data, list):
# store it to reuse on next response
self.data_selector = ".".join(path)
logger.info(
f"Detected page data at path: {self.data_selector} type: list length:"
f" {len(data)}"
)
else:
logger.info(
f"Detected single page data at path: {path} type: {type(data).__name__}"
)
length_info = ""
logger.info(
f"Extracted data of type {type(data).__name__} from path {data_selector}{length_info}"
)
# wrap single pages into lists
if not isinstance(data, list):
data = [data]
return cast(List[Any], data)

def detect_paginator(self, response: Response) -> BasePaginator:
def detect_data_selector(self, response: Response) -> str:
"""Detects a path to page data in `response`. If there's no
paging detected, returns "$" which will select full response
Returns:
str: a json path to the page data.
"""
path, data = find_response_page_data(response.json())
data_selector = ".".join(path)
# if list is detected, it is probably a paged data
if isinstance(data, list):
logger.info(
f"Detected page data at path: '{data_selector}' type: list length: {len(data)}"
)
else:
logger.info(f"Detected single page data at path: '{path}' type: {type(data).__name__}")
return data_selector

def detect_paginator(self, response: Response, data: Any) -> BasePaginator:
"""Detects a paginator for the response and returns it.
Args:
response (Response): The response to detect the paginator for.
data_selector (data_selector): Path to paginated data or $ if paginated data not detected
Returns:
BasePaginator: The paginator instance that was detected.
Expand All @@ -275,9 +281,15 @@ def detect_paginator(self, response: Response) -> BasePaginator:
if score == 1.0:
logger.info(f"Detected paginator: {paginator}")
elif score == 0.0:
logger.warning(
f"Fallback paginator used: {paginator}. Please provide right paginator manually."
)
if isinstance(data, list):
logger.warning(
f"Fallback paginator used: {paginator}. Please provide right paginator"
" manually."
)
else:
logger.info(
f"Fallback paginator used: {paginator} to handle not paginated response."
)
else:
logger.warning(
"Please verify the paginator settings. We strongly suggest to use explicit"
Expand Down
6 changes: 6 additions & 0 deletions dlt/sources/helpers/rest_client/detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,12 @@ def single_page_detector(response: Response) -> Tuple[SinglePagePaginator, float

class PaginatorFactory:
def __init__(self, detectors: List[Callable[[Response], Tuple[BasePaginator, float]]] = None):
"""`detectors` are functions taking Response as input and returning paginator instance and
detection score. Score value:
1.0 - perfect detection
0.0 - fallback detection
in between - partial detection, several paginator parameters are defaults
"""
if detectors is None:
detectors = [
header_links_detector,
Expand Down
3 changes: 3 additions & 0 deletions dlt/sources/helpers/rest_client/paginators.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ def update_request(self, request: Request) -> None:
"""
...

def __str__(self) -> str:
return f"{type(self).__name__} at {id(self):x}"


class SinglePagePaginator(BasePaginator):
"""A paginator for single-page API responses."""
Expand Down

0 comments on commit 48cb2f9

Please sign in to comment.