Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: add ibis meta data routers #603

Merged
merged 4 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
113 changes: 113 additions & 0 deletions ibis-server/app/model/metadata/bigquery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
from app.model.data_source import BigQueryConnectionInfo
from app.model.data_source import DataSource
from app.model.metadata.metadata import Metadata
from app.model.metadata.dto import (
Table,
Constraint,
TableProperties,
Column,
ConstraintType,
)
from json import loads


class BigQueryMetadata(Metadata):
def __init__(self, connection_info: BigQueryConnectionInfo):
super().__init__(connection_info)
self.connection = DataSource.bigquery.get_connection(connection_info)

def get_table_list(self) -> list[Table]:
dataset_id = self.connection_info.dataset_id
sql = f"""
SELECT
c.table_catalog,
c.table_schema,
c.table_name,
c.column_name,
c.ordinal_position,
c.is_nullable,
c.data_type,
c.is_generated,
c.generation_expression,
c.is_stored,
c.is_hidden,
c.is_updatable,
c.is_system_defined,
c.is_partitioning_column,
c.clustering_ordinal_position,
c.collation_name,
c.column_default,
c.rounding_mode,
cf.description AS column_description,
table_options.option_value AS table_description
FROM {dataset_id}.INFORMATION_SCHEMA.COLUMNS c
JOIN {dataset_id}.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS cf
ON cf.table_name = c.table_name
AND cf.column_name = c.column_name
LEFT JOIN {dataset_id}.INFORMATION_SCHEMA.TABLE_OPTIONS table_options
ON c.table_name = table_options.table_name
WHERE
cf.column_name = cf.field_path
AND NOT REGEXP_CONTAINS(cf.data_type, r'^(STRUCT|ARRAY<STRUCT)')
"""
response = loads(self.connection.sql(sql).to_pandas().to_json(orient="records"))

unique_tables = {}
for row in response:
# generate unique table name
table_name = row["table_name"]
# init table if not exists
if table_name not in unique_tables:
unique_tables[table_name] = Table(
name=table_name,
description=row["table_description"],
columns=[],
properties=TableProperties(
schema=row["table_schema"],
catalog=row["table_catalog"],
table=row["table_name"],
),
primaryKey="",
)
# table exists, and add column to the table
unique_tables[table_name].columns.append(
Column(
name=row["column_name"],
type=row["data_type"],
notNull=row["is_nullable"].lower() == "no",
description=row["column_description"],
properties={},
)
)
# TODO: BigQuery data type mapping
return list(unique_tables.values())

def get_constraints(self) -> list[Constraint]:
dataset_id = self.connection_info.dataset_id
sql = f"""
SELECT
CONCAT(ccu.table_name, '_', ccu.column_name, '_', kcu.table_name, '_', kcu.column_name) as constraintName,
ccu.table_name as constraintTable, ccu.column_name constraintColumn,
kcu.table_name as constraintedTable, kcu.column_name as constraintedColumn,
FROM {dataset_id}.INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE ccu
JOIN {dataset_id}.INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu
ON ccu.constraint_name = kcu.constraint_name
JOIN {dataset_id}.INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
ON ccu.constraint_name = tc.constraint_name
WHERE tc.constraint_type = 'FOREIGN KEY'
"""
response = loads(self.connection.sql(sql).to_pandas().to_json(orient="records"))

constraints = []
for row in response:
constraints.append(
Constraint(
constraintName=row["constraintName"],
constraintTable=row["constraintTable"],
constraintColumn=row["constraintColumn"],
constraintedTable=row["constraintedTable"],
constraintedColumn=row["constraintedColumn"],
constraintType=ConstraintType.FOREIGN_KEY,
)
)
return constraints
100 changes: 100 additions & 0 deletions ibis-server/app/model/metadata/dto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
from enum import Enum
from app.model.data_source import ConnectionInfo
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any


class MetadataDTO(BaseModel):
connection_info: ConnectionInfo = Field(alias="connectionInfo")


class WrenEngineColumnType(Enum):
# Boolean Types
BOOLEAN = "BOOLEAN"

# Numeric Types
TINYINT = "TINYINT"
INT2 = "INT2"
SMALLINT = "SMALLINT" # alias for INT2
INT4 = "INT4"
INTEGER = "INTEGER" # alias for INT4
INT8 = "INT8"
BIGINT = "BIGINT" # alias for INT8
NUMERIC = "NUMERIC"
DECIMAL = "DECIMAL"

# Floating-Point Types
FLOAT4 = "FLOAT4"
REAL = "REAL" # alias for FLOAT4
FLOAT8 = "FLOAT8"
DOUBLE = "DOUBLE" # alias for FLOAT8

# Character Types
VARCHAR = "VARCHAR"
CHAR = "CHAR"
BPCHAR = "BPCHAR" # BPCHAR is fixed-length blank padded string
TEXT = "TEXT" # alias for VARCHAR
STRING = "STRING" # alias for VARCHAR
NAME = "NAME" # alias for VARCHAR

# Date/Time Types
TIMESTAMP = "TIMESTAMP"
TIMESTAMPTZ = "TIMESTAMP WITH TIME ZONE"
DATE = "DATE"
INTERVAL = "INTERVAL"

# JSON Types
JSON = "JSON"

# Object identifiers (OIDs) are used internally by PostgreSQL as primary keys for various system tables.
# https:#www.postgresql.org/docs/current/datatype-oid.html
OID = "OID"

# Binary Data Types
BYTEA = "BYTEA"

# UUID Type
UUID = "UUID"

# Network Address Types
INET = "INET"

# Unknown Type
UNKNOWN = "UNKNOWN"


class Column(BaseModel):
name: str
type: str
notNull: bool
description: Optional[str] = None
properties: Optional[Dict[str, Any]] = None


class TableProperties(BaseModel):
schema: Optional[str]
catalog: Optional[str]
table: Optional[str] # only table name without schema or catalog


class Table(BaseModel):
name: str # unique table name (might contain schema name or catalog name as well)
columns: List[Column]
description: Optional[str] = None
properties: TableProperties = None
primaryKey: Optional[str] = None


class ConstraintType(Enum):
PRIMARY_KEY = "PRIMARY KEY"
FOREIGN_KEY = "FOREIGN KEY"
UNIQUE = "UNIQUE"


class Constraint(BaseModel):
constraintName: str
constraintType: ConstraintType
constraintTable: str
constraintColumn: str
constraintedTable: str
constraintedColumn: str
34 changes: 34 additions & 0 deletions ibis-server/app/model/metadata/factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from app.model.data_source import DataSource, ConnectionInfo
from json import loads
from app.model.metadata.postgres import PostgresMetadata
from app.model.metadata.bigquery import BigQueryMetadata
from app.model.metadata.metadata import Metadata
from app.model.metadata.dto import (
Table,
Constraint,
)


class MetadataFactory:
def __init__(self, data_source: DataSource, connection_info: ConnectionInfo):
self.metadata = self.get_metadata(data_source, connection_info)

def get_metadata(self, data_source: DataSource, connection_info) -> Metadata:
if data_source == DataSource.postgres:
return PostgresMetadata(connection_info)
if data_source == DataSource.bigquery:
return BigQueryMetadata(connection_info)
raise NotImplementedError(f"Unsupported data source: {self}")

def get_table_list(self) -> list[Table]:
return self.metadata.get_table_list()

def get_constraints(self) -> list[Constraint]:
return self.metadata.get_constraints()


def to_json(df):
json_obj = loads(df.to_json(orient="split"))
del json_obj["index"]
json_obj["dtypes"] = df.dtypes.apply(lambda x: x.name).to_dict()
return json_obj
13 changes: 13 additions & 0 deletions ibis-server/app/model/metadata/metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from app.model.connector import ConnectionInfo
from app.model.metadata.dto import Table, Constraint


class Metadata:
def __init__(self, connection_info: ConnectionInfo):
self.connection_info = connection_info

def get_table_list(self) -> list[Table]:
raise NotImplementedError

def get_constraints(self) -> list[Constraint]:
raise NotImplementedError
Loading
Loading