Skip to content

Commit

Permalink
Fix local clickhouse deployment timestamp parsing issue with simple c…
Browse files Browse the repository at this point in the history
…onfiguration setting~

Signed-off-by: Marcel Coetzee <[email protected]>
  • Loading branch information
Pipboyguy committed Jun 20, 2024
1 parent 4912173 commit e1ab71d
Showing 1 changed file with 9 additions and 8 deletions.
17 changes: 9 additions & 8 deletions dlt/destinations/impl/clickhouse/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,12 @@
import clickhouse_connect
from clickhouse_connect.driver.tools import insert_file

import dlt
from dlt import config
from dlt.common.configuration.specs import (
CredentialsConfiguration,
AzureCredentialsWithoutDefaults,
GcpCredentials,
AwsCredentialsWithoutDefaults,
)
from dlt.destinations.exceptions import DestinationTransientException
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.destination.reference import (
SupportsStagingDestination,
Expand Down Expand Up @@ -117,7 +114,8 @@ def from_db_type(
if db_type == "DateTime('UTC')":
db_type = "DateTime"
if datetime_match := re.match(
r"DateTime64(?:\((?P<precision>\d+)(?:,?\s*'(?P<timezone>UTC)')?\))?", db_type
r"DateTime64(?:\((?P<precision>\d+)(?:,?\s*'(?P<timezone>UTC)')?\))?",
db_type,
):
if datetime_match["precision"]:
precision = int(datetime_match["precision"])
Expand All @@ -135,7 +133,7 @@ def from_db_type(
db_type = "Decimal"

if db_type == "Decimal" and (precision, scale) == self.capabilities.wei_precision:
return dict(data_type="wei")
return cast(TColumnType, dict(data_type="wei"))

return super().from_db_type(db_type, precision, scale)

Expand Down Expand Up @@ -165,7 +163,7 @@ def __init__(

compression = "auto"

# Don't use dbapi driver for local files.
# Don't use the DBAPI driver for local files.
if not bucket_path:
# Local filesystem.
if ext == "jsonl":
Expand All @@ -186,8 +184,8 @@ def __init__(
fmt=clickhouse_format,
settings={
"allow_experimental_lightweight_delete": 1,
# "allow_experimental_object_type": 1,
"enable_http_compression": 1,
"date_time_input_format": "best_effort",
},
compression=compression,
)
Expand Down Expand Up @@ -345,7 +343,10 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) ->
)

def _get_table_update_sql(
self, table_name: str, new_columns: Sequence[TColumnSchema], generate_alter: bool
self,
table_name: str,
new_columns: Sequence[TColumnSchema],
generate_alter: bool,
) -> List[str]:
table: TTableSchema = self.prepare_load_table(table_name, self.in_staging_mode)
sql = SqlJobClientBase._get_table_update_sql(self, table_name, new_columns, generate_alter)
Expand Down

0 comments on commit e1ab71d

Please sign in to comment.