diff --git a/ibis-server/app/model/metadata.py b/ibis-server/app/model/metadata.py deleted file mode 100644 index 9a4c57355..000000000 --- a/ibis-server/app/model/metadata.py +++ /dev/null @@ -1,357 +0,0 @@ -from app.model.data_source import DataSource -from app.model.dto import PostgresConnectionUrl, PostgresConnectionInfo -from enum import StrEnum, auto -from json import loads -from app.model.metadata_dto import ( - CompactTable, - CompactColumn, - Constraint, - WrenEngineColumnType, - ConstraintType, -) - - -class Metadata(StrEnum): - postgres = auto() - bigquery = auto() - - def get_table_list(self, connection_info): - if self == Metadata.postgres: - return self.get_postgres_table_list_sql(connection_info) - if self == Metadata.bigquery: - return self.get_bigquery_table_list_sql(connection_info) - raise NotImplementedError(f"Unsupported data source: {self}") - - def get_constraints(self, connection_info): - if self == Metadata.postgres: - return self.get_postgres_table_constraints(connection_info) - if self == Metadata.bigquery: - return self.get_bigquery_table_constraints(connection_info) - raise NotImplementedError(f"Unsupported data source: {self}") - - @staticmethod - def get_postgres_table_list_sql( - connection_info: PostgresConnectionUrl | PostgresConnectionInfo, - ): - sql = """ - SELECT - t.table_catalog, - t.table_schema, - t.table_name, - c.column_name, - c.data_type, - c.is_nullable, - c.ordinal_position - FROM - information_schema.tables t - JOIN - information_schema.columns c ON t.table_schema = c.table_schema AND t.table_name = c.table_name - WHERE - t.table_type in ('BASE TABLE', 'VIEW') - and t.table_schema not in ('information_schema', 'pg_catalog') - """ - res = to_json( - DataSource.postgres.get_connection(connection_info) - .sql(sql, dialect="trino") - .to_pandas() - ) - - # transform the result to a list of dictionaries - response = [ - ( - lambda x: { - "table_catalog": x[0], - "table_schema": x[1], - "table_name": x[2], - "column_name": x[3], - "data_type": transform_postgres_column_type(x[4]), - "is_nullable": x[5], - "ordinal_position": x[6], - } - )(row) - for row in res["data"] - ] - - unique_tables = {} - for row in response: - # generate unique table name - schema_table = f"{row['table_schema']}.{row['table_name']}" - # init table if not exists - if schema_table not in unique_tables: - table = { - "name": schema_table, - "description": "", - "columns": [], - "properties": { - "schema": row["table_schema"], - "catalog": row["table_catalog"], - "table": row["table_name"], - }, - "primaryKey": "", - } - unique_tables[schema_table] = CompactTable(**table) - # table exists, and add column to the table - column = { - "name": row["column_name"], - "type": row["data_type"], - "notNull": row["is_nullable"].lower() == "no", - "description": "", - "properties": {}, - } - column = CompactColumn(**column) - unique_tables[schema_table].columns.append(column) - - compact_tables: list[CompactTable] = list(unique_tables.values()) - return compact_tables - - @staticmethod - def get_bigquery_table_list_sql(connection_info): - dataset_id = connection_info.dataset_id - sql = f""" - SELECT - c.table_catalog, - c.table_schema, - c.table_name, - c.column_name, - c.ordinal_position, - c.is_nullable, - c.data_type, - c.is_generated, - c.generation_expression, - c.is_stored, - c.is_hidden, - c.is_updatable, - c.is_system_defined, - c.is_partitioning_column, - c.clustering_ordinal_position, - c.collation_name, - c.column_default, - c.rounding_mode, - cf.description AS column_description, - table_options.option_value AS table_description - FROM {dataset_id}.INFORMATION_SCHEMA.COLUMNS c - JOIN {dataset_id}.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS cf - ON cf.table_name = c.table_name - AND cf.column_name = c.column_name - LEFT JOIN {dataset_id}.INFORMATION_SCHEMA.TABLE_OPTIONS table_options - ON c.table_name = table_options.table_name - WHERE - cf.column_name = cf.field_path - AND NOT REGEXP_CONTAINS(cf.data_type, r'^(STRUCT|ARRAY list[Table]: + dataset_id = self.connection_info.dataset_id + sql = f""" + SELECT + c.table_catalog, + c.table_schema, + c.table_name, + c.column_name, + c.ordinal_position, + c.is_nullable, + c.data_type, + c.is_generated, + c.generation_expression, + c.is_stored, + c.is_hidden, + c.is_updatable, + c.is_system_defined, + c.is_partitioning_column, + c.clustering_ordinal_position, + c.collation_name, + c.column_default, + c.rounding_mode, + cf.description AS column_description, + table_options.option_value AS table_description + FROM {dataset_id}.INFORMATION_SCHEMA.COLUMNS c + JOIN {dataset_id}.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS cf + ON cf.table_name = c.table_name + AND cf.column_name = c.column_name + LEFT JOIN {dataset_id}.INFORMATION_SCHEMA.TABLE_OPTIONS table_options + ON c.table_name = table_options.table_name + WHERE + cf.column_name = cf.field_path + AND NOT REGEXP_CONTAINS(cf.data_type, r'^(STRUCT|ARRAY list[Constraint]: + dataset_id = self.connection_info.dataset_id + sql = f""" + SELECT + CONCAT(ccu.table_name, '_', ccu.column_name, '_', kcu.table_name, '_', kcu.column_name) as constraintName, + ccu.table_name as constraintTable, ccu.column_name constraintColumn, + kcu.table_name as constraintedTable, kcu.column_name as constraintedColumn, + FROM {dataset_id}.INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE ccu + JOIN {dataset_id}.INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu + ON ccu.constraint_name = kcu.constraint_name + JOIN {dataset_id}.INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc + ON ccu.constraint_name = tc.constraint_name + WHERE tc.constraint_type = 'FOREIGN KEY' + """ + response = loads(self.connection.sql(sql).to_pandas().to_json(orient="records")) + + constraints = [] + for row in response: + constraints.append( + Constraint( + constraintName=row["constraintName"], + constraintTable=row["constraintTable"], + constraintColumn=row["constraintColumn"], + constraintedTable=row["constraintedTable"], + constraintedColumn=row["constraintedColumn"], + constraintType=ConstraintType.FOREIGN_KEY, + ) + ) + return constraints + diff --git a/ibis-server/app/model/metadata_dto.py b/ibis-server/app/model/metadata/dto.py similarity index 78% rename from ibis-server/app/model/metadata_dto.py rename to ibis-server/app/model/metadata/dto.py index efbe15517..0d74536ad 100644 --- a/ibis-server/app/model/metadata_dto.py +++ b/ibis-server/app/model/metadata/dto.py @@ -1,45 +1,31 @@ from enum import Enum -from app.model.data_source import ( - PostgresConnectionUrl, - PostgresConnectionInfo, - BigQueryConnectionInfo, - SnowflakeConnectionInfo, -) +from app.model.data_source import ConnectionInfo from pydantic import BaseModel, Field -from typing import List, Optional, Dict, Any, Union +from typing import List, Optional, Dict, Any class MetadataDTO(BaseModel): - connection_info: Union[ - PostgresConnectionUrl | PostgresConnectionInfo, - BigQueryConnectionInfo, - SnowflakeConnectionInfo, - ] = Field(alias="connectionInfo") + connection_info: ConnectionInfo = Field(alias="connectionInfo") class WrenEngineColumnType(Enum): # Boolean Types BOOLEAN = "BOOLEAN" - + # Numeric Types TINYINT = "TINYINT" - INT2 = "INT2" SMALLINT = "SMALLINT" # alias for INT2 - INT4 = "INT4" INTEGER = "INTEGER" # alias for INT4 - INT8 = "INT8" BIGINT = "BIGINT" # alias for INT8 - NUMERIC = "NUMERIC" DECIMAL = "DECIMAL" # Floating-Point Types FLOAT4 = "FLOAT4" REAL = "REAL" # alias for FLOAT4 - FLOAT8 = "FLOAT8" DOUBLE = "DOUBLE" # alias for FLOAT8 @@ -77,7 +63,7 @@ class WrenEngineColumnType(Enum): UNKNOWN = "UNKNOWN" -class CompactColumn(BaseModel): +class Column(BaseModel): name: str type: str notNull: bool @@ -85,17 +71,17 @@ class CompactColumn(BaseModel): properties: Optional[Dict[str, Any]] = None -class CompactTableProperties(BaseModel): +class TableProperties(BaseModel): schema: Optional[str] catalog: Optional[str] table: Optional[str] # only table name without schema or catalog -class CompactTable(BaseModel): +class Table(BaseModel): name: str # unique table name (might contain schema name or catalog name as well) - columns: List[CompactColumn] + columns: List[Column] description: Optional[str] = None - properties: CompactTableProperties = None + properties: TableProperties = None primaryKey: Optional[str] = None diff --git a/ibis-server/app/model/metadata/factory.py b/ibis-server/app/model/metadata/factory.py new file mode 100644 index 000000000..2e0adee9d --- /dev/null +++ b/ibis-server/app/model/metadata/factory.py @@ -0,0 +1,34 @@ +from app.model.data_source import DataSource, ConnectionInfo +from json import loads +from app.model.metadata.postgres import PostgresMetadata +from app.model.metadata.bigquery import BigQueryMetadata +from app.model.metadata.metadata import Metadata +from app.model.metadata.dto import ( + Table, + Constraint, +) + + +class MetadataFactory: + def __init__(self, data_source: DataSource, connection_info: ConnectionInfo): + self.metadata = self.get_metadata(data_source, connection_info) + + def get_metadata(self, data_source: DataSource, connection_info) -> Metadata: + if data_source == DataSource.postgres: + return PostgresMetadata(connection_info) + if data_source == DataSource.bigquery: + return BigQueryMetadata(connection_info) + raise NotImplementedError(f"Unsupported data source: {self}") + + def get_table_list(self) -> list[Table]: + return self.metadata.get_table_list() + + def get_constraints(self) -> list[Constraint]: + return self.metadata.get_constraints() + + +def to_json(df): + json_obj = loads(df.to_json(orient="split")) + del json_obj["index"] + json_obj["dtypes"] = df.dtypes.apply(lambda x: x.name).to_dict() + return json_obj diff --git a/ibis-server/app/model/metadata/metadata.py b/ibis-server/app/model/metadata/metadata.py new file mode 100644 index 000000000..d6f2a6cfe --- /dev/null +++ b/ibis-server/app/model/metadata/metadata.py @@ -0,0 +1,13 @@ +from app.model.connector import ConnectionInfo +from app.model.metadata.dto import Table, Constraint + + +class Metadata(): + def __init__(self, connection_info: ConnectionInfo): + self.connection_info = connection_info + + def get_table_list(self) -> list[Table]: + raise NotImplementedError + + def get_constraints(self) -> list[Constraint]: + raise NotImplementedError \ No newline at end of file diff --git a/ibis-server/app/model/metadata/postgres.py b/ibis-server/app/model/metadata/postgres.py new file mode 100644 index 000000000..4e0b372c0 --- /dev/null +++ b/ibis-server/app/model/metadata/postgres.py @@ -0,0 +1,169 @@ +from app.model.data_source import DataSource, PostgresConnectionInfo +from app.model.metadata.dto import ( + Table, + Constraint, + TableProperties, + Column, + ConstraintType, + WrenEngineColumnType, +) +from app.model.metadata.metadata import Metadata +from json import loads + + +class PostgresMetadata(Metadata): + def __init__(self, connection_info: PostgresConnectionInfo): + super().__init__(connection_info) + + def get_table_list(self) -> list[Table]: + sql = """ + SELECT + t.table_catalog, + t.table_schema, + t.table_name, + c.column_name, + c.data_type, + c.is_nullable, + c.ordinal_position + FROM + information_schema.tables t + JOIN + information_schema.columns c ON t.table_schema = c.table_schema AND t.table_name = c.table_name + WHERE + t.table_type in ('BASE TABLE', 'VIEW') + and t.table_schema not in ('information_schema', 'pg_catalog') + """ + response = loads( + DataSource.postgres.get_connection(self.connection_info) + .sql(sql) + .to_pandas() + .to_json(orient="records") + ) + + unique_tables = {} + for row in response: + # generate unique table name + schema_table = self._format_postgres_compact_table_name( + row["table_schema"], row["table_name"] + ) + # init table if not exists + if schema_table not in unique_tables: + unique_tables[schema_table] = Table( + name=schema_table, + description="", + columns=[], + properties=TableProperties( + schema=row["table_schema"], + catalog=row["table_catalog"], + table=row["table_name"], + ), + primaryKey="", + ) + + # table exists, and add column to the table + unique_tables[schema_table].columns.append( + Column( + name=row["column_name"], + type=self._transform_postgres_column_type(row["data_type"]), + notNull=row["is_nullable"].lower() == "no", + description="", + properties=None, + ) + ) + return list(unique_tables.values()) + + def get_constraints(self) -> list[Constraint]: + sql = """ + SELECT + tc.table_schema, + tc.table_name, + kcu.column_name, + ccu.table_schema AS foreign_table_schema, + ccu.table_name AS foreign_table_name, + ccu.column_name AS foreign_column_name + FROM information_schema.table_constraints AS tc + JOIN information_schema.key_column_usage AS kcu + ON tc.constraint_name = kcu.constraint_name + AND tc.table_schema = kcu.table_schema + JOIN information_schema.constraint_column_usage AS ccu + ON ccu.constraint_name = tc.constraint_name + WHERE tc.constraint_type = 'FOREIGN KEY' + """ + res = loads( + DataSource.postgres.get_connection(self.connection_info) + .sql(sql, dialect="trino") + .to_pandas() + .to_json(orient="records") + ) + constraints = [] + for row in res: + constraints.append( + Constraint( + constraintName=self._format_constraint_name( + row["table_name"], + row["column_name"], + row["foreign_table_name"], + row["foreign_column_name"], + ), + constraintTable=self._format_postgres_compact_table_name( + row["table_schema"], row["table_name"] + ), + constraintColumn=row["column_name"], + constraintedTable=self._format_postgres_compact_table_name( + row["foreign_table_schema"], row["foreign_table_name"] + ), + constraintedColumn=row["foreign_column_name"], + constraintType=ConstraintType.FOREIGN_KEY, + ) + ) + return constraints + + def _format_postgres_compact_table_name(self, schema: str, table: str): + return f"{schema}.{table}" + + def _format_constraint_name( + self, table_name, column_name, foreign_table_name, foreign_column_name + ): + return f"{table_name}_{column_name}_{foreign_table_name}_{foreign_column_name}" + + def _transform_postgres_column_type(self, data_type): + # lower case the data_type + data_type = data_type.lower() + + # all possible types listed here: https://www.postgresql.org/docs/current/datatype.html#DATATYPE-TABLE + + switcher = { + "text": WrenEngineColumnType.TEXT, + "char": WrenEngineColumnType.CHAR, + "character": WrenEngineColumnType.CHAR, + "bpchar": WrenEngineColumnType.CHAR, + "name": WrenEngineColumnType.CHAR, + "character varying": WrenEngineColumnType.VARCHAR, + "bigint": WrenEngineColumnType.BIGINT, + "int": WrenEngineColumnType.INTEGER, + "integer": WrenEngineColumnType.INTEGER, + "smallint": WrenEngineColumnType.SMALLINT, + "real": WrenEngineColumnType.REAL, + "double precision": WrenEngineColumnType.DOUBLE, + "numeric": WrenEngineColumnType.DECIMAL, + "decimal": WrenEngineColumnType.DECIMAL, + "boolean": WrenEngineColumnType.BOOLEAN, + "timestamp": WrenEngineColumnType.TIMESTAMP, + "timestamp without time zone": WrenEngineColumnType.TIMESTAMP, + "timestamp with time zone": WrenEngineColumnType.TIMESTAMPTZ, + "date": WrenEngineColumnType.DATE, + "interval": WrenEngineColumnType.INTERVAL, + "json": WrenEngineColumnType.JSON, + "bytea": WrenEngineColumnType.BYTEA, + "uuid": WrenEngineColumnType.UUID, + "inet": WrenEngineColumnType.INET, + "oid": WrenEngineColumnType.OID, + } + + return switcher.get(data_type, WrenEngineColumnType.UNKNOWN) + + +def to_json(df): + json_obj = loads(df.to_json(orient="split")) + del json_obj["index"] + return json_obj diff --git a/ibis-server/app/routers/ibis/bigquery.py b/ibis-server/app/routers/ibis/bigquery.py index 58e4e6980..cc66b79a5 100644 --- a/ibis-server/app/routers/ibis/bigquery.py +++ b/ibis-server/app/routers/ibis/bigquery.py @@ -7,9 +7,8 @@ from app.model.connector import Connector, QueryBigQueryDTO, to_json from app.model.data_source import DataSource from app.model.validator import ValidateDTO, Validator -from app.model.dto import BigQueryDTO -from app.model.metadata_dto import MetadataDTO -from app.model.metadata import Metadata +from app.model.metadata.dto import MetadataDTO, Table, Constraint +from app.model.metadata.factory import MetadataFactory router = APIRouter(prefix="/bigquery", tags=["bigquery"]) @@ -36,15 +35,15 @@ def validate(rule_name: str, dto: ValidateDTO) -> Response: return Response(status_code=204) -@router.post("/metadata/tables") +@router.post("/metadata/tables", response_model=list[Table]) @log_dto -def get_bigquery_table_list(dto: MetadataDTO) -> dict: - table_list = Metadata.bigquery.get_table_list(dto.connection_info) - return {"tables": table_list} +def get_bigquery_table_list(dto: MetadataDTO) -> list[Table]: + metadata = MetadataFactory(DataSource.bigquery, dto.connection_info) + return metadata.get_table_list() -@router.post("/metadata/constraints") +@router.post("/metadata/constraints", response_model=list[Constraint]) @log_dto -def get_bigquery_constraints(dto: MetadataDTO) -> dict: - table_list = Metadata.bigquery.get_constraints(dto.connection_info) - return {"constraints": table_list} +def get_bigquery_constraints(dto: MetadataDTO) -> list[Constraint]: + metadata = MetadataFactory(DataSource.bigquery, dto.connection_info) + return metadata.get_constraints() diff --git a/ibis-server/app/routers/ibis/postgres.py b/ibis-server/app/routers/ibis/postgres.py index 35fd33d5b..8abd0b617 100644 --- a/ibis-server/app/routers/ibis/postgres.py +++ b/ibis-server/app/routers/ibis/postgres.py @@ -7,8 +7,8 @@ from app.model.connector import Connector, QueryPostgresDTO, to_json from app.model.data_source import DataSource from app.model.validator import ValidateDTO, Validator -from app.model.metadata_dto import MetadataDTO -from app.model.metadata import Metadata +from app.model.metadata.dto import MetadataDTO, Table, Constraint +from app.model.metadata.factory import MetadataFactory router = APIRouter(prefix="/postgres", tags=["postgres"]) @@ -35,15 +35,16 @@ def validate(rule_name: str, dto: ValidateDTO) -> Response: return Response(status_code=204) -@router.post("/metadata/tables") +@router.post("/metadata/tables", response_model=list[Table]) @log_dto -def get_postgres_table_list(dto: MetadataDTO) -> dict: - table_list = Metadata.postgres.get_table_list(dto.connection_info) - return {"tables": table_list} +def get_postgres_table_list(dto: MetadataDTO) -> list[Table]: + metadata = MetadataFactory(DataSource.postgres, dto.connection_info) + return metadata.get_table_list() + - -@router.post("/metadata/constraints") +@router.post("/metadata/constraints", response_model=list[Constraint]) @log_dto -def get_postgres_constraints(dto: MetadataDTO) -> dict: - table_list = Metadata.postgres.get_constraints(dto.connection_info) - return {"constraints": table_list} +def get_postgres_constraints(dto: MetadataDTO) -> list[Constraint]: + metadata = MetadataFactory(DataSource.postgres, dto.connection_info) + return metadata.get_constraints() +