diff --git a/.devcontainer/destination-duckdb/devcontainer.json b/.devcontainer/destination-duckdb/devcontainer.json index 252a58a1cf38..587b51126ff6 100644 --- a/.devcontainer/destination-duckdb/devcontainer.json +++ b/.devcontainer/destination-duckdb/devcontainer.json @@ -26,7 +26,7 @@ // Python extensions: "charliermarsh.ruff", "matangover.mypy", - "ms-python.black", + "ms-python.black-formatter", "ms-python.python", "ms-python.vscode-pylance", @@ -41,6 +41,7 @@ ], "settings": { "extensions.ignoreRecommendations": true, + "git.autofetch": true, "git.openRepositoryInParentFolders": "always", "python.defaultInterpreterPath": ".venv/bin/python", "python.interpreter.infoVisibility": "always", @@ -50,7 +51,10 @@ "python.testing.pytestArgs": [ "--rootdir=/workspaces/airbyte/airbyte-integrations/connectors/destination-duckdb", "." - ] + ], + "[python]": { + "editor.defaultFormatter": "ms-python.black-formatter" + } } } }, diff --git a/airbyte-integrations/connectors/destination-duckdb/Dockerfile b/airbyte-integrations/connectors/destination-duckdb/Dockerfile index e1438e9c946a..47010db44b8b 100644 --- a/airbyte-integrations/connectors/destination-duckdb/Dockerfile +++ b/airbyte-integrations/connectors/destination-duckdb/Dockerfile @@ -30,5 +30,5 @@ RUN echo "Etc/UTC" > /etc/timezone ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.2.0 +LABEL io.airbyte.version=0.2.1 LABEL io.airbyte.name=airbyte/destination-duckdb diff --git a/airbyte-integrations/connectors/destination-duckdb/destination_duckdb/destination.py b/airbyte-integrations/connectors/destination-duckdb/destination_duckdb/destination.py index a87de508c5df..5b6c892dd7a2 100644 --- a/airbyte-integrations/connectors/destination-duckdb/destination_duckdb/destination.py +++ b/airbyte-integrations/connectors/destination-duckdb/destination_duckdb/destination.py @@ -5,6 +5,7 @@ import datetime import json import os +import re import uuid from collections import defaultdict from logging import getLogger @@ -13,10 +14,30 @@ import duckdb from airbyte_cdk import AirbyteLogger from airbyte_cdk.destinations import Destination -from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, DestinationSyncMode, Status, Type +from airbyte_cdk.models import ( + AirbyteConnectionStatus, + AirbyteMessage, + ConfiguredAirbyteCatalog, + DestinationSyncMode, + Status, + Type, +) logger = getLogger("airbyte") +CONFIG_MOTHERDUCK_API_KEY = "motherduck_api_key" +CONFIG_DEFAULT_SCHEMA = "main" + + +def validated_sql_name(sql_name: Any) -> str: + """Return the input if it is a valid SQL name, otherwise raise an exception.""" + pattern = r"^[a-zA-Z0-9_]*$" + result = str(sql_name) + if bool(re.match(pattern, result)): + return result + + raise ValueError(f"Invalid SQL name: {sql_name}") + class DestinationDuckdb(Destination): @staticmethod @@ -25,7 +46,9 @@ def _get_destination_path(destination_path: str) -> str: Get a normalized version of the destination path. Automatically append /local/ to the start of the path """ - if destination_path.startswith("md:") or destination_path.startswith("motherduck:"): + if destination_path.startswith("md:") or destination_path.startswith( + "motherduck:" + ): return destination_path if not destination_path.startswith("/local"): @@ -34,7 +57,8 @@ def _get_destination_path(destination_path: str) -> str: destination_path = os.path.normpath(destination_path) if not destination_path.startswith("/local"): raise ValueError( - f"destination_path={destination_path} is not a valid path." "A valid path shall start with /local or no / prefix" + f"destination_path={destination_path} is not a valid path." + "A valid path shall start with /local or no / prefix" ) return destination_path @@ -63,31 +87,28 @@ def write( path = str(config.get("destination_path")) path = self._get_destination_path(path) + schema_name = validated_sql_name(config.get("schema", CONFIG_DEFAULT_SCHEMA)) # Get and register auth token if applicable - motherduck_api_key = str(config.get("motherduck_api_key")) + motherduck_api_key = str(config.get(CONFIG_MOTHERDUCK_API_KEY, "")) if motherduck_api_key: os.environ["motherduck_token"] = motherduck_api_key con = duckdb.connect(database=path, read_only=False) - # create the tables if needed - # con.execute("BEGIN TRANSACTION") + con.execute(f"CREATE SCHEMA IF NOT EXISTS {schema_name}") + for configured_stream in configured_catalog.streams: name = configured_stream.stream.name table_name = f"_airbyte_raw_{name}" if configured_stream.destination_sync_mode == DestinationSyncMode.overwrite: # delete the tables logger.info(f"Dropping tables for overwrite: {table_name}") - query = """ - DROP TABLE IF EXISTS {} - """.format( - table_name - ) + query = f"DROP TABLE IF EXISTS {schema_name}.{table_name}" con.execute(query) # create the table if needed query = f""" - CREATE TABLE IF NOT EXISTS {table_name} ( + CREATE TABLE IF NOT EXISTS {schema_name}.{table_name} ( _airbyte_ab_id TEXT PRIMARY KEY, _airbyte_emitted_at DATETIME, _airbyte_data JSON @@ -103,12 +124,12 @@ def write( # flush the buffer for stream_name in buffer.keys(): logger.info(f"flushing buffer for state: {message}") - query = """ - INSERT INTO {table_name} (_airbyte_ab_id, _airbyte_emitted_at, _airbyte_data) + table_name = f"_airbyte_raw_{stream_name}" + query = f""" + INSERT INTO {schema_name}.{table_name} + (_airbyte_ab_id, _airbyte_emitted_at, _airbyte_data) VALUES (?,?,?) - """.format( - table_name=f"_airbyte_raw_{stream_name}" - ) + """ con.executemany(query, buffer[stream_name]) con.commit() @@ -119,7 +140,9 @@ def write( data = message.record.data stream = message.record.stream if stream not in streams: - logger.debug(f"Stream {stream} was not present in configured streams, skipping") + logger.debug( + f"Stream {stream} was not present in configured streams, skipping" + ) continue # add to buffer @@ -135,17 +158,18 @@ def write( # flush any remaining messages for stream_name in buffer.keys(): - query = """ - INSERT INTO {table_name} + table_name = f"_airbyte_raw_{stream_name}" + query = f""" + INSERT INTO {schema_name}.{table_name} VALUES (?,?,?) - """.format( - table_name=f"_airbyte_raw_{stream_name}" - ) + """ con.executemany(query, buffer[stream_name]) con.commit() - def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: + def check( + self, logger: AirbyteLogger, config: Mapping[str, Any] + ) -> AirbyteConnectionStatus: """ Tests if the input configuration can be used to successfully connect to the destination with the needed permissions e.g: if a provided API token or password can be used to connect and write to the destination. @@ -165,8 +189,8 @@ def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConn logger.info(f"Using DuckDB file at {path}") os.makedirs(os.path.dirname(path), exist_ok=True) - if "motherduck_api_key" in config: - os.environ["motherduck_token"] = config["motherduck_api_key"] + if CONFIG_MOTHERDUCK_API_KEY in config: + os.environ["motherduck_token"] = str(config[CONFIG_MOTHERDUCK_API_KEY]) con = duckdb.connect(database=path, read_only=False) con.execute("SELECT 1;") @@ -174,4 +198,6 @@ def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConn return AirbyteConnectionStatus(status=Status.SUCCEEDED) except Exception as e: - return AirbyteConnectionStatus(status=Status.FAILED, message=f"An exception occurred: {repr(e)}") + return AirbyteConnectionStatus( + status=Status.FAILED, message=f"An exception occurred: {repr(e)}" + ) diff --git a/airbyte-integrations/connectors/destination-duckdb/integration_tests/integration_test.py b/airbyte-integrations/connectors/destination-duckdb/integration_tests/integration_test.py index 81f4305e5759..6ad34be92b12 100644 --- a/airbyte-integrations/connectors/destination-duckdb/integration_tests/integration_test.py +++ b/airbyte-integrations/connectors/destination-duckdb/integration_tests/integration_test.py @@ -31,6 +31,9 @@ from destination_duckdb import DestinationDuckdb CONFIG_PATH = "integration_tests/config.json" +SECRETS_CONFIG_PATH = ( + "secrets/config.json" # Should contain a valid MotherDuck API token +) def pytest_generate_tests(metafunc): @@ -38,27 +41,35 @@ def pytest_generate_tests(metafunc): return configs: list[str] = ["local_file_config"] - if Path(CONFIG_PATH).is_file(): + if Path(SECRETS_CONFIG_PATH).is_file(): configs.append("motherduck_config") else: print( - f"Skipping MotherDuck tests because config file not found at: {CONFIG_PATH}" + f"Skipping MotherDuck tests because config file not found at: {SECRETS_CONFIG_PATH}" ) # for test_name in ["test_check_succeeds", "test_write"]: metafunc.parametrize("config", configs, indirect=True) +@pytest.fixture(scope="module") +def test_schema_name() -> str: + letters = string.ascii_lowercase + rand_string = "".join(random.choice(letters) for _ in range(6)) + return f"test_schema_{rand_string}" + + @pytest.fixture -def config(request) -> Dict[str, str]: - # create a file "myfile" in "mydir" in temp directory +def config(request, test_schema_name: str) -> Dict[str, str]: if request.param == "local_file_config": tmp_dir = tempfile.TemporaryDirectory() test = os.path.join(str(tmp_dir.name), "test.duckdb") - yield {"destination_path": test} + yield {"destination_path": test, "schema": test_schema_name} elif request.param == "motherduck_config": - yield json.loads(Path(CONFIG_PATH).read_text()) + config_dict = json.loads(Path(SECRETS_CONFIG_PATH).read_text()) + config_dict["schema"] = test_schema_name + yield config_dict else: raise ValueError(f"Unknown config type: {request.param}") @@ -165,6 +176,7 @@ def test_write( airbyte_message2: AirbyteMessage, airbyte_message3: AirbyteMessage, test_table_name: str, + test_schema_name: str, ): destination = DestinationDuckdb() generator = destination.write( @@ -179,7 +191,8 @@ def test_write( con = duckdb.connect(database=config.get("destination_path"), read_only=False) with con: cursor = con.execute( - f"SELECT _airbyte_ab_id, _airbyte_emitted_at, _airbyte_data FROM _airbyte_raw_{test_table_name} ORDER BY _airbyte_data" + "SELECT _airbyte_ab_id, _airbyte_emitted_at, _airbyte_data " + f"FROM {test_schema_name}._airbyte_raw_{test_table_name} ORDER BY _airbyte_data" ) result = cursor.fetchall() diff --git a/airbyte-integrations/connectors/destination-duckdb/metadata.yaml b/airbyte-integrations/connectors/destination-duckdb/metadata.yaml index b0314656d69d..36bb69d1bf8c 100644 --- a/airbyte-integrations/connectors/destination-duckdb/metadata.yaml +++ b/airbyte-integrations/connectors/destination-duckdb/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 94bd199c-2ff0-4aa2-b98e-17f0acb72610 - dockerImageTag: 0.2.0 + dockerImageTag: 0.2.1 dockerRepository: airbyte/destination-duckdb githubIssueLabel: destination-duckdb icon: duckdb.svg diff --git a/airbyte-integrations/connectors/destination-duckdb/unit_tests/unit_test.py b/airbyte-integrations/connectors/destination-duckdb/unit_tests/unit_test.py index 4edc22c51af2..f7dcb2c361cc 100644 --- a/airbyte-integrations/connectors/destination-duckdb/unit_tests/unit_test.py +++ b/airbyte-integrations/connectors/destination-duckdb/unit_tests/unit_test.py @@ -3,7 +3,7 @@ # import pytest -from destination_duckdb import DestinationDuckdb +from destination_duckdb.destination import DestinationDuckdb, validated_sql_name def test_read_invalid_path(): @@ -12,3 +12,25 @@ def test_read_invalid_path(): _ = DestinationDuckdb._get_destination_path(invalid_input) assert True + + +@pytest.mark.parametrize( + "input, expected", + [ + ("test", "test"), + ("test_123", "test_123"), + ("test;123", None), + ("test123;", None), + ("test-123", None), + ("test 123", None), + ("test.123", None), + ("test,123", None), + ("test!123", None), + ], +) +def test_validated_sql_name(input, expected): + if expected is None: + with pytest.raises(ValueError): + validated_sql_name(input) + else: + assert validated_sql_name(input) == expected diff --git a/docs/integrations/destinations/duckdb.md b/docs/integrations/destinations/duckdb.md index aa01e1e406ff..679618b844c1 100644 --- a/docs/integrations/destinations/duckdb.md +++ b/docs/integrations/destinations/duckdb.md @@ -26,7 +26,7 @@ To specify a MotherDuck-hosted database as your destination, simply provide your :::caution -We do not recommend providing your API token in the `md:` connection string, as this may cause your token to be printed to execution logs. Please use the `MotherDuck API Key`` setting instead. +We do not recommend providing your API token in the `md:` connection string, as this may cause your token to be printed to execution logs. Please use the `MotherDuck API Key` setting instead. ::: @@ -106,5 +106,6 @@ Note: If you are running Airbyte on Windows with Docker backed by WSL2, you have | Version | Date | Pull Request | Subject | | :------ | :--------- | :------------------------------------------------------- | :--------------------- | -| 0.2.0 | 2022-10-14 | [](https://github.com/airbytehq/airbyte/pull/) | Add support for MotherDuck | +| 0.2.1 | 2022-10-20 | [#30600](https://github.com/airbytehq/airbyte/pull/30600) | Fix: schema name mapping | +| 0.2.0 | 2022-10-19 | [#29428](https://github.com/airbytehq/airbyte/pull/29428) | Add support for MotherDuck. Upgrade DuckDB version to `v0.8``. | | 0.1.0 | 2022-10-14 | [17494](https://github.com/airbytehq/airbyte/pull/17494) | New DuckDB destination |