From 5690946b1a97a7d1a8a53e64a5148cd3eaa2f21b Mon Sep 17 00:00:00 2001
From: Jeremy
Date: Fri, 3 Nov 2023 03:00:11 +0800
Subject: [PATCH] feat: support databend for superset (#23308)
---
README.md | 1 +
docs/docs/databases/databend.mdx | 23 ++
setup.py | 1 -
.../src/assets/images/databend.svg | 1 +
superset/db_engine_specs/databend.py | 353 ++++++++++++++++++
.../db_engine_specs/test_databend.py | 130 +++++++
6 files changed, 508 insertions(+), 1 deletion(-)
create mode 100644 docs/docs/databases/databend.mdx
create mode 100644 superset-frontend/src/assets/images/databend.svg
create mode 100644 superset/db_engine_specs/databend.py
create mode 100644 tests/unit_tests/db_engine_specs/test_databend.py
diff --git a/README.md b/README.md
index 6ed81792e7202..a1d37fad9b69e 100644
--- a/README.md
+++ b/README.md
@@ -128,6 +128,7 @@ Here are some of the major database solutions that are supported:
+
diff --git a/docs/docs/databases/databend.mdx b/docs/docs/databases/databend.mdx
new file mode 100644
index 0000000000000..caffc7b2b179f
--- /dev/null
+++ b/docs/docs/databases/databend.mdx
@@ -0,0 +1,23 @@
+---
+title: Databend
+hide_title: true
+sidebar_position: 39
+version: 1
+---
+
+## Databend
+
+The recommended connector library for Databend is [databend-sqlalchemy](https://pypi.org/project/databend-sqlalchemy/).
+Superset has been tested on `databend-sqlalchemy>=0.2.3`.
+
+The recommended connection string is:
+
+```
+databend://{username}:{password}@{host}:{port}/{database_name}
+```
+
+Here's a connection string example of Superset connecting to a Databend database:
+
+```
+databend://user:password@localhost:8000/default?secure=false
+```
diff --git a/setup.py b/setup.py
index 2ea20c29ac3fb..5173ad6dea708 100644
--- a/setup.py
+++ b/setup.py
@@ -50,7 +50,6 @@ def get_git_sha() -> str:
with open(VERSION_INFO_FILE, "w") as version_file:
json.dump(version_info, version_file)
-
setup(
name="apache-superset",
description="A modern, enterprise-ready business intelligence web application",
diff --git a/superset-frontend/src/assets/images/databend.svg b/superset-frontend/src/assets/images/databend.svg
new file mode 100644
index 0000000000000..bf0ba4eb77e8a
--- /dev/null
+++ b/superset-frontend/src/assets/images/databend.svg
@@ -0,0 +1 @@
+
diff --git a/superset/db_engine_specs/databend.py b/superset/db_engine_specs/databend.py
new file mode 100644
index 0000000000000..589e8b9168e26
--- /dev/null
+++ b/superset/db_engine_specs/databend.py
@@ -0,0 +1,353 @@
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+import re
+from datetime import datetime
+from typing import Any, cast, TYPE_CHECKING
+
+from flask_babel import gettext as __
+from marshmallow import fields, Schema
+from marshmallow.validate import Range
+from sqlalchemy import types
+from sqlalchemy.engine.url import URL
+from urllib3.exceptions import NewConnectionError
+
+from superset.databases.utils import make_url_safe
+from superset.db_engine_specs.base import (
+ BaseEngineSpec,
+ BasicParametersMixin,
+ BasicParametersType,
+ BasicPropertiesType,
+)
+from superset.db_engine_specs.exceptions import SupersetDBAPIDatabaseError
+from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
+from superset.utils.core import GenericDataType
+from superset.utils.hashing import md5_sha_from_str
+from superset.utils.network import is_hostname_valid, is_port_open
+
+if TYPE_CHECKING:
+ from superset.models.core import Database
+
+logger = logging.getLogger(__name__)
+
+
+class DatabendBaseEngineSpec(BaseEngineSpec):
+ """Shared engine spec for Databend."""
+
+ time_secondary_columns = True
+ time_groupby_inline = True
+
+ _time_grain_expressions = {
+ None: "{col}",
+ "PT1M": "to_start_of_minute(TO_DATETIME({col}))",
+ "PT5M": "to_start_of_five_minutes(TO_DATETIME({col}))",
+ "PT10M": "to_start_of_ten_minutes(TO_DATETIME({col}))",
+ "PT15M": "to_start_of_fifteen_minutes(TO_DATETIME({col}))",
+ "PT30M": "TO_DATETIME(intDiv(toUInt32(TO_DATETIME({col})), 1800)*1800)",
+ "PT1H": "to_start_of_hour(TO_DATETIME({col}))",
+ "P1D": "to_start_of_day(TO_DATETIME({col}))",
+ "P1W": "to_monday(TO_DATETIME({col}))",
+ "P1M": "to_start_of_month(TO_DATETIME({col}))",
+ "P3M": "to_start_of_quarter(TO_DATETIME({col}))",
+ "P1Y": "to_start_of_year(TO_DATETIME({col}))",
+ }
+
+ column_type_mappings = (
+ (
+ re.compile(r".*Varchar.*", re.IGNORECASE),
+ types.String(),
+ GenericDataType.STRING,
+ ),
+ (
+ re.compile(r".*Array.*", re.IGNORECASE),
+ types.String(),
+ GenericDataType.STRING,
+ ),
+ (
+ re.compile(r".*Map.*", re.IGNORECASE),
+ types.String(),
+ GenericDataType.STRING,
+ ),
+ (
+ re.compile(r".*Json.*", re.IGNORECASE),
+ types.JSON(),
+ GenericDataType.STRING,
+ ),
+ (
+ re.compile(r".*Bool.*", re.IGNORECASE),
+ types.Boolean(),
+ GenericDataType.BOOLEAN,
+ ),
+ (
+ re.compile(r".*String.*", re.IGNORECASE),
+ types.String(),
+ GenericDataType.STRING,
+ ),
+ (
+ re.compile(r".*Int\d+.*", re.IGNORECASE),
+ types.INTEGER(),
+ GenericDataType.NUMERIC,
+ ),
+ (
+ re.compile(r".*Float\d+.*", re.IGNORECASE),
+ types.FLOAT(),
+ GenericDataType.NUMERIC,
+ ),
+ (
+ re.compile(r".*Double\d+.*", re.IGNORECASE),
+ types.FLOAT(),
+ GenericDataType.NUMERIC,
+ ),
+ (
+ re.compile(r".*Decimal.*", re.IGNORECASE),
+ types.DECIMAL(),
+ GenericDataType.NUMERIC,
+ ),
+ (
+ re.compile(r".*DateTime.*", re.IGNORECASE),
+ types.DateTime(),
+ GenericDataType.TEMPORAL,
+ ),
+ (
+ re.compile(r".*Date.*", re.IGNORECASE),
+ types.Date(),
+ GenericDataType.TEMPORAL,
+ ),
+ )
+
+ @classmethod
+ def epoch_to_dttm(cls) -> str:
+ return "{col}"
+
+ @classmethod
+ def convert_dttm(
+ cls, target_type: str, dttm: datetime, db_extra: dict[str, Any] | None = None
+ ) -> str | None:
+ sqla_type = cls.get_sqla_column_type(target_type)
+
+ if isinstance(sqla_type, types.Date):
+ return f"to_date('{dttm.date().isoformat()}')"
+ if isinstance(sqla_type, types.DateTime):
+ return f"""to_dateTime('{dttm.isoformat(sep=" ", timespec="seconds")}')"""
+ return None
+
+
+class DatabendEngineSpec(DatabendBaseEngineSpec):
+ """Engine spec for databend_sqlalchemy connector"""
+
+ engine = "databend"
+ engine_name = "Databend"
+ _function_names: list[str] = []
+
+ _show_functions_column = "name"
+ supports_file_upload = False
+
+ @classmethod
+ def get_dbapi_exception_mapping(cls) -> dict[type[Exception], type[Exception]]:
+ return {NewConnectionError: SupersetDBAPIDatabaseError}
+
+ @classmethod
+ def get_dbapi_mapped_exception(cls, exception: Exception) -> Exception:
+ new_exception = cls.get_dbapi_exception_mapping().get(type(exception))
+ if new_exception == SupersetDBAPIDatabaseError:
+ return SupersetDBAPIDatabaseError("Connection failed")
+ if not new_exception:
+ return exception
+ return new_exception(str(exception))
+
+ @classmethod
+ def get_function_names(cls, database: Database) -> list[str]:
+ if cls._function_names:
+ return cls._function_names
+ try:
+ names = database.get_df("SELECT name FROM system.functions;")[
+ "name"
+ ].tolist()
+ cls._function_names = names
+ return names
+ except Exception as ex: # pylint: disable=broad-except
+ logger.exception("Error retrieving system.functions: %s", str(ex))
+ return []
+
+
+class DatabendParametersSchema(Schema):
+ username = fields.String(allow_none=True, description=__("Username"))
+ password = fields.String(allow_none=True, description=__("Password"))
+ host = fields.String(required=True, description=__("Hostname or IP address"))
+ port = fields.Integer(
+ allow_none=True,
+ description=__("Database port"),
+ validate=Range(min=0, max=65535),
+ )
+ database = fields.String(allow_none=True, description=__("Database name"))
+ encryption = fields.Boolean(
+ default=True, description=__("Use an encrypted connection to the database")
+ )
+ query = fields.Dict(
+ keys=fields.Str(), values=fields.Raw(), description=__("Additional parameters")
+ )
+
+
+class DatabendConnectEngineSpec(DatabendEngineSpec, BasicParametersMixin):
+ """Engine spec for databend sqlalchemy connector"""
+
+ engine = "databend"
+ engine_name = "Databend"
+
+ default_driver = "databend"
+ _function_names: list[str] = []
+
+ sqlalchemy_uri_placeholder = (
+ "databend://user:password@host[:port][/dbname][?secure=value&=value...]"
+ )
+ parameters_schema = DatabendParametersSchema()
+ encryption_parameters = {"secure": "true"}
+
+ @classmethod
+ def get_dbapi_exception_mapping(cls) -> dict[type[Exception], type[Exception]]:
+ return {}
+
+ @classmethod
+ def get_dbapi_mapped_exception(cls, exception: Exception) -> Exception:
+ new_exception = cls.get_dbapi_exception_mapping().get(type(exception))
+ if new_exception == SupersetDBAPIDatabaseError:
+ return SupersetDBAPIDatabaseError("Connection failed")
+ if not new_exception:
+ return exception
+ return new_exception(str(exception))
+
+ @classmethod
+ def get_function_names(cls, database: Database) -> list[str]:
+ if cls._function_names:
+ return cls._function_names
+ try:
+ names = database.get_df("SELECT name FROM system.functions;")[
+ "name"
+ ].tolist()
+ cls._function_names = names
+ return names
+ except Exception as ex: # pylint: disable=broad-except
+ logger.exception("Error retrieving system.functions: %s", str(ex))
+ return []
+
+ @classmethod
+ def get_datatype(cls, type_code: str) -> str:
+ return type_code
+
+ @classmethod
+ def build_sqlalchemy_uri(
+ cls, parameters: BasicParametersType, *_args: dict[str, str] | None
+ ) -> str:
+ url_params = parameters.copy()
+ if url_params.get("encryption"):
+ query = parameters.get("query", {}).copy()
+ query.update(cls.encryption_parameters)
+ url_params["query"] = query
+ if not url_params.get("database"):
+ url_params["database"] = "__default__"
+ url_params.pop("encryption", None)
+ return str(URL(f"{cls.engine}", **url_params))
+
+ @classmethod
+ def get_parameters_from_uri(
+ cls, uri: str, *_args: dict[str, Any] | None
+ ) -> BasicParametersType:
+ url = make_url_safe(uri)
+ query = url.query
+ if "secure" in query:
+ encryption = url.query.get("secure") == "true"
+ query.pop("secure")
+ else:
+ encryption = False
+ return BasicParametersType(
+ username=url.username,
+ password=url.password,
+ host=url.host,
+ port=url.port,
+ database="" if url.database == "__default__" else cast(str, url.database),
+ query=query,
+ encryption=encryption,
+ )
+
+ @classmethod
+ def default_port(cls, interface: str, secure: bool) -> int:
+ if interface.startswith("http"):
+ return 443 if secure else 8000
+ raise ValueError("Unrecognized Databend interface")
+
+ @classmethod
+ def validate_parameters(
+ cls, properties: BasicPropertiesType
+ ) -> list[SupersetError]:
+ # The newest versions of superset send a "properties" object with a
+ # parameters key, instead of just the parameters, so we hack to be compatible
+ parameters = properties.get("parameters", properties)
+ host = parameters.get("host", None)
+ host = str(host) if host is not None else None
+ if not host:
+ return [
+ SupersetError(
+ "Hostname is required",
+ SupersetErrorType.CONNECTION_MISSING_PARAMETERS_ERROR,
+ ErrorLevel.WARNING,
+ {"missing": ["host"]},
+ )
+ ]
+ if not is_hostname_valid(host):
+ return [
+ SupersetError(
+ "The hostname provided can't be resolved.",
+ SupersetErrorType.CONNECTION_INVALID_HOSTNAME_ERROR,
+ ErrorLevel.ERROR,
+ {"invalid": ["host"]},
+ )
+ ]
+ port = parameters.get("port")
+ if port is not None:
+ if isinstance(port, (int, str)):
+ try:
+ port = int(port)
+ if port <= 0 or port >= 65535:
+ port = -1
+ except (ValueError, TypeError):
+ port = -1
+ encryption = parameters.get("encryption", False)
+ if port is None or port == -1:
+ encryption = bool(encryption)
+ port = cls.default_port("http", encryption)
+ if port <= 0 or port >= 65535:
+ return [
+ SupersetError(
+ "Port must be a valid integer between 0 and 65535 (inclusive).",
+ SupersetErrorType.CONNECTION_INVALID_PORT_ERROR,
+ ErrorLevel.ERROR,
+ {"invalid": ["port"]},
+ )
+ ]
+ if not is_port_open(host, port):
+ return [
+ SupersetError(
+ "The port is closed.",
+ SupersetErrorType.CONNECTION_PORT_CLOSED_ERROR,
+ ErrorLevel.ERROR,
+ {"invalid": ["port"]},
+ )
+ ]
+ return []
+
+ @staticmethod
+ def _mutate_label(label: str) -> str:
+ """
+ Suffix with the first six characters from the md5 of the label to avoid
+ collisions with original column names
+
+ :param label: Expected expression label
+ :return: Conditionally mutated label
+ """
+ return f"{label}_{md5_sha_from_str(label)[:6]}"
diff --git a/tests/unit_tests/db_engine_specs/test_databend.py b/tests/unit_tests/db_engine_specs/test_databend.py
new file mode 100644
index 0000000000000..9c494492d9ad1
--- /dev/null
+++ b/tests/unit_tests/db_engine_specs/test_databend.py
@@ -0,0 +1,130 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import datetime
+from typing import Any, Optional
+from unittest.mock import Mock
+
+import pytest
+from sqlalchemy.types import (
+ Boolean,
+ Date,
+ DateTime,
+ DECIMAL,
+ Float,
+ Integer,
+ String,
+ TypeEngine,
+)
+from urllib3.connection import HTTPConnection
+
+from superset.utils.core import GenericDataType
+from tests.unit_tests.db_engine_specs.utils import (
+ assert_column_spec,
+ assert_convert_dttm,
+)
+from tests.unit_tests.fixtures.common import dttm
+
+
+@pytest.mark.parametrize(
+ "target_type,expected_result",
+ [
+ ("Date", "to_date('2019-01-02')"),
+ ("DateTime", "to_dateTime('2019-01-02 03:04:05')"),
+ ("UnknownType", None),
+ ],
+)
+def test_convert_dttm(
+ target_type: str, expected_result: Optional[str], dttm: datetime
+) -> None:
+ from superset.db_engine_specs.databend import DatabendEngineSpec as spec
+
+ assert_convert_dttm(spec, target_type, expected_result, dttm)
+
+
+def test_execute_connection_error() -> None:
+ from urllib3.exceptions import NewConnectionError
+
+ from superset.db_engine_specs.databend import DatabendEngineSpec
+ from superset.db_engine_specs.exceptions import SupersetDBAPIDatabaseError
+
+ cursor = Mock()
+ cursor.execute.side_effect = NewConnectionError(
+ HTTPConnection("Dummypool"), "Exception with sensitive data"
+ )
+ with pytest.raises(SupersetDBAPIDatabaseError) as ex:
+ DatabendEngineSpec.execute(cursor, "SELECT col1 from table1")
+
+
+@pytest.mark.parametrize(
+ "native_type,sqla_type,attrs,generic_type,is_dttm",
+ [
+ ("Varchar", String, None, GenericDataType.STRING, False),
+ ("Nullable(Varchar)", String, None, GenericDataType.STRING, False),
+ ("Array(UInt8)", String, None, GenericDataType.STRING, False),
+ ("Int8", Integer, None, GenericDataType.NUMERIC, False),
+ ("Int16", Integer, None, GenericDataType.NUMERIC, False),
+ ("Int32", Integer, None, GenericDataType.NUMERIC, False),
+ ("Int64", Integer, None, GenericDataType.NUMERIC, False),
+ ("Int128", Integer, None, GenericDataType.NUMERIC, False),
+ ("Int256", Integer, None, GenericDataType.NUMERIC, False),
+ ("Nullable(Int64)", Integer, None, GenericDataType.NUMERIC, False),
+ ("UInt8", Integer, None, GenericDataType.NUMERIC, False),
+ ("UInt16", Integer, None, GenericDataType.NUMERIC, False),
+ ("UInt32", Integer, None, GenericDataType.NUMERIC, False),
+ ("UInt64", Integer, None, GenericDataType.NUMERIC, False),
+ ("UInt128", Integer, None, GenericDataType.NUMERIC, False),
+ ("UInt256", Integer, None, GenericDataType.NUMERIC, False),
+ ("Float", Float, None, GenericDataType.NUMERIC, False),
+ ("Double", Float, None, GenericDataType.NUMERIC, False),
+ ("Decimal(1, 2)", DECIMAL, None, GenericDataType.NUMERIC, False),
+ ("Decimal32(2)", DECIMAL, None, GenericDataType.NUMERIC, False),
+ ("Decimal64(2)", DECIMAL, None, GenericDataType.NUMERIC, False),
+ ("Decimal128(2)", DECIMAL, None, GenericDataType.NUMERIC, False),
+ ("Decimal256(2)", DECIMAL, None, GenericDataType.NUMERIC, False),
+ ("Bool", Boolean, None, GenericDataType.BOOLEAN, False),
+ ("Nullable(Bool)", Boolean, None, GenericDataType.BOOLEAN, False),
+ ("Date", Date, None, GenericDataType.TEMPORAL, True),
+ ("Nullable(Date)", Date, None, GenericDataType.TEMPORAL, True),
+ ("Datetime", DateTime, None, GenericDataType.TEMPORAL, True),
+ ("Nullable(Datetime)", DateTime, None, GenericDataType.TEMPORAL, True),
+ ],
+)
+def test_get_column_spec(
+ native_type: str,
+ sqla_type: type[TypeEngine],
+ attrs: Optional[dict[str, Any]],
+ generic_type: GenericDataType,
+ is_dttm: bool,
+) -> None:
+ from superset.db_engine_specs.databend import DatabendConnectEngineSpec as spec
+
+ assert_column_spec(spec, native_type, sqla_type, attrs, generic_type, is_dttm)
+
+
+@pytest.mark.parametrize(
+ "column_name,expected_result",
+ [
+ ("time", "time_07cc69"),
+ ("count", "count_e2942a"),
+ ],
+)
+def test_make_label_compatible(column_name: str, expected_result: str) -> None:
+ from superset.db_engine_specs.databend import DatabendConnectEngineSpec as spec
+
+ label = spec.make_label_compatible(column_name)
+ assert label == expected_result