diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index e2c1f827bc..cf1f1bc857 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -1,6 +1,7 @@ import os import re from copy import deepcopy +from textwrap import dedent from typing import ClassVar, Optional, Dict, List, Sequence, cast, Tuple from urllib.parse import urlparse @@ -201,22 +202,23 @@ def __init__( compression = "none" if config.get("data_writer.disable_compression") else "gz" if bucket_scheme in ("s3", "gs", "gcs"): - # get auth and bucket url - bucket_http_url = convert_storage_to_http_scheme(bucket_url) - access_key_id: str = None - secret_access_key: str = None if isinstance(staging_credentials, AwsCredentialsWithoutDefaults): + bucket_http_url = convert_storage_to_http_scheme( + bucket_url, endpoint=staging_credentials.endpoint_url + ) access_key_id = staging_credentials.aws_access_key_id secret_access_key = staging_credentials.aws_secret_access_key - elif isinstance(staging_credentials, GcpCredentials): - access_key_id = client.credentials.gcp_access_key_id - secret_access_key = client.credentials.gcp_secret_access_key - if not access_key_id or not secret_access_key: - raise DestinationTransientException( - "You have tried loading from gcs with clickhouse. Please provide valid" - " 'gcp_access_key_id' and 'gcp_secret_access_key' to connect to gcs as" - " outlined in the dlthub docs." - ) + else: + raise LoadJobTerminalException( + file_path, + dedent( + """ + Google Cloud Storage buckets must be configured using the S3 compatible access pattern. + Please provide the necessary S3 credentials (access key ID and secret access key), to access the GCS bucket through the S3 API. + Refer to https://dlthub.com/docs/dlt-ecosystem/destinations/filesystem#using-s3-compatible-storage. + """, + ).strip(), + ) auth = "NOSIGN" if access_key_id and secret_access_key: diff --git a/dlt/destinations/impl/clickhouse/utils.py b/dlt/destinations/impl/clickhouse/utils.py index b0b06909f9..0e2fa3db00 100644 --- a/dlt/destinations/impl/clickhouse/utils.py +++ b/dlt/destinations/impl/clickhouse/utils.py @@ -25,11 +25,10 @@ def convert_storage_to_http_scheme( protocol = "https" if use_https else "http" if endpoint: - domain = endpoint + domain = endpoint.replace("https://", "").replace("http://", "") elif region and parsed_url.scheme == "s3": domain = f"s3-{region}.amazonaws.com" else: - # TODO: Incorporate dlt.config endpoint. storage_domains = { "s3": "s3.amazonaws.com", "gs": "storage.googleapis.com", diff --git a/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md b/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md index 58551751c5..b1dde5a328 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md +++ b/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md @@ -115,12 +115,14 @@ destination. The `clickhouse` destination has a few specific deviations from the default sql destinations: -1. `Clickhouse` has an experimental `object` datatype, but we have found it to be a bit unpredictable, so the dlt clickhouse destination will load the complex datatype to a `text` column. If you need this feature, get in touch with our Slack community, and we will consider adding it. +1. `Clickhouse` has an experimental `object` datatype, but we have found it to be a bit unpredictable, so the dlt clickhouse destination will load the complex datatype to a `text` column. If you need + this feature, get in touch with our Slack community, and we will consider adding it. 2. `Clickhouse` does not support the `time` datatype. Time will be loaded to a `text` column. 3. `Clickhouse` does not support the `binary` datatype. Binary will be loaded to a `text` column. When loading from `jsonl`, this will be a base64 string, when loading from parquet this will be the `binary` object converted to `text`. 4. `Clickhouse` accepts adding columns to a populated table that are not null. -5. `Clickhouse` can produce rounding errors under certain conditions when using the float / double datatype. Make sure to use decimal if you cannot afford to have rounding errors. Loading the value 12.7001 to a double column with the loader file format jsonl set will predictbly produce a rounding error for example. +5. `Clickhouse` can produce rounding errors under certain conditions when using the float / double datatype. Make sure to use decimal if you cannot afford to have rounding errors. Loading the value + 12.7001 to a double column with the loader file format jsonl set will predictbly produce a rounding error for example. ## Supported column hints @@ -173,51 +175,42 @@ pipeline = dlt.pipeline( ) ``` -### Using Google Cloud Storage as a Staging Area +### Using Google Cloud or S3-Compatible Storage as a Staging Area -dlt supports using Google Cloud Storage (GCS) as a staging area when loading data into ClickHouse. This is handled automatically by -ClickHouse's [GCS table function](https://clickhouse.com/docs/en/sql-reference/table-functions/gcs) which dlt uses under the hood. +dlt supports using S3-compatible storage services, including Google Cloud Storage (GCS), as a staging area when loading data into ClickHouse. +This is handled automatically by +ClickHouse's [GCS table function](https://clickhouse.com/docs/en/sql-reference/table-functions/gcs), which dlt uses under the hood. -The clickhouse GCS table function only supports authentication using Hash-based Message Authentication Code (HMAC) keys. To enable this, GCS provides an S3 compatibility mode that emulates -the Amazon S3 -API. ClickHouse takes advantage of this to allow accessing GCS buckets via its S3 integration. +The ClickHouse GCS table function only supports authentication using Hash-based Message Authentication Code (HMAC) keys, which is compatible with the Amazon S3 API. +To enable this, GCS provides an S3 +compatibility mode that emulates the S3 API, allowing ClickHouse to access GCS buckets via its S3 integration. + +For detailed instructions on setting up S3-compatible storage with dlt, including AWS S3, MinIO, and Cloudflare R2, refer to +the [dlt documentation on filesystem destinations](https://dlthub.com/docs/dlt-ecosystem/destinations/filesystem#using-s3-compatible-storage). To set up GCS staging with HMAC authentication in dlt: 1. Create HMAC keys for your GCS service account by following the [Google Cloud guide](https://cloud.google.com/storage/docs/authentication/managing-hmackeys#create). -2. Configure the HMAC keys as well as the `client_email`, `project_id` and `private_key` for your service account in your dlt project's ClickHouse destination settings in `config.toml`: +2. Configure the HMAC keys (`aws_access_key_id` and `aws_secret_access_key`) in your dlt project's ClickHouse destination settings in `config.toml`, similar to how you would configure AWS S3 + credentials: ```toml [destination.filesystem] -bucket_url = "gs://dlt-ci" +bucket_url = "s3://my_awesome_bucket" [destination.filesystem.credentials] -project_id = "a-cool-project" -client_email = "my-service-account@a-cool-project.iam.gserviceaccount.com" -private_key = "-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkaslkdjflasjnkdcopauihj...wEiEx7y+mx\nNffxQBqVVej2n/D93xY99pM=\n-----END PRIVATE KEY-----\n" - -[destination.clickhouse.credentials] -database = "dlt" -username = "dlt" -password = "Dlt*12345789234567" -host = "localhost" -port = 9440 -secure = 1 -gcp_access_key_id = "JFJ$$*f2058024835jFffsadf" -gcp_secret_access_key = "DFJdwslf2hf57)%$02jaflsedjfasoi" +aws_access_key_id = "JFJ$$*f2058024835jFffsadf" +aws_secret_access_key = "DFJdwslf2hf57)%$02jaflsedjfasoi" +project_id = "my-awesome-project" +endpoint_url = "https://storage.googleapis.com" ``` -Note: In addition to the HMAC keys (`gcp_access_key_id` and `gcp_secret_access_key`), you now need to provide the `client_email`, `project_id` and `private_key` for your service account -under `[destination.filesystem.credentials]`. -This is because the GCS staging support is now implemented as a temporary workaround and is still unoptimized. - -dlt will pass these credentials to ClickHouse which will handle the authentication and GCS access. - -There is active work in progress to simplify and improve the GCS staging setup for the ClickHouse dlt destination in the future. Proper GCS staging support is being tracked in these GitHub issues: - -- [Make filesystem destination work with gcs in s3 compatibility mode](https://github.com/dlt-hub/dlt/issues/1272) -- [GCS staging area support](https://github.com/dlt-hub/dlt/issues/1181) +:::caution +When configuring the `bucket_url` for S3-compatible storage services like Google Cloud Storage (GCS) with ClickHouse in dlt, ensure that the URL is prepended with `s3://` instead of `gs://`. This is +because the ClickHouse GCS table function requires the use of HMAC credentials, which are compatible with the S3 API. Prepending with `s3://` allows the HMAC credentials to integrate properly with +dlt's staging mechanisms for ClickHouse. +::: ### dbt support diff --git a/tests/load/clickhouse/test_clickhouse_gcs_s3_compatibility.py b/tests/load/clickhouse/test_clickhouse_gcs_s3_compatibility.py new file mode 100644 index 0000000000..481cd420c6 --- /dev/null +++ b/tests/load/clickhouse/test_clickhouse_gcs_s3_compatibility.py @@ -0,0 +1,28 @@ +from typing import Generator, Dict + +import pytest + +import dlt +from dlt.destinations import filesystem +from tests.load.utils import GCS_BUCKET +from tests.pipeline.utils import assert_load_info + + +@pytest.mark.essential +def test_clickhouse_gcs_s3_compatibility() -> None: + @dlt.resource + def dummy_data() -> Generator[Dict[str, int], None, None]: + yield {"field1": 1, "field2": 2} + + gcp_bucket = filesystem( + GCS_BUCKET.replace("gs://", "s3://"), destination_name="filesystem_s3_gcs_comp" + ) + + pipe = dlt.pipeline( + pipeline_name="gcs_s3_compatibility", + destination="clickhouse", + staging=gcp_bucket, + full_refresh=True, + ) + pack = pipe.run([dummy_data]) + assert_load_info(pack) diff --git a/tests/load/utils.py b/tests/load/utils.py index e6b860c723..445f8d815b 100644 --- a/tests/load/utils.py +++ b/tests/load/utils.py @@ -345,13 +345,6 @@ def destinations_configs( extra_info="az-authorization", disable_compression=True, ), - DestinationTestConfiguration( - destination="clickhouse", - staging="filesystem", - file_format="parquet", - bucket_url=GCS_BUCKET, - extra_info="gcs-authorization", - ), DestinationTestConfiguration( destination="clickhouse", staging="filesystem", @@ -373,13 +366,6 @@ def destinations_configs( bucket_url=AZ_BUCKET, extra_info="az-authorization", ), - DestinationTestConfiguration( - destination="clickhouse", - staging="filesystem", - file_format="jsonl", - bucket_url=GCS_BUCKET, - extra_info="gcs-authorization", - ), DestinationTestConfiguration( destination="clickhouse", staging="filesystem",