Skip to content

Commit

Permalink
Remove unused tests #1055
Browse files Browse the repository at this point in the history
Signed-off-by: Marcel Coetzee <[email protected]>
  • Loading branch information
Pipboyguy committed Apr 4, 2024
1 parent 092a524 commit 79d9b80
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 86 deletions.
35 changes: 20 additions & 15 deletions dlt/destinations/impl/clickhouse/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ def __init__(
if file_extension not in ["parquet", "jsonl"]:
raise ValueError("Clickhouse staging only supports 'parquet' and 'jsonl' file formats.")

print("File Path:", file_path)
print("Table Name:", table_name)
print("Bucket Path:", bucket_path)

if not bucket_path:
# Local filesystem.
raise NotImplementedError("Only object storage is supported.")
Expand Down Expand Up @@ -197,26 +201,27 @@ def __init__(
)

elif bucket_scheme in ("az", "abfs"):
if isinstance(staging_credentials, AzureCredentialsWithoutDefaults):
# Authenticated access.
account_name = staging_credentials.azure_storage_account_name
storage_account_url = f"https://{staging_credentials.azure_storage_account_name}.blob.core.windows.net"
account_key = staging_credentials.azure_storage_account_key
container_name = bucket_url.netloc
blobpath = bucket_url.path

clickhouse_format = FILE_FORMAT_TO_TABLE_FUNCTION_MAPPING[file_extension]

table_function = (
f"azureBlobStorage('{storage_account_url}','{container_name}','{ blobpath }','{ account_name }','{ account_key }','{ clickhouse_format}')"
)

else:
if not isinstance(
staging_credentials, AzureCredentialsWithoutDefaults
):
# Unsigned access.
raise NotImplementedError(
"Unsigned Azure Blob Storage access from Clickhouse isn't supported as yet."
)

# Authenticated access.
account_name = staging_credentials.azure_storage_account_name
storage_account_url = f"https://{staging_credentials.azure_storage_account_name}.blob.core.windows.net"
account_key = staging_credentials.azure_storage_account_key
container_name = bucket_url.netloc
blobpath = bucket_url.path

clickhouse_format = FILE_FORMAT_TO_TABLE_FUNCTION_MAPPING[file_extension]

table_function = (
f"azureBlobStorage('{storage_account_url}','{container_name}','{ blobpath }','{ account_name }','{ account_key }','{ clickhouse_format}')"
)

with client.begin_transaction():
client.execute_sql(
f"""INSERT INTO {qualified_table_name} SELECT * FROM {table_function}"""
Expand Down
71 changes: 0 additions & 71 deletions tests/load/pipeline/test_clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,74 +81,3 @@ def items2() -> Iterator[TDataItem]:
finally:
with pipeline.sql_client() as client:
client.drop_dataset()


@pytest.mark.parametrize(
"destination_config",
destinations_configs(default_sql_configs=True, all_staging_configs=True, subset=["clickhouse"]),
ids=lambda x: x.name,
)
def test_clickhouse_destination_merge(destination_config: DestinationTestConfiguration) -> None:
pipeline = destination_config.setup_pipeline(f"clickhouse_{uniq_id()}", full_refresh=True)

try:

@dlt.resource(name="items")
def items() -> Iterator[TDataItem]:
yield {
"id": 1,
"name": "item",
"sub_items": [
{"id": 101, "name": "sub item 101"},
{"id": 101, "name": "sub item 102"},
],
}

pipeline.run(
items,
loader_file_format=destination_config.file_format,
staging=destination_config.staging,
)

table_counts = load_table_counts(
pipeline, *[t["name"] for t in pipeline.default_schema._schema_tables.values()]
)
assert table_counts["items"] == 1
assert table_counts["items__sub_items"] == 2
assert table_counts["_dlt_loads"] == 1

# Load again with schema evolution.
@dlt.resource(name="items", write_disposition="merge", primary_key="id")
def items2() -> Iterator[TDataItem]:
yield {
"id": 1,
"name": "item",
"new_field": "hello",
"sub_items": [
{
"id": 101,
"name": "sub item 101",
"other_new_field": "hello 101",
},
{
"id": 101,
"name": "sub item 102",
"other_new_field": "hello 102",
},
],
}

pipeline.run(items2)
table_counts = load_table_counts(
pipeline, *[t["name"] for t in pipeline.default_schema._schema_tables.values()]
)
assert table_counts["items"] == 1
assert table_counts["items__sub_items"] == 2
assert table_counts["_dlt_loads"] == 2

except Exception as e:
raise e

finally:
with pipeline.sql_client() as client:
client.drop_dataset()
5 changes: 5 additions & 0 deletions tests/load/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ def destinations_configs(
file_format="jsonl",
bucket_url=AWS_BUCKET,
extra_info="s3-authorization",
disable_compression=True,
)
]
destination_configs += [
Expand Down Expand Up @@ -306,27 +307,31 @@ def destinations_configs(
file_format="parquet",
bucket_url=AZ_BUCKET,
extra_info="az-authorization",
disable_compression=True,
),
DestinationTestConfiguration(
destination="clickhouse",
staging="filesystem",
file_format="parquet",
bucket_url=GCS_BUCKET,
extra_info="gcs-authorization",
disable_compression=True,
),
DestinationTestConfiguration(
destination="clickhouse",
staging="filesystem",
file_format="parquet",
bucket_url=AWS_BUCKET,
extra_info="s3-authorization",
disable_compression=True,
),
DestinationTestConfiguration(
destination="clickhouse",
staging="filesystem",
file_format="parquet",
bucket_url=AZ_BUCKET,
extra_info="az-authorization",
disable_compression=True,
),
DestinationTestConfiguration(
destination="clickhouse",
Expand Down

0 comments on commit 79d9b80

Please sign in to comment.