From cb80c6be5e4fc651bce0fb0aef430d998c281f4e Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Mon, 20 May 2024 12:34:41 +0200 Subject: [PATCH 01/11] Add HMAC credentials and update Clickhouse configuration Signed-off-by: Marcel Coetzee --- dlt/common/configuration/specs/__init__.py | 4 +- .../configuration/specs/hmac_credentials.py | 41 +++++++++++++++++++ .../impl/clickhouse/clickhouse.py | 17 ++++---- .../impl/clickhouse/configuration.py | 4 -- 4 files changed, 53 insertions(+), 13 deletions(-) create mode 100644 dlt/common/configuration/specs/hmac_credentials.py diff --git a/dlt/common/configuration/specs/__init__.py b/dlt/common/configuration/specs/__init__.py index 9acf14bde3..88d7a534b9 100644 --- a/dlt/common/configuration/specs/__init__.py +++ b/dlt/common/configuration/specs/__init__.py @@ -21,6 +21,7 @@ from .api_credentials import OAuth2Credentials from .aws_credentials import AwsCredentials, AwsCredentialsWithoutDefaults from .azure_credentials import AzureCredentials, AzureCredentialsWithoutDefaults +from .hmac_credentials import HMACCredentials # backward compatibility for service account credentials @@ -53,4 +54,5 @@ "AzureCredentialsWithoutDefaults", "GcpClientCredentials", "GcpClientCredentialsWithDefault", -] + "HMACCredentials", +] \ No newline at end of file diff --git a/dlt/common/configuration/specs/hmac_credentials.py b/dlt/common/configuration/specs/hmac_credentials.py new file mode 100644 index 0000000000..29f987db42 --- /dev/null +++ b/dlt/common/configuration/specs/hmac_credentials.py @@ -0,0 +1,41 @@ +from typing import Optional, Dict + +from dlt.common.configuration.specs import ( + CredentialsConfiguration, + configspec, +) +from dlt.common.typing import TSecretStrValue, DictStrAny + + +@configspec +class HMACCredentials(CredentialsConfiguration): + aws_access_key_id: str = None + aws_secret_access_key: TSecretStrValue = None + aws_session_token: Optional[TSecretStrValue] = None + profile_name: Optional[str] = None + region_name: Optional[str] = None + endpoint_url: Optional[str] = None + + def to_s3fs_credentials(self) -> Dict[str, Optional[str]]: + """Dict of keyword arguments that can be passed to s3fs""" + credentials: DictStrAny = dict( + key=self.aws_access_key_id, + secret=self.aws_secret_access_key, + token=self.aws_session_token, + profile=self.profile_name, + endpoint_url=self.endpoint_url, + ) + if self.region_name: + credentials["client_kwargs"] = {"region_name": self.region_name} + return credentials + + def to_native_representation(self) -> Dict[str, Optional[str]]: + """Return a dict that can be passed as kwargs to boto3 session""" + return dict(self) + + def to_session_credentials(self) -> Dict[str, str]: + return dict( + aws_access_key_id=self.aws_access_key_id, + aws_secret_access_key=self.aws_secret_access_key, + aws_session_token=self.aws_session_token, + ) diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index e2c1f827bc..2c0e6a6e77 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -7,15 +7,14 @@ import clickhouse_connect from clickhouse_connect.driver.tools import insert_file -import dlt from dlt import config from dlt.common.configuration.specs import ( CredentialsConfiguration, AzureCredentialsWithoutDefaults, GcpCredentials, AwsCredentialsWithoutDefaults, + HMACCredentials, ) -from dlt.destinations.exceptions import DestinationTransientException from dlt.common.destination import DestinationCapabilitiesContext from dlt.common.destination.reference import ( SupportsStagingDestination, @@ -34,7 +33,7 @@ TColumnSchemaBase, ) from dlt.common.storages import FileStorage -from dlt.destinations.exceptions import LoadJobTerminalException +from dlt.destinations.exceptions import DestinationTransientException, LoadJobTerminalException from dlt.destinations.impl.clickhouse import capabilities from dlt.destinations.impl.clickhouse.clickhouse_adapter import ( TTableEngineType, @@ -209,13 +208,15 @@ def __init__( access_key_id = staging_credentials.aws_access_key_id secret_access_key = staging_credentials.aws_secret_access_key elif isinstance(staging_credentials, GcpCredentials): - access_key_id = client.credentials.gcp_access_key_id - secret_access_key = client.credentials.gcp_secret_access_key + # Defer to the provided HMAC Credentials. + access_key_id = client.credentials.gcp_access_key_id # type: ignore + secret_access_key = client.credentials.gcp_secret_access_key # type: ignore if not access_key_id or not secret_access_key: raise DestinationTransientException( - "You have tried loading from gcs with clickhouse. Please provide valid" - " 'gcp_access_key_id' and 'gcp_secret_access_key' to connect to gcs as" - " outlined in the dlthub docs." + "You have tried loading from gcs with clickhouse. " + "Please provide valid HMAC credentials as outlined in " + "the dlthub docs:\n" + "https://dlthub.com/devel/dlt-ecosystem/destinations/clickhouse#using-google-cloud-storage-as-a-staging-area." ) auth = "NOSIGN" diff --git a/dlt/destinations/impl/clickhouse/configuration.py b/dlt/destinations/impl/clickhouse/configuration.py index bbff6e0a9c..8c472b8867 100644 --- a/dlt/destinations/impl/clickhouse/configuration.py +++ b/dlt/destinations/impl/clickhouse/configuration.py @@ -38,10 +38,6 @@ class ClickHouseCredentials(ConnectionStringCredentials): """Separator for dataset table names, defaults to '___', i.e. 'database.dataset___table'.""" dataset_sentinel_table_name: str = "dlt_sentinel_table" """Special table to mark dataset as existing""" - gcp_access_key_id: Optional[str] = None - """When loading from a gcp bucket, you need to provide gcp interoperable keys""" - gcp_secret_access_key: Optional[str] = None - """When loading from a gcp bucket, you need to provide gcp interoperable keys""" __config_gen_annotations__: ClassVar[List[str]] = [ "host", From cabe61cfd4050b924c5a1a0230cd3667d9fa562f Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Mon, 20 May 2024 16:50:09 +0200 Subject: [PATCH 02/11] Revert "Add HMAC credentials and update Clickhouse configuration" This reverts commit cb80c6be5e4fc651bce0fb0aef430d998c281f4e. --- dlt/common/configuration/specs/__init__.py | 4 +- .../configuration/specs/hmac_credentials.py | 41 ------------------- .../impl/clickhouse/clickhouse.py | 17 ++++---- .../impl/clickhouse/configuration.py | 4 ++ 4 files changed, 13 insertions(+), 53 deletions(-) delete mode 100644 dlt/common/configuration/specs/hmac_credentials.py diff --git a/dlt/common/configuration/specs/__init__.py b/dlt/common/configuration/specs/__init__.py index 88d7a534b9..9acf14bde3 100644 --- a/dlt/common/configuration/specs/__init__.py +++ b/dlt/common/configuration/specs/__init__.py @@ -21,7 +21,6 @@ from .api_credentials import OAuth2Credentials from .aws_credentials import AwsCredentials, AwsCredentialsWithoutDefaults from .azure_credentials import AzureCredentials, AzureCredentialsWithoutDefaults -from .hmac_credentials import HMACCredentials # backward compatibility for service account credentials @@ -54,5 +53,4 @@ "AzureCredentialsWithoutDefaults", "GcpClientCredentials", "GcpClientCredentialsWithDefault", - "HMACCredentials", -] \ No newline at end of file +] diff --git a/dlt/common/configuration/specs/hmac_credentials.py b/dlt/common/configuration/specs/hmac_credentials.py deleted file mode 100644 index 29f987db42..0000000000 --- a/dlt/common/configuration/specs/hmac_credentials.py +++ /dev/null @@ -1,41 +0,0 @@ -from typing import Optional, Dict - -from dlt.common.configuration.specs import ( - CredentialsConfiguration, - configspec, -) -from dlt.common.typing import TSecretStrValue, DictStrAny - - -@configspec -class HMACCredentials(CredentialsConfiguration): - aws_access_key_id: str = None - aws_secret_access_key: TSecretStrValue = None - aws_session_token: Optional[TSecretStrValue] = None - profile_name: Optional[str] = None - region_name: Optional[str] = None - endpoint_url: Optional[str] = None - - def to_s3fs_credentials(self) -> Dict[str, Optional[str]]: - """Dict of keyword arguments that can be passed to s3fs""" - credentials: DictStrAny = dict( - key=self.aws_access_key_id, - secret=self.aws_secret_access_key, - token=self.aws_session_token, - profile=self.profile_name, - endpoint_url=self.endpoint_url, - ) - if self.region_name: - credentials["client_kwargs"] = {"region_name": self.region_name} - return credentials - - def to_native_representation(self) -> Dict[str, Optional[str]]: - """Return a dict that can be passed as kwargs to boto3 session""" - return dict(self) - - def to_session_credentials(self) -> Dict[str, str]: - return dict( - aws_access_key_id=self.aws_access_key_id, - aws_secret_access_key=self.aws_secret_access_key, - aws_session_token=self.aws_session_token, - ) diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index 2c0e6a6e77..e2c1f827bc 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -7,14 +7,15 @@ import clickhouse_connect from clickhouse_connect.driver.tools import insert_file +import dlt from dlt import config from dlt.common.configuration.specs import ( CredentialsConfiguration, AzureCredentialsWithoutDefaults, GcpCredentials, AwsCredentialsWithoutDefaults, - HMACCredentials, ) +from dlt.destinations.exceptions import DestinationTransientException from dlt.common.destination import DestinationCapabilitiesContext from dlt.common.destination.reference import ( SupportsStagingDestination, @@ -33,7 +34,7 @@ TColumnSchemaBase, ) from dlt.common.storages import FileStorage -from dlt.destinations.exceptions import DestinationTransientException, LoadJobTerminalException +from dlt.destinations.exceptions import LoadJobTerminalException from dlt.destinations.impl.clickhouse import capabilities from dlt.destinations.impl.clickhouse.clickhouse_adapter import ( TTableEngineType, @@ -208,15 +209,13 @@ def __init__( access_key_id = staging_credentials.aws_access_key_id secret_access_key = staging_credentials.aws_secret_access_key elif isinstance(staging_credentials, GcpCredentials): - # Defer to the provided HMAC Credentials. - access_key_id = client.credentials.gcp_access_key_id # type: ignore - secret_access_key = client.credentials.gcp_secret_access_key # type: ignore + access_key_id = client.credentials.gcp_access_key_id + secret_access_key = client.credentials.gcp_secret_access_key if not access_key_id or not secret_access_key: raise DestinationTransientException( - "You have tried loading from gcs with clickhouse. " - "Please provide valid HMAC credentials as outlined in " - "the dlthub docs:\n" - "https://dlthub.com/devel/dlt-ecosystem/destinations/clickhouse#using-google-cloud-storage-as-a-staging-area." + "You have tried loading from gcs with clickhouse. Please provide valid" + " 'gcp_access_key_id' and 'gcp_secret_access_key' to connect to gcs as" + " outlined in the dlthub docs." ) auth = "NOSIGN" diff --git a/dlt/destinations/impl/clickhouse/configuration.py b/dlt/destinations/impl/clickhouse/configuration.py index 8c472b8867..bbff6e0a9c 100644 --- a/dlt/destinations/impl/clickhouse/configuration.py +++ b/dlt/destinations/impl/clickhouse/configuration.py @@ -38,6 +38,10 @@ class ClickHouseCredentials(ConnectionStringCredentials): """Separator for dataset table names, defaults to '___', i.e. 'database.dataset___table'.""" dataset_sentinel_table_name: str = "dlt_sentinel_table" """Special table to mark dataset as existing""" + gcp_access_key_id: Optional[str] = None + """When loading from a gcp bucket, you need to provide gcp interoperable keys""" + gcp_secret_access_key: Optional[str] = None + """When loading from a gcp bucket, you need to provide gcp interoperable keys""" __config_gen_annotations__: ClassVar[List[str]] = [ "host", From f24eb1da5fe89f774fb2717e3679d817ec2d778e Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Mon, 20 May 2024 22:14:54 +0200 Subject: [PATCH 03/11] Refactor error handling for storage authentication in Clickhouse Signed-off-by: Marcel Coetzee --- .../impl/clickhouse/clickhouse.py | 24 ++++++++----------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index e2c1f827bc..881db6b14c 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -201,23 +201,19 @@ def __init__( compression = "none" if config.get("data_writer.disable_compression") else "gz" if bucket_scheme in ("s3", "gs", "gcs"): + if not isinstance(staging_credentials, AwsCredentialsWithoutDefaults): + raise DestinationTransientException( + "Staging data to S3 or Google Cloud Storage requires providing AWS-style HMAC access keys, even if using a " + "non-AWS storage provider. Please make sure to configure your destination with an `aws_access_key_id` and `aws_secret_access_key`. " + "See https://dlthub.com/docs/general-usage/destination#pass-explicit-parameters-and-a-name-to-a-destination " + "for documentation on how to set up your destination with the required access keys." + ) # TODO: Point exception message to relevant doc section. + # get auth and bucket url bucket_http_url = convert_storage_to_http_scheme(bucket_url) - access_key_id: str = None - secret_access_key: str = None - if isinstance(staging_credentials, AwsCredentialsWithoutDefaults): - access_key_id = staging_credentials.aws_access_key_id - secret_access_key = staging_credentials.aws_secret_access_key - elif isinstance(staging_credentials, GcpCredentials): - access_key_id = client.credentials.gcp_access_key_id - secret_access_key = client.credentials.gcp_secret_access_key - if not access_key_id or not secret_access_key: - raise DestinationTransientException( - "You have tried loading from gcs with clickhouse. Please provide valid" - " 'gcp_access_key_id' and 'gcp_secret_access_key' to connect to gcs as" - " outlined in the dlthub docs." - ) + access_key_id = staging_credentials.aws_access_key_id + secret_access_key = staging_credentials.aws_secret_access_key auth = "NOSIGN" if access_key_id and secret_access_key: auth = f"'{access_key_id}','{secret_access_key}'" From c30f23cb45785901b531cbb98315f6bbd907a63f Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Mon, 20 May 2024 22:24:09 +0200 Subject: [PATCH 04/11] Revert "Refactor error handling for storage authentication in Clickhouse" This reverts commit f24eb1da5fe89f774fb2717e3679d817ec2d778e. --- .../impl/clickhouse/clickhouse.py | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index 881db6b14c..e2c1f827bc 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -201,19 +201,23 @@ def __init__( compression = "none" if config.get("data_writer.disable_compression") else "gz" if bucket_scheme in ("s3", "gs", "gcs"): - if not isinstance(staging_credentials, AwsCredentialsWithoutDefaults): - raise DestinationTransientException( - "Staging data to S3 or Google Cloud Storage requires providing AWS-style HMAC access keys, even if using a " - "non-AWS storage provider. Please make sure to configure your destination with an `aws_access_key_id` and `aws_secret_access_key`. " - "See https://dlthub.com/docs/general-usage/destination#pass-explicit-parameters-and-a-name-to-a-destination " - "for documentation on how to set up your destination with the required access keys." - ) # TODO: Point exception message to relevant doc section. - # get auth and bucket url bucket_http_url = convert_storage_to_http_scheme(bucket_url) + access_key_id: str = None + secret_access_key: str = None + if isinstance(staging_credentials, AwsCredentialsWithoutDefaults): + access_key_id = staging_credentials.aws_access_key_id + secret_access_key = staging_credentials.aws_secret_access_key + elif isinstance(staging_credentials, GcpCredentials): + access_key_id = client.credentials.gcp_access_key_id + secret_access_key = client.credentials.gcp_secret_access_key + if not access_key_id or not secret_access_key: + raise DestinationTransientException( + "You have tried loading from gcs with clickhouse. Please provide valid" + " 'gcp_access_key_id' and 'gcp_secret_access_key' to connect to gcs as" + " outlined in the dlthub docs." + ) - access_key_id = staging_credentials.aws_access_key_id - secret_access_key = staging_credentials.aws_secret_access_key auth = "NOSIGN" if access_key_id and secret_access_key: auth = f"'{access_key_id}','{secret_access_key}'" From 217f6f7665c92494f6024b9d1e2770332d77a0b2 Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Tue, 21 May 2024 20:44:29 +0200 Subject: [PATCH 05/11] Remove GCS ClickHouse buckets in CI until named destinations are supported Signed-off-by: Marcel Coetzee --- tests/load/utils.py | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/tests/load/utils.py b/tests/load/utils.py index 81107e83d9..a7b1fbae41 100644 --- a/tests/load/utils.py +++ b/tests/load/utils.py @@ -344,13 +344,6 @@ def destinations_configs( extra_info="az-authorization", disable_compression=True, ), - DestinationTestConfiguration( - destination="clickhouse", - staging="filesystem", - file_format="parquet", - bucket_url=GCS_BUCKET, - extra_info="gcs-authorization", - ), DestinationTestConfiguration( destination="clickhouse", staging="filesystem", @@ -372,13 +365,6 @@ def destinations_configs( bucket_url=AZ_BUCKET, extra_info="az-authorization", ), - DestinationTestConfiguration( - destination="clickhouse", - staging="filesystem", - file_format="jsonl", - bucket_url=GCS_BUCKET, - extra_info="gcs-authorization", - ), DestinationTestConfiguration( destination="clickhouse", staging="filesystem", From c9c13945d4ea73e7de49002c1b75ccc3686518b1 Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Tue, 28 May 2024 19:13:00 +0200 Subject: [PATCH 06/11] Add GCS S3 compatibility test, remove GCP credentials from Clickhouse Signed-off-by: Marcel Coetzee --- .../impl/clickhouse/clickhouse.py | 9 --------- .../test_clickhouse_gcs_s3_compatibility.py | 19 +++++++++++++++++++ 2 files changed, 19 insertions(+), 9 deletions(-) create mode 100644 tests/load/clickhouse/test_clickhouse_gcs_s3_compatibility.py diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index e2c1f827bc..193888bdb4 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -208,15 +208,6 @@ def __init__( if isinstance(staging_credentials, AwsCredentialsWithoutDefaults): access_key_id = staging_credentials.aws_access_key_id secret_access_key = staging_credentials.aws_secret_access_key - elif isinstance(staging_credentials, GcpCredentials): - access_key_id = client.credentials.gcp_access_key_id - secret_access_key = client.credentials.gcp_secret_access_key - if not access_key_id or not secret_access_key: - raise DestinationTransientException( - "You have tried loading from gcs with clickhouse. Please provide valid" - " 'gcp_access_key_id' and 'gcp_secret_access_key' to connect to gcs as" - " outlined in the dlthub docs." - ) auth = "NOSIGN" if access_key_id and secret_access_key: diff --git a/tests/load/clickhouse/test_clickhouse_gcs_s3_compatibility.py b/tests/load/clickhouse/test_clickhouse_gcs_s3_compatibility.py new file mode 100644 index 0000000000..2cf250f66b --- /dev/null +++ b/tests/load/clickhouse/test_clickhouse_gcs_s3_compatibility.py @@ -0,0 +1,19 @@ +from typing import Generator, Dict + +import dlt +from dlt.destinations import filesystem +from tests.load.utils import GCS_BUCKET +from tests.pipeline.utils import assert_load_info + + +def test_clickhouse_gcs_s3_compatibility() -> None: + @dlt.resource + def dummy_data() -> Generator[Dict[str, int], None, None]: + yield {"field1": 1, "field2": 2} + + + gcp_bucket = filesystem(GCS_BUCKET.replace("gs://", "s3://"), destination_name="filesystem_s3_gcs_comp") + + pipe = dlt.pipeline(pipeline_name="gcs_s3_compatibility", destination="clickhouse", staging=gcp_bucket, full_refresh=True, ) + pack = pipe.run([dummy_data]) + assert_load_info(pack) From 7a4361892fe5545ec6bf40248c6ea15ed63b8d78 Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Tue, 28 May 2024 19:14:11 +0200 Subject: [PATCH 07/11] Refactor ClickHouse test code for better readability Signed-off-by: Marcel Coetzee --- .../test_clickhouse_gcs_s3_compatibility.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/tests/load/clickhouse/test_clickhouse_gcs_s3_compatibility.py b/tests/load/clickhouse/test_clickhouse_gcs_s3_compatibility.py index 2cf250f66b..a7b6b2086f 100644 --- a/tests/load/clickhouse/test_clickhouse_gcs_s3_compatibility.py +++ b/tests/load/clickhouse/test_clickhouse_gcs_s3_compatibility.py @@ -11,9 +11,15 @@ def test_clickhouse_gcs_s3_compatibility() -> None: def dummy_data() -> Generator[Dict[str, int], None, None]: yield {"field1": 1, "field2": 2} + gcp_bucket = filesystem( + GCS_BUCKET.replace("gs://", "s3://"), destination_name="filesystem_s3_gcs_comp" + ) - gcp_bucket = filesystem(GCS_BUCKET.replace("gs://", "s3://"), destination_name="filesystem_s3_gcs_comp") - - pipe = dlt.pipeline(pipeline_name="gcs_s3_compatibility", destination="clickhouse", staging=gcp_bucket, full_refresh=True, ) + pipe = dlt.pipeline( + pipeline_name="gcs_s3_compatibility", + destination="clickhouse", + staging=gcp_bucket, + full_refresh=True, + ) pack = pipe.run([dummy_data]) assert_load_info(pack) From abd87f82d3df31b2346f0e4ca2f780ea69a97596 Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Tue, 28 May 2024 20:05:06 +0200 Subject: [PATCH 08/11] Refactor endpoint handling and update GCS bucket configuration Signed-off-by: Marcel Coetzee --- .../impl/clickhouse/clickhouse.py | 19 +++++++++++++++---- dlt/destinations/impl/clickhouse/utils.py | 3 +-- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index 193888bdb4..cf1f1bc857 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -1,6 +1,7 @@ import os import re from copy import deepcopy +from textwrap import dedent from typing import ClassVar, Optional, Dict, List, Sequence, cast, Tuple from urllib.parse import urlparse @@ -201,13 +202,23 @@ def __init__( compression = "none" if config.get("data_writer.disable_compression") else "gz" if bucket_scheme in ("s3", "gs", "gcs"): - # get auth and bucket url - bucket_http_url = convert_storage_to_http_scheme(bucket_url) - access_key_id: str = None - secret_access_key: str = None if isinstance(staging_credentials, AwsCredentialsWithoutDefaults): + bucket_http_url = convert_storage_to_http_scheme( + bucket_url, endpoint=staging_credentials.endpoint_url + ) access_key_id = staging_credentials.aws_access_key_id secret_access_key = staging_credentials.aws_secret_access_key + else: + raise LoadJobTerminalException( + file_path, + dedent( + """ + Google Cloud Storage buckets must be configured using the S3 compatible access pattern. + Please provide the necessary S3 credentials (access key ID and secret access key), to access the GCS bucket through the S3 API. + Refer to https://dlthub.com/docs/dlt-ecosystem/destinations/filesystem#using-s3-compatible-storage. + """, + ).strip(), + ) auth = "NOSIGN" if access_key_id and secret_access_key: diff --git a/dlt/destinations/impl/clickhouse/utils.py b/dlt/destinations/impl/clickhouse/utils.py index b0b06909f9..0e2fa3db00 100644 --- a/dlt/destinations/impl/clickhouse/utils.py +++ b/dlt/destinations/impl/clickhouse/utils.py @@ -25,11 +25,10 @@ def convert_storage_to_http_scheme( protocol = "https" if use_https else "http" if endpoint: - domain = endpoint + domain = endpoint.replace("https://", "").replace("http://", "") elif region and parsed_url.scheme == "s3": domain = f"s3-{region}.amazonaws.com" else: - # TODO: Incorporate dlt.config endpoint. storage_domains = { "s3": "s3.amazonaws.com", "gs": "storage.googleapis.com", From 4c3186b0f0cbf4aa4b8cd54167bb56f93a190cae Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Tue, 28 May 2024 23:19:26 +0200 Subject: [PATCH 09/11] Refactor test for clickhouse gcs_s3 compatibility Signed-off-by: Marcel Coetzee --- .../test_clickhouse_gcs_s3_compatibility.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/tests/load/clickhouse/test_clickhouse_gcs_s3_compatibility.py b/tests/load/clickhouse/test_clickhouse_gcs_s3_compatibility.py index a7b6b2086f..8bc5c59c14 100644 --- a/tests/load/clickhouse/test_clickhouse_gcs_s3_compatibility.py +++ b/tests/load/clickhouse/test_clickhouse_gcs_s3_compatibility.py @@ -1,25 +1,23 @@ from typing import Generator, Dict +import pytest + import dlt from dlt.destinations import filesystem from tests.load.utils import GCS_BUCKET from tests.pipeline.utils import assert_load_info +@pytest.mark.essential def test_clickhouse_gcs_s3_compatibility() -> None: @dlt.resource def dummy_data() -> Generator[Dict[str, int], None, None]: yield {"field1": 1, "field2": 2} - gcp_bucket = filesystem( - GCS_BUCKET.replace("gs://", "s3://"), destination_name="filesystem_s3_gcs_comp" - ) - pipe = dlt.pipeline( - pipeline_name="gcs_s3_compatibility", - destination="clickhouse", - staging=gcp_bucket, - full_refresh=True, - ) + gcp_bucket = filesystem(GCS_BUCKET.replace("gs://", "s3://"), destination_name="filesystem_s3_gcs_comp") + + pipe = dlt.pipeline(pipeline_name="gcs_s3_compatibility", destination="clickhouse", staging=gcp_bucket, + full_refresh=True, ) pack = pipe.run([dummy_data]) assert_load_info(pack) From 743c1a209c69c0077e67900e924b8b3a3854c824 Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Wed, 29 May 2024 00:01:04 +0200 Subject: [PATCH 10/11] Update ClickHouse docs and tests for S3-compatible staging Signed-off-by: Marcel Coetzee --- .../dlt-ecosystem/destinations/clickhouse.md | 59 ++++++++----------- .../test_clickhouse_gcs_s3_compatibility.py | 13 ++-- 2 files changed, 35 insertions(+), 37 deletions(-) diff --git a/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md b/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md index 58551751c5..bc9cee899b 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md +++ b/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md @@ -115,12 +115,14 @@ destination. The `clickhouse` destination has a few specific deviations from the default sql destinations: -1. `Clickhouse` has an experimental `object` datatype, but we have found it to be a bit unpredictable, so the dlt clickhouse destination will load the complex datatype to a `text` column. If you need this feature, get in touch with our Slack community, and we will consider adding it. +1. `Clickhouse` has an experimental `object` datatype, but we have found it to be a bit unpredictable, so the dlt clickhouse destination will load the complex datatype to a `text` column. If you need + this feature, get in touch with our Slack community, and we will consider adding it. 2. `Clickhouse` does not support the `time` datatype. Time will be loaded to a `text` column. 3. `Clickhouse` does not support the `binary` datatype. Binary will be loaded to a `text` column. When loading from `jsonl`, this will be a base64 string, when loading from parquet this will be the `binary` object converted to `text`. 4. `Clickhouse` accepts adding columns to a populated table that are not null. -5. `Clickhouse` can produce rounding errors under certain conditions when using the float / double datatype. Make sure to use decimal if you cannot afford to have rounding errors. Loading the value 12.7001 to a double column with the loader file format jsonl set will predictbly produce a rounding error for example. +5. `Clickhouse` can produce rounding errors under certain conditions when using the float / double datatype. Make sure to use decimal if you cannot afford to have rounding errors. Loading the value + 12.7001 to a double column with the loader file format jsonl set will predictbly produce a rounding error for example. ## Supported column hints @@ -173,51 +175,42 @@ pipeline = dlt.pipeline( ) ``` -### Using Google Cloud Storage as a Staging Area +### Using S3-Compatible Storage as a Staging Area -dlt supports using Google Cloud Storage (GCS) as a staging area when loading data into ClickHouse. This is handled automatically by -ClickHouse's [GCS table function](https://clickhouse.com/docs/en/sql-reference/table-functions/gcs) which dlt uses under the hood. +dlt supports using S3-compatible storage services, including Google Cloud Storage (GCS), as a staging area when loading data into ClickHouse. +This is handled automatically by +ClickHouse's [GCS table function](https://clickhouse.com/docs/en/sql-reference/table-functions/gcs), which dlt uses under the hood. -The clickhouse GCS table function only supports authentication using Hash-based Message Authentication Code (HMAC) keys. To enable this, GCS provides an S3 compatibility mode that emulates -the Amazon S3 -API. ClickHouse takes advantage of this to allow accessing GCS buckets via its S3 integration. +The ClickHouse GCS table function only supports authentication using Hash-based Message Authentication Code (HMAC) keys, which is compatible with the Amazon S3 API. +To enable this, GCS provides an S3 +compatibility mode that emulates the S3 API, allowing ClickHouse to access GCS buckets via its S3 integration. + +For detailed instructions on setting up S3-compatible storage with dlt, including AWS S3, MinIO, and Cloudflare R2, refer to +the [dlt documentation on filesystem destinations](https://dlthub.com/docs/dlt-ecosystem/destinations/filesystem#using-s3-compatible-storage). To set up GCS staging with HMAC authentication in dlt: 1. Create HMAC keys for your GCS service account by following the [Google Cloud guide](https://cloud.google.com/storage/docs/authentication/managing-hmackeys#create). -2. Configure the HMAC keys as well as the `client_email`, `project_id` and `private_key` for your service account in your dlt project's ClickHouse destination settings in `config.toml`: +2. Configure the HMAC keys (`aws_access_key_id` and `aws_secret_access_key`) in your dlt project's ClickHouse destination settings in `config.toml`, similar to how you would configure AWS S3 + credentials: ```toml [destination.filesystem] -bucket_url = "gs://dlt-ci" +bucket_url = "s3://my_awesome_bucket" [destination.filesystem.credentials] -project_id = "a-cool-project" -client_email = "my-service-account@a-cool-project.iam.gserviceaccount.com" -private_key = "-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkaslkdjflasjnkdcopauihj...wEiEx7y+mx\nNffxQBqVVej2n/D93xY99pM=\n-----END PRIVATE KEY-----\n" - -[destination.clickhouse.credentials] -database = "dlt" -username = "dlt" -password = "Dlt*12345789234567" -host = "localhost" -port = 9440 -secure = 1 -gcp_access_key_id = "JFJ$$*f2058024835jFffsadf" -gcp_secret_access_key = "DFJdwslf2hf57)%$02jaflsedjfasoi" +aws_access_key_id = "JFJ$$*f2058024835jFffsadf" +aws_secret_access_key = "DFJdwslf2hf57)%$02jaflsedjfasoi" +project_id = "my-awesome-project" +endpoint_url = "https://storage.googleapis.com" ``` -Note: In addition to the HMAC keys (`gcp_access_key_id` and `gcp_secret_access_key`), you now need to provide the `client_email`, `project_id` and `private_key` for your service account -under `[destination.filesystem.credentials]`. -This is because the GCS staging support is now implemented as a temporary workaround and is still unoptimized. - -dlt will pass these credentials to ClickHouse which will handle the authentication and GCS access. - -There is active work in progress to simplify and improve the GCS staging setup for the ClickHouse dlt destination in the future. Proper GCS staging support is being tracked in these GitHub issues: - -- [Make filesystem destination work with gcs in s3 compatibility mode](https://github.com/dlt-hub/dlt/issues/1272) -- [GCS staging area support](https://github.com/dlt-hub/dlt/issues/1181) +:::caution +When configuring the `bucket_url` for S3-compatible storage services like Google Cloud Storage (GCS) with ClickHouse in dlt, ensure that the URL is prepended with `s3://` instead of `gs://`. This is +because the ClickHouse GCS table function requires the use of HMAC credentials, which are compatible with the S3 API. Prepending with `s3://` allows the HMAC credentials to integrate properly with +dlt's staging mechanisms for ClickHouse. +::: ### dbt support diff --git a/tests/load/clickhouse/test_clickhouse_gcs_s3_compatibility.py b/tests/load/clickhouse/test_clickhouse_gcs_s3_compatibility.py index 8bc5c59c14..481cd420c6 100644 --- a/tests/load/clickhouse/test_clickhouse_gcs_s3_compatibility.py +++ b/tests/load/clickhouse/test_clickhouse_gcs_s3_compatibility.py @@ -14,10 +14,15 @@ def test_clickhouse_gcs_s3_compatibility() -> None: def dummy_data() -> Generator[Dict[str, int], None, None]: yield {"field1": 1, "field2": 2} + gcp_bucket = filesystem( + GCS_BUCKET.replace("gs://", "s3://"), destination_name="filesystem_s3_gcs_comp" + ) - gcp_bucket = filesystem(GCS_BUCKET.replace("gs://", "s3://"), destination_name="filesystem_s3_gcs_comp") - - pipe = dlt.pipeline(pipeline_name="gcs_s3_compatibility", destination="clickhouse", staging=gcp_bucket, - full_refresh=True, ) + pipe = dlt.pipeline( + pipeline_name="gcs_s3_compatibility", + destination="clickhouse", + staging=gcp_bucket, + full_refresh=True, + ) pack = pipe.run([dummy_data]) assert_load_info(pack) From 9a58f8f35ab3c227066a5f5ee2f3a1fb070ea5d1 Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Thu, 30 May 2024 15:11:50 +0200 Subject: [PATCH 11/11] Update ClickHouse documentation on staging areas Signed-off-by: Marcel Coetzee --- docs/website/docs/dlt-ecosystem/destinations/clickhouse.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md b/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md index bc9cee899b..b1dde5a328 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md +++ b/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md @@ -175,7 +175,7 @@ pipeline = dlt.pipeline( ) ``` -### Using S3-Compatible Storage as a Staging Area +### Using Google Cloud or S3-Compatible Storage as a Staging Area dlt supports using S3-compatible storage services, including Google Cloud Storage (GCS), as a staging area when loading data into ClickHouse. This is handled automatically by