Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parametrized destinations #746

Merged
merged 30 commits into from
Nov 18, 2023
Merged
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
3cdab60
Move destination modules to subfolder
steinitzu Nov 11, 2023
3b53861
Mockup destination factory
steinitzu Nov 9, 2023
4b449e3
Destination factory replacing reference and dest __init__
steinitzu Nov 14, 2023
f195fa7
Update factories
steinitzu Nov 11, 2023
340f637
Defer duckdb credentials resolving in pipeline context
steinitzu Nov 12, 2023
4ae4ceb
Simplify destination config resolution
steinitzu Nov 14, 2023
bccfa9d
capabilities are callable
steinitzu Nov 14, 2023
3bde2f4
bigquery, athena factories
steinitzu Nov 14, 2023
0d59d51
Add rest of factories
steinitzu Nov 14, 2023
3ee262d
Cleanup
steinitzu Nov 14, 2023
9243b2f
Destination type vars
steinitzu Nov 14, 2023
9ad561c
Cleanup
steinitzu Nov 14, 2023
007d7a3
Fix test
steinitzu Nov 14, 2023
62b5a57
Create initial config from non-defaults only
steinitzu Nov 14, 2023
d77b54a
Update naming convention path
steinitzu Nov 14, 2023
0d4ad49
Fix config in bigquery location test
steinitzu Nov 14, 2023
f657071
Only keep non-default config args in factory
steinitzu Nov 14, 2023
56e922a
Resolve duckdb credentials in pipeline context
steinitzu Nov 15, 2023
08a8fdc
Cleanup
steinitzu Nov 15, 2023
2567ca0
Union credentials arguments
steinitzu Nov 15, 2023
3537c03
Common tests without dest dependencies
steinitzu Nov 15, 2023
25a937a
Forward all athena arguments
steinitzu Nov 16, 2023
97f1afc
Delete commented code
steinitzu Nov 16, 2023
fe24e14
Reference docstrings
steinitzu Nov 16, 2023
481a7cb
Add deprecation warning for credentials argument
steinitzu Nov 16, 2023
d91402c
Init docstrings for destination factories
steinitzu Nov 16, 2023
fc92929
Fix tests
steinitzu Nov 17, 2023
13ec6fb
Destination name in output
steinitzu Nov 17, 2023
1f16c8f
Correct exception in unknown destination test
steinitzu Nov 17, 2023
2541d49
Merge branch 'devel' into sthor/parametrized-destination
rudolfix Nov 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Resolve duckdb credentials in pipeline context
steinitzu committed Nov 15, 2023
commit 56e922a02ec3662b213461e63a52c2fb91579f99
32 changes: 20 additions & 12 deletions dlt/destinations/impl/duckdb/configuration.py
Original file line number Diff line number Diff line change
@@ -96,25 +96,35 @@ class DuckDbCredentials(DuckDbBaseCredentials):

__config_gen_annotations__: ClassVar[List[str]] = []

def _database_path(self) -> str:
def is_partial(self) -> bool:
partial = super().is_partial()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should not call super(). config resolver will do that for you

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh? How does that work? :/

if partial:
return True
# Wait until pipeline context is set up before resolving
return self.database == ":pipeline:"

def on_resolved(self) -> None:
# do not set any paths for external database
if self.database == ":external:":
return
# try the pipeline context
is_default_path = False
if self.database == ":pipeline:":
return self._path_in_pipeline(DEFAULT_DUCK_DB_NAME)
self.database = self._path_in_pipeline(DEFAULT_DUCK_DB_NAME)
else:
# maybe get database
maybe_database, maybe_is_default_path = self._path_from_pipeline(DEFAULT_DUCK_DB_NAME)
# if pipeline context was not present or database was not set
if not self.database or not maybe_is_default_path:
# create database locally
is_default_path = maybe_is_default_path
path = maybe_database
else:
path = self.database
self.database = maybe_database

path = os.path.abspath(path)
# always make database an abs path
self.database = os.path.abspath(self.database)
# do not save the default path into pipeline's local state
if not is_default_path:
self._path_to_pipeline(path)
return path
self._path_to_pipeline(self.database)

def _path_in_pipeline(self, rel_path: str) -> str:
from dlt.common.configuration.container import Container
@@ -123,9 +133,7 @@ def _path_in_pipeline(self, rel_path: str) -> str:
context = Container()[PipelineContext]
if context.is_active():
# pipeline is active, get the working directory
abs_path = os.path.abspath(os.path.join(context.pipeline().working_dir, rel_path))
context.pipeline().set_local_state_val(LOCAL_STATE_KEY, abs_path)
return abs_path
return os.path.join(context.pipeline().working_dir, rel_path)
raise RuntimeError("Attempting to use special duckdb database :pipeline: outside of pipeline context.")

def _path_to_pipeline(self, abspath: str) -> None:
@@ -173,7 +181,7 @@ def _path_from_pipeline(self, default_path: str) -> Tuple[str, bool]:
return default_path, True

def _conn_str(self) -> str:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems just for motherduck compatiblity it's needed, uses the same method.

return self._database_path()
return self.database


@configspec
2 changes: 1 addition & 1 deletion dlt/pipeline/__init__.py
Original file line number Diff line number Diff line change
@@ -116,7 +116,7 @@ def pipeline(
if not pipelines_dir:
pipelines_dir = get_dlt_pipelines_dir()

destination = Destination.from_reference(destination or kwargs["destination_name"], credentials=credentials)
destination = Destination.from_reference(destination or kwargs["destination_name"])
staging = Destination.from_reference(staging or kwargs.get("staging_name", None)) if staging is not None else None

progress = collector_from_name(progress)
20 changes: 10 additions & 10 deletions tests/load/duckdb/test_duckdb_client.py
Original file line number Diff line number Diff line change
@@ -47,13 +47,13 @@ def test_duckdb_open_conn_default() -> None:
def test_duckdb_database_path() -> None:
# resolve without any path provided
c = resolve_configuration(DuckDbClientConfiguration(dataset_name="test_dataset"))
assert c.credentials._database_path().lower() == os.path.abspath("quack.duckdb").lower()
assert c.credentials._conn_str().lower() == os.path.abspath("quack.duckdb").lower()
# resolve without any path but with pipeline context
p = dlt.pipeline(pipeline_name="quack_pipeline")
c = resolve_configuration(DuckDbClientConfiguration(dataset_name="test_dataset"))
# still cwd
db_path = os.path.abspath(os.path.join(".", "quack_pipeline.duckdb"))
assert c.credentials._database_path().lower() == db_path.lower()
assert c.credentials._conn_str().lower() == db_path.lower()
# we do not keep default duckdb path in the local state
with pytest.raises(KeyError):
p.get_local_state_val("duckdb_database")
@@ -70,7 +70,7 @@ def test_duckdb_database_path() -> None:
# test special :pipeline: path to create in pipeline folder
c = resolve_configuration(DuckDbClientConfiguration(dataset_name="test_dataset", credentials=":pipeline:"))
db_path = os.path.abspath(os.path.join(p.working_dir, DEFAULT_DUCK_DB_NAME))
assert c.credentials._database_path().lower() == db_path.lower()
assert c.credentials._conn_str().lower() == db_path.lower()
# connect
conn = c.credentials.borrow_conn(read_only=False)
c.credentials.return_conn(conn)
@@ -81,7 +81,7 @@ def test_duckdb_database_path() -> None:
# provide relative path
db_path = "_storage/test_quack.duckdb"
c = resolve_configuration(DuckDbClientConfiguration(dataset_name="test_dataset", credentials="duckdb:///_storage/test_quack.duckdb"))
assert c.credentials._database_path().lower() == os.path.abspath(db_path).lower()
assert c.credentials._conn_str().lower() == os.path.abspath(db_path).lower()
conn = c.credentials.borrow_conn(read_only=False)
c.credentials.return_conn(conn)
assert os.path.isfile(db_path)
@@ -91,7 +91,7 @@ def test_duckdb_database_path() -> None:
db_path = os.path.abspath("_storage/abs_test_quack.duckdb")
c = resolve_configuration(DuckDbClientConfiguration(dataset_name="test_dataset", credentials=f"duckdb:///{db_path}"))
assert os.path.isabs(c.credentials.database)
assert c.credentials._database_path().lower() == db_path.lower()
assert c.credentials._conn_str().lower() == db_path.lower()
conn = c.credentials.borrow_conn(read_only=False)
c.credentials.return_conn(conn)
assert os.path.isfile(db_path)
@@ -100,7 +100,7 @@ def test_duckdb_database_path() -> None:
# set just path as credentials
db_path = "_storage/path_test_quack.duckdb"
c = resolve_configuration(DuckDbClientConfiguration(dataset_name="test_dataset", credentials=db_path))
assert c.credentials._database_path().lower() == os.path.abspath(db_path).lower()
assert c.credentials._conn_str().lower() == os.path.abspath(db_path).lower()
conn = c.credentials.borrow_conn(read_only=False)
c.credentials.return_conn(conn)
assert os.path.isfile(db_path)
@@ -109,7 +109,7 @@ def test_duckdb_database_path() -> None:
db_path = os.path.abspath("_storage/abs_path_test_quack.duckdb")
c = resolve_configuration(DuckDbClientConfiguration(dataset_name="test_dataset", credentials=db_path))
assert os.path.isabs(c.credentials.database)
assert c.credentials._database_path().lower() == db_path.lower()
assert c.credentials._conn_str().lower() == db_path.lower()
conn = c.credentials.borrow_conn(read_only=False)
c.credentials.return_conn(conn)
assert os.path.isfile(db_path)
@@ -129,7 +129,7 @@ def test_keeps_initial_db_path() -> None:
print(p.pipelines_dir)
with p.sql_client() as conn:
# still cwd
assert conn.credentials._database_path().lower() == os.path.abspath(db_path).lower()
assert conn.credentials._conn_str().lower() == os.path.abspath(db_path).lower()
# but it is kept in the local state
assert p.get_local_state_val("duckdb_database").lower() == os.path.abspath(db_path).lower()

@@ -139,7 +139,7 @@ def test_keeps_initial_db_path() -> None:
with p.sql_client() as conn:
# still cwd
assert p.get_local_state_val("duckdb_database").lower() == os.path.abspath(db_path).lower()
assert conn.credentials._database_path().lower() == os.path.abspath(db_path).lower()
assert conn.credentials._conn_str().lower() == os.path.abspath(db_path).lower()

# now create a new pipeline
dlt.pipeline(pipeline_name="not_quack", destination="dummy")
@@ -148,7 +148,7 @@ def test_keeps_initial_db_path() -> None:
assert p.get_local_state_val("duckdb_database").lower() == os.path.abspath(db_path).lower()
# new pipeline context took over
# TODO: restore pipeline context on each call
assert conn.credentials._database_path().lower() != os.path.abspath(db_path).lower()
assert conn.credentials._conn_str().lower() != os.path.abspath(db_path).lower()


def test_duckdb_database_delete() -> None:
4 changes: 2 additions & 2 deletions tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -16,7 +16,7 @@
from dlt.common.configuration.specs.gcp_credentials import GcpOAuthCredentials
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.destination.capabilities import TLoaderFileFormat
from dlt.common.exceptions import DestinationHasFailedJobs, DestinationTerminalException, PipelineStateNotAvailable, UnknownDestinationModule
from dlt.common.exceptions import DestinationHasFailedJobs, DestinationTerminalException, PipelineStateNotAvailable, InvalidDestinationReference
from dlt.common.pipeline import PipelineContext
from dlt.common.runtime.collector import AliveCollector, EnlightenCollector, LogCollector, TqdmCollector
from dlt.common.schema.utils import new_column, new_table
@@ -165,7 +165,7 @@ def test_pipeline_context() -> None:


def test_import_unknown_destination() -> None:
with pytest.raises(UnknownDestinationModule):
with pytest.raises(InvalidDestinationReference):
dlt.pipeline(destination="!")