Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC - Enable support for clickhouse on SL #1592

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
31 changes: 30 additions & 1 deletion .github/workflows/cd-sql-engine-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,39 @@ jobs:
python-version: "3.12"
make-target: "test-trino"

clickhouse-tests:
# Clickhouse tests run on a local service container, which obviates the need for separate Environment hosting.
# We run them here instead of in the CI unit test suite because they are a bit slower, and because in future
# we may choose to execute them against a hosted instance, at which point this config will look like the other
# engine configs in this file.
name: Clickhouse Tests
if: ${{ github.event.action != 'labeled' || github.event.label.name == 'Run Tests With Other SQL Engines' }}
runs-on: ubuntu-latest
services:
clickhouse:
image: clickhouse/clickhouse-server
ports:
- 8123:8123
env:
CLICKHOUSE_USER: "metricflow"
CLICKHOUSE_PASSWORD: "metricflowing"
CLICKHOUSE_DB: "metricflow"
steps:
- name: Check-out the repo
uses: actions/checkout@v3

- name: Test w/ Python 3.12
uses: ./.github/actions/run-mf-tests
with:
python-version: "3.12"
make-target: "test-clickhouse"
mf_sql_engine_url: "clickhouse://metricflow:metricflowing@localhost:8123/metricflow"
mf_sql_engine_password: "metricflowing"

remove-label:
name: Remove Label After Running Tests
runs-on: ubuntu-latest
needs: [ snowflake-tests, redshift-tests, bigquery-tests, databricks-tests ]
needs: [ snowflake-tests, redshift-tests, bigquery-tests, databricks-tests, clickhouse-tests ]
# Default behavior for `needs` is that it requires success, so a success / failure needs to be specifically checked.
if: ${{ (success() || failure()) && github.event.action == 'labeled' }}
steps:
Expand Down
12 changes: 12 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ populate-persistent-source-schema-snowflake:
test-trino:
hatch -v run trino-env:pytest -vv -n $(PARALLELISM) $(ADDITIONAL_PYTEST_OPTIONS) $(TESTS_METRICFLOW)/

.PHONY: test-clickhouse
test-clickhouse:
hatch -v run clickhouse-env:pytest -vv -n $(PARALLELISM) $(ADDITIONAL_PYTEST_OPTIONS) $(TESTS_METRICFLOW)/

.PHONY: populate-persistent-source-schema-clickhouse
populate-persistent-source-schema-clickhouse:
hatch -v run clickhouse-env:pytest -vv $(ADDITIONAL_PYTEST_OPTIONS) $(USE_PERSISTENT_SOURCE_SCHEMA) $(POPULATE_PERSISTENT_SOURCE_SCHEMA)

.PHONY: lint
lint:
hatch -v run dev-env:pre-commit run --all-files
Expand All @@ -84,6 +92,10 @@ postgresql postgres:
trino:
make -C local-data-warehouses trino

.PHONY: clickhouse
clickhouse:
make -C local-data-warehouses clickhouse

# Re-generate test snapshots using all supported SQL engines.
.PHONY: regenerate-test-snapshots
regenerate-test-snapshots:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from metricflow.data_table.mf_table import MetricFlowDataTable
from metricflow.protocols.sql_client import SqlEngine
from metricflow.sql.render.big_query import BigQuerySqlQueryPlanRenderer
from metricflow.sql.render.clickhouse import ClickhouseSqlQueryPlanRenderer
from metricflow.sql.render.databricks import DatabricksSqlQueryPlanRenderer
from metricflow.sql.render.duckdb_renderer import DuckDbSqlQueryPlanRenderer
from metricflow.sql.render.postgres import PostgresSQLSqlQueryPlanRenderer
Expand Down Expand Up @@ -42,6 +43,7 @@ class SupportedAdapterTypes(enum.Enum):
BIGQUERY = "bigquery"
DUCKDB = "duckdb"
TRINO = "trino"
CLICKHOUSE = "clickhouse"

@property
def sql_engine_type(self) -> SqlEngine:
Expand All @@ -60,6 +62,8 @@ def sql_engine_type(self) -> SqlEngine:
return SqlEngine.DUCKDB
elif self is SupportedAdapterTypes.TRINO:
return SqlEngine.TRINO
elif self is SupportedAdapterTypes.CLICKHOUSE:
return SqlEngine.CLICKHOUSE
else:
assert_values_exhausted(self)

Expand All @@ -80,6 +84,8 @@ def sql_query_plan_renderer(self) -> SqlQueryPlanRenderer:
return DuckDbSqlQueryPlanRenderer()
elif self is SupportedAdapterTypes.TRINO:
return TrinoSqlQueryPlanRenderer()
elif self is SupportedAdapterTypes.CLICKHOUSE:
return ClickhouseSqlQueryPlanRenderer()
else:
assert_values_exhausted(self)

Expand Down
4 changes: 4 additions & 0 deletions local-data-warehouses/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,7 @@ postgresql:
.PHONY: trino
trino:
docker-compose -f trino/docker-compose.yaml up

.PHONY: clickhouse
clickhouse:
docker-compose -f clickhouse/docker-compose.yaml up
16 changes: 16 additions & 0 deletions local-data-warehouses/clickhouse/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
version: "3.7"

services:
clickhouse:
container_name: clickhouse
image: clickhouse
expose:
- "8123"
- "9000"
ports:
- "8123:8123"
- "9000:9000"
environment:
CLICKHOUSE_USER: "metricflow"
CLICKHOUSE_PASSWORD: "metricflowing"
CLICKHOUSE_DB: "metricflow"
5 changes: 5 additions & 0 deletions metricflow/protocols/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class SqlEngine(Enum):
SNOWFLAKE = "Snowflake"
DATABRICKS = "Databricks"
TRINO = "Trino"
CLICKHOUSE = "Clickhouse"

@property
def unsupported_granularities(self) -> Set[TimeGranularity]:
Expand All @@ -49,6 +50,10 @@ def unsupported_granularities(self) -> Set[TimeGranularity]:
return {TimeGranularity.NANOSECOND}
elif self is SqlEngine.TRINO:
return {TimeGranularity.NANOSECOND, TimeGranularity.MICROSECOND}
elif self is SqlEngine.CLICKHOUSE:
return (
set()
) # TODO: it looks like clickhouse supports nanoseconds (https://clickhouse.com/docs/en/sql-reference/functions/date-time-functions#date_trunc)
else:
assert_values_exhausted(self)

Expand Down
239 changes: 239 additions & 0 deletions metricflow/sql/render/clickhouse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
from __future__ import annotations

import textwrap
from typing import Collection, Optional, Sequence

from dbt_semantic_interfaces.enum_extension import assert_values_exhausted
from dbt_semantic_interfaces.type_enums.date_part import DatePart
from dbt_semantic_interfaces.type_enums.time_granularity import TimeGranularity
from metricflow_semantics.errors.error_classes import UnsupportedEngineFeatureError
from metricflow_semantics.sql.sql_bind_parameters import SqlBindParameterSet
from metricflow_semantics.sql.sql_exprs import (
SqlAddTimeExpression,
SqlArithmeticExpression,
SqlArithmeticOperator,
SqlGenerateUuidExpression,
SqlIntegerExpression,
SqlPercentileExpression,
SqlPercentileFunctionType,
SqlSubtractTimeIntervalExpression,
)
from typing_extensions import override

from metricflow.protocols.sql_client import SqlEngine
from metricflow.sql.render.expr_renderer import (
DefaultSqlExpressionRenderer,
SqlExpressionRenderer,
SqlExpressionRenderResult,
)
from metricflow.sql.render.rendering_constants import SqlRenderingConstants
from metricflow.sql.render.sql_plan_renderer import DefaultSqlQueryPlanRenderer, SqlPlanRenderResult
from metricflow.sql.sql_plan import SqlJoinDescription


class ClickhouseSqlExpressionRenderer(DefaultSqlExpressionRenderer):
"""Expression renderer for the Clickhouse engine."""

__QUARTER_IN_MONTHS = 3

sql_engine = SqlEngine.CLICKHOUSE

@property
@override
def double_data_type(self) -> str:
"""Custom double data type for the Clickhouse engine."""
return "DOUBLE PRECISION"

@property
@override
def supported_percentile_function_types(self) -> Collection[SqlPercentileFunctionType]:
return {
SqlPercentileFunctionType.CONTINUOUS,
SqlPercentileFunctionType.DISCRETE,
SqlPercentileFunctionType.APPROXIMATE_CONTINUOUS,
}

@override
def render_date_part(self, date_part: DatePart) -> str:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing this will need to be moved to render_extract()?

"""Map DatePart enum to Clickhouse date/time function names."""
if date_part is DatePart.DOW:
return "toDayOfWeek" # Returns 1-7 where Monday is 1
elif date_part is DatePart.DOY:
return "toDayOfYear"
elif date_part is DatePart.MONTH:
return "toMonth"
elif date_part is DatePart.QUARTER:
return "toQuarter"
elif date_part is DatePart.YEAR:
return "toYear"
elif date_part is DatePart.DAY:
return "toDayOfMonth"
return assert_values_exhausted(date_part)

@override
def visit_subtract_time_interval_expr(self, node: SqlSubtractTimeIntervalExpression) -> SqlExpressionRenderResult:
"""Render time delta operations for Clickhouse, which needs custom support for quarterly granularity."""
arg_rendered = node.arg.accept(self)

count = node.count
granularity = node.granularity
if granularity is TimeGranularity.QUARTER:
granularity = TimeGranularity.MONTH
count *= self.__QUARTER_IN_MONTHS

function_name = self.__get_function_operation_from_time_granularity(granularity)

return SqlExpressionRenderResult(
sql=f"{function_name}({arg_rendered.sql}, CAST(-{count} AS Integer))",
bind_parameter_set=arg_rendered.bind_parameter_set,
)

@override
def visit_add_time_expr(self, node: SqlAddTimeExpression) -> SqlExpressionRenderResult:
"""Render time delta operations for Clickhouse, which needs custom support for quarterly granularity."""
granularity = node.granularity
count_expr = node.count_expr
if granularity is TimeGranularity.QUARTER:
granularity = TimeGranularity.MONTH
SqlArithmeticExpression.create(
left_expr=node.count_expr,
operator=SqlArithmeticOperator.MULTIPLY,
right_expr=SqlIntegerExpression.create(self.__QUARTER_IN_MONTHS),
) # TODO: this is not correct, we need to multiply the count by the number of months in a quarter ?

arg_rendered = node.arg.accept(self)
count_rendered = count_expr.accept(self)
count_sql = f"({count_rendered.sql})" if count_expr.requires_parenthesis else count_rendered.sql

function_operation = self.__get_function_operation_from_time_granularity(granularity)

return SqlExpressionRenderResult(
sql=f"{function_operation}({arg_rendered.sql}, CAST({count_sql} AS Integer))",
bind_parameter_set=SqlBindParameterSet.merge_iterable(
(arg_rendered.bind_parameter_set, count_rendered.bind_parameter_set)
),
)

@override
def visit_generate_uuid_expr(self, node: SqlGenerateUuidExpression) -> SqlExpressionRenderResult:
return SqlExpressionRenderResult(
sql="generateUUIDv4()",
bind_parameter_set=SqlBindParameterSet(),
)

@override
def visit_percentile_expr(self, node: SqlPercentileExpression) -> SqlExpressionRenderResult:
"""Render a percentile expression for Clickhouse."""
arg_rendered = self.render_sql_expr(node.order_by_arg)
params = arg_rendered.bind_parameter_set
percentile = node.percentile_args.percentile

if node.percentile_args.function_type is SqlPercentileFunctionType.CONTINUOUS:
function_str = "quantile" # Uses interpolation by default
elif node.percentile_args.function_type is SqlPercentileFunctionType.DISCRETE:
function_str = "quantileExact" # Exact calculation without interpolation
elif node.percentile_args.function_type is SqlPercentileFunctionType.APPROXIMATE_CONTINUOUS:
function_str = "quantile" # Default quantile is already approximate
elif node.percentile_args.function_type is SqlPercentileFunctionType.APPROXIMATE_DISCRETE:
raise UnsupportedEngineFeatureError(
"Approximate discrete percentile aggregate not supported for Clickhouse. Set "
+ "use_approximate_percentile to false in all percentile measures."
)
else:
assert_values_exhausted(node.percentile_args.function_type)

# Clickhouse uses function(percentile)(expr) syntax instead of WITHIN GROUP
return SqlExpressionRenderResult(
sql=f"{function_str}({percentile})({arg_rendered.sql})",
bind_parameter_set=params,
)

def __get_function_operation_from_time_granularity(self, granularity: TimeGranularity) -> str:
return {
TimeGranularity.YEAR: "addYears",
TimeGranularity.QUARTER: "addMonths",
TimeGranularity.MONTH: "addMonths",
TimeGranularity.WEEK: "addWeeks",
TimeGranularity.DAY: "addDays",
TimeGranularity.HOUR: "addHours",
TimeGranularity.MINUTE: "addMinutes",
TimeGranularity.SECOND: "addSeconds",
}[granularity]


class ClickhouseSqlQueryPlanRenderer(DefaultSqlQueryPlanRenderer):
"""Plan renderer for the Clickhouse engine."""

EXPR_RENDERER = ClickhouseSqlExpressionRenderer()

@property
@override
def expr_renderer(self) -> SqlExpressionRenderer:
return self.EXPR_RENDERER

@override
def _render_adapter_specific_flags(self) -> Optional[SqlPlanRenderResult]:
"""Add ClickHouse-specific query settings."""
settings = ["allow_experimental_join_condition = 1", "allow_experimental_analyzer = 1", "join_use_nulls = 0"]
return SqlPlanRenderResult(sql=f"SETTINGS {', '.join(settings)}", bind_parameter_set=SqlBindParameterSet())

def _render_joins_section(self, join_descriptions: Sequence[SqlJoinDescription]) -> Optional[SqlPlanRenderResult]:
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't link this at all.

The problem is that clickhouse doesn't support inequality INNER JOINS

https://clickhouse.com/docs/en/sql-reference/statements/select/join#join-with-inequality-conditions-for-columns-from-different-tables.

The suggested approach is to use CROSS JOIN instead, but it forces a change on the rendering logic of any join that uses inequality.

The tests are passing, but I'm open for suggestions here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll definitely want to discuss this at the MF sync tomorrow. The problem is that a SQL plan theoretically should be engine-agnostic. In this case, it isn't, since the INNER JOIN in the SQL plan is invalid for Clickhouse.

A couple of other potential options:

  • We change the SQL plan to use CROSS JOIN, which I'm assuming would work for all engines. Then, we could implement an engine-specific optimizer that swaps in an INNER JOIN if the engine supports it. In practice, if that optimizer errors for some reason, the user might be surprised to see a CROSS JOIN here when it's not necessary. We might be ok with that tradeoff.
  • Another option would be for us to just create an internal concept of a generic "inequality join" or something similar to use in the SQL plan. And that would get translated into either INNER JOIN or CROSS JOIN by the SQL renderer.
    Both of those options might have issues integrating with the WHERE clause, though, since we would need to include that when CROSS JOIN is used 🤔

Let's hold off on making any changes here for now - I want to see if Paul has any better ideas for how to work around this.

"""Convert the join descriptions into a "JOIN" section with ClickHouse-specific handling."""
if len(join_descriptions) == 0:
return None

params = SqlBindParameterSet()
join_section_lines = []
where_conditions = []

for join_description in join_descriptions:
right_source_rendered = self._render_node(join_description.right_source)
params = params.merge(right_source_rendered.bind_parameter_set)

on_condition_rendered: Optional[SqlExpressionRenderResult] = None
if join_description.on_condition:
on_condition_rendered = self.EXPR_RENDERER.render_sql_expr(join_description.on_condition)
params = params.merge(on_condition_rendered.bind_parameter_set)

# Check if this is a time-range join
is_time_range_join = False
if on_condition_rendered:
is_time_range_join = any(op in on_condition_rendered.sql for op in ["<=", ">=", "<", ">"])

# Add join type
join_section_lines.append(join_description.join_type.value)

# Add the source
if join_description.right_source.as_sql_table_node is not None:
join_section_lines.append(
textwrap.indent(
f"{right_source_rendered.sql} {join_description.right_source_alias}",
prefix=SqlRenderingConstants.INDENT,
)
)
else:
join_section_lines.append("(")
join_section_lines.append(
textwrap.indent(right_source_rendered.sql, prefix=SqlRenderingConstants.INDENT)
)
join_section_lines.append(f") {join_description.right_source_alias}")

# Add conditions
if is_time_range_join:
# For time-range joins, convert to CROSS JOIN + WHERE
join_section_lines[-len(join_section_lines)] = "CROSS JOIN" # Replace join type
if on_condition_rendered:
where_conditions.append(on_condition_rendered.sql)
else:
# For regular joins, use ON clause
if on_condition_rendered:
join_section_lines.append("ON")
join_section_lines.append(
textwrap.indent(on_condition_rendered.sql, prefix=SqlRenderingConstants.INDENT)
)

# Store where conditions for use in _render_where_section
if where_conditions:
self._stored_where_conditions = where_conditions

return SqlPlanRenderResult("\n".join(join_section_lines), params)
Loading
Loading