Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

supports adding DltResource in RESTAPIConfig dict #1865

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion dlt/sources/rest_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,14 +211,18 @@ def rest_api_resources(config: RESTAPIConfig) -> List[DltResource]:
def create_resources(
client_config: ClientConfig,
dependency_graph: graphlib.TopologicalSorter,
endpoint_resource_map: Dict[str, EndpointResource],
endpoint_resource_map: Dict[str, Union[EndpointResource, DltResource]],
resolved_param_map: Dict[str, Optional[ResolvedParam]],
) -> Dict[str, DltResource]:
resources = {}

for resource_name in dependency_graph.static_order():
resource_name = cast(str, resource_name)
endpoint_resource = endpoint_resource_map[resource_name]
if isinstance(endpoint_resource, DltResource):
resources[resource_name] = endpoint_resource
continue

endpoint_config = cast(Endpoint, endpoint_resource["endpoint"])
request_params = endpoint_config.get("params", {})
request_json = endpoint_config.get("json", None)
Expand Down
66 changes: 42 additions & 24 deletions dlt/sources/rest_api/config_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
OAuth2ClientCredentials,
)

from dlt.extract.resource import DltResource

from .typing import (
EndpointResourceBase,
AuthConfig,
Expand Down Expand Up @@ -269,35 +271,20 @@ def make_parent_key_name(resource_name: str, field_name: str) -> str:

def build_resource_dependency_graph(
resource_defaults: EndpointResourceBase,
resource_list: List[Union[str, EndpointResource]],
) -> Tuple[Any, Dict[str, EndpointResource], Dict[str, Optional[ResolvedParam]]]:
resource_list: List[Union[str, EndpointResource, DltResource]],
) -> Tuple[
Any, Dict[str, Union[EndpointResource, DltResource]], Dict[str, Optional[ResolvedParam]]
]:
dependency_graph = graphlib.TopologicalSorter()
endpoint_resource_map: Dict[str, EndpointResource] = {}
resolved_param_map: Dict[str, ResolvedParam] = {}

# expand all resources and index them
for resource_kwargs in resource_list:
if isinstance(resource_kwargs, dict):
# clone resource here, otherwise it needs to be cloned in several other places
# note that this clones only dict structure, keeping all instances without deepcopy
resource_kwargs = update_dict_nested({}, resource_kwargs) # type: ignore

endpoint_resource = _make_endpoint_resource(resource_kwargs, resource_defaults)
assert isinstance(endpoint_resource["endpoint"], dict)
_setup_single_entity_endpoint(endpoint_resource["endpoint"])
_bind_path_params(endpoint_resource)

resource_name = endpoint_resource["name"]
assert isinstance(
resource_name, str
), f"Resource name must be a string, got {type(resource_name)}"

if resource_name in endpoint_resource_map:
raise ValueError(f"Resource {resource_name} has already been defined")
endpoint_resource_map[resource_name] = endpoint_resource
endpoint_resource_map = expand_and_index_resources(resource_list, resource_defaults)

# create dependency graph
for resource_name, endpoint_resource in endpoint_resource_map.items():
if isinstance(endpoint_resource, DltResource):
dependency_graph.add(resource_name)
resolved_param_map[resource_name] = None
break
assert isinstance(endpoint_resource["endpoint"], dict)
# connect transformers to resources via resolved params
resolved_params = _find_resolved_params(endpoint_resource["endpoint"])
Expand All @@ -322,6 +309,37 @@ def build_resource_dependency_graph(
return dependency_graph, endpoint_resource_map, resolved_param_map


def expand_and_index_resources(
resource_list: List[Union[str, EndpointResource, DltResource]],
resource_defaults: EndpointResourceBase,
) -> Dict[str, Union[EndpointResource, DltResource]]:
endpoint_resource_map: Dict[str, Union[EndpointResource, DltResource]] = {}
for resource in resource_list:
if isinstance(resource, DltResource):
endpoint_resource_map[resource.name] = resource
break
elif isinstance(resource, dict):
# clone resource here, otherwise it needs to be cloned in several other places
# note that this clones only dict structure, keeping all instances without deepcopy
resource = update_dict_nested({}, resource) # type: ignore

endpoint_resource = _make_endpoint_resource(resource, resource_defaults)
assert isinstance(endpoint_resource["endpoint"], dict)
_setup_single_entity_endpoint(endpoint_resource["endpoint"])
_bind_path_params(endpoint_resource)

resource_name = endpoint_resource["name"]
assert isinstance(
resource_name, str
), f"Resource name must be a string, got {type(resource_name)}"

if resource_name in endpoint_resource_map:
raise ValueError(f"Resource {resource_name} has already been defined")
endpoint_resource_map[resource_name] = endpoint_resource

return endpoint_resource_map


def _make_endpoint_resource(
resource: Union[str, EndpointResource], default_config: EndpointResourceBase
) -> EndpointResource:
Expand Down
3 changes: 2 additions & 1 deletion dlt/sources/rest_api/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

from dlt.extract.items import TTableHintTemplate
from dlt.extract.incremental.typing import LastValueFunc
from dlt.extract.resource import DltResource

from requests import Session

Expand Down Expand Up @@ -276,7 +277,7 @@ class EndpointResource(EndpointResourceBase, total=False):

class RESTAPIConfigBase(TypedDict):
client: ClientConfig
resources: List[Union[str, EndpointResource]]
resources: List[Union[str, EndpointResource, DltResource]]


class RESTAPIConfig(RESTAPIConfigBase, total=False):
Expand Down
45 changes: 45 additions & 0 deletions docs/website/docs/dlt-ecosystem/verified-sources/rest_api/basic.md
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,51 @@ The `field` value can be specified as a [JSONPath](https://github.com/h2non/json

Under the hood, dlt handles this by using a [transformer resource](../../../general-usage/resource.md#process-resources-with-dlttransformer).

#### Define a resource which is not a REST endpoint

Sometimes, we want to request endpoints with specific values that are not returned by another endpoint.
Thus, you can also include arbitrary dlt resources in your `RESTAPIConfig` instead of defining a resource for every path!

In the following example, we want to load the issues belonging to three repositories.
Instead of defining now three different issues resources, one for each of the paths `dlt-hub/dlt/issues/`, `dlt-hub/verified-sources/issues/`, `dlt-hub/dlthub-education/issues/`, we have a resource `repositories` which yields a list of repository names which will be fetched by the dependent resource `issues`.

```py
@dlt.resource()
def repositories() -> Generator[List[Dict[str, Any]]]:
"""A seed list of repositories to fetch"""
yield [{"name": "dlt"}, {"name": "verified-sources"}, {"name": "dlthub-education"}]

config: RESTAPIConfig = {
"client": {"base_url": "https://github.com/api/v2"},
"resources": [
{
"name": "issues",
"endpoint": {
"path": "dlt-hub/{repository}/issues/",
"params": {
"repository": {
"type": "resolve",
"resource": "repositories",
"field": "name",
},
},
},
},
repositories(),
],
}
burnash marked this conversation as resolved.
Show resolved Hide resolved
```

Be careful that the parent resource needs to return `Generator[List[Dict[str, Any]]]`. Thus, the following will NOT work:

```py
@dlt.resource()
burnash marked this conversation as resolved.
Show resolved Hide resolved
def repositories() -> Generator[Dict[str, Any]]:
"""Not working seed list of repositories to fetch"""
yield from [{"name": "dlt"}, {"name": "verified-sources"}, {"name": "dlthub-education"}]
```


#### Include fields from the parent resource

You can include data from the parent resource in the child resource by using the `include_from_parent` field in the resource configuration. For example:
Expand Down
51 changes: 40 additions & 11 deletions tests/sources/rest_api/configurations/source_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ class CustomOAuthAuth(OAuth2AuthBase):
pass


@dlt.resource(name="repositories", selected=False)
def repositories():
"""A seed list of repositories to fetch"""
yield [{"name": "dlt"}, {"name": "verified-sources"}, {"name": "dlthub-education"}]


VALID_CONFIGS: List[RESTAPIConfig] = [
{
"client": {"base_url": "https://api.example.com"},
Expand Down Expand Up @@ -310,19 +316,42 @@ class CustomOAuthAuth(OAuth2AuthBase):
],
},
{
"client": {
"base_url": "https://example.com",
"session": requests.Session(),
},
"resources": ["users"],
"client": {"base_url": "https://github.com/api/v2"},
"resources": [
{
"name": "issues",
"endpoint": {
"path": "dlt-hub/{repository}/issues/",
"params": {
"repository": {
"type": "resolve",
"resource": "repositories",
"field": "name",
},
},
},
},
repositories(),
],
},
{
"client": {
"base_url": "https://example.com",
# This is a subclass of requests.Session and is thus also allowed
"session": dlt.sources.helpers.requests.Session(),
},
"resources": ["users"],
"client": {"base_url": "https://github.com/api/v2"},
"resources": [
{
"name": "issues",
"endpoint": {
"path": "dlt-hub/{repository}/issues/",
"params": {
"repository": {
"type": "resolve",
"resource": "repositories",
"field": "name",
},
},
},
},
repositories(),
],
},
]

Expand Down
68 changes: 68 additions & 0 deletions tests/sources/rest_api/configurations/test_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,3 +401,71 @@ def test_resource_defaults_no_params() -> None:
"per_page": 50,
"sort": "updated",
}


def test_accepts_DltResource_in_resources() -> None:
@dlt.resource(selected=False)
def repositories():
"""A seed list of repositories to fetch"""
yield [{"name": "dlt"}, {"name": "verified-sources"}, {"name": "dlthub-education"}]

config: RESTAPIConfig = {
"client": {"base_url": "https://github.com/api/v2"},
"resources": [
{
"name": "issues",
"endpoint": {
"path": "dlt-hub/{repository}/issues/",
"params": {
"repository": {
"type": "resolve",
"resource": "repositories",
"field": "name",
},
},
},
},
repositories(),
],
}

source = rest_api_source(config)
assert list(source.resources.keys()) == ["repositories", "issues"]
assert list(source.selected_resources.keys()) == ["issues"]


def test_resource_defaults_dont_apply_to_DltResource() -> None:
@dlt.resource()
def repositories():
"""A seed list of repositories to fetch"""
yield [{"name": "dlt"}, {"name": "verified-sources"}, {"name": "dlthub-education"}]

config: RESTAPIConfig = {
"client": {"base_url": "https://github.com/api/v2"},
"resource_defaults": {
"write_disposition": "replace",
},
"resources": [
{
"name": "issues",
"endpoint": {
"path": "dlt-hub/{repository}/issues/",
"params": {
"repository": {
"type": "resolve",
"resource": "repositories",
"field": "name",
},
},
},
},
repositories(),
],
}

source = rest_api_source(config)
assert source.resources["issues"].write_disposition == "replace"
assert source.resources["repositories"].write_disposition != "replace", (
"DltResource defined outside of the RESTAPIConfig object is influenced by the content of"
" the RESTAPIConfig"
)
38 changes: 38 additions & 0 deletions tests/sources/rest_api/integration/test_offline.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,3 +358,41 @@ def send_spy(*args, **kwargs):

mocked_send.assert_called_once()
assert mocked_send.call_args[0][0].url == "https://api.example.com/posts"


def test_DltResource_gets_called(mock_api_server, mocker) -> None:
@dlt.resource()
def post_list():
yield [{"id": "0"}, {"id": "1"}, {"id": "2"}]

config: RESTAPIConfig = {
"client": {"base_url": "http://api.example.com/"},
"resource_defaults": {
"write_disposition": "replace",
},
"resources": [
{
"name": "posts",
"endpoint": {
"path": "posts/{post_id}/comments",
"params": {
"post_id": {
"type": "resolve",
"resource": "post_list",
"field": "id",
},
},
},
},
post_list(),
],
}

RESTClient = dlt.sources.helpers.rest_client.RESTClient
with mock.patch.object(RESTClient, "paginate") as mock_paginate:
source = rest_api_source(config)
_ = list(source)
assert mock_paginate.call_count == 3
for i in range(3):
_, kwargs = mock_paginate.call_args_list[i]
assert kwargs["path"] == f"posts/{i}/comments"
Loading