From e1ab71d3cd98096b3e1bf8a4cdc200b01a73106f Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Thu, 20 Jun 2024 22:12:11 +0200 Subject: [PATCH] Fix local clickhouse deployment timestamp parsing issue with simple configuration setting~ Signed-off-by: Marcel Coetzee --- dlt/destinations/impl/clickhouse/clickhouse.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index 4c5c4feafc..4e5c76da87 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -8,15 +8,12 @@ import clickhouse_connect from clickhouse_connect.driver.tools import insert_file -import dlt from dlt import config from dlt.common.configuration.specs import ( CredentialsConfiguration, AzureCredentialsWithoutDefaults, - GcpCredentials, AwsCredentialsWithoutDefaults, ) -from dlt.destinations.exceptions import DestinationTransientException from dlt.common.destination import DestinationCapabilitiesContext from dlt.common.destination.reference import ( SupportsStagingDestination, @@ -117,7 +114,8 @@ def from_db_type( if db_type == "DateTime('UTC')": db_type = "DateTime" if datetime_match := re.match( - r"DateTime64(?:\((?P\d+)(?:,?\s*'(?PUTC)')?\))?", db_type + r"DateTime64(?:\((?P\d+)(?:,?\s*'(?PUTC)')?\))?", + db_type, ): if datetime_match["precision"]: precision = int(datetime_match["precision"]) @@ -135,7 +133,7 @@ def from_db_type( db_type = "Decimal" if db_type == "Decimal" and (precision, scale) == self.capabilities.wei_precision: - return dict(data_type="wei") + return cast(TColumnType, dict(data_type="wei")) return super().from_db_type(db_type, precision, scale) @@ -165,7 +163,7 @@ def __init__( compression = "auto" - # Don't use dbapi driver for local files. + # Don't use the DBAPI driver for local files. if not bucket_path: # Local filesystem. if ext == "jsonl": @@ -186,8 +184,8 @@ def __init__( fmt=clickhouse_format, settings={ "allow_experimental_lightweight_delete": 1, - # "allow_experimental_object_type": 1, "enable_http_compression": 1, + "date_time_input_format": "best_effort", }, compression=compression, ) @@ -345,7 +343,10 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> ) def _get_table_update_sql( - self, table_name: str, new_columns: Sequence[TColumnSchema], generate_alter: bool + self, + table_name: str, + new_columns: Sequence[TColumnSchema], + generate_alter: bool, ) -> List[str]: table: TTableSchema = self.prepare_load_table(table_name, self.in_staging_mode) sql = SqlJobClientBase._get_table_update_sql(self, table_name, new_columns, generate_alter)