diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index 1007d8ea45..4d1734189b 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -166,6 +166,10 @@ def __init__( if file_extension not in ["parquet", "jsonl"]: raise ValueError("Clickhouse staging only supports 'parquet' and 'jsonl' file formats.") + print("File Path:", file_path) + print("Table Name:", table_name) + print("Bucket Path:", bucket_path) + if not bucket_path: # Local filesystem. raise NotImplementedError("Only object storage is supported.") @@ -197,26 +201,27 @@ def __init__( ) elif bucket_scheme in ("az", "abfs"): - if isinstance(staging_credentials, AzureCredentialsWithoutDefaults): - # Authenticated access. - account_name = staging_credentials.azure_storage_account_name - storage_account_url = f"https://{staging_credentials.azure_storage_account_name}.blob.core.windows.net" - account_key = staging_credentials.azure_storage_account_key - container_name = bucket_url.netloc - blobpath = bucket_url.path - - clickhouse_format = FILE_FORMAT_TO_TABLE_FUNCTION_MAPPING[file_extension] - - table_function = ( - f"azureBlobStorage('{storage_account_url}','{container_name}','{ blobpath }','{ account_name }','{ account_key }','{ clickhouse_format}')" - ) - - else: + if not isinstance( + staging_credentials, AzureCredentialsWithoutDefaults + ): # Unsigned access. raise NotImplementedError( "Unsigned Azure Blob Storage access from Clickhouse isn't supported as yet." ) + # Authenticated access. + account_name = staging_credentials.azure_storage_account_name + storage_account_url = f"https://{staging_credentials.azure_storage_account_name}.blob.core.windows.net" + account_key = staging_credentials.azure_storage_account_key + container_name = bucket_url.netloc + blobpath = bucket_url.path + + clickhouse_format = FILE_FORMAT_TO_TABLE_FUNCTION_MAPPING[file_extension] + + table_function = ( + f"azureBlobStorage('{storage_account_url}','{container_name}','{ blobpath }','{ account_name }','{ account_key }','{ clickhouse_format}')" + ) + with client.begin_transaction(): client.execute_sql( f"""INSERT INTO {qualified_table_name} SELECT * FROM {table_function}""" diff --git a/tests/load/pipeline/test_clickhouse.py b/tests/load/pipeline/test_clickhouse.py index a06277ef80..9d6c6ed8d7 100644 --- a/tests/load/pipeline/test_clickhouse.py +++ b/tests/load/pipeline/test_clickhouse.py @@ -81,74 +81,3 @@ def items2() -> Iterator[TDataItem]: finally: with pipeline.sql_client() as client: client.drop_dataset() - - -@pytest.mark.parametrize( - "destination_config", - destinations_configs(default_sql_configs=True, all_staging_configs=True, subset=["clickhouse"]), - ids=lambda x: x.name, -) -def test_clickhouse_destination_merge(destination_config: DestinationTestConfiguration) -> None: - pipeline = destination_config.setup_pipeline(f"clickhouse_{uniq_id()}", full_refresh=True) - - try: - - @dlt.resource(name="items") - def items() -> Iterator[TDataItem]: - yield { - "id": 1, - "name": "item", - "sub_items": [ - {"id": 101, "name": "sub item 101"}, - {"id": 101, "name": "sub item 102"}, - ], - } - - pipeline.run( - items, - loader_file_format=destination_config.file_format, - staging=destination_config.staging, - ) - - table_counts = load_table_counts( - pipeline, *[t["name"] for t in pipeline.default_schema._schema_tables.values()] - ) - assert table_counts["items"] == 1 - assert table_counts["items__sub_items"] == 2 - assert table_counts["_dlt_loads"] == 1 - - # Load again with schema evolution. - @dlt.resource(name="items", write_disposition="merge", primary_key="id") - def items2() -> Iterator[TDataItem]: - yield { - "id": 1, - "name": "item", - "new_field": "hello", - "sub_items": [ - { - "id": 101, - "name": "sub item 101", - "other_new_field": "hello 101", - }, - { - "id": 101, - "name": "sub item 102", - "other_new_field": "hello 102", - }, - ], - } - - pipeline.run(items2) - table_counts = load_table_counts( - pipeline, *[t["name"] for t in pipeline.default_schema._schema_tables.values()] - ) - assert table_counts["items"] == 1 - assert table_counts["items__sub_items"] == 2 - assert table_counts["_dlt_loads"] == 2 - - except Exception as e: - raise e - - finally: - with pipeline.sql_client() as client: - client.drop_dataset() diff --git a/tests/load/utils.py b/tests/load/utils.py index b3ae9cfe7c..078c26bf71 100644 --- a/tests/load/utils.py +++ b/tests/load/utils.py @@ -203,6 +203,7 @@ def destinations_configs( file_format="jsonl", bucket_url=AWS_BUCKET, extra_info="s3-authorization", + disable_compression=True, ) ] destination_configs += [ @@ -306,6 +307,7 @@ def destinations_configs( file_format="parquet", bucket_url=AZ_BUCKET, extra_info="az-authorization", + disable_compression=True, ), DestinationTestConfiguration( destination="clickhouse", @@ -313,6 +315,7 @@ def destinations_configs( file_format="parquet", bucket_url=GCS_BUCKET, extra_info="gcs-authorization", + disable_compression=True, ), DestinationTestConfiguration( destination="clickhouse", @@ -320,6 +323,7 @@ def destinations_configs( file_format="parquet", bucket_url=AWS_BUCKET, extra_info="s3-authorization", + disable_compression=True, ), DestinationTestConfiguration( destination="clickhouse", @@ -327,6 +331,7 @@ def destinations_configs( file_format="parquet", bucket_url=AZ_BUCKET, extra_info="az-authorization", + disable_compression=True, ), DestinationTestConfiguration( destination="clickhouse",