Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Destination DuckDB: Fix schema name handling #30600

Merged
merged 12 commits into from
Sep 20, 2023
Merged
8 changes: 6 additions & 2 deletions .devcontainer/destination-duckdb/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
// Python extensions:
"charliermarsh.ruff",
"matangover.mypy",
"ms-python.black",
"ms-python.black-formatter",
"ms-python.python",
"ms-python.vscode-pylance",

Expand All @@ -41,6 +41,7 @@
],
"settings": {
"extensions.ignoreRecommendations": true,
"git.autofetch": true,
"git.openRepositoryInParentFolders": "always",
"python.defaultInterpreterPath": ".venv/bin/python",
"python.interpreter.infoVisibility": "always",
Expand All @@ -50,7 +51,10 @@
"python.testing.pytestArgs": [
"--rootdir=/workspaces/airbyte/airbyte-integrations/connectors/destination-duckdb",
"."
]
],
"[python]": {
"editor.defaultFormatter": "ms-python.black-formatter"
}
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ RUN echo "Etc/UTC" > /etc/timezone
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.name=airbyte/destination-duckdb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import datetime
import json
import os
import re
import uuid
from collections import defaultdict
from logging import getLogger
Expand All @@ -13,10 +14,30 @@
import duckdb
from airbyte_cdk import AirbyteLogger
from airbyte_cdk.destinations import Destination
from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, DestinationSyncMode, Status, Type
from airbyte_cdk.models import (
AirbyteConnectionStatus,
AirbyteMessage,
ConfiguredAirbyteCatalog,
DestinationSyncMode,
Status,
Type,
)

logger = getLogger("airbyte")

CONFIG_MOTHERDUCK_API_KEY = "motherduck_api_key"
CONFIG_DEFAULT_SCHEMA = "main"


def validated_sql_name(sql_name: Any) -> str:
"""Return the input if it is a valid SQL name, otherwise raise an exception."""
pattern = r"^[a-zA-Z0-9_]*$"
result = str(sql_name)
if bool(re.match(pattern, result)):
return result

raise ValueError(f"Invalid SQL name: {sql_name}")


class DestinationDuckdb(Destination):
@staticmethod
Expand All @@ -25,7 +46,9 @@ def _get_destination_path(destination_path: str) -> str:
Get a normalized version of the destination path.
Automatically append /local/ to the start of the path
"""
if destination_path.startswith("md:") or destination_path.startswith("motherduck:"):
if destination_path.startswith("md:") or destination_path.startswith(
"motherduck:"
):
return destination_path

if not destination_path.startswith("/local"):
Expand All @@ -34,7 +57,8 @@ def _get_destination_path(destination_path: str) -> str:
destination_path = os.path.normpath(destination_path)
if not destination_path.startswith("/local"):
raise ValueError(
f"destination_path={destination_path} is not a valid path." "A valid path shall start with /local or no / prefix"
f"destination_path={destination_path} is not a valid path."
"A valid path shall start with /local or no / prefix"
)

return destination_path
Expand Down Expand Up @@ -63,31 +87,28 @@ def write(

path = str(config.get("destination_path"))
path = self._get_destination_path(path)
schema_name = validated_sql_name(config.get("schema", CONFIG_DEFAULT_SCHEMA))

# Get and register auth token if applicable
motherduck_api_key = str(config.get("motherduck_api_key"))
motherduck_api_key = str(config.get(CONFIG_MOTHERDUCK_API_KEY, ""))
if motherduck_api_key:
os.environ["motherduck_token"] = motherduck_api_key

con = duckdb.connect(database=path, read_only=False)

# create the tables if needed
# con.execute("BEGIN TRANSACTION")
con.execute(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")

for configured_stream in configured_catalog.streams:
name = configured_stream.stream.name
table_name = f"_airbyte_raw_{name}"
if configured_stream.destination_sync_mode == DestinationSyncMode.overwrite:
# delete the tables
logger.info(f"Dropping tables for overwrite: {table_name}")
query = """
DROP TABLE IF EXISTS {}
""".format(
table_name
)
query = f"DROP TABLE IF EXISTS {schema_name}.{table_name}"
con.execute(query)
# create the table if needed
query = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
CREATE TABLE IF NOT EXISTS {schema_name}.{table_name} (
_airbyte_ab_id TEXT PRIMARY KEY,
_airbyte_emitted_at DATETIME,
_airbyte_data JSON
Expand All @@ -103,12 +124,12 @@ def write(
# flush the buffer
for stream_name in buffer.keys():
logger.info(f"flushing buffer for state: {message}")
query = """
INSERT INTO {table_name} (_airbyte_ab_id, _airbyte_emitted_at, _airbyte_data)
table_name = f"_airbyte_raw_{stream_name}"
query = f"""
INSERT INTO {schema_name}.{table_name}
(_airbyte_ab_id, _airbyte_emitted_at, _airbyte_data)
VALUES (?,?,?)
""".format(
table_name=f"_airbyte_raw_{stream_name}"
)
"""
con.executemany(query, buffer[stream_name])

con.commit()
Expand All @@ -119,7 +140,9 @@ def write(
data = message.record.data
stream = message.record.stream
if stream not in streams:
logger.debug(f"Stream {stream} was not present in configured streams, skipping")
logger.debug(
f"Stream {stream} was not present in configured streams, skipping"
)
continue

# add to buffer
Expand All @@ -135,17 +158,18 @@ def write(

# flush any remaining messages
for stream_name in buffer.keys():
query = """
INSERT INTO {table_name}
table_name = f"_airbyte_raw_{stream_name}"
query = f"""
INSERT INTO {schema_name}.{table_name}
VALUES (?,?,?)
""".format(
table_name=f"_airbyte_raw_{stream_name}"
)
"""

con.executemany(query, buffer[stream_name])
con.commit()

def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
def check(
self, logger: AirbyteLogger, config: Mapping[str, Any]
) -> AirbyteConnectionStatus:
"""
Tests if the input configuration can be used to successfully connect to the destination with the needed permissions
e.g: if a provided API token or password can be used to connect and write to the destination.
Expand All @@ -165,13 +189,15 @@ def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConn
logger.info(f"Using DuckDB file at {path}")
os.makedirs(os.path.dirname(path), exist_ok=True)

if "motherduck_api_key" in config:
os.environ["motherduck_token"] = config["motherduck_api_key"]
if CONFIG_MOTHERDUCK_API_KEY in config:
os.environ["motherduck_token"] = str(config[CONFIG_MOTHERDUCK_API_KEY])

con = duckdb.connect(database=path, read_only=False)
con.execute("SELECT 1;")

return AirbyteConnectionStatus(status=Status.SUCCEEDED)

except Exception as e:
return AirbyteConnectionStatus(status=Status.FAILED, message=f"An exception occurred: {repr(e)}")
return AirbyteConnectionStatus(
status=Status.FAILED, message=f"An exception occurred: {repr(e)}"
)
Original file line number Diff line number Diff line change
Expand Up @@ -31,34 +31,45 @@
from destination_duckdb import DestinationDuckdb

CONFIG_PATH = "integration_tests/config.json"
SECRETS_CONFIG_PATH = (
"secrets/config.json" # Should contain a valid MotherDuck API token
)


def pytest_generate_tests(metafunc):
if "config" not in metafunc.fixturenames:
return

configs: list[str] = ["local_file_config"]
if Path(CONFIG_PATH).is_file():
if Path(SECRETS_CONFIG_PATH).is_file():
configs.append("motherduck_config")
else:
print(
f"Skipping MotherDuck tests because config file not found at: {CONFIG_PATH}"
f"Skipping MotherDuck tests because config file not found at: {SECRETS_CONFIG_PATH}"
)

# for test_name in ["test_check_succeeds", "test_write"]:
metafunc.parametrize("config", configs, indirect=True)


@pytest.fixture(scope="module")
def test_schema_name() -> str:
letters = string.ascii_lowercase
rand_string = "".join(random.choice(letters) for _ in range(6))
return f"test_schema_{rand_string}"


@pytest.fixture
def config(request) -> Dict[str, str]:
# create a file "myfile" in "mydir" in temp directory
def config(request, test_schema_name: str) -> Dict[str, str]:
if request.param == "local_file_config":
tmp_dir = tempfile.TemporaryDirectory()
test = os.path.join(str(tmp_dir.name), "test.duckdb")
yield {"destination_path": test}
yield {"destination_path": test, "schema": test_schema_name}

elif request.param == "motherduck_config":
yield json.loads(Path(CONFIG_PATH).read_text())
config_dict = json.loads(Path(SECRETS_CONFIG_PATH).read_text())
config_dict["schema"] = test_schema_name
yield config_dict

else:
raise ValueError(f"Unknown config type: {request.param}")
Expand Down Expand Up @@ -165,6 +176,7 @@ def test_write(
airbyte_message2: AirbyteMessage,
airbyte_message3: AirbyteMessage,
test_table_name: str,
test_schema_name: str,
):
destination = DestinationDuckdb()
generator = destination.write(
Expand All @@ -179,7 +191,8 @@ def test_write(
con = duckdb.connect(database=config.get("destination_path"), read_only=False)
with con:
cursor = con.execute(
f"SELECT _airbyte_ab_id, _airbyte_emitted_at, _airbyte_data FROM _airbyte_raw_{test_table_name} ORDER BY _airbyte_data"
"SELECT _airbyte_ab_id, _airbyte_emitted_at, _airbyte_data "
f"FROM {test_schema_name}._airbyte_raw_{test_table_name} ORDER BY _airbyte_data"
)
result = cursor.fetchall()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 94bd199c-2ff0-4aa2-b98e-17f0acb72610
dockerImageTag: 0.2.0
dockerImageTag: 0.2.1
dockerRepository: airbyte/destination-duckdb
githubIssueLabel: destination-duckdb
icon: duckdb.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

import pytest
from destination_duckdb import DestinationDuckdb
from destination_duckdb.destination import DestinationDuckdb, validated_sql_name


def test_read_invalid_path():
Expand All @@ -12,3 +12,25 @@ def test_read_invalid_path():
_ = DestinationDuckdb._get_destination_path(invalid_input)

assert True


@pytest.mark.parametrize(
"input, expected",
[
("test", "test"),
("test_123", "test_123"),
("test;123", None),
("test123;", None),
("test-123", None),
("test 123", None),
("test.123", None),
("test,123", None),
("test!123", None),
],
)
def test_validated_sql_name(input, expected):
if expected is None:
with pytest.raises(ValueError):
validated_sql_name(input)
else:
assert validated_sql_name(input) == expected
5 changes: 3 additions & 2 deletions docs/integrations/destinations/duckdb.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ To specify a MotherDuck-hosted database as your destination, simply provide your

:::caution

We do not recommend providing your API token in the `md:` connection string, as this may cause your token to be printed to execution logs. Please use the `MotherDuck API Key`` setting instead.
We do not recommend providing your API token in the `md:` connection string, as this may cause your token to be printed to execution logs. Please use the `MotherDuck API Key` setting instead.

:::

Expand Down Expand Up @@ -106,5 +106,6 @@ Note: If you are running Airbyte on Windows with Docker backed by WSL2, you have

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :--------------------- |
| 0.2.0 | 2022-10-14 | [](https://github.com/airbytehq/airbyte/pull/) | Add support for MotherDuck |
| 0.2.1 | 2022-10-20 | [#30600](https://github.com/airbytehq/airbyte/pull/30600) | Fix: schema name mapping |
| 0.2.0 | 2022-10-19 | [#29428](https://github.com/airbytehq/airbyte/pull/29428) | Add support for MotherDuck. Upgrade DuckDB version to `v0.8``. |
| 0.1.0 | 2022-10-14 | [17494](https://github.com/airbytehq/airbyte/pull/17494) | New DuckDB destination |
Loading