diff --git a/airbyte-integrations/connectors/destination-motherduck/destination_motherduck/processors/duckdb.py b/airbyte-integrations/connectors/destination-motherduck/destination_motherduck/processors/duckdb.py index f7c4cef779fa..a37cb35ebbcf 100644 --- a/airbyte-integrations/connectors/destination-motherduck/destination_motherduck/processors/duckdb.py +++ b/airbyte-integrations/connectors/destination-motherduck/destination_motherduck/processors/duckdb.py @@ -7,23 +7,25 @@ import warnings from pathlib import Path from typing import TYPE_CHECKING, Any, Dict, List, Literal, Sequence +from urllib.parse import parse_qsl, urlparse import pyarrow as pa from airbyte_cdk import DestinationSyncMode from airbyte_cdk.sql import exceptions as exc -from airbyte_cdk.sql.constants import AB_EXTRACTED_AT_COLUMN +from airbyte_cdk.sql.constants import AB_EXTRACTED_AT_COLUMN, DEBUG_MODE from airbyte_cdk.sql.secrets import SecretString from airbyte_cdk.sql.shared.sql_processor import SqlConfig, SqlProcessorBase, SQLRuntimeError from duckdb_engine import DuckDBEngineWarning from overrides import overrides from pydantic import Field -from sqlalchemy import Executable, TextClause, text +from sqlalchemy import Executable, TextClause, create_engine, text from sqlalchemy.exc import ProgrammingError, SQLAlchemyError if TYPE_CHECKING: from sqlalchemy.engine import Connection, Engine BUFFER_TABLE_NAME = "_airbyte_temp_buffer_data" +MOTHERDUCK_SCHEME = "md" logger = logging.getLogger(__name__) @@ -52,7 +54,16 @@ def get_sql_alchemy_url(self) -> SecretString: message="duckdb-engine doesn't yet support reflection on indices", category=DuckDBEngineWarning, ) - return SecretString(f"duckdb:///{self.db_path!s}") + parsed_db_path = urlparse(self.db_path) + if parsed_db_path.scheme == MOTHERDUCK_SCHEME: + path = f"{MOTHERDUCK_SCHEME}:{parsed_db_path.path}" + else: + path = parsed_db_path.path + return SecretString(f"duckdb:///{path!s}") + + def get_duckdb_config(self) -> Dict[str, Any]: + """Get config dictionary to pass to duckdb""" + return dict(parse_qsl(urlparse(self.db_path).query)) @overrides def get_database_name(self) -> str: @@ -75,21 +86,30 @@ def _is_file_based_db(self) -> bool: return ( ("/" in db_path_str or "\\" in db_path_str) and db_path_str != ":memory:" - and "md:" not in db_path_str + and f"{MOTHERDUCK_SCHEME}:" not in db_path_str and "motherduck:" not in db_path_str ) @overrides def get_sql_engine(self) -> Engine: - """Return the SQL Alchemy engine. + """ + Return a new SQL engine to use. - This method is overridden to ensure that the database parent directory is created if it - doesn't exist. + This method is overridden to: + - ensure that the database parent directory is created if it doesn't exist. + - pass the DuckDB query parameters (such as motherduck_token) via the config """ if self._is_file_based_db(): Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) - return super().get_sql_engine() + return create_engine( + url=self.get_sql_alchemy_url(), + echo=DEBUG_MODE, + execution_options={ + "schema_translate_map": {None: self.schema_name}, + }, + future=True, + ) class DuckDBSqlProcessor(SqlProcessorBase): diff --git a/airbyte-integrations/connectors/destination-motherduck/destination_motherduck/processors/motherduck.py b/airbyte-integrations/connectors/destination-motherduck/destination_motherduck/processors/motherduck.py index 2e4316c12944..d05a75a56a7c 100644 --- a/airbyte-integrations/connectors/destination-motherduck/destination_motherduck/processors/motherduck.py +++ b/airbyte-integrations/connectors/destination-motherduck/destination_motherduck/processors/motherduck.py @@ -18,11 +18,13 @@ import warnings +from airbyte_cdk.sql.constants import DEBUG_MODE from airbyte_cdk.sql.secrets import SecretString from destination_motherduck.processors.duckdb import DuckDBConfig, DuckDBSqlProcessor from duckdb_engine import DuckDBEngineWarning from overrides import overrides from pydantic import Field +from sqlalchemy import Engine, create_engine # Suppress warnings from DuckDB about reflection on indices. # https://github.com/Mause/duckdb_engine/issues/905 @@ -52,18 +54,37 @@ def get_sql_alchemy_url(self) -> SecretString: category=DuckDBEngineWarning, ) - return SecretString( - f"duckdb:///md:{self.database}?motherduck_token={self.api_key}" - f"&custom_user_agent={self.custom_user_agent}" - # Not sure why this doesn't work. We have to override later in the flow. - # f"&schema={self.schema_name}" - ) + # We defer adding schema name and API token until `create_engine()` call. + return SecretString(f"duckdb:///md:{self.database}?custom_user_agent={self.custom_user_agent}") @overrides def get_database_name(self) -> str: """Return the name of the database.""" return self.database + @overrides + def get_sql_engine(self) -> Engine: + """ + Return a new SQL engine to use. + + This method is overridden to: + - ensure that the database parent directory is created if it doesn't exist. + - pass the DuckDB query parameters (such as motherduck_token) via the config + """ + return create_engine( + url=self.get_sql_alchemy_url(), + echo=DEBUG_MODE, + execution_options={ + "schema_translate_map": {None: self.schema_name}, + }, + future=True, + connect_args={ + "config": { + "motherduck_token": self.api_key, + }, + }, + ) + class MotherDuckSqlProcessor(DuckDBSqlProcessor): """A cache implementation for MotherDuck.""" diff --git a/airbyte-integrations/connectors/destination-motherduck/integration_tests/integration_test.py b/airbyte-integrations/connectors/destination-motherduck/integration_tests/integration_test.py index 8be066129616..076bc7014685 100644 --- a/airbyte-integrations/connectors/destination-motherduck/integration_tests/integration_test.py +++ b/airbyte-integrations/connectors/destination-motherduck/integration_tests/integration_test.py @@ -26,6 +26,7 @@ SyncMode, Type, ) +from airbyte_cdk.sql.secrets import SecretString from destination_motherduck import DestinationMotherDuck from destination_motherduck.destination import CONFIG_MOTHERDUCK_API_KEY from faker import Faker @@ -69,6 +70,9 @@ def config(request, test_schema_name: str) -> Generator[Any, Any, Any]: elif request.param == "motherduck_config": config_dict = json.loads(Path(SECRETS_CONFIG_PATH).read_text()) config_dict["schema"] = test_schema_name + if CONFIG_MOTHERDUCK_API_KEY in config_dict: + # Prevent accidentally printing API Key if `config_dict` is printed. + config_dict[CONFIG_MOTHERDUCK_API_KEY] = SecretString(config_dict[CONFIG_MOTHERDUCK_API_KEY]) yield config_dict else: @@ -296,7 +300,7 @@ def test_check_succeeds( ): destination = DestinationMotherDuck() status = destination.check(logger=MagicMock(), config=config) - assert status.status == Status.SUCCEEDED + assert status.status == Status.SUCCEEDED, status.message def _state(data: Dict[str, Any]) -> AirbyteMessage: diff --git a/airbyte-integrations/connectors/destination-motherduck/metadata.yaml b/airbyte-integrations/connectors/destination-motherduck/metadata.yaml index e010968bb5c9..24b7c85ce99a 100644 --- a/airbyte-integrations/connectors/destination-motherduck/metadata.yaml +++ b/airbyte-integrations/connectors/destination-motherduck/metadata.yaml @@ -4,7 +4,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 042ee9b5-eb98-4e99-a4e5-3f0d573bee66 - dockerImageTag: 0.1.15 + dockerImageTag: 0.1.16 dockerRepository: airbyte/destination-motherduck githubIssueLabel: destination-motherduck icon: duckdb.svg diff --git a/airbyte-integrations/connectors/destination-motherduck/pyproject.toml b/airbyte-integrations/connectors/destination-motherduck/pyproject.toml index 2ec1cd27f302..8911f35d365f 100644 --- a/airbyte-integrations/connectors/destination-motherduck/pyproject.toml +++ b/airbyte-integrations/connectors/destination-motherduck/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "airbyte-destination-motherduck" -version = "0.1.15" +version = "0.1.16" description = "Destination implementation for MotherDuck." authors = ["Guen Prawiroatmodjo, Simon Späti, Airbyte"] license = "MIT" diff --git a/docs/integrations/destinations/motherduck.md b/docs/integrations/destinations/motherduck.md index 042525a2ca68..4c635c0b8bb0 100644 --- a/docs/integrations/destinations/motherduck.md +++ b/docs/integrations/destinations/motherduck.md @@ -74,6 +74,7 @@ This connector is primarily designed to work with MotherDuck and local DuckDB fi | Version | Date | Pull Request | Subject | |:--------| :--------- | :-------------------------------------------------------- | :----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| 0.1.16 | 2024-12-06 | [48562](https://github.com/airbytehq/airbyte/pull/48562) | Improved handling of config parameters during SQL engine creation. | | 0.1.15 | 2024-11-07 | [48405](https://github.com/airbytehq/airbyte/pull/48405) | Updated docs and hovertext for schema, api key, and database name. | | 0.1.14 | 2024-10-30 | [48006](https://github.com/airbytehq/airbyte/pull/48006) | Fix bug in _flush_buffer, explicitly register dataframe before inserting | | 0.1.13 | 2024-10-30 | [47969](https://github.com/airbytehq/airbyte/pull/47969) | Preserve Platform-generated id in state messages. |