Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ibis): Add Oracle connector #1067

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
13 changes: 13 additions & 0 deletions ibis-server/app/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ class QueryMySqlDTO(QueryDTO):
connection_info: ConnectionUrl | MySqlConnectionInfo = connection_info_field


class QueryOracleDTO(QueryDTO):
connection_info: ConnectionUrl | OracleConnectionInfo = connection_info_field


class QueryPostgresDTO(QueryDTO):
connection_info: ConnectionUrl | PostgresConnectionInfo = connection_info_field

Expand Down Expand Up @@ -135,6 +139,14 @@ class PostgresConnectionInfo(BaseModel):
password: SecretStr | None = None


class OracleConnectionInfo(BaseModel):
host: SecretStr = Field(examples=["localhost"])
port: SecretStr = Field(examples=[1521])
database: SecretStr
user: SecretStr
password: SecretStr | None = None


class SnowflakeConnectionInfo(BaseModel):
user: SecretStr
password: SecretStr
Expand Down Expand Up @@ -205,6 +217,7 @@ class GcsFileConnectionInfo(BaseModel):
| ConnectionUrl
| MSSqlConnectionInfo
| MySqlConnectionInfo
| OracleConnectionInfo
| PostgresConnectionInfo
| SnowflakeConnectionInfo
| TrinoConnectionInfo
Expand Down
14 changes: 14 additions & 0 deletions ibis-server/app/model/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
ConnectionInfo,
MSSqlConnectionInfo,
MySqlConnectionInfo,
OracleConnectionInfo,
PostgresConnectionInfo,
QueryBigQueryDTO,
QueryCannerDTO,
Expand All @@ -27,6 +28,7 @@
QueryMinioFileDTO,
QueryMSSqlDTO,
QueryMySqlDTO,
QueryOracleDTO,
QueryPostgresDTO,
QueryS3FileDTO,
QuerySnowflakeDTO,
Expand All @@ -43,6 +45,7 @@ class DataSource(StrEnum):
clickhouse = auto()
mssql = auto()
mysql = auto()
oracle = auto()
postgres = auto()
snowflake = auto()
trino = auto()
Expand Down Expand Up @@ -70,6 +73,7 @@ class DataSourceExtension(Enum):
clickhouse = QueryClickHouseDTO
mssql = QueryMSSqlDTO
mysql = QueryMySqlDTO
oracle = QueryOracleDTO
postgres = QueryPostgresDTO
snowflake = QuerySnowflakeDTO
trino = QueryTrinoDTO
Expand Down Expand Up @@ -171,6 +175,16 @@ def get_postgres_connection(info: PostgresConnectionInfo) -> BaseBackend:
password=(info.password and info.password.get_secret_value()),
)

@staticmethod
def get_oracle_connection(info: OracleConnectionInfo) -> BaseBackend:
return ibis.oracle.connect(
host=info.host.get_secret_value(),
port=int(info.port.get_secret_value()),
database=info.database.get_secret_value(),
user=info.user.get_secret_value(),
password=(info.password and info.password.get_secret_value()),
)

@staticmethod
def get_snowflake_connection(info: SnowflakeConnectionInfo) -> BaseBackend:
return ibis.snowflake.connect(
Expand Down
2 changes: 2 additions & 0 deletions ibis-server/app/model/metadata/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
MinioFileMetadata,
S3FileMetadata,
)
from app.model.metadata.oracle import OracleMetadata
from app.model.metadata.postgres import PostgresMetadata
from app.model.metadata.snowflake import SnowflakeMetadata
from app.model.metadata.trino import TrinoMetadata
Expand All @@ -21,6 +22,7 @@
DataSource.clickhouse: ClickHouseMetadata,
DataSource.mssql: MSSQLMetadata,
DataSource.mysql: MySQLMetadata,
DataSource.oracle: OracleMetadata,
DataSource.postgres: PostgresMetadata,
DataSource.trino: TrinoMetadata,
DataSource.snowflake: SnowflakeMetadata,
Expand Down
222 changes: 222 additions & 0 deletions ibis-server/app/model/metadata/oracle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
import ibis

from app.model import OracleConnectionInfo
from app.model.data_source import DataSource
from app.model.metadata.dto import (
Column,
Constraint,
ConstraintType,
RustWrenEngineColumnType,
Table,
TableProperties,
)
from app.model.metadata.metadata import Metadata


class OracleMetadata(Metadata):
def __init__(self, connection_info: OracleConnectionInfo):
super().__init__(connection_info)
self.connection = DataSource.oracle.get_connection(connection_info)

def get_table_list(self) -> list[Table]:
sql = """
SELECT
t.owner AS TABLE_CATALOG,
t.owner AS TABLE_SCHEMA,
t.table_name AS TABLE_NAME,
c.column_name AS COLUMN_NAME,
c.data_type AS DATA_TYPE,
c.nullable AS IS_NULLABLE,
c.column_id AS ORDINAL_POSITION,
tc.comments AS TABLE_COMMENT,
cc.comments AS COLUMN_COMMENT
FROM
all_tables t
JOIN
all_tab_columns c
ON t.owner = c.owner
AND t.table_name = c.table_name
LEFT JOIN
all_tab_comments tc
ON tc.owner = t.owner
AND tc.table_name = t.table_name
LEFT JOIN
all_col_comments cc
ON cc.owner = c.owner
AND cc.table_name = c.table_name
AND cc.column_name = c.column_name
WHERE
t.owner = 'SYSTEM'
ORDER BY
t.table_name, c.column_id;
"""
# Provide the pre-build schema explicitly with uppercase column names
# To avoid potential ibis get schema error:
# Solve oracledb DatabaseError: ORA-00942: table or view not found
schema = ibis.schema(
{
"TABLE_CATALOG": "string",
"TABLE_SCHEMA": "string",
"TABLE_NAME": "string",
"COLUMN_NAME": "string",
"DATA_TYPE": "string",
"IS_NULLABLE": "string",
"ORDINAL_POSITION": "int64",
"TABLE_COMMENT": "string",
"COLUMN_COMMENT": "string",
}
)
response = (
self.connection.sql(sql, schema=schema)
.to_pandas()
.to_dict(orient="records")
)

unique_tables = {}
for row in response:
# Use uppercase keys that match the provided schema.
schema_table = self._format_compact_table_name(
row["TABLE_SCHEMA"], row["TABLE_NAME"]
)
if schema_table not in unique_tables:
unique_tables[schema_table] = Table(
name=schema_table,
description=row["TABLE_COMMENT"],
columns=[],
properties=TableProperties(
schema=row["TABLE_SCHEMA"],
catalog="", # Oracle doesn't use catalogs.
table=row["TABLE_NAME"],
),
primaryKey="",
)

unique_tables[schema_table].columns.append(
Column(
name=row["COLUMN_NAME"],
type=self._transform_column_type(row["DATA_TYPE"]),
notNull=row["IS_NULLABLE"] == "N",
description=row["COLUMN_COMMENT"],
properties=None,
)
)

return list(unique_tables.values())

def get_constraints(self) -> list[Constraint]:
schema = ibis.schema(
{
"TABLE_SCHEMA": "string",
"TABLE_NAME": "string",
"COLUMN_NAME": "string",
"REFERENCED_TABLE_SCHEMA": "string",
"REFERENCED_TABLE_NAME": "string",
"REFERENCED_COLUMN_NAME": "string",
}
)

sql = """
SELECT
a.owner AS TABLE_SCHEMA,
a.table_name AS TABLE_NAME,
a.column_name AS COLUMN_NAME,
a_pk.owner AS REFERENCED_TABLE_SCHEMA,
a_pk.table_name AS REFERENCED_TABLE_NAME,
a_pk.column_name AS REFERENCED_COLUMN_NAME
FROM
dba_cons_columns a
JOIN
dba_constraints c
ON a.owner = c.owner
AND a.constraint_name = c.constraint_name
JOIN
dba_constraints c_pk
ON c.r_owner = c_pk.owner
AND c.r_constraint_name = c_pk.constraint_name
JOIN
dba_cons_columns a_pk
ON c_pk.owner = a_pk.owner
AND c_pk.constraint_name = a_pk.constraint_name
WHERE
c.constraint_type = 'R'
ORDER BY
a.owner,
a.table_name,
a.column_name
"""
res = (
self.connection.sql(sql, schema=schema)
.to_pandas()
.to_dict(orient="records")
)

constraints = []
for row in res:
constraints.append(
Constraint(
constraintName=self._format_constraint_name(
row["TABLE_NAME"],
row["COLUMN_NAME"],
row["REFERENCED_TABLE_NAME"],
row["REFERENCED_COLUMN_NAME"],
),
constraintTable=self._format_compact_table_name(
row["TABLE_SCHEMA"], row["TABLE_NAME"]
),
constraintColumn=row["COLUMN_NAME"],
constraintedTable=self._format_compact_table_name(
row["REFERENCED_TABLE_SCHEMA"], row["REFERENCED_TABLE_NAME"]
),
constraintedColumn=row["REFERENCED_COLUMN_NAME"],
constraintType=ConstraintType.FOREIGN_KEY,
)
)
return constraints

def get_version(self) -> str:
schema = ibis.schema({"VERSION": "string"})
return (
self.connection.sql("SELECT version FROM v$instance", schema=schema)
.to_pandas()
.iloc[0, 0]
)

def _format_compact_table_name(self, schema: str, table: str):
return f"{schema}.{table}"

def _format_constraint_name(
self, table_name, column_name, referenced_table_name, referenced_column_name
):
return f"{table_name}_{column_name}_{referenced_table_name}_{referenced_column_name}"

def _transform_column_type(self, data_type):
switcher = {
"CHAR": RustWrenEngineColumnType.CHAR,
"NCHAR": RustWrenEngineColumnType.CHAR,
"VARCHAR2": RustWrenEngineColumnType.VARCHAR,
"NVARCHAR2": RustWrenEngineColumnType.VARCHAR,
"CLOB": RustWrenEngineColumnType.TEXT,
"NCLOB": RustWrenEngineColumnType.TEXT,
"NUMBER": RustWrenEngineColumnType.DECIMAL,
"FLOAT": RustWrenEngineColumnType.FLOAT8,
"BINARY_FLOAT": RustWrenEngineColumnType.FLOAT8,
"BINARY_DOUBLE": RustWrenEngineColumnType.DOUBLE,
"DATE": RustWrenEngineColumnType.TIMESTAMP, # Oracle DATE includes time.
"TIMESTAMP": RustWrenEngineColumnType.TIMESTAMP,
"TIMESTAMP WITH TIME ZONE": RustWrenEngineColumnType.TIMESTAMPTZ,
"TIMESTAMP WITH LOCAL TIME ZONE": RustWrenEngineColumnType.TIMESTAMPTZ,
"INTERVAL YEAR TO MONTH": RustWrenEngineColumnType.INTERVAL,
"INTERVAL DAY TO SECOND": RustWrenEngineColumnType.INTERVAL,
"BLOB": RustWrenEngineColumnType.BYTEA,
"BFILE": RustWrenEngineColumnType.BYTEA,
"RAW": RustWrenEngineColumnType.BYTEA,
"LONG RAW": RustWrenEngineColumnType.BYTEA,
"ROWID": RustWrenEngineColumnType.CHAR,
"UROWID": RustWrenEngineColumnType.CHAR,
"JSON": RustWrenEngineColumnType.JSON,
"OSON": RustWrenEngineColumnType.JSON,
"VARCHAR2 WITH JSON": RustWrenEngineColumnType.JSON,
"BLOB WITH JSON": RustWrenEngineColumnType.JSON,
"CLOB WITH JSON": RustWrenEngineColumnType.JSON,
}
return switcher.get(data_type.upper(), RustWrenEngineColumnType.UNKNOWN)
3 changes: 3 additions & 0 deletions ibis-server/app/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ def default(obj):
return _date_offset_to_str(obj)
if isinstance(obj, datetime.timedelta):
return str(obj)
# Add handling for any remaining LOB objects
if hasattr(obj, "read"): # Check if object is LOB-like
return str(obj)
raise TypeError

json_obj = orjson.loads(
Expand Down
9 changes: 9 additions & 0 deletions ibis-server/docs/how-to-add-data-source.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ class PostgresConnectionInfo(BaseModel):
We use the base model of [Pydantic](https://docs.pydantic.dev/latest/api/base_model/) to support our class definitions.
Pydantic provides a convenient field type called [Secret Types](https://docs.pydantic.dev/2.0/usage/types/secrets/) that can protect the sensitive information.

Add your xxxConnectionInfo to ConnectionInfo
```python
ConnectionInfo = (
...
| PostgresConnectionInfo
...
)
```

Return to the `DataSourceExtension` enum class to implement the `get_{data_source}_connection` function.
This function should be specific to your new data source. For example, if you've added a PostgreSQL data source, you might implement a `get_postgres_connection` function.
```python
Expand Down
3 changes: 3 additions & 0 deletions ibis-server/justfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ dev:
test MARKER:
poetry run pytest -m '{{ MARKER }}'

test-verbose MARKER:
poetry run pytest -s -v -m '{{ MARKER }}'

image-name := "ghcr.io/canner/wren-engine-ibis:latest"

docker-build:
Expand Down
Loading
Loading