Skip to content

Commit

Permalink
Fix: MotherDuck destination was not passing MotherDuck API key to SQL…
Browse files Browse the repository at this point in the history
… processor (#47380)
  • Loading branch information
guenp authored Oct 25, 2024
1 parent 4c9ed48 commit 89a1283
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def get_sql_alchemy_url(self) -> SecretString:

return SecretString(
f"duckdb:///md:{self.database}?motherduck_token={self.api_key}"
f"&custom_user_agent=={self.custom_user_agent}"
f"&custom_user_agent={self.custom_user_agent}"
# Not sure why this doesn't work. We have to override later in the flow.
# f"&schema={self.schema_name}"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from typing import Any, Dict, Iterable, List, Mapping
from urllib.parse import urlparse

import duckdb
from airbyte_cdk import AirbyteStream, ConfiguredAirbyteStream, SyncMode
from airbyte_cdk.destinations import Destination
from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, DestinationSyncMode, Status, Type
from airbyte_cdk.sql._processors.duckdb import DuckDBConfig, DuckDBSqlProcessor
Expand Down Expand Up @@ -125,13 +125,7 @@ def write(
path = str(config.get("destination_path"))
path = self._get_destination_path(path)
schema_name = validated_sql_name(config.get("schema", CONFIG_DEFAULT_SCHEMA))

# Get and register auth token if applicable
motherduck_api_key = str(config.get(CONFIG_MOTHERDUCK_API_KEY, ""))
duckdb_config = {}
if motherduck_api_key:
duckdb_config["motherduck_token"] = motherduck_api_key
duckdb_config["custom_user_agent"] = "airbyte"

for configured_stream in configured_catalog.streams:
stream_name = configured_stream.stream.name
Expand All @@ -140,6 +134,7 @@ def write(
configured_catalog=configured_catalog,
schema_name=schema_name,
db_path=path,
motherduck_token=motherduck_api_key,
)
processor._ensure_schema_exists()

Expand Down Expand Up @@ -171,7 +166,7 @@ def write(
for message in input_messages:
if message.type == Type.STATE:
# flush the buffer
self._flush_buffer(buffer, configured_catalog, path, schema_name)
self._flush_buffer(buffer, configured_catalog, path, schema_name, motherduck_api_key)
buffer = defaultdict(lambda: defaultdict(list))

yield message
Expand All @@ -196,14 +191,15 @@ def write(
logger.info(f"Message type {message.type} not supported, skipping")

# flush any remaining messages
self._flush_buffer(buffer, configured_catalog, path, schema_name)
self._flush_buffer(buffer, configured_catalog, path, schema_name, motherduck_api_key)

def _flush_buffer(
self,
buffer: Dict[str, Dict[str, List[Any]]],
configured_catalog: ConfiguredAirbyteCatalog,
db_path: str,
schema_name: str,
motherduck_api_key: str,
) -> None:
"""
Flush the buffer to the destination
Expand All @@ -212,9 +208,7 @@ def _flush_buffer(
stream_name = configured_stream.stream.name
if stream_name in buffer:
processor = self._get_sql_processor(
configured_catalog=configured_catalog,
schema_name=schema_name,
db_path=db_path,
configured_catalog=configured_catalog, schema_name=schema_name, db_path=db_path, motherduck_token=motherduck_api_key
)
processor.write_stream_data_from_buffer(buffer, stream_name, configured_stream.destination_sync_mode)

Expand All @@ -238,11 +232,8 @@ def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCon
logger.info(f"Using DuckDB file at {path}")
os.makedirs(os.path.dirname(path), exist_ok=True)

duckdb_config = {}
if CONFIG_MOTHERDUCK_API_KEY in config:
duckdb_config["motherduck_token"] = str(config[CONFIG_MOTHERDUCK_API_KEY])
duckdb_config["custom_user_agent"] = "airbyte"
# Next, we want to specify 'saas_mode' for during check,
if self._is_motherduck(path):
# We want to specify 'saas_mode' for during check,
# to reduce memory usage from unnecessary extensions
if "?" in path:
# There are already some query params; append to them.
Expand All @@ -251,13 +242,20 @@ def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCon
# No query params yet; add one.
path += "?saas_mode=true"

con = duckdb.connect(
database=path,
read_only=False,
config=duckdb_config,
# Create a dummy catalog to check if the SQL processor works
check_stream = ConfiguredAirbyteStream(
stream=AirbyteStream(name="check", json_schema={"type": "object"}, supported_sync_modes=[SyncMode.incremental]),
sync_mode=SyncMode.incremental,
destination_sync_mode=SyncMode.incremental,
)
con.execute("SELECT 1;")

check_catalog = ConfiguredAirbyteCatalog(streams=[check_stream])
processor = self._get_sql_processor(
configured_catalog=check_catalog,
schema_name="test",
db_path=path,
motherduck_token=str(config.get(CONFIG_MOTHERDUCK_API_KEY, "")),
)
processor._execute_sql("SELECT 1;")
return AirbyteConnectionStatus(status=Status.SUCCEEDED)

except Exception as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 042ee9b5-eb98-4e99-a4e5-3f0d573bee66
dockerImageTag: 0.1.2
dockerImageTag: 0.1.3
dockerRepository: airbyte/destination-motherduck
githubIssueLabel: destination-motherduck
icon: duckdb.svg
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "destination-motherduck"
version = "0.1.2"
version = "0.1.3"
description = "Destination implementation for MotherDuck."
authors = ["Guen Prawiroatmodjo, Simon Späti, Airbyte"]
license = "MIT"
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/motherduck.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ This connector is primarily designed to work with MotherDuck and local DuckDB fi

| Version | Date | Pull Request | Subject |
|:--------| :--------- | :-------------------------------------------------------- | :----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 0.1.3 | 2024-10-23 | [47315](https://github.com/airbytehq/airbyte/pull/47315) | Fix bug causing MotherDuck API key to not be correctly passed to the engine. |
| 0.1.2 | 2024-10-23 | [47315](https://github.com/airbytehq/airbyte/pull/47315) | Use `saas_only` mode during connection check to reduce ram usage. |
| 0.1.1 | 2024-10-23 | [47312](https://github.com/airbytehq/airbyte/pull/47312) | Fix: generate new unique destination ID |
| 0.1.0 | 2024-10-23 | [46904](https://github.com/airbytehq/airbyte/pull/46904) | New MotherDuck destination |
Expand Down

0 comments on commit 89a1283

Please sign in to comment.