Skip to content

Commit

Permalink
Add support for container types ARRAY, OBJECT, and FLOAT_VECTOR
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Dec 16, 2023
1 parent b8e5da3 commit 3ddc350
Show file tree
Hide file tree
Showing 10 changed files with 431 additions and 36 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ jobs:
pip install "setuptools>=64" --upgrade
# Install package in editable mode.
pip install --use-pep517 --prefer-binary --editable=.[test,develop]
pip install --use-pep517 --prefer-binary --editable=.[all,develop,test]
- name: Run linter and software tests
run: |
Expand Down
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog for Meltano/Singer Target for CrateDB

## In progress
- Add support for container types `ARRAY`, `OBJECT`, and `FLOAT_VECTOR`.

## 2023-12-08 v0.0.1
- Make it work. It can run the canonical Meltano GitHub -> DB example.
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,27 @@ LIMIT
```


## Vector Store Support

In order to support CrateDB's vector store feature, i.e. its `FLOAT_VECTOR`
data type, you will need to install `numpy`. It has been added to an "extra"
of the Python package, called `vector`.

When installing the package using pip, this would apply:
```
pip install 'meltano-target-cratedb[vector]'
```

When installing the package using the Meltano's project definition, this
would probably the right way to write it down, but it hasn't been verified
yet.
```yaml
- name: target-cratedb
variant: cratedb
pip_url: meltano-target-cratedb[vector]
```


## Development

In order to work on this adapter dialect on behalf of a real pipeline definition,
Expand Down
10 changes: 8 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,13 @@ dynamic = [
dependencies = [
"crate[sqlalchemy]",
"cratedb-toolkit",
'importlib-resources; python_version < "3.9"',
"meltanolabs-target-postgres==0.0.9",
'importlib-resources; python_version < "3.9"', # "meltanolabs-target-postgres==0.0.9",
"meltanolabs-target-postgres@ git+https://github.com/singer-contrib/meltanolabs-target-postgres.git@pgvector",
]
[project.optional-dependencies]
all = [
"meltano-target-cratedb[vector]",
]
develop = [
"black<24",
"mypy==1.7.1",
Expand All @@ -115,6 +118,9 @@ test = [
"pytest-cov<5",
"pytest-mock<4",
]
vector = [
"numpy",
]
[project.urls]
changelog = "https://github.com/crate-workbench/meltano-target-cratedb/blob/main/CHANGES.md"
documentation = "https://github.com/crate-workbench/meltano-target-cratedb"
Expand Down
2 changes: 1 addition & 1 deletion target_cratedb/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Init CrateDB."""
from target_cratedb.patch import patch_sqlalchemy
from target_cratedb.sqlalchemy.patch import patch_sqlalchemy

patch_sqlalchemy()
80 changes: 72 additions & 8 deletions target_cratedb/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,18 @@
from datetime import datetime

import sqlalchemy
import sqlalchemy as sa
from crate.client.sqlalchemy.types import ObjectType, ObjectTypeImpl, _ObjectArray
from singer_sdk import typing as th
from sqlalchemy.dialects.postgresql import ARRAY, BIGINT
from singer_sdk.helpers._typing import is_array_type, is_boolean_type, is_integer_type, is_number_type, is_object_type
from sqlalchemy.types import (
ARRAY,
BIGINT,
BOOLEAN,
DATE,
DATETIME,
DECIMAL,
FLOAT,
INTEGER,
TEXT,
TIME,
Expand All @@ -22,7 +26,8 @@
)
from target_postgres.connector import NOTYPE, PostgresConnector

from target_cratedb.patch import polyfill_refresh_after_dml_engine
from target_cratedb.sqlalchemy.patch import polyfill_refresh_after_dml_engine
from target_cratedb.sqlalchemy.vector import FloatVector


class CrateDBConnector(PostgresConnector):
Expand Down Expand Up @@ -111,8 +116,52 @@ def pick_individual_type(jsonschema_type: dict):
if "object" in jsonschema_type["type"]:
return ObjectType
if "array" in jsonschema_type["type"]:
# TODO: Handle other inner-types as well?
# Select between different kinds of `ARRAY` data types.
#
# This currently leverages an unspecified definition for the Singer SCHEMA,
# using the `additionalProperties` attribute to convey additional type
# information, agnostic of the target database.
#
# In this case, it is about telling different kinds of `ARRAY` types apart:
# Either it is a vanilla `ARRAY`, to be stored into a `jsonb[]` type, or,
# alternatively, it can be a "vector" kind `ARRAY` of floating point
# numbers, effectively what pgvector is storing in its `VECTOR` type.
#
# Still, `type: "vector"` is only a surrogate label here, because other
# database systems may use different types for implementing the same thing,
# and need to translate accordingly.
"""
Schema override rule in `meltano.yml`:
type: "array"
items:
type: "number"
additionalProperties:
storage:
type: "vector"
dim: 4
Produced schema annotation in `catalog.json`:
{"type": "array",
"items": {"type": "number"},
"additionalProperties": {"storage": {"type": "vector", "dim": 4}}}
"""
if "additionalProperties" in jsonschema_type and "storage" in jsonschema_type["additionalProperties"]:
storage_properties = jsonschema_type["additionalProperties"]["storage"]
if "type" in storage_properties and storage_properties["type"] == "vector":
# On PostgreSQL/pgvector, use the corresponding type definition
# from its SQLAlchemy dialect.
return FloatVector(storage_properties["dim"])

# Discover/translate inner types.
inner_type = resolve_array_inner_type(jsonschema_type)
if inner_type is not None:
return ARRAY(inner_type)

# When type discovery fails, assume `TEXT`.
return ARRAY(TEXT())

if jsonschema_type.get("format") == "date-time":
return TIMESTAMP()
individual_type = th.to_sql_type(jsonschema_type)
Expand All @@ -139,20 +188,18 @@ def pick_best_sql_type(sql_type_array: list):
DATE,
TIME,
DECIMAL,
FLOAT,
BIGINT,
INTEGER,
BOOLEAN,
NOTYPE,
ARRAY,
ObjectType,
FloatVector,
ObjectTypeImpl,
]

for sql_type in precedence_order:
for obj in sql_type_array:
# FIXME: Workaround. Currently, ObjectType can not be resolved back to a type?
# TypeError: isinstance() arg 2 must be a type, a tuple of types, or a union
if isinstance(sql_type, ObjectTypeImpl):
return ObjectType
if isinstance(obj, sql_type):
return obj
return TEXT()
Expand Down Expand Up @@ -188,6 +235,8 @@ def _get_type_sort_key(

if isinstance(sql_type, _ObjectArray):
return 0, _len
if isinstance(sql_type, FloatVector):
return 0, _len
if isinstance(sql_type, NOTYPE):
return 0, _len

Expand Down Expand Up @@ -245,3 +294,18 @@ def prepare_schema(self, schema_name: str) -> None:
Don't emit `CREATE SCHEMA` statements to CrateDB.
"""
pass


def resolve_array_inner_type(jsonschema_type: dict) -> t.Union[sa.types.TypeEngine, None]:
if "items" in jsonschema_type:
if is_boolean_type(jsonschema_type["items"]):
return BOOLEAN()
if is_number_type(jsonschema_type["items"]):
return FLOAT()
if is_integer_type(jsonschema_type["items"]):
return BIGINT()

Check warning on line 306 in target_cratedb/connector.py

View check run for this annotation

Codecov / codecov/patch

target_cratedb/connector.py#L306

Added line #L306 was not covered by tests
if is_object_type(jsonschema_type["items"]):
return ObjectType()

Check warning on line 308 in target_cratedb/connector.py

View check run for this annotation

Codecov / codecov/patch

target_cratedb/connector.py#L308

Added line #L308 was not covered by tests
if is_array_type(jsonschema_type["items"]):
return resolve_array_inner_type(jsonschema_type["items"]["type"])

Check warning on line 310 in target_cratedb/connector.py

View check run for this annotation

Codecov / codecov/patch

target_cratedb/connector.py#L310

Added line #L310 was not covered by tests
return None
Empty file.
38 changes: 35 additions & 3 deletions target_cratedb/patch.py → target_cratedb/sqlalchemy/patch.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,36 @@
from datetime import datetime

import sqlalchemy as sa
from crate.client.sqlalchemy.dialect import TYPES_MAP, DateTime
from _decimal import Decimal
from crate.client.http import CrateJsonEncoder
from crate.client.sqlalchemy.dialect import ARRAY, TYPES_MAP, DateTime
from crate.client.sqlalchemy.types import _ObjectArray
from sqlalchemy.sql import sqltypes


def patch_sqlalchemy():
patch_types()
patch_json_encoder()


def patch_types():
"""
Register missing timestamp data type.
Register missing data types, and fix erroneous ones.
TODO: Upstream to crate-python.
"""
# TODO: Submit patch to `crate-python`.
TYPES_MAP["bigint"] = sqltypes.BIGINT
TYPES_MAP["bigint_array"] = ARRAY(sqltypes.BIGINT)
TYPES_MAP["long"] = sqltypes.BIGINT
TYPES_MAP["long_array"] = ARRAY(sqltypes.BIGINT)
TYPES_MAP["real"] = sqltypes.DOUBLE
TYPES_MAP["real_array"] = ARRAY(sqltypes.DOUBLE)
TYPES_MAP["timestamp without time zone"] = sqltypes.TIMESTAMP
TYPES_MAP["timestamp with time zone"] = sqltypes.TIMESTAMP

# TODO: Can `ARRAY` be inherited from PostgreSQL's
# `ARRAY`, to make type checking work?

def as_generic(self):
return sqltypes.ARRAY

Expand All @@ -36,6 +51,23 @@ def process(value):
DateTime.bind_processor = bind_processor


def patch_json_encoder():
"""
`Decimal` types have been rendered as strings.
TODO: Upstream to crate-python.
"""

json_encoder_default = CrateJsonEncoder.default

def default(self, o):
if isinstance(o, Decimal):
return float(o)
return json_encoder_default(o)

Check warning on line 66 in target_cratedb/sqlalchemy/patch.py

View check run for this annotation

Codecov / codecov/patch

target_cratedb/sqlalchemy/patch.py#L66

Added line #L66 was not covered by tests

CrateJsonEncoder.default = default


def polyfill_refresh_after_dml_engine(engine: sa.Engine):
def receive_after_execute(
conn: sa.engine.Connection, clauseelement, multiparams, params, execution_options, result
Expand Down
Loading

0 comments on commit 3ddc350

Please sign in to comment.