Skip to content

Commit

Permalink
Fix 'from_db_type' #1055
Browse files Browse the repository at this point in the history
Signed-off-by: Marcel Coetzee <[email protected]>
  • Loading branch information
Pipboyguy committed Apr 4, 2024
1 parent eca4d2d commit 092a524
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 44 deletions.
2 changes: 1 addition & 1 deletion dlt/destinations/impl/clickhouse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def capabilities() -> DestinationCapabilitiesContext:
caps = DestinationCapabilitiesContext()
# Clickhouse only supports loading from staged files on s3 for now.
caps.preferred_loader_file_format = "jsonl"
caps.supported_loader_file_formats = ["parquet", "jsonl", "insert_values"]
caps.supported_loader_file_formats = ["parquet", "jsonl"]
caps.preferred_staging_file_format = "jsonl"
caps.supported_staging_file_formats = ["parquet", "jsonl"]

Expand Down
42 changes: 33 additions & 9 deletions dlt/destinations/impl/clickhouse/clickhouse.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import re
from copy import deepcopy
from typing import ClassVar, Optional, Dict, List, Sequence, cast, Tuple
from urllib.parse import urlparse
Expand Down Expand Up @@ -72,13 +73,12 @@

class ClickhouseTypeMapper(TypeMapper):
sct_to_unbound_dbt = {
"complex": "String",
"complex": "JSON",
"text": "String",
"double": "Float64",
"bool": "Boolean",
"date": "Date",
"timestamp": "DateTime('UTC')",
"time": "Time('UTC')",
"bigint": "Int64",
"binary": "String",
"wei": "Decimal",
Expand All @@ -87,21 +87,19 @@ class ClickhouseTypeMapper(TypeMapper):
sct_to_dbt = {
"decimal": "Decimal(%i,%i)",
"wei": "Decimal(%i,%i)",
"timestamp": "DateTime(%i, 'UTC')",
"time": "Time(%i ,'UTC')",
"timestamp": "DateTime(%i,'UTC')",
}

dbt_to_sct = {
"String": "text",
"Float64": "double",
"Boolean": "bool",
"Bool": "bool",
"Date": "date",
"DateTime": "timestamp",
"DateTime('UTC')": "timestamp",
"DateTime64": "timestamp",
"Time": "timestamp",
"Time('UTC')": "timestamp",
"Int64": "bigint",
"JSON": "complex",
"Object('json')": "complex",
"Decimal": "decimal",
}

Expand All @@ -111,8 +109,33 @@ def to_db_time_type(self, precision: Optional[int], table_format: TTableFormat =
def from_db_type(
self, db_type: str, precision: Optional[int] = None, scale: Optional[int] = None
) -> TColumnType:
# Remove "Nullable" wrapper.
db_type = re.sub(r"^Nullable\((?P<type>.+)\)$", r"\g<type>", db_type)

# Remove timezone details.
if db_type == "DateTime('UTC')":
db_type = "DateTime"
if datetime_match := re.match(
r"DateTime64(?:\((?P<precision>\d+)(?:,?\s*'(?P<timezone>UTC)')?\))?", db_type
):
if datetime_match["precision"]:
precision = int(datetime_match["precision"])
else:
precision = None
db_type = "DateTime64"

# Extract precision and scale, parameters and remove from string.
if decimal_match := re.match(
r"Decimal\((?P<precision>\d+)\s*(?:,\s*(?P<scale>\d+))?\)", db_type
):
precision, scale = decimal_match.groups() # type: ignore[assignment]
precision = int(precision)
scale = int(scale) if scale else 0
db_type = "Decimal"

if db_type == "Decimal" and (precision, scale) == self.capabilities.wei_precision:
return dict(data_type="wei")

return super().from_db_type(db_type, precision, scale)


Expand Down Expand Up @@ -396,9 +419,10 @@ def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = Non
)

# Alter table statements only accept `Nullable` modifiers.
# JSON type isn't nullable in Clickhouse.
type_with_nullability_modifier = (
f"Nullable({self.type_mapper.to_db_type(c)})"
if c.get("nullable", True)
if c.get("nullable", True) and c.get("data_type") != "complex"
else self.type_mapper.to_db_type(c)
)

Expand Down
12 changes: 7 additions & 5 deletions dlt/destinations/impl/clickhouse/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,6 @@ def open_connection(self) -> clickhouse_driver.dbapi.connection.Connection:
self._conn = clickhouse_driver.dbapi.connect(
dsn=self.credentials.to_native_representation()
)
with self._conn.cursor() as cur:
# Toggle experimental settings.
# These are necessary for nested datatypes and other operations to work.
cur.execute("set allow_experimental_object_type = 1")
cur.execute("set allow_experimental_lightweight_delete = 1")
return self._conn

@raise_open_connection_error
Expand Down Expand Up @@ -133,6 +128,13 @@ def execute_query(
query, db_args = _convert_to_old_pyformat(query, args, OperationalError)
db_args.update(kwargs)

# Prefix each query transaction with experimental settings.
# These are necessary for nested datatypes to be available and other operations to work.
query = (
"set allow_experimental_lightweight_delete = 1;"
"set allow_experimental_object_type = 1;"
f"{query}"
)
with self._conn.cursor() as cursor:
for query_line in query.split(";"):
if query_line := query_line.strip():
Expand Down
4 changes: 2 additions & 2 deletions tests/load/clickhouse/test_clickhouse_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from tests.common.configuration.utils import environment


def test_connection_string_with_all_params() -> None:
def test_clickhouse_connection_string_with_all_params() -> None:
url = "clickhouse://user1:pass1@host1:9000/testdb?secure=0&connect_timeout=230&send_receive_timeout=1000"

creds = ClickhouseCredentials() # type: ignore
Expand Down Expand Up @@ -50,7 +50,7 @@ def test_clickhouse_configuration() -> None:


@pytest.mark.usefixtures("environment")
def test_gcp_hmac_getter_accessor(environment: Any) -> None:
def test_clickhouse_gcp_hmac_getter_accessor(environment: Any) -> None:
environment["DESTINATION__FILESYSTEM__CREDENTIALS__GCP_ACCESS_KEY_ID"] = "25g08jaDJacj42"
environment["DESTINATION__FILESYSTEM__CREDENTIALS__GCP_SECRET_ACCESS_KEY"] = "ascvntp45uasdf"

Expand Down
20 changes: 10 additions & 10 deletions tests/load/clickhouse/test_clickhouse_table_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def test_clickhouse_configuration() -> None:
assert ClickhouseClientConfiguration(credentials=c).fingerprint() == digest128("host1")


def test_create_table(clickhouse_client: ClickhouseClient) -> None:
def test_clickhouse_create_table(clickhouse_client: ClickhouseClient) -> None:
statements = clickhouse_client._get_table_update_sql("event_test_table", TABLE_UPDATE, False)
assert len(statements) == 1
sql = statements[0]
Expand All @@ -66,7 +66,7 @@ def test_create_table(clickhouse_client: ClickhouseClient) -> None:
assert "`col6` Decimal(38,9)" in sql
assert "`col7` String" in sql
assert "`col8` Decimal(76,0)" in sql
assert "`col9` String" in sql
assert "`col9` JSON" in sql
assert "`col10` Date" in sql
assert "`col11` DateTime" in sql
assert "`col1_null` Nullable(Int64)" in sql
Expand All @@ -77,18 +77,18 @@ def test_create_table(clickhouse_client: ClickhouseClient) -> None:
assert "`col6_null` Nullable(Decimal(38,9))" in sql
assert "`col7_null` Nullable(String)" in sql
assert "`col8_null` Nullable(Decimal(76,0))" in sql
assert "`col9_null` Nullable(String)" in sql
assert "`col9_null` JSON" in sql # JSON isn't nullable in clickhouse
assert "`col10_null` Nullable(Date)" in sql
assert "`col11_null` Nullable(DateTime)" in sql
assert "`col1_precision` Int64" in sql
assert "`col4_precision` DateTime(3, 'UTC')" in sql
assert "`col4_precision` DateTime(3,'UTC')" in sql
assert "`col5_precision` String" in sql
assert "`col6_precision` Decimal(6,2)" in sql
assert "`col7_precision` String" in sql
assert "`col11_precision` DateTime" in sql


def test_alter_table(clickhouse_client: ClickhouseClient) -> None:
def test_clickhouse_alter_table(clickhouse_client: ClickhouseClient) -> None:
statements = clickhouse_client._get_table_update_sql("event_test_table", TABLE_UPDATE, True)
assert len(statements) == 1
sql = statements[0]
Expand All @@ -108,7 +108,7 @@ def test_alter_table(clickhouse_client: ClickhouseClient) -> None:
assert "`col6` Decimal(38,9)" in sql
assert "`col7` String" in sql
assert "`col8` Decimal(76,0)" in sql
assert "`col9` String" in sql
assert "`col9` JSON" in sql
assert "`col10` Date" in sql
assert "`col11` DateTime" in sql
assert "`col1_null` Nullable(Int64)" in sql
Expand All @@ -119,11 +119,11 @@ def test_alter_table(clickhouse_client: ClickhouseClient) -> None:
assert "`col6_null` Nullable(Decimal(38,9))" in sql
assert "`col7_null` Nullable(String)" in sql
assert "`col8_null` Nullable(Decimal(76,0))" in sql
assert "`col9_null` Nullable(String)" in sql
assert "`col9_null` JSON" in sql
assert "`col10_null` Nullable(Date)" in sql
assert "`col11_null` Nullable(DateTime)" in sql
assert "`col1_precision` Int64" in sql
assert "`col4_precision` DateTime(3, 'UTC')" in sql
assert "`col4_precision` DateTime(3,'UTC')" in sql
assert "`col5_precision` String" in sql
assert "`col6_precision` Decimal(6,2)" in sql
assert "`col7_precision` String" in sql
Expand All @@ -138,7 +138,7 @@ def test_alter_table(clickhouse_client: ClickhouseClient) -> None:


@pytest.mark.usefixtures("empty_schema")
def test_create_table_with_primary_keys(clickhouse_client: ClickhouseClient) -> None:
def test_clickhouse_create_table_with_primary_keys(clickhouse_client: ClickhouseClient) -> None:
mod_update = deepcopy(TABLE_UPDATE)

mod_update[1]["primary_key"] = True
Expand All @@ -154,7 +154,7 @@ def test_create_table_with_primary_keys(clickhouse_client: ClickhouseClient) ->
"Only `primary_key` hint has been implemented so far, which isn't specified inline with the"
" column definition."
)
def test_create_table_with_hints(client: ClickhouseClient) -> None:
def test_clickhouse_create_table_with_hints(client: ClickhouseClient) -> None:
mod_update = deepcopy(TABLE_UPDATE)

mod_update[0]["primary_key"] = True
Expand Down
30 changes: 15 additions & 15 deletions tests/load/clickhouse/test_utls.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@
)


def test_convert_s3_url_to_http() -> None:
def test_clickhouse_convert_s3_url_to_http() -> None:
s3_url: str = "s3://my-bucket/path/to/file.txt"
expected_http_url: str = "http://my-bucket.s3.amazonaws.com/path/to/file.txt"
assert convert_storage_to_http_scheme(s3_url) == expected_http_url


def test_convert_s3_url_to_https() -> None:
def test_clickhouse_convert_s3_url_to_https() -> None:
s3_url: str = "s3://my-bucket/path/to/file.txt"
expected_https_url: str = "https://my-bucket.s3.amazonaws.com/path/to/file.txt"
assert convert_storage_to_http_scheme(s3_url, use_https=True) == expected_https_url


def test_convert_gs_url_to_http() -> None:
def test_clickhouse_convert_gs_url_to_http() -> None:
gs_url: str = "gs://my-bucket/path/to/file.txt"
expected_http_url: str = "http://my-bucket.storage.googleapis.com/path/to/file.txt"
assert convert_storage_to_http_scheme(gs_url) == expected_http_url
Expand All @@ -27,7 +27,7 @@ def test_convert_gs_url_to_http() -> None:
assert convert_storage_to_http_scheme(gcs_url) == expected_http_url


def test_convert_gs_url_to_https() -> None:
def test_clickhouse_convert_gs_url_to_https() -> None:
gs_url: str = "gs://my-bucket/path/to/file.txt"
expected_https_url: str = "https://my-bucket.storage.googleapis.com/path/to/file.txt"
assert convert_storage_to_http_scheme(gs_url, use_https=True) == expected_https_url
Expand All @@ -36,13 +36,13 @@ def test_convert_gs_url_to_https() -> None:
assert convert_storage_to_http_scheme(gcs_url, use_https=True) == expected_https_url


def test_convert_s3_url_to_http_with_region() -> None:
def test_clickhouse_convert_s3_url_to_http_with_region() -> None:
s3_url: str = "s3://my-bucket/path/to/file.txt"
expected_http_url: str = "http://my-bucket.s3-us-west-2.amazonaws.com/path/to/file.txt"
assert convert_storage_to_http_scheme(s3_url, region="us-west-2") == expected_http_url


def test_convert_s3_url_to_https_with_region() -> None:
def test_clickhouse_convert_s3_url_to_https_with_region() -> None:
s3_url: str = "s3://my-bucket/path/to/file.txt"
expected_https_url: str = "https://my-bucket.s3-us-east-1.amazonaws.com/path/to/file.txt"
assert (
Expand All @@ -51,7 +51,7 @@ def test_convert_s3_url_to_https_with_region() -> None:
)


def test_convert_s3_url_to_http_with_endpoint() -> None:
def test_clickhouse_convert_s3_url_to_http_with_endpoint() -> None:
s3_url: str = "s3://my-bucket/path/to/file.txt"
expected_http_url: str = "http://my-bucket.s3.custom-endpoint.com/path/to/file.txt"
assert (
Expand All @@ -69,7 +69,7 @@ def test_convert_s3_url_to_https_with_endpoint() -> None:
)


def test_convert_gs_url_to_http_with_endpoint() -> None:
def test_clickhouse_convert_gs_url_to_http_with_endpoint() -> None:
gs_url: str = "gs://my-bucket/path/to/file.txt"
expected_http_url: str = "http://my-bucket.custom-endpoint.com/path/to/file.txt"
assert (
Expand All @@ -82,7 +82,7 @@ def test_convert_gs_url_to_http_with_endpoint() -> None:
)


def test_convert_gs_url_to_https_with_endpoint() -> None:
def test_clickhouse_convert_gs_url_to_https_with_endpoint() -> None:
gs_url: str = "gs://my-bucket/path/to/file.txt"
expected_https_url: str = "https://my-bucket.custom-endpoint.com/path/to/file.txt"
assert (
Expand All @@ -97,7 +97,7 @@ def test_convert_gs_url_to_https_with_endpoint() -> None:
)


def test_render_with_credentials_jsonl() -> None:
def test_clickhouse_render_with_credentials_jsonl() -> None:
url = "https://example.com/data.jsonl"
access_key_id = "test_access_key"
secret_access_key = "test_secret_key"
Expand All @@ -111,7 +111,7 @@ def test_render_with_credentials_jsonl() -> None:
)


def test_render_with_credentials_parquet() -> None:
def test_clickhouse_render_with_credentials_parquet() -> None:
url = "https://example.com/data.parquet"
access_key_id = "test_access_key"
secret_access_key = "test_secret_key"
Expand All @@ -125,14 +125,14 @@ def test_render_with_credentials_parquet() -> None:
)


def test_render_without_credentials() -> None:
def test_clickhouse_render_without_credentials() -> None:
url = "https://example.com/data.jsonl"
file_format = "jsonl"
expected_output = """s3('https://example.com/data.jsonl',NOSIGN,'JSONEachRow')"""
assert render_object_storage_table_function(url, file_format=file_format) == expected_output # type: ignore[arg-type]


def test_render_invalid_file_format() -> None:
def test_clickhouse_render_invalid_file_format() -> None:
url = "https://example.com/data.unknown"
access_key_id = "test_access_key"
secret_access_key = "test_secret_key"
Expand All @@ -142,13 +142,13 @@ def test_render_invalid_file_format() -> None:
assert "Clickhouse s3/gcs staging only supports 'parquet' and 'jsonl'." == str(excinfo.value)


def test_invalid_url_format() -> None:
def test_clickhouse_invalid_url_format() -> None:
with pytest.raises(Exception) as exc_info:
convert_storage_to_http_scheme("invalid-url")
assert str(exc_info.value) == "Error converting storage URL to HTTP protocol: 'invalid-url'"


def test_render_missing_url() -> None:
def test_clickhouse_render_missing_url() -> None:
with pytest.raises(TypeError) as excinfo:
render_object_storage_table_function() # type: ignore
assert "missing 1 required positional argument: 'url'" in str(excinfo.value)
7 changes: 7 additions & 0 deletions tests/load/test_job_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,13 @@ def test_get_storage_table_with_all_types(client: SqlJobClientBase) -> None:
continue
if client.config.destination_type == "databricks" and c["data_type"] in ("complex", "time"):
continue
# Clickhouse has no active data type for binary or time type.
# TODO: JSON type is available, but not nullable in Clickhouse.
if client.config.destination_type == "clickhouse":
if c["data_type"] in ("binary", "time"):
continue
elif c["data_type"] == "complex" and c["nullable"]:
continue
assert c["data_type"] == expected_c["data_type"]


Expand Down
4 changes: 2 additions & 2 deletions tests/load/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def destinations_configs(
destination_configs += [
DestinationTestConfiguration(destination=destination)
for destination in SQL_DESTINATIONS
if destination not in ("athena", "mssql", "synapse", "databricks")
if destination not in ("athena", "mssql", "synapse", "databricks", "clickhouse")
]
destination_configs += [
DestinationTestConfiguration(destination="duckdb", file_format="parquet")
Expand Down Expand Up @@ -200,7 +200,7 @@ def destinations_configs(
destination_configs += [
DestinationTestConfiguration(
destination="clickhouse",
file_format="parquet",
file_format="jsonl",
bucket_url=AWS_BUCKET,
extra_info="s3-authorization",
)
Expand Down

0 comments on commit 092a524

Please sign in to comment.