diff --git a/dlt/sources/rest_api/__init__.py b/dlt/sources/rest_api/__init__.py index b6fa061d58..b92ed6301c 100644 --- a/dlt/sources/rest_api/__init__.py +++ b/dlt/sources/rest_api/__init__.py @@ -211,7 +211,7 @@ def rest_api_resources(config: RESTAPIConfig) -> List[DltResource]: def create_resources( client_config: ClientConfig, dependency_graph: graphlib.TopologicalSorter, - endpoint_resource_map: Dict[str, EndpointResource], + endpoint_resource_map: Dict[str, Union[EndpointResource, DltResource]], resolved_param_map: Dict[str, Optional[ResolvedParam]], ) -> Dict[str, DltResource]: resources = {} @@ -219,6 +219,10 @@ def create_resources( for resource_name in dependency_graph.static_order(): resource_name = cast(str, resource_name) endpoint_resource = endpoint_resource_map[resource_name] + if isinstance(endpoint_resource, DltResource): + resources[resource_name] = endpoint_resource + continue + endpoint_config = cast(Endpoint, endpoint_resource["endpoint"]) request_params = endpoint_config.get("params", {}) request_json = endpoint_config.get("json", None) diff --git a/dlt/sources/rest_api/config_setup.py b/dlt/sources/rest_api/config_setup.py index 7659a01070..ffc100cac1 100644 --- a/dlt/sources/rest_api/config_setup.py +++ b/dlt/sources/rest_api/config_setup.py @@ -52,6 +52,8 @@ OAuth2ClientCredentials, ) +from dlt.extract.resource import DltResource + from .typing import ( EndpointResourceBase, AuthConfig, @@ -269,35 +271,20 @@ def make_parent_key_name(resource_name: str, field_name: str) -> str: def build_resource_dependency_graph( resource_defaults: EndpointResourceBase, - resource_list: List[Union[str, EndpointResource]], -) -> Tuple[Any, Dict[str, EndpointResource], Dict[str, Optional[ResolvedParam]]]: + resource_list: List[Union[str, EndpointResource, DltResource]], +) -> Tuple[ + Any, Dict[str, Union[EndpointResource, DltResource]], Dict[str, Optional[ResolvedParam]] +]: dependency_graph = graphlib.TopologicalSorter() - endpoint_resource_map: Dict[str, EndpointResource] = {} resolved_param_map: Dict[str, ResolvedParam] = {} - - # expand all resources and index them - for resource_kwargs in resource_list: - if isinstance(resource_kwargs, dict): - # clone resource here, otherwise it needs to be cloned in several other places - # note that this clones only dict structure, keeping all instances without deepcopy - resource_kwargs = update_dict_nested({}, resource_kwargs) # type: ignore - - endpoint_resource = _make_endpoint_resource(resource_kwargs, resource_defaults) - assert isinstance(endpoint_resource["endpoint"], dict) - _setup_single_entity_endpoint(endpoint_resource["endpoint"]) - _bind_path_params(endpoint_resource) - - resource_name = endpoint_resource["name"] - assert isinstance( - resource_name, str - ), f"Resource name must be a string, got {type(resource_name)}" - - if resource_name in endpoint_resource_map: - raise ValueError(f"Resource {resource_name} has already been defined") - endpoint_resource_map[resource_name] = endpoint_resource + endpoint_resource_map = expand_and_index_resources(resource_list, resource_defaults) # create dependency graph for resource_name, endpoint_resource in endpoint_resource_map.items(): + if isinstance(endpoint_resource, DltResource): + dependency_graph.add(resource_name) + resolved_param_map[resource_name] = None + break assert isinstance(endpoint_resource["endpoint"], dict) # connect transformers to resources via resolved params resolved_params = _find_resolved_params(endpoint_resource["endpoint"]) @@ -322,6 +309,37 @@ def build_resource_dependency_graph( return dependency_graph, endpoint_resource_map, resolved_param_map +def expand_and_index_resources( + resource_list: List[Union[str, EndpointResource, DltResource]], + resource_defaults: EndpointResourceBase, +) -> Dict[str, Union[EndpointResource, DltResource]]: + endpoint_resource_map: Dict[str, Union[EndpointResource, DltResource]] = {} + for resource in resource_list: + if isinstance(resource, DltResource): + endpoint_resource_map[resource.name] = resource + break + elif isinstance(resource, dict): + # clone resource here, otherwise it needs to be cloned in several other places + # note that this clones only dict structure, keeping all instances without deepcopy + resource = update_dict_nested({}, resource) # type: ignore + + endpoint_resource = _make_endpoint_resource(resource, resource_defaults) + assert isinstance(endpoint_resource["endpoint"], dict) + _setup_single_entity_endpoint(endpoint_resource["endpoint"]) + _bind_path_params(endpoint_resource) + + resource_name = endpoint_resource["name"] + assert isinstance( + resource_name, str + ), f"Resource name must be a string, got {type(resource_name)}" + + if resource_name in endpoint_resource_map: + raise ValueError(f"Resource {resource_name} has already been defined") + endpoint_resource_map[resource_name] = endpoint_resource + + return endpoint_resource_map + + def _make_endpoint_resource( resource: Union[str, EndpointResource], default_config: EndpointResourceBase ) -> EndpointResource: diff --git a/dlt/sources/rest_api/typing.py b/dlt/sources/rest_api/typing.py index e816b70779..81c53887f1 100644 --- a/dlt/sources/rest_api/typing.py +++ b/dlt/sources/rest_api/typing.py @@ -34,6 +34,7 @@ from dlt.extract.items import TTableHintTemplate from dlt.extract.incremental.typing import LastValueFunc +from dlt.extract.resource import DltResource from requests import Session @@ -276,7 +277,7 @@ class EndpointResource(EndpointResourceBase, total=False): class RESTAPIConfigBase(TypedDict): client: ClientConfig - resources: List[Union[str, EndpointResource]] + resources: List[Union[str, EndpointResource, DltResource]] class RESTAPIConfig(RESTAPIConfigBase, total=False): diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/rest_api/basic.md b/docs/website/docs/dlt-ecosystem/verified-sources/rest_api/basic.md index e301128dc1..944b508a12 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/rest_api/basic.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/rest_api/basic.md @@ -638,6 +638,54 @@ The `field` value can be specified as a [JSONPath](https://github.com/h2non/json Under the hood, dlt handles this by using a [transformer resource](../../../general-usage/resource.md#process-resources-with-dlttransformer). +#### Define a resource which is not a REST endpoint + +Sometimes, we want to request endpoints with specific values that are not returned by another endpoint. +Thus, you can also include arbitrary dlt resources in your `RESTAPIConfig` instead of defining a resource for every path! + +In the following example, we want to load the issues belonging to three repositories. +Instead of defining now three different issues resources, one for each of the paths `dlt-hub/dlt/issues/`, `dlt-hub/verified-sources/issues/`, `dlt-hub/dlthub-education/issues/`, we have a resource `repositories` which yields a list of repository names which will be fetched by the dependent resource `issues`. + +```py +from dlt.sources.rest_api import RESTAPIConfig + +@dlt.resource() +def repositories() -> Generator[List[Dict[str, Any]]]: + """A seed list of repositories to fetch""" + yield [{"name": "dlt"}, {"name": "verified-sources"}, {"name": "dlthub-education"}] + + +config: RESTAPIConfig = { + "client": {"base_url": "https://github.com/api/v2"}, + "resources": [ + { + "name": "issues", + "endpoint": { + "path": "dlt-hub/{repository}/issues/", + "params": { + "repository": { + "type": "resolve", + "resource": "repositories", + "field": "name", + }, + }, + }, + }, + repositories(), + ], +} +``` + +Be careful that the parent resource needs to return `Generator[List[Dict[str, Any]]]`. Thus, the following will NOT work: + +```py +@dlt.resource +def repositories() -> Generator[Dict[str, Any]]: + """Not working seed list of repositories to fetch""" + yield from [{"name": "dlt"}, {"name": "verified-sources"}, {"name": "dlthub-education"}] +``` + + #### Include fields from the parent resource You can include data from the parent resource in the child resource by using the `include_from_parent` field in the resource configuration. For example: diff --git a/tests/sources/rest_api/configurations/source_configs.py b/tests/sources/rest_api/configurations/source_configs.py index b9fdc3f2d0..8e26a4183b 100644 --- a/tests/sources/rest_api/configurations/source_configs.py +++ b/tests/sources/rest_api/configurations/source_configs.py @@ -113,6 +113,12 @@ class CustomOAuthAuth(OAuth2AuthBase): pass +@dlt.resource(name="repositories", selected=False) +def repositories(): + """A seed list of repositories to fetch""" + yield [{"name": "dlt"}, {"name": "verified-sources"}, {"name": "dlthub-education"}] + + VALID_CONFIGS: List[RESTAPIConfig] = [ { "client": {"base_url": "https://api.example.com"}, @@ -310,19 +316,42 @@ class CustomOAuthAuth(OAuth2AuthBase): ], }, { - "client": { - "base_url": "https://example.com", - "session": requests.Session(), - }, - "resources": ["users"], + "client": {"base_url": "https://github.com/api/v2"}, + "resources": [ + { + "name": "issues", + "endpoint": { + "path": "dlt-hub/{repository}/issues/", + "params": { + "repository": { + "type": "resolve", + "resource": "repositories", + "field": "name", + }, + }, + }, + }, + repositories(), + ], }, { - "client": { - "base_url": "https://example.com", - # This is a subclass of requests.Session and is thus also allowed - "session": dlt.sources.helpers.requests.Session(), - }, - "resources": ["users"], + "client": {"base_url": "https://github.com/api/v2"}, + "resources": [ + { + "name": "issues", + "endpoint": { + "path": "dlt-hub/{repository}/issues/", + "params": { + "repository": { + "type": "resolve", + "resource": "repositories", + "field": "name", + }, + }, + }, + }, + repositories(), + ], }, ] diff --git a/tests/sources/rest_api/configurations/test_configuration.py b/tests/sources/rest_api/configurations/test_configuration.py index 0167ea1eb8..6adbfc5175 100644 --- a/tests/sources/rest_api/configurations/test_configuration.py +++ b/tests/sources/rest_api/configurations/test_configuration.py @@ -401,3 +401,71 @@ def test_resource_defaults_no_params() -> None: "per_page": 50, "sort": "updated", } + + +def test_accepts_DltResource_in_resources() -> None: + @dlt.resource(selected=False) + def repositories(): + """A seed list of repositories to fetch""" + yield [{"name": "dlt"}, {"name": "verified-sources"}, {"name": "dlthub-education"}] + + config: RESTAPIConfig = { + "client": {"base_url": "https://github.com/api/v2"}, + "resources": [ + { + "name": "issues", + "endpoint": { + "path": "dlt-hub/{repository}/issues/", + "params": { + "repository": { + "type": "resolve", + "resource": "repositories", + "field": "name", + }, + }, + }, + }, + repositories(), + ], + } + + source = rest_api_source(config) + assert list(source.resources.keys()) == ["repositories", "issues"] + assert list(source.selected_resources.keys()) == ["issues"] + + +def test_resource_defaults_dont_apply_to_DltResource() -> None: + @dlt.resource() + def repositories(): + """A seed list of repositories to fetch""" + yield [{"name": "dlt"}, {"name": "verified-sources"}, {"name": "dlthub-education"}] + + config: RESTAPIConfig = { + "client": {"base_url": "https://github.com/api/v2"}, + "resource_defaults": { + "write_disposition": "replace", + }, + "resources": [ + { + "name": "issues", + "endpoint": { + "path": "dlt-hub/{repository}/issues/", + "params": { + "repository": { + "type": "resolve", + "resource": "repositories", + "field": "name", + }, + }, + }, + }, + repositories(), + ], + } + + source = rest_api_source(config) + assert source.resources["issues"].write_disposition == "replace" + assert source.resources["repositories"].write_disposition != "replace", ( + "DltResource defined outside of the RESTAPIConfig object is influenced by the content of" + " the RESTAPIConfig" + ) diff --git a/tests/sources/rest_api/integration/test_offline.py b/tests/sources/rest_api/integration/test_offline.py index 7fe77de029..cf82764e55 100644 --- a/tests/sources/rest_api/integration/test_offline.py +++ b/tests/sources/rest_api/integration/test_offline.py @@ -358,3 +358,41 @@ def send_spy(*args, **kwargs): mocked_send.assert_called_once() assert mocked_send.call_args[0][0].url == "https://api.example.com/posts" + + +def test_DltResource_gets_called(mock_api_server, mocker) -> None: + @dlt.resource() + def post_list(): + yield [{"id": "0"}, {"id": "1"}, {"id": "2"}] + + config: RESTAPIConfig = { + "client": {"base_url": "http://api.example.com/"}, + "resource_defaults": { + "write_disposition": "replace", + }, + "resources": [ + { + "name": "posts", + "endpoint": { + "path": "posts/{post_id}/comments", + "params": { + "post_id": { + "type": "resolve", + "resource": "post_list", + "field": "id", + }, + }, + }, + }, + post_list(), + ], + } + + RESTClient = dlt.sources.helpers.rest_client.RESTClient + with mock.patch.object(RESTClient, "paginate") as mock_paginate: + source = rest_api_source(config) + _ = list(source) + assert mock_paginate.call_count == 3 + for i in range(3): + _, kwargs = mock_paginate.call_args_list[i] + assert kwargs["path"] == f"posts/{i}/comments"