Skip to content

Commit

Permalink
Refine clickhouse destination and basic adapter #1055
Browse files Browse the repository at this point in the history
Signed-off-by: Marcel Coetzee <[email protected]>
  • Loading branch information
Pipboyguy committed Mar 16, 2024
1 parent 14d39e4 commit dbad9b1
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 17 deletions.
111 changes: 99 additions & 12 deletions dlt/destinations/impl/clickhouse/clickhouse.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,37 @@
from typing import ClassVar, Optional
from copy import deepcopy
from typing import ClassVar, Optional, Dict, List, Sequence

from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.destination.reference import SupportsStagingDestination
from dlt.common.schema import Schema, TColumnSchema
from dlt.common.schema.typing import TColumnType, TTableFormat
from dlt.common.schema.typing import TTableFormat, TTableSchema, TColumnHint, TColumnType
from dlt.destinations.impl.clickhouse import capabilities
from dlt.destinations.impl.clickhouse.configuration import ClickhouseClientConfiguration
from dlt.destinations.job_client_impl import SqlJobClientWithStaging
from dlt.destinations.sql_client import SqlClientBase
from dlt.destinations.impl.clickhouse.clickhouse_adapter import (
TTableEngineType,
TABLE_ENGINE_TYPE_HINT,
)
from dlt.destinations.impl.clickhouse.configuration import (
ClickhouseClientConfiguration,
ClickhouseCredentials,
)
from dlt.destinations.impl.clickhouse.sql_client import ClickhouseSqlClient
from dlt.destinations.job_client_impl import (
SqlJobClientWithStaging,
CopyRemoteFileLoadJob,
SqlJobClientBase,
)
from dlt.destinations.sql_jobs import SqlMergeJob
from dlt.destinations.type_mapping import TypeMapper
from dlt.destinations.typing import TNativeConn


HINT_TO_CLICKHOUSE_ATTR: Dict[TColumnHint, str] = {
"primary_key": "PRIMARY KEY",
}

TABLE_ENGINE_TYPE_TO_CLICKHOUSE_ATTR: Dict[TTableEngineType, str] = {
"merge_tree": "MergeTree",
"replicated_merge_tree": "ReplicatedMergeTree",
}


class ClickhouseTypeMapper(TypeMapper):
Expand Down Expand Up @@ -46,22 +68,87 @@ def to_db_time_type(self, precision: Optional[int], table_format: TTableFormat =
return "DateTime"


class ClickhouseCopyFileLoadJob(CopyRemoteFileLoadJob):
def __init__(
self,
table: TTableSchema,
file_path: str,
sql_client: ClickhouseSqlClient,
staging_credentials: Optional[ClickhouseCredentials] = None,
staging_iam_role: str = None,
) -> None:
self._staging_iam_role = staging_iam_role
super().__init__(table, file_path, sql_client, staging_credentials)

def exception(self) -> str:
pass


class ClickhouseMergeJob(SqlMergeJob): ...


class ClickhouseClient(SqlJobClientWithStaging, SupportsStagingDestination):
capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities()

def __init__(
self,
schema: Schema,
config: ClickhouseClientConfiguration,
sql_client: SqlClientBase[TNativeConn],
) -> None:
super().__init__(schema, config, sql_client)
...
self.config: ClickhouseClientConfiguration = config
# TODO: There are no schemas in Clickhouse. No point in having schemas, only dataset names and table names for example "dataset1_mytable".
self.sql_client = ClickhouseSqlClient(
self.config.normalize_dataset_name(self.schema), self.config.credentials
)
super().__init__(schema, self.config, self.sql_client)
self.active_hints = deepcopy(HINT_TO_CLICKHOUSE_ATTR) if self.config.create_indexes else {}
self.type_mapper = ClickhouseTypeMapper(self.capabilities)

def _get_table_update_sql(
self, table_name: str, new_columns: Sequence[TColumnSchema], generate_alter: bool
) -> List[str]:
table: TTableSchema = self.prepare_load_table(table_name, self.in_staging_mode)
sql = SqlJobClientBase._get_table_update_sql(self, table_name, new_columns, generate_alter)

if generate_alter:
return sql

# TODO: Remove `unique` and `primary_key` default implementations.
if primary_key_list := [
self.capabilities.escape_identifier(c["name"])
for c in new_columns
if c.get("primary_key")
]:
sql[0] += "\nPRIMARY KEY (" + ", ".join(primary_key_list) + ")"
else:
sql[0] += "\nPRIMARY KEY tuple()"

# Default to 'ReplicatedMergeTree' if user didn't explicitly set a table engine hint.
# 'ReplicatedMergeTree' is the only supported engine for Clickhouse Cloud.
sql[0] = f"{sql[0]}\nENGINE = {table.get(TABLE_ENGINE_TYPE_HINT, 'replicated_merge_tree')}"

return sql

def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str:
pass
# The primary key definition is defined outside column specification.
hints_str = " ".join(
self.active_hints.get(hint, "")
for hint in self.active_hints.keys()
if c.get(hint, False) is True and hint != "primary_key"
)
return (
f"{self.capabilities.escape_identifier(c['name'])} "
f"{self.type_mapper.to_db_type(c)} "
f"{hints_str} "
f"{self._gen_not_null(c.get('nullable', True))}"
)

# Clickhouse fields are not nullable by default.
@staticmethod
def _gen_not_null(v: bool) -> str:
return "NULL" if v else "NOT NULL"

def _from_db_type(
self, db_type: str, precision: Optional[int], scale: Optional[int]
self, ch_t: str, precision: Optional[int], scale: Optional[int]
) -> TColumnType:
pass
return self.type_mapper.from_db_type(ch_t, precision, scale)
59 changes: 59 additions & 0 deletions dlt/destinations/impl/clickhouse/clickhouse_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from typing import Any, Literal, Set, get_args, Dict

from dlt.destinations.utils import ensure_resource
from dlt.extract import DltResource
from dlt.extract.items import TTableHintTemplate


TTableEngineType = Literal["merge_tree", "replicated_merge_tree"]

"""
The table engine (type of table) determines:
- How and where data is stored, where to write it to, and where to read it from.
- Which queries are supported, and how.
- Concurrent data access.
- Use of indexes, if present.
- Whether multithread request execution is possible.
- Data replication parameters.
See https://clickhouse.com/docs/en/engines/table-engines.
"""
TABLE_ENGINE_TYPES: Set[TTableEngineType] = set(get_args(TTableEngineType))
TABLE_ENGINE_TYPE_HINT: Literal["x-table-engine-type"] = "x-table-engine-type"

def clickhouse_adapter(data: Any, table_engine_type: TTableEngineType = None) -> DltResource:
"""Prepares data for the Clickhouse destination by specifying which table engine type
that should be used.
Args:
data (Any): The data to be transformed. It can be raw data or an instance
of DltResource. If raw data, the function wraps it into a DltResource
object.
table_engine_type (TTableEngineType, optional): The table index type used when creating
the Synapse table.
Returns:
DltResource: A resource with applied Synapse-specific hints.
Raises:
ValueError: If input for `table_engine_type` is invalid.
Examples:
>>> data = [{"name": "Alice", "description": "Software Developer"}]
>>> clickhouse_adapter(data, table_engine_type="merge_tree")
[DltResource with hints applied]
"""
resource = ensure_resource(data)

additional_table_hints: Dict[str, TTableHintTemplate[Any]] = {}
if table_engine_type is not None:
if table_engine_type not in TABLE_ENGINE_TYPES:
allowed_types = ", ".join(TABLE_ENGINE_TYPES)
raise ValueError(
f"Table engine type {table_engine_type} is invalid. Allowed table engine types are:"
f" {allowed_types}."
)
additional_table_hints[TABLE_ENGINE_TYPE_HINT] = table_engine_type
resource.apply_hints(additional_table_hints=additional_table_hints)
return resource
3 changes: 2 additions & 1 deletion dlt/destinations/impl/clickhouse/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ class ClickhouseClientConfiguration(DestinationClientDwhWithStagingConfiguration
# columns within the same granule.
# See: https://clickhouse.com/docs/en/optimize/sparse-primary-indexes
create_indexes: bool = False
"""Whether `primary_key` and `unique` column hints are applied."""
"""Whether `primary_key` column hint is applied. Note that Clickhouse has no unique constraint,
and primary keys don't guarantee uniqueness."""

__config_gen_annotations__: ClassVar[List[str]] = ["create_indexes"]

Expand Down
10 changes: 6 additions & 4 deletions dlt/destinations/impl/clickhouse/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,13 @@ def execute_sql(

@contextmanager
@raise_database_error
def execute_query(self, query: AnyStr, *args: Any, **kwargs: Any) -> Iterator[DBApiCursor]:
with self._conn.cursor() as curr:
def execute_query(self, query: AnyStr, *args: Any, **kwargs: Any) -> Iterator[ClickhouseDBApiCursorImpl]:
cur: clickhouse_driver.dbapi.connection.Cursor
with self._conn.cursor() as cur:
try:
curr.execute(query, args or (kwargs or None))
yield ClickhouseDBApiCursorImpl(curr) # type: ignore[abstract]
# TODO: Clickhouse driver only accepts pyformat `...WHERE name=%(name)s` parameter marker arguments.
cur.execute(query, args or (kwargs or None))
yield ClickhouseDBApiCursorImpl(cur) # type: ignore[abstract]
except clickhouse_driver.dbapi.Error:
self.close_connection()
self.open_connection()
Expand Down

0 comments on commit dbad9b1

Please sign in to comment.