Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parametrized destinations #746

Merged
merged 30 commits into from
Nov 18, 2023
Merged

Parametrized destinations #746

merged 30 commits into from
Nov 18, 2023

Conversation

steinitzu
Copy link
Collaborator

Description

Draft implementation of parametrized destinations.

I added a base DestinationFactory with same interface as the destination modules just to hold the config arguments.

Usage is e.g.

import dlt
from dlt.destinations import snowflake, filesystem


pipeline = dlt.pipeline(
    "callable_dest_staging",
    destination=snowflake(),
    staging=filesystem("s3://my-bucket")
)

The biggest change is that I moved all destinations to dlt.destinations.impl. Just because I think it would be more confusing to have a module and a callable with the same name.
I.e. dlt.destinations.postgres(...) is the factory, dlt.destinations.impl.postgres is the package with all the destination code.

Related Issues

@steinitzu steinitzu requested a review from rudolfix November 9, 2023 18:27
Copy link

netlify bot commented Nov 9, 2023

Deploy Preview for dlt-hub-docs canceled.

Name Link
🔨 Latest commit 1f16c8f
🔍 Latest deploy log https://app.netlify.com/sites/dlt-hub-docs/deploys/6557c59bd300e00008b25c4e

@steinitzu
Copy link
Collaborator Author

@rudolfix would be great to know what you think of the approach before I finalize it.

Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@steinitzu I really like it!. using partial to get initial config is smart. I think factory impls can be simplified even more.

btw. we'll have another ticket to improve destinations even further ie. to let people assign names to them so we have dynamic configs in case of having several environments ie.
progress-dev
will be taken from [destination.progress-dev] just a headsup

but we'll merge that first


destination = _filesystem

@with_config(spec=FilesystemDestinationClientConfiguration, sections=(known_sections.DESTINATION, 'filesystem'), accept_partial=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using partial is smart!

) -> None:
cfg: SnowflakeClientConfiguration = kwargs['_dlt_config']
self.credentials = cfg.credentials
self.config_params = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all configurations are also dictionaries. so you could get dict, assign credentials (part of the base) and pass rest of the dict as config_params

also possible at run time IMO - when actual client/capabilities are created

Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@steinitzu on the second look: I think we should also instantiate destination factory in pipeline, when we pass the destination as string and completely get rid of ability to pass a module.

I bet we can get rid of most of the code in __init__.py of each of destination implementations: for sure the _configure and client() methods. maybe all of them?

@steinitzu
Copy link
Collaborator Author

Not quite done, but some more progress.

  1. DestinationReference is removed entirely. client/capabilities/spec defined on the factory and always use it
  2. Also renamed DestinationFactory to Destination, think that makes sense as now it's "the" base destination
  3. I create capabilities at import time ( https://github.com/dlt-hub/dlt/blob/sthor%2Fparametrized-destination/dlt/destinations/impl/postgres/factory.py#L15 ). I couldn't see a reason to have them behind a callable (except for dummy where they can be configured). Any problem with this?

Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls see how I'd simplify configuring the destination even more

dlt/common/destination/__init__.py Show resolved Hide resolved
dlt/common/destination/reference.py Outdated Show resolved Hide resolved
dlt/destinations/impl/duckdb/configuration.py Show resolved Hide resolved
dlt/destinations/impl/duckdb/factory.py Outdated Show resolved Hide resolved
@with_config(spec=DuckDbClientConfiguration, sections=(known_sections.DESTINATION, 'duckdb'), accept_partial=True)
def __init__(
self,
credentials: DuckDbCredentials = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be better to get connection string or path and create DuckDbCredentials from it. it's constructor should accept

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah probably all destinations we should accept either TCredentials | str or any native value types.

@@ -95,28 +96,25 @@ class DuckDbCredentials(DuckDbBaseCredentials):

__config_gen_annotations__: ClassVar[List[str]] = []

def on_resolved(self) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the problem here? on_resolved is not called in new model because we allow for partial? but at some point we must make sure that configuration is complete. pls see my other comments

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was having an issue where on_resolved was called outside of pipeline context, so when using :pipeline: it gets reset to None.
The top level client config is partial but the credentials are not partial, maybe that's why?
I think it was just right when I instantiated the factory from here: https://github.com/dlt-hub/dlt/blob/sthor%2Fparametrized-destination/dlt/pipeline/__init__.py#L119

It took me some time to understand what was happening.
But I just moved the same logic to the _database_path method which is called on the fly when conn string is generated, at that point pipeline should always be set up already.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok my fix was a problem for dbt runner so I went back to the original on_resolved method, but added this https://github.com/dlt-hub/dlt/blob/sthor%2Fparametrized-destination/dlt/destinations/impl/duckdb/configuration.py#L99-L104 so it doesn't trigger right when the instance is created


return DuckDbClient

@with_config(spec=DuckDbClientConfiguration, sections=(known_sections.DESTINATION, 'duckdb'), accept_partial=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, we can simplify this even more.

  1. I'd not use @with_config here at. I'd just let people pass initial parameters and do not pull config at all.
  2. I'd use with_config not as decorator but within a function that would return a configuration. there's no partial.
    so let's imagine we have this in Destination:
def configuration(initial: TConfig) -> TConfig:
   return with_config(lambda x: x, spec=DuckDbClientConfiguration, sections=(known_sections.DESTINATION, self.destination_name), initial_config=initial)

we'd need to extend with_config to accept inital config in the decorator BUT if you look in the code there's already mechanims to handle that:

# if one of arguments is spec the use it as initial value
                if spec_arg:
                    config = bound_args.arguments.get(spec_arg.name, None)                    

if we do that, we do not use partial, we always resolve configuration etc.
where initial is coming from? we form it in __init__ from the aguments.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm yeah I think that makes sense. So we'd just be leaving resolving config until the last moment when it's needed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question, why use with_config and not resolve_config directly for this?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use resolve_config! looks even better

@steinitzu steinitzu force-pushed the sthor/parametrized-destination branch 2 times, most recently from b8f9d97 to 69b3bc5 Compare November 14, 2023 21:09
@steinitzu steinitzu force-pushed the sthor/parametrized-destination branch from 69b3bc5 to d77b54a Compare November 14, 2023 21:31
@steinitzu steinitzu force-pushed the sthor/parametrized-destination branch from 85c2b86 to f657071 Compare November 15, 2023 01:05
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is really close! please remember to deprecate credentials argument usage in Pipeline run and load method

@@ -0,0 +1,69 @@
# import typing as t
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed :)

def __init__(
self,
query_result_bucket: t.Optional[str] = None,
credentials: t.Optional[AwsCredentials] = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for each init in each destination let's handle credentials as:
Union[SomeCredentials, Dict[str, Any], Any] then look how CredentialsConfiguration implements

def __init__(self, init_value: Any = None) -> None:

so you can just pass the credentials the constructor of SomeCredentials (if they are not already credentials). basically doing what pipeline is not doing with "credentials" argument

client_spec = destination.spec()
        # initialize explicit credentials
        if not as_staging:
            # explicit credentials passed to dlt.pipeline should not be applied to staging
            credentials = credentials or self.credentials
        if credentials is not None and not isinstance(credentials, CredentialsConfiguration):
            # use passed credentials as initial value. initial value may resolve credentials
            credentials = initialize_credentials(
                client_spec.get_resolvable_fields()["credentials"],
                credentials
            )
            ```

@@ -95,6 +96,13 @@ class DuckDbCredentials(DuckDbBaseCredentials):

__config_gen_annotations__: ClassVar[List[str]] = []

def is_partial(self) -> bool:
partial = super().is_partial()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should not call super(). config resolver will do that for you

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh? How does that work? :/

@@ -173,6 +180,9 @@ def _path_from_pipeline(self, default_path: str) -> Tuple[str, bool]:

return default_path, True

def _conn_str(self) -> str:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems just for motherduck compatiblity it's needed, uses the same method.

dlt/destinations/impl/athena/factory.py Outdated Show resolved Hide resolved
# @with_config(spec=DuckDbClientConfiguration, sections=(known_sections.DESTINATION, 'duckdb'), accept_partial=True)
def __init__(
self,
credentials: t.Union[DuckDbCredentials, str, "DuckDBPyConnection"] = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool!


def __init__(
self,
credentials: t.Union[MsSqlCredentials, str] = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx for accepting str :)

@@ -0,0 +1,67 @@
# import typing as t

# from dlt.common.destination import Destination, DestinationCapabilitiesContext
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete?

def __init__(
self,
credentials: t.Optional[WeaviateCredentials] = None,
**kwargs: t.Any,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we could add vectorizer and module_config?



def test_duckdb_database_delete() -> None:
db_path = "_storage/path_test_quack.duckdb"
p = dlt.pipeline(pipeline_name="quack_pipeline", credentials=db_path, destination="duckdb")
p = dlt.pipeline(pipeline_name="quack_pipeline", destination=duckdb(credentials=DuckDbCredentials(db_path)))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should still work. we just need to create the instance ourselves if stirng is passed. I explained that above

@steinitzu steinitzu force-pushed the sthor/parametrized-destination branch from fba1d21 to 3537c03 Compare November 16, 2023 00:15
@steinitzu steinitzu marked this pull request as ready for review November 17, 2023 19:57
rudolfix
rudolfix previously approved these changes Nov 18, 2023
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! thanks for the docstrings!

@rudolfix rudolfix merged commit 105795c into devel Nov 18, 2023
26 of 32 checks passed
@rudolfix rudolfix deleted the sthor/parametrized-destination branch November 18, 2023 17:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

allow to parametrize destinations by converting them into callables
2 participants