Skip to content

Commit

Permalink
🐛 Destination DuckDB: Fix schema name handling (#30600)
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers authored Sep 20, 2023
1 parent 1a7f4df commit f2a1841
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 41 deletions.
8 changes: 6 additions & 2 deletions .devcontainer/destination-duckdb/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
// Python extensions:
"charliermarsh.ruff",
"matangover.mypy",
"ms-python.black",
"ms-python.black-formatter",
"ms-python.python",
"ms-python.vscode-pylance",

Expand All @@ -41,6 +41,7 @@
],
"settings": {
"extensions.ignoreRecommendations": true,
"git.autofetch": true,
"git.openRepositoryInParentFolders": "always",
"python.defaultInterpreterPath": ".venv/bin/python",
"python.interpreter.infoVisibility": "always",
Expand All @@ -50,7 +51,10 @@
"python.testing.pytestArgs": [
"--rootdir=/workspaces/airbyte/airbyte-integrations/connectors/destination-duckdb",
"."
]
],
"[python]": {
"editor.defaultFormatter": "ms-python.black-formatter"
}
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ RUN echo "Etc/UTC" > /etc/timezone
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.name=airbyte/destination-duckdb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import datetime
import json
import os
import re
import uuid
from collections import defaultdict
from logging import getLogger
Expand All @@ -13,10 +14,30 @@
import duckdb
from airbyte_cdk import AirbyteLogger
from airbyte_cdk.destinations import Destination
from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, DestinationSyncMode, Status, Type
from airbyte_cdk.models import (
AirbyteConnectionStatus,
AirbyteMessage,
ConfiguredAirbyteCatalog,
DestinationSyncMode,
Status,
Type,
)

logger = getLogger("airbyte")

CONFIG_MOTHERDUCK_API_KEY = "motherduck_api_key"
CONFIG_DEFAULT_SCHEMA = "main"


def validated_sql_name(sql_name: Any) -> str:
"""Return the input if it is a valid SQL name, otherwise raise an exception."""
pattern = r"^[a-zA-Z0-9_]*$"
result = str(sql_name)
if bool(re.match(pattern, result)):
return result

raise ValueError(f"Invalid SQL name: {sql_name}")


class DestinationDuckdb(Destination):
@staticmethod
Expand All @@ -25,7 +46,9 @@ def _get_destination_path(destination_path: str) -> str:
Get a normalized version of the destination path.
Automatically append /local/ to the start of the path
"""
if destination_path.startswith("md:") or destination_path.startswith("motherduck:"):
if destination_path.startswith("md:") or destination_path.startswith(
"motherduck:"
):
return destination_path

if not destination_path.startswith("/local"):
Expand All @@ -34,7 +57,8 @@ def _get_destination_path(destination_path: str) -> str:
destination_path = os.path.normpath(destination_path)
if not destination_path.startswith("/local"):
raise ValueError(
f"destination_path={destination_path} is not a valid path." "A valid path shall start with /local or no / prefix"
f"destination_path={destination_path} is not a valid path."
"A valid path shall start with /local or no / prefix"
)

return destination_path
Expand Down Expand Up @@ -63,31 +87,28 @@ def write(

path = str(config.get("destination_path"))
path = self._get_destination_path(path)
schema_name = validated_sql_name(config.get("schema", CONFIG_DEFAULT_SCHEMA))

# Get and register auth token if applicable
motherduck_api_key = str(config.get("motherduck_api_key"))
motherduck_api_key = str(config.get(CONFIG_MOTHERDUCK_API_KEY, ""))
if motherduck_api_key:
os.environ["motherduck_token"] = motherduck_api_key

con = duckdb.connect(database=path, read_only=False)

# create the tables if needed
# con.execute("BEGIN TRANSACTION")
con.execute(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")

for configured_stream in configured_catalog.streams:
name = configured_stream.stream.name
table_name = f"_airbyte_raw_{name}"
if configured_stream.destination_sync_mode == DestinationSyncMode.overwrite:
# delete the tables
logger.info(f"Dropping tables for overwrite: {table_name}")
query = """
DROP TABLE IF EXISTS {}
""".format(
table_name
)
query = f"DROP TABLE IF EXISTS {schema_name}.{table_name}"
con.execute(query)
# create the table if needed
query = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
CREATE TABLE IF NOT EXISTS {schema_name}.{table_name} (
_airbyte_ab_id TEXT PRIMARY KEY,
_airbyte_emitted_at DATETIME,
_airbyte_data JSON
Expand All @@ -103,12 +124,12 @@ def write(
# flush the buffer
for stream_name in buffer.keys():
logger.info(f"flushing buffer for state: {message}")
query = """
INSERT INTO {table_name} (_airbyte_ab_id, _airbyte_emitted_at, _airbyte_data)
table_name = f"_airbyte_raw_{stream_name}"
query = f"""
INSERT INTO {schema_name}.{table_name}
(_airbyte_ab_id, _airbyte_emitted_at, _airbyte_data)
VALUES (?,?,?)
""".format(
table_name=f"_airbyte_raw_{stream_name}"
)
"""
con.executemany(query, buffer[stream_name])

con.commit()
Expand All @@ -119,7 +140,9 @@ def write(
data = message.record.data
stream = message.record.stream
if stream not in streams:
logger.debug(f"Stream {stream} was not present in configured streams, skipping")
logger.debug(
f"Stream {stream} was not present in configured streams, skipping"
)
continue

# add to buffer
Expand All @@ -135,17 +158,18 @@ def write(

# flush any remaining messages
for stream_name in buffer.keys():
query = """
INSERT INTO {table_name}
table_name = f"_airbyte_raw_{stream_name}"
query = f"""
INSERT INTO {schema_name}.{table_name}
VALUES (?,?,?)
""".format(
table_name=f"_airbyte_raw_{stream_name}"
)
"""

con.executemany(query, buffer[stream_name])
con.commit()

def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
def check(
self, logger: AirbyteLogger, config: Mapping[str, Any]
) -> AirbyteConnectionStatus:
"""
Tests if the input configuration can be used to successfully connect to the destination with the needed permissions
e.g: if a provided API token or password can be used to connect and write to the destination.
Expand All @@ -165,13 +189,15 @@ def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConn
logger.info(f"Using DuckDB file at {path}")
os.makedirs(os.path.dirname(path), exist_ok=True)

if "motherduck_api_key" in config:
os.environ["motherduck_token"] = config["motherduck_api_key"]
if CONFIG_MOTHERDUCK_API_KEY in config:
os.environ["motherduck_token"] = str(config[CONFIG_MOTHERDUCK_API_KEY])

con = duckdb.connect(database=path, read_only=False)
con.execute("SELECT 1;")

return AirbyteConnectionStatus(status=Status.SUCCEEDED)

except Exception as e:
return AirbyteConnectionStatus(status=Status.FAILED, message=f"An exception occurred: {repr(e)}")
return AirbyteConnectionStatus(
status=Status.FAILED, message=f"An exception occurred: {repr(e)}"
)
Original file line number Diff line number Diff line change
Expand Up @@ -31,34 +31,45 @@
from destination_duckdb import DestinationDuckdb

CONFIG_PATH = "integration_tests/config.json"
SECRETS_CONFIG_PATH = (
"secrets/config.json" # Should contain a valid MotherDuck API token
)


def pytest_generate_tests(metafunc):
if "config" not in metafunc.fixturenames:
return

configs: list[str] = ["local_file_config"]
if Path(CONFIG_PATH).is_file():
if Path(SECRETS_CONFIG_PATH).is_file():
configs.append("motherduck_config")
else:
print(
f"Skipping MotherDuck tests because config file not found at: {CONFIG_PATH}"
f"Skipping MotherDuck tests because config file not found at: {SECRETS_CONFIG_PATH}"
)

# for test_name in ["test_check_succeeds", "test_write"]:
metafunc.parametrize("config", configs, indirect=True)


@pytest.fixture(scope="module")
def test_schema_name() -> str:
letters = string.ascii_lowercase
rand_string = "".join(random.choice(letters) for _ in range(6))
return f"test_schema_{rand_string}"


@pytest.fixture
def config(request) -> Dict[str, str]:
# create a file "myfile" in "mydir" in temp directory
def config(request, test_schema_name: str) -> Dict[str, str]:
if request.param == "local_file_config":
tmp_dir = tempfile.TemporaryDirectory()
test = os.path.join(str(tmp_dir.name), "test.duckdb")
yield {"destination_path": test}
yield {"destination_path": test, "schema": test_schema_name}

elif request.param == "motherduck_config":
yield json.loads(Path(CONFIG_PATH).read_text())
config_dict = json.loads(Path(SECRETS_CONFIG_PATH).read_text())
config_dict["schema"] = test_schema_name
yield config_dict

else:
raise ValueError(f"Unknown config type: {request.param}")
Expand Down Expand Up @@ -165,6 +176,7 @@ def test_write(
airbyte_message2: AirbyteMessage,
airbyte_message3: AirbyteMessage,
test_table_name: str,
test_schema_name: str,
):
destination = DestinationDuckdb()
generator = destination.write(
Expand All @@ -179,7 +191,8 @@ def test_write(
con = duckdb.connect(database=config.get("destination_path"), read_only=False)
with con:
cursor = con.execute(
f"SELECT _airbyte_ab_id, _airbyte_emitted_at, _airbyte_data FROM _airbyte_raw_{test_table_name} ORDER BY _airbyte_data"
"SELECT _airbyte_ab_id, _airbyte_emitted_at, _airbyte_data "
f"FROM {test_schema_name}._airbyte_raw_{test_table_name} ORDER BY _airbyte_data"
)
result = cursor.fetchall()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 94bd199c-2ff0-4aa2-b98e-17f0acb72610
dockerImageTag: 0.2.0
dockerImageTag: 0.2.1
dockerRepository: airbyte/destination-duckdb
githubIssueLabel: destination-duckdb
icon: duckdb.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

import pytest
from destination_duckdb import DestinationDuckdb
from destination_duckdb.destination import DestinationDuckdb, validated_sql_name


def test_read_invalid_path():
Expand All @@ -12,3 +12,25 @@ def test_read_invalid_path():
_ = DestinationDuckdb._get_destination_path(invalid_input)

assert True


@pytest.mark.parametrize(
"input, expected",
[
("test", "test"),
("test_123", "test_123"),
("test;123", None),
("test123;", None),
("test-123", None),
("test 123", None),
("test.123", None),
("test,123", None),
("test!123", None),
],
)
def test_validated_sql_name(input, expected):
if expected is None:
with pytest.raises(ValueError):
validated_sql_name(input)
else:
assert validated_sql_name(input) == expected
5 changes: 3 additions & 2 deletions docs/integrations/destinations/duckdb.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ To specify a MotherDuck-hosted database as your destination, simply provide your

:::caution

We do not recommend providing your API token in the `md:` connection string, as this may cause your token to be printed to execution logs. Please use the `MotherDuck API Key`` setting instead.
We do not recommend providing your API token in the `md:` connection string, as this may cause your token to be printed to execution logs. Please use the `MotherDuck API Key` setting instead.

:::

Expand Down Expand Up @@ -106,5 +106,6 @@ Note: If you are running Airbyte on Windows with Docker backed by WSL2, you have

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :--------------------- |
| 0.2.0 | 2022-10-14 | [](https://github.com/airbytehq/airbyte/pull/) | Add support for MotherDuck |
| 0.2.1 | 2022-10-20 | [#30600](https://github.com/airbytehq/airbyte/pull/30600) | Fix: schema name mapping |
| 0.2.0 | 2022-10-19 | [#29428](https://github.com/airbytehq/airbyte/pull/29428) | Add support for MotherDuck. Upgrade DuckDB version to `v0.8``. |
| 0.1.0 | 2022-10-14 | [17494](https://github.com/airbytehq/airbyte/pull/17494) | New DuckDB destination |

0 comments on commit f2a1841

Please sign in to comment.