diff --git a/README.md b/README.md index 6ed81792e7202..a1d37fad9b69e 100644 --- a/README.md +++ b/README.md @@ -128,6 +128,7 @@ Here are some of the major database solutions that are supported: +
diff --git a/docs/docs/databases/databend.mdx b/docs/docs/databases/databend.mdx new file mode 100644 index 0000000000000..caffc7b2b179f --- /dev/null +++ b/docs/docs/databases/databend.mdx @@ -0,0 +1,23 @@ +--- +title: Databend +hide_title: true +sidebar_position: 39 +version: 1 +--- + +## Databend + +The recommended connector library for Databend is [databend-sqlalchemy](https://pypi.org/project/databend-sqlalchemy/). +Superset has been tested on `databend-sqlalchemy>=0.2.3`. + +The recommended connection string is: + +``` +databend://{username}:{password}@{host}:{port}/{database_name} +``` + +Here's a connection string example of Superset connecting to a Databend database: + +``` +databend://user:password@localhost:8000/default?secure=false +``` diff --git a/setup.py b/setup.py index 2ea20c29ac3fb..5173ad6dea708 100644 --- a/setup.py +++ b/setup.py @@ -50,7 +50,6 @@ def get_git_sha() -> str: with open(VERSION_INFO_FILE, "w") as version_file: json.dump(version_info, version_file) - setup( name="apache-superset", description="A modern, enterprise-ready business intelligence web application", diff --git a/superset-frontend/src/assets/images/databend.svg b/superset-frontend/src/assets/images/databend.svg new file mode 100644 index 0000000000000..bf0ba4eb77e8a --- /dev/null +++ b/superset-frontend/src/assets/images/databend.svg @@ -0,0 +1 @@ + diff --git a/superset/db_engine_specs/databend.py b/superset/db_engine_specs/databend.py new file mode 100644 index 0000000000000..589e8b9168e26 --- /dev/null +++ b/superset/db_engine_specs/databend.py @@ -0,0 +1,353 @@ +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import logging +import re +from datetime import datetime +from typing import Any, cast, TYPE_CHECKING + +from flask_babel import gettext as __ +from marshmallow import fields, Schema +from marshmallow.validate import Range +from sqlalchemy import types +from sqlalchemy.engine.url import URL +from urllib3.exceptions import NewConnectionError + +from superset.databases.utils import make_url_safe +from superset.db_engine_specs.base import ( + BaseEngineSpec, + BasicParametersMixin, + BasicParametersType, + BasicPropertiesType, +) +from superset.db_engine_specs.exceptions import SupersetDBAPIDatabaseError +from superset.errors import ErrorLevel, SupersetError, SupersetErrorType +from superset.utils.core import GenericDataType +from superset.utils.hashing import md5_sha_from_str +from superset.utils.network import is_hostname_valid, is_port_open + +if TYPE_CHECKING: + from superset.models.core import Database + +logger = logging.getLogger(__name__) + + +class DatabendBaseEngineSpec(BaseEngineSpec): + """Shared engine spec for Databend.""" + + time_secondary_columns = True + time_groupby_inline = True + + _time_grain_expressions = { + None: "{col}", + "PT1M": "to_start_of_minute(TO_DATETIME({col}))", + "PT5M": "to_start_of_five_minutes(TO_DATETIME({col}))", + "PT10M": "to_start_of_ten_minutes(TO_DATETIME({col}))", + "PT15M": "to_start_of_fifteen_minutes(TO_DATETIME({col}))", + "PT30M": "TO_DATETIME(intDiv(toUInt32(TO_DATETIME({col})), 1800)*1800)", + "PT1H": "to_start_of_hour(TO_DATETIME({col}))", + "P1D": "to_start_of_day(TO_DATETIME({col}))", + "P1W": "to_monday(TO_DATETIME({col}))", + "P1M": "to_start_of_month(TO_DATETIME({col}))", + "P3M": "to_start_of_quarter(TO_DATETIME({col}))", + "P1Y": "to_start_of_year(TO_DATETIME({col}))", + } + + column_type_mappings = ( + ( + re.compile(r".*Varchar.*", re.IGNORECASE), + types.String(), + GenericDataType.STRING, + ), + ( + re.compile(r".*Array.*", re.IGNORECASE), + types.String(), + GenericDataType.STRING, + ), + ( + re.compile(r".*Map.*", re.IGNORECASE), + types.String(), + GenericDataType.STRING, + ), + ( + re.compile(r".*Json.*", re.IGNORECASE), + types.JSON(), + GenericDataType.STRING, + ), + ( + re.compile(r".*Bool.*", re.IGNORECASE), + types.Boolean(), + GenericDataType.BOOLEAN, + ), + ( + re.compile(r".*String.*", re.IGNORECASE), + types.String(), + GenericDataType.STRING, + ), + ( + re.compile(r".*Int\d+.*", re.IGNORECASE), + types.INTEGER(), + GenericDataType.NUMERIC, + ), + ( + re.compile(r".*Float\d+.*", re.IGNORECASE), + types.FLOAT(), + GenericDataType.NUMERIC, + ), + ( + re.compile(r".*Double\d+.*", re.IGNORECASE), + types.FLOAT(), + GenericDataType.NUMERIC, + ), + ( + re.compile(r".*Decimal.*", re.IGNORECASE), + types.DECIMAL(), + GenericDataType.NUMERIC, + ), + ( + re.compile(r".*DateTime.*", re.IGNORECASE), + types.DateTime(), + GenericDataType.TEMPORAL, + ), + ( + re.compile(r".*Date.*", re.IGNORECASE), + types.Date(), + GenericDataType.TEMPORAL, + ), + ) + + @classmethod + def epoch_to_dttm(cls) -> str: + return "{col}" + + @classmethod + def convert_dttm( + cls, target_type: str, dttm: datetime, db_extra: dict[str, Any] | None = None + ) -> str | None: + sqla_type = cls.get_sqla_column_type(target_type) + + if isinstance(sqla_type, types.Date): + return f"to_date('{dttm.date().isoformat()}')" + if isinstance(sqla_type, types.DateTime): + return f"""to_dateTime('{dttm.isoformat(sep=" ", timespec="seconds")}')""" + return None + + +class DatabendEngineSpec(DatabendBaseEngineSpec): + """Engine spec for databend_sqlalchemy connector""" + + engine = "databend" + engine_name = "Databend" + _function_names: list[str] = [] + + _show_functions_column = "name" + supports_file_upload = False + + @classmethod + def get_dbapi_exception_mapping(cls) -> dict[type[Exception], type[Exception]]: + return {NewConnectionError: SupersetDBAPIDatabaseError} + + @classmethod + def get_dbapi_mapped_exception(cls, exception: Exception) -> Exception: + new_exception = cls.get_dbapi_exception_mapping().get(type(exception)) + if new_exception == SupersetDBAPIDatabaseError: + return SupersetDBAPIDatabaseError("Connection failed") + if not new_exception: + return exception + return new_exception(str(exception)) + + @classmethod + def get_function_names(cls, database: Database) -> list[str]: + if cls._function_names: + return cls._function_names + try: + names = database.get_df("SELECT name FROM system.functions;")[ + "name" + ].tolist() + cls._function_names = names + return names + except Exception as ex: # pylint: disable=broad-except + logger.exception("Error retrieving system.functions: %s", str(ex)) + return [] + + +class DatabendParametersSchema(Schema): + username = fields.String(allow_none=True, description=__("Username")) + password = fields.String(allow_none=True, description=__("Password")) + host = fields.String(required=True, description=__("Hostname or IP address")) + port = fields.Integer( + allow_none=True, + description=__("Database port"), + validate=Range(min=0, max=65535), + ) + database = fields.String(allow_none=True, description=__("Database name")) + encryption = fields.Boolean( + default=True, description=__("Use an encrypted connection to the database") + ) + query = fields.Dict( + keys=fields.Str(), values=fields.Raw(), description=__("Additional parameters") + ) + + +class DatabendConnectEngineSpec(DatabendEngineSpec, BasicParametersMixin): + """Engine spec for databend sqlalchemy connector""" + + engine = "databend" + engine_name = "Databend" + + default_driver = "databend" + _function_names: list[str] = [] + + sqlalchemy_uri_placeholder = ( + "databend://user:password@host[:port][/dbname][?secure=value&=value...]" + ) + parameters_schema = DatabendParametersSchema() + encryption_parameters = {"secure": "true"} + + @classmethod + def get_dbapi_exception_mapping(cls) -> dict[type[Exception], type[Exception]]: + return {} + + @classmethod + def get_dbapi_mapped_exception(cls, exception: Exception) -> Exception: + new_exception = cls.get_dbapi_exception_mapping().get(type(exception)) + if new_exception == SupersetDBAPIDatabaseError: + return SupersetDBAPIDatabaseError("Connection failed") + if not new_exception: + return exception + return new_exception(str(exception)) + + @classmethod + def get_function_names(cls, database: Database) -> list[str]: + if cls._function_names: + return cls._function_names + try: + names = database.get_df("SELECT name FROM system.functions;")[ + "name" + ].tolist() + cls._function_names = names + return names + except Exception as ex: # pylint: disable=broad-except + logger.exception("Error retrieving system.functions: %s", str(ex)) + return [] + + @classmethod + def get_datatype(cls, type_code: str) -> str: + return type_code + + @classmethod + def build_sqlalchemy_uri( + cls, parameters: BasicParametersType, *_args: dict[str, str] | None + ) -> str: + url_params = parameters.copy() + if url_params.get("encryption"): + query = parameters.get("query", {}).copy() + query.update(cls.encryption_parameters) + url_params["query"] = query + if not url_params.get("database"): + url_params["database"] = "__default__" + url_params.pop("encryption", None) + return str(URL(f"{cls.engine}", **url_params)) + + @classmethod + def get_parameters_from_uri( + cls, uri: str, *_args: dict[str, Any] | None + ) -> BasicParametersType: + url = make_url_safe(uri) + query = url.query + if "secure" in query: + encryption = url.query.get("secure") == "true" + query.pop("secure") + else: + encryption = False + return BasicParametersType( + username=url.username, + password=url.password, + host=url.host, + port=url.port, + database="" if url.database == "__default__" else cast(str, url.database), + query=query, + encryption=encryption, + ) + + @classmethod + def default_port(cls, interface: str, secure: bool) -> int: + if interface.startswith("http"): + return 443 if secure else 8000 + raise ValueError("Unrecognized Databend interface") + + @classmethod + def validate_parameters( + cls, properties: BasicPropertiesType + ) -> list[SupersetError]: + # The newest versions of superset send a "properties" object with a + # parameters key, instead of just the parameters, so we hack to be compatible + parameters = properties.get("parameters", properties) + host = parameters.get("host", None) + host = str(host) if host is not None else None + if not host: + return [ + SupersetError( + "Hostname is required", + SupersetErrorType.CONNECTION_MISSING_PARAMETERS_ERROR, + ErrorLevel.WARNING, + {"missing": ["host"]}, + ) + ] + if not is_hostname_valid(host): + return [ + SupersetError( + "The hostname provided can't be resolved.", + SupersetErrorType.CONNECTION_INVALID_HOSTNAME_ERROR, + ErrorLevel.ERROR, + {"invalid": ["host"]}, + ) + ] + port = parameters.get("port") + if port is not None: + if isinstance(port, (int, str)): + try: + port = int(port) + if port <= 0 or port >= 65535: + port = -1 + except (ValueError, TypeError): + port = -1 + encryption = parameters.get("encryption", False) + if port is None or port == -1: + encryption = bool(encryption) + port = cls.default_port("http", encryption) + if port <= 0 or port >= 65535: + return [ + SupersetError( + "Port must be a valid integer between 0 and 65535 (inclusive).", + SupersetErrorType.CONNECTION_INVALID_PORT_ERROR, + ErrorLevel.ERROR, + {"invalid": ["port"]}, + ) + ] + if not is_port_open(host, port): + return [ + SupersetError( + "The port is closed.", + SupersetErrorType.CONNECTION_PORT_CLOSED_ERROR, + ErrorLevel.ERROR, + {"invalid": ["port"]}, + ) + ] + return [] + + @staticmethod + def _mutate_label(label: str) -> str: + """ + Suffix with the first six characters from the md5 of the label to avoid + collisions with original column names + + :param label: Expected expression label + :return: Conditionally mutated label + """ + return f"{label}_{md5_sha_from_str(label)[:6]}" diff --git a/tests/unit_tests/db_engine_specs/test_databend.py b/tests/unit_tests/db_engine_specs/test_databend.py new file mode 100644 index 0000000000000..9c494492d9ad1 --- /dev/null +++ b/tests/unit_tests/db_engine_specs/test_databend.py @@ -0,0 +1,130 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from datetime import datetime +from typing import Any, Optional +from unittest.mock import Mock + +import pytest +from sqlalchemy.types import ( + Boolean, + Date, + DateTime, + DECIMAL, + Float, + Integer, + String, + TypeEngine, +) +from urllib3.connection import HTTPConnection + +from superset.utils.core import GenericDataType +from tests.unit_tests.db_engine_specs.utils import ( + assert_column_spec, + assert_convert_dttm, +) +from tests.unit_tests.fixtures.common import dttm + + +@pytest.mark.parametrize( + "target_type,expected_result", + [ + ("Date", "to_date('2019-01-02')"), + ("DateTime", "to_dateTime('2019-01-02 03:04:05')"), + ("UnknownType", None), + ], +) +def test_convert_dttm( + target_type: str, expected_result: Optional[str], dttm: datetime +) -> None: + from superset.db_engine_specs.databend import DatabendEngineSpec as spec + + assert_convert_dttm(spec, target_type, expected_result, dttm) + + +def test_execute_connection_error() -> None: + from urllib3.exceptions import NewConnectionError + + from superset.db_engine_specs.databend import DatabendEngineSpec + from superset.db_engine_specs.exceptions import SupersetDBAPIDatabaseError + + cursor = Mock() + cursor.execute.side_effect = NewConnectionError( + HTTPConnection("Dummypool"), "Exception with sensitive data" + ) + with pytest.raises(SupersetDBAPIDatabaseError) as ex: + DatabendEngineSpec.execute(cursor, "SELECT col1 from table1") + + +@pytest.mark.parametrize( + "native_type,sqla_type,attrs,generic_type,is_dttm", + [ + ("Varchar", String, None, GenericDataType.STRING, False), + ("Nullable(Varchar)", String, None, GenericDataType.STRING, False), + ("Array(UInt8)", String, None, GenericDataType.STRING, False), + ("Int8", Integer, None, GenericDataType.NUMERIC, False), + ("Int16", Integer, None, GenericDataType.NUMERIC, False), + ("Int32", Integer, None, GenericDataType.NUMERIC, False), + ("Int64", Integer, None, GenericDataType.NUMERIC, False), + ("Int128", Integer, None, GenericDataType.NUMERIC, False), + ("Int256", Integer, None, GenericDataType.NUMERIC, False), + ("Nullable(Int64)", Integer, None, GenericDataType.NUMERIC, False), + ("UInt8", Integer, None, GenericDataType.NUMERIC, False), + ("UInt16", Integer, None, GenericDataType.NUMERIC, False), + ("UInt32", Integer, None, GenericDataType.NUMERIC, False), + ("UInt64", Integer, None, GenericDataType.NUMERIC, False), + ("UInt128", Integer, None, GenericDataType.NUMERIC, False), + ("UInt256", Integer, None, GenericDataType.NUMERIC, False), + ("Float", Float, None, GenericDataType.NUMERIC, False), + ("Double", Float, None, GenericDataType.NUMERIC, False), + ("Decimal(1, 2)", DECIMAL, None, GenericDataType.NUMERIC, False), + ("Decimal32(2)", DECIMAL, None, GenericDataType.NUMERIC, False), + ("Decimal64(2)", DECIMAL, None, GenericDataType.NUMERIC, False), + ("Decimal128(2)", DECIMAL, None, GenericDataType.NUMERIC, False), + ("Decimal256(2)", DECIMAL, None, GenericDataType.NUMERIC, False), + ("Bool", Boolean, None, GenericDataType.BOOLEAN, False), + ("Nullable(Bool)", Boolean, None, GenericDataType.BOOLEAN, False), + ("Date", Date, None, GenericDataType.TEMPORAL, True), + ("Nullable(Date)", Date, None, GenericDataType.TEMPORAL, True), + ("Datetime", DateTime, None, GenericDataType.TEMPORAL, True), + ("Nullable(Datetime)", DateTime, None, GenericDataType.TEMPORAL, True), + ], +) +def test_get_column_spec( + native_type: str, + sqla_type: type[TypeEngine], + attrs: Optional[dict[str, Any]], + generic_type: GenericDataType, + is_dttm: bool, +) -> None: + from superset.db_engine_specs.databend import DatabendConnectEngineSpec as spec + + assert_column_spec(spec, native_type, sqla_type, attrs, generic_type, is_dttm) + + +@pytest.mark.parametrize( + "column_name,expected_result", + [ + ("time", "time_07cc69"), + ("count", "count_e2942a"), + ], +) +def test_make_label_compatible(column_name: str, expected_result: str) -> None: + from superset.db_engine_specs.databend import DatabendConnectEngineSpec as spec + + label = spec.make_label_compatible(column_name) + assert label == expected_result