-
Notifications
You must be signed in to change notification settings - Fork 38
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Feature: add ibis meta data routers (#603)
* add ibis meta data routers * refactor metadata with factory method * add test cases and format code using ruff * rebase main branch and fix the import path
- Loading branch information
1 parent
25d62c5
commit a325b49
Showing
10 changed files
with
493 additions
and
0 deletions.
There are no files selected for viewing
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
from app.model.data_source import BigQueryConnectionInfo | ||
from app.model.data_source import DataSource | ||
from app.model.metadata.metadata import Metadata | ||
from app.model.metadata.dto import ( | ||
Table, | ||
Constraint, | ||
TableProperties, | ||
Column, | ||
ConstraintType, | ||
) | ||
from json import loads | ||
|
||
|
||
class BigQueryMetadata(Metadata): | ||
def __init__(self, connection_info: BigQueryConnectionInfo): | ||
super().__init__(connection_info) | ||
self.connection = DataSource.bigquery.get_connection(connection_info) | ||
|
||
def get_table_list(self) -> list[Table]: | ||
dataset_id = self.connection_info.dataset_id | ||
sql = f""" | ||
SELECT | ||
c.table_catalog, | ||
c.table_schema, | ||
c.table_name, | ||
c.column_name, | ||
c.ordinal_position, | ||
c.is_nullable, | ||
c.data_type, | ||
c.is_generated, | ||
c.generation_expression, | ||
c.is_stored, | ||
c.is_hidden, | ||
c.is_updatable, | ||
c.is_system_defined, | ||
c.is_partitioning_column, | ||
c.clustering_ordinal_position, | ||
c.collation_name, | ||
c.column_default, | ||
c.rounding_mode, | ||
cf.description AS column_description, | ||
table_options.option_value AS table_description | ||
FROM {dataset_id}.INFORMATION_SCHEMA.COLUMNS c | ||
JOIN {dataset_id}.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS cf | ||
ON cf.table_name = c.table_name | ||
AND cf.column_name = c.column_name | ||
LEFT JOIN {dataset_id}.INFORMATION_SCHEMA.TABLE_OPTIONS table_options | ||
ON c.table_name = table_options.table_name | ||
WHERE | ||
cf.column_name = cf.field_path | ||
AND NOT REGEXP_CONTAINS(cf.data_type, r'^(STRUCT|ARRAY<STRUCT)') | ||
""" | ||
response = loads(self.connection.sql(sql).to_pandas().to_json(orient="records")) | ||
|
||
unique_tables = {} | ||
for row in response: | ||
# generate unique table name | ||
table_name = row["table_name"] | ||
# init table if not exists | ||
if table_name not in unique_tables: | ||
unique_tables[table_name] = Table( | ||
name=table_name, | ||
description=row["table_description"], | ||
columns=[], | ||
properties=TableProperties( | ||
schema=row["table_schema"], | ||
catalog=row["table_catalog"], | ||
table=row["table_name"], | ||
), | ||
primaryKey="", | ||
) | ||
# table exists, and add column to the table | ||
unique_tables[table_name].columns.append( | ||
Column( | ||
name=row["column_name"], | ||
type=row["data_type"], | ||
notNull=row["is_nullable"].lower() == "no", | ||
description=row["column_description"], | ||
properties={}, | ||
) | ||
) | ||
# TODO: BigQuery data type mapping | ||
return list(unique_tables.values()) | ||
|
||
def get_constraints(self) -> list[Constraint]: | ||
dataset_id = self.connection_info.dataset_id | ||
sql = f""" | ||
SELECT | ||
CONCAT(ccu.table_name, '_', ccu.column_name, '_', kcu.table_name, '_', kcu.column_name) as constraintName, | ||
ccu.table_name as constraintTable, ccu.column_name constraintColumn, | ||
kcu.table_name as constraintedTable, kcu.column_name as constraintedColumn, | ||
FROM {dataset_id}.INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE ccu | ||
JOIN {dataset_id}.INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu | ||
ON ccu.constraint_name = kcu.constraint_name | ||
JOIN {dataset_id}.INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc | ||
ON ccu.constraint_name = tc.constraint_name | ||
WHERE tc.constraint_type = 'FOREIGN KEY' | ||
""" | ||
response = loads(self.connection.sql(sql).to_pandas().to_json(orient="records")) | ||
|
||
constraints = [] | ||
for row in response: | ||
constraints.append( | ||
Constraint( | ||
constraintName=row["constraintName"], | ||
constraintTable=row["constraintTable"], | ||
constraintColumn=row["constraintColumn"], | ||
constraintedTable=row["constraintedTable"], | ||
constraintedColumn=row["constraintedColumn"], | ||
constraintType=ConstraintType.FOREIGN_KEY, | ||
) | ||
) | ||
return constraints |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
from enum import Enum | ||
from app.model.data_source import ConnectionInfo | ||
from pydantic import BaseModel, Field | ||
from typing import List, Optional, Dict, Any | ||
|
||
|
||
class MetadataDTO(BaseModel): | ||
connection_info: ConnectionInfo = Field(alias="connectionInfo") | ||
|
||
|
||
class WrenEngineColumnType(Enum): | ||
# Boolean Types | ||
BOOLEAN = "BOOLEAN" | ||
|
||
# Numeric Types | ||
TINYINT = "TINYINT" | ||
INT2 = "INT2" | ||
SMALLINT = "SMALLINT" # alias for INT2 | ||
INT4 = "INT4" | ||
INTEGER = "INTEGER" # alias for INT4 | ||
INT8 = "INT8" | ||
BIGINT = "BIGINT" # alias for INT8 | ||
NUMERIC = "NUMERIC" | ||
DECIMAL = "DECIMAL" | ||
|
||
# Floating-Point Types | ||
FLOAT4 = "FLOAT4" | ||
REAL = "REAL" # alias for FLOAT4 | ||
FLOAT8 = "FLOAT8" | ||
DOUBLE = "DOUBLE" # alias for FLOAT8 | ||
|
||
# Character Types | ||
VARCHAR = "VARCHAR" | ||
CHAR = "CHAR" | ||
BPCHAR = "BPCHAR" # BPCHAR is fixed-length blank padded string | ||
TEXT = "TEXT" # alias for VARCHAR | ||
STRING = "STRING" # alias for VARCHAR | ||
NAME = "NAME" # alias for VARCHAR | ||
|
||
# Date/Time Types | ||
TIMESTAMP = "TIMESTAMP" | ||
TIMESTAMPTZ = "TIMESTAMP WITH TIME ZONE" | ||
DATE = "DATE" | ||
INTERVAL = "INTERVAL" | ||
|
||
# JSON Types | ||
JSON = "JSON" | ||
|
||
# Object identifiers (OIDs) are used internally by PostgreSQL as primary keys for various system tables. | ||
# https:#www.postgresql.org/docs/current/datatype-oid.html | ||
OID = "OID" | ||
|
||
# Binary Data Types | ||
BYTEA = "BYTEA" | ||
|
||
# UUID Type | ||
UUID = "UUID" | ||
|
||
# Network Address Types | ||
INET = "INET" | ||
|
||
# Unknown Type | ||
UNKNOWN = "UNKNOWN" | ||
|
||
|
||
class Column(BaseModel): | ||
name: str | ||
type: str | ||
notNull: bool | ||
description: Optional[str] = None | ||
properties: Optional[Dict[str, Any]] = None | ||
|
||
|
||
class TableProperties(BaseModel): | ||
schema: Optional[str] | ||
catalog: Optional[str] | ||
table: Optional[str] # only table name without schema or catalog | ||
|
||
|
||
class Table(BaseModel): | ||
name: str # unique table name (might contain schema name or catalog name as well) | ||
columns: List[Column] | ||
description: Optional[str] = None | ||
properties: TableProperties = None | ||
primaryKey: Optional[str] = None | ||
|
||
|
||
class ConstraintType(Enum): | ||
PRIMARY_KEY = "PRIMARY KEY" | ||
FOREIGN_KEY = "FOREIGN KEY" | ||
UNIQUE = "UNIQUE" | ||
|
||
|
||
class Constraint(BaseModel): | ||
constraintName: str | ||
constraintType: ConstraintType | ||
constraintTable: str | ||
constraintColumn: str | ||
constraintedTable: str | ||
constraintedColumn: str |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
from app.model.data_source import DataSource, ConnectionInfo | ||
from json import loads | ||
from app.model.metadata.postgres import PostgresMetadata | ||
from app.model.metadata.bigquery import BigQueryMetadata | ||
from app.model.metadata.metadata import Metadata | ||
from app.model.metadata.dto import ( | ||
Table, | ||
Constraint, | ||
) | ||
|
||
|
||
class MetadataFactory: | ||
def __init__(self, data_source: DataSource, connection_info: ConnectionInfo): | ||
self.metadata = self.get_metadata(data_source, connection_info) | ||
|
||
def get_metadata(self, data_source: DataSource, connection_info) -> Metadata: | ||
if data_source == DataSource.postgres: | ||
return PostgresMetadata(connection_info) | ||
if data_source == DataSource.bigquery: | ||
return BigQueryMetadata(connection_info) | ||
raise NotImplementedError(f"Unsupported data source: {self}") | ||
|
||
def get_table_list(self) -> list[Table]: | ||
return self.metadata.get_table_list() | ||
|
||
def get_constraints(self) -> list[Constraint]: | ||
return self.metadata.get_constraints() | ||
|
||
|
||
def to_json(df): | ||
json_obj = loads(df.to_json(orient="split")) | ||
del json_obj["index"] | ||
json_obj["dtypes"] = df.dtypes.apply(lambda x: x.name).to_dict() | ||
return json_obj |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
from app.model.connector import ConnectionInfo | ||
from app.model.metadata.dto import Table, Constraint | ||
|
||
|
||
class Metadata: | ||
def __init__(self, connection_info: ConnectionInfo): | ||
self.connection_info = connection_info | ||
|
||
def get_table_list(self) -> list[Table]: | ||
raise NotImplementedError | ||
|
||
def get_constraints(self) -> list[Constraint]: | ||
raise NotImplementedError |
Oops, something went wrong.