From 30f04164082c5fbdefe19bb81feed828cc53366f Mon Sep 17 00:00:00 2001 From: Sultan Iman <354868+sultaniman@users.noreply.github.com> Date: Tue, 7 May 2024 11:57:49 +0200 Subject: [PATCH] Accept :memory: mode for credentials parameter in duckdb factory (#1297) * Accept :memory: mode for credentials parameter in duckdb factory * Check instance of native_value and add check in test for in-memory mode * Add a separate test for in-memory mode when using duckdb * Fix failing test, bind dataset_name to configuration * Adjust test, take ownership when :memory: has been passed * Revert changes * Adjust documentation of duckdb * Add a new exception for duckdb * Adjust error message * Remove backticks * Fix typo * Update docs * Update duckdb docs, merge examples * Remove the mention of :memory: from docstrings * Adjust the message in exception * Catch :memory: in DuckDbCredentials.on_resolve * Update tests * Rename exception * Update docs * Use Destination.from_reference in code snippet * Add one more test for Destination.from_reference and update docs * Format code and ignore mypy error * Use standard way to initialize destination via factory * Preserve environment * Cleanup duckdb docs code snippets * Adjust memo about :pipeline: connection string * Show tables from in-memory schema * Explicitly mention python script instead of pipeline * Fix typo * Reword the description of :pipeline: value * Update docs/website/docs/dlt-ecosystem/destinations/duckdb.md Co-authored-by: VioletM <sansiositres@gmail.com> * Re-arrange text blocks * Add example on how to use :pipeline: connection string * Fix typo * Adjust warning message --------- Co-authored-by: VioletM <sansiositres@gmail.com> --- dlt/destinations/impl/duckdb/configuration.py | 7 ++- dlt/destinations/impl/duckdb/exceptions.py | 11 ++++ dlt/destinations/impl/duckdb/factory.py | 2 +- .../docs/dlt-ecosystem/destinations/duckdb.md | 54 ++++++++++++++++--- tests/load/duckdb/test_duckdb_client.py | 44 ++++++++++++++- 5 files changed, 108 insertions(+), 10 deletions(-) create mode 100644 dlt/destinations/impl/duckdb/exceptions.py diff --git a/dlt/destinations/impl/duckdb/configuration.py b/dlt/destinations/impl/duckdb/configuration.py index 06ae8482ef..bc04552078 100644 --- a/dlt/destinations/impl/duckdb/configuration.py +++ b/dlt/destinations/impl/duckdb/configuration.py @@ -1,15 +1,17 @@ import os import dataclasses import threading -from pathvalidate import is_valid_filepath + from typing import Any, ClassVar, Dict, Final, List, Optional, Tuple, Type, Union +from pathvalidate import is_valid_filepath from dlt.common import logger from dlt.common.configuration import configspec from dlt.common.configuration.specs import ConnectionStringCredentials from dlt.common.configuration.specs.exceptions import InvalidConnectionString from dlt.common.destination.reference import DestinationClientDwhWithStagingConfiguration from dlt.common.typing import TSecretValue +from dlt.destinations.impl.duckdb.exceptions import InvalidInMemoryDuckdbCredentials try: from duckdb import DuckDBPyConnection @@ -117,6 +119,9 @@ def is_partial(self) -> bool: return self.database == ":pipeline:" def on_resolved(self) -> None: + if isinstance(self.database, str) and self.database == ":memory:": + raise InvalidInMemoryDuckdbCredentials() + # do not set any paths for external database if self.database == ":external:": return diff --git a/dlt/destinations/impl/duckdb/exceptions.py b/dlt/destinations/impl/duckdb/exceptions.py new file mode 100644 index 0000000000..7d2727c4c1 --- /dev/null +++ b/dlt/destinations/impl/duckdb/exceptions.py @@ -0,0 +1,11 @@ +from dlt.common.destination.exceptions import DestinationTerminalException + + +class InvalidInMemoryDuckdbCredentials(DestinationTerminalException): + def __init__(self) -> None: + super().__init__( + "To use in-memory instance of duckdb, " + "please instantiate it first and then pass to destination factory\n" + '\nconn = duckdb.connect(":memory:")\n' + 'dlt.pipeline(pipeline_name="...", destination=dlt.destinations.duckdb(conn)' + ) diff --git a/dlt/destinations/impl/duckdb/factory.py b/dlt/destinations/impl/duckdb/factory.py index 6a0152df26..55fcd3b339 100644 --- a/dlt/destinations/impl/duckdb/factory.py +++ b/dlt/destinations/impl/duckdb/factory.py @@ -37,7 +37,7 @@ def __init__( Args: credentials: Credentials to connect to the duckdb database. Can be an instance of `DuckDbCredentials` or - a path to a database file. Use `:memory:` to create an in-memory database or :pipeline: to create a duckdb + a path to a database file. Use :pipeline: to create a duckdb in the working folder of the pipeline create_indexes: Should unique indexes be created, defaults to False **kwargs: Additional arguments passed to the destination config diff --git a/docs/website/docs/dlt-ecosystem/destinations/duckdb.md b/docs/website/docs/dlt-ecosystem/destinations/duckdb.md index 546cf0756e..22c5fd1df9 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/duckdb.md +++ b/docs/website/docs/dlt-ecosystem/destinations/duckdb.md @@ -102,30 +102,72 @@ p = dlt.pipeline( ) ``` -The destination accepts a `duckdb` connection instance via `credentials`, so you can also open a database connection yourself and pass it to `dlt` to use. `:memory:` databases are supported. +The destination accepts a `duckdb` connection instance via `credentials`, so you can also open a database connection yourself and pass it to `dlt` to use. + ```py import duckdb + db = duckdb.connect() p = dlt.pipeline( - pipeline_name='chess', + pipeline_name="chess", destination=dlt.destinations.duckdb(db), - dataset_name='chess_data', + dataset_name="chess_data", full_refresh=False, ) + +# Or if you would like to use in-memory duckdb instance +db = duckdb.connect(":memory:") +p = pipeline_one = dlt.pipeline( + pipeline_name="in_memory_pipeline", + destination=dlt.destinations.duckdb(db), + dataset_name="chess_data", +) + +print(db.sql("DESCRIBE;")) + +# Example output +# ┌──────────┬───────────────┬─────────────────────┬──────────────────────┬───────────────────────┬───────────┐ +# │ database │ schema │ name │ column_names │ column_types │ temporary │ +# │ varchar │ varchar │ varchar │ varchar[] │ varchar[] │ boolean │ +# ├──────────┼───────────────┼─────────────────────┼──────────────────────┼───────────────────────┼───────────┤ +# │ memory │ chess_data │ _dlt_loads │ [load_id, schema_n… │ [VARCHAR, VARCHAR, … │ false │ +# │ memory │ chess_data │ _dlt_pipeline_state │ [version, engine_v… │ [BIGINT, BIGINT, VA… │ false │ +# │ memory │ chess_data │ _dlt_version │ [version, engine_v… │ [BIGINT, BIGINT, TI… │ false │ +# │ memory │ chess_data │ my_table │ [a, _dlt_load_id, … │ [BIGINT, VARCHAR, V… │ false │ +# └──────────┴───────────────┴─────────────────────┴──────────────────────┴───────────────────────┴───────────┘ ``` +:::note +Be careful! The in-memory instance of the database will be destroyed, once your Python script exits. +::: + This destination accepts database connection strings in the format used by [duckdb-engine](https://github.com/Mause/duckdb_engine#configuration). You can configure a DuckDB destination with [secret / config values](../../general-usage/credentials) (e.g., using a `secrets.toml` file) ```toml destination.duckdb.credentials="duckdb:///_storage/test_quack.duckdb" ``` + The **duckdb://** URL above creates a **relative** path to `_storage/test_quack.duckdb`. To define an **absolute** path, you need to specify four slashes, i.e., `duckdb:////_storage/test_quack.duckdb`. -A few special connection strings are supported: -* **:pipeline:** creates the database in the working directory of the pipeline with the name `quack.duckdb`. -* **:memory:** creates an in-memory database. This may be useful for testing. +Dlt supports a unique connection string that triggers specific behavior for duckdb destination: +* **:pipeline:** creates the database in the working directory of the pipeline, naming it `quack.duckdb`. + +Please see the code snippets below showing how to use it +1. Via `config.toml` +```toml +destination.duckdb.credentials=":pipeline:" +``` + +2. In Python code +```py +p = pipeline_one = dlt.pipeline( + pipeline_name="my_pipeline", + destination="duckdb", + credentials=":pipeline:", +) +``` ### Additional configuration Unique indexes may be created during loading if the following config value is set: diff --git a/tests/load/duckdb/test_duckdb_client.py b/tests/load/duckdb/test_duckdb_client.py index 896cba7d5a..8f6bf195e2 100644 --- a/tests/load/duckdb/test_duckdb_client.py +++ b/tests/load/duckdb/test_duckdb_client.py @@ -6,14 +6,15 @@ from dlt.common.configuration.resolve import resolve_configuration from dlt.common.configuration.utils import get_resolved_traces +from dlt.common.destination.reference import Destination from dlt.destinations.impl.duckdb.configuration import ( - DUCK_DB_NAME, DuckDbClientConfiguration, - DuckDbCredentials, DEFAULT_DUCK_DB_NAME, ) from dlt.destinations import duckdb +from dlt.destinations.impl.duckdb.exceptions import InvalidInMemoryDuckdbCredentials +from dlt.pipeline.exceptions import PipelineStepFailed from tests.load.pipeline.utils import drop_pipeline from tests.pipeline.utils import assert_table from tests.utils import patch_home_dir, autouse_test_storage, preserve_environ, TEST_STORAGE_ROOT @@ -56,6 +57,44 @@ def test_duckdb_open_conn_default() -> None: delete_quack_db() +def test_duckdb_in_memory_mode_via_factory(preserve_environ): + delete_quack_db() + try: + import duckdb + + # Check if passing external duckdb connection works fine + db = duckdb.connect(":memory:") + dlt.pipeline(pipeline_name="booboo", destination=dlt.destinations.duckdb(db)) + + # Check if passing :memory: to factory fails + with pytest.raises(PipelineStepFailed) as exc: + p = dlt.pipeline(pipeline_name="booboo", destination="duckdb", credentials=":memory:") + p.run([1, 2, 3]) + + assert isinstance(exc.value.exception, InvalidInMemoryDuckdbCredentials) + + os.environ["DESTINATION__DUCKDB__CREDENTIALS"] = ":memory:" + with pytest.raises(PipelineStepFailed): + p = dlt.pipeline( + pipeline_name="booboo", + destination="duckdb", + ) + p.run([1, 2, 3]) + + assert isinstance(exc.value.exception, InvalidInMemoryDuckdbCredentials) + + with pytest.raises(PipelineStepFailed) as exc: + p = dlt.pipeline( + pipeline_name="booboo", + destination=Destination.from_reference("duckdb", credentials=":memory:"), # type: ignore[arg-type] + ) + p.run([1, 2, 3], table_name="numbers") + + assert isinstance(exc.value.exception, InvalidInMemoryDuckdbCredentials) + finally: + delete_quack_db() + + def test_duckdb_database_path() -> None: # resolve without any path provided c = resolve_configuration( @@ -257,6 +296,7 @@ def test_external_duckdb_database() -> None: assert c.credentials._conn_owner is False assert hasattr(c.credentials, "_conn") conn.close() + assert not os.path.exists(":memory:") def test_default_duckdb_dataset_name() -> None: