Skip to content

Commit

Permalink
Add support for container types ARRAY, OBJECT, and FLOAT_VECTOR
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Dec 14, 2023
1 parent b8e5da3 commit e6f32ea
Show file tree
Hide file tree
Showing 8 changed files with 322 additions and 31 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog for Meltano/Singer Target for CrateDB

## In progress
- Add support for container types `ARRAY`, `OBJECT`, and `FLOAT_VECTOR`.

## 2023-12-08 v0.0.1
- Make it work. It can run the canonical Meltano GitHub -> DB example.
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ dynamic = [
dependencies = [
"crate[sqlalchemy]",
"cratedb-toolkit",
'importlib-resources; python_version < "3.9"',
"meltanolabs-target-postgres==0.0.9",
'importlib-resources; python_version < "3.9"', # "meltanolabs-target-postgres==0.0.9",
"meltanolabs-target-postgres@ git+https://github.com/singer-contrib/meltanolabs-target-postgres.git@pgvector",
]
[project.optional-dependencies]
develop = [
Expand Down
2 changes: 1 addition & 1 deletion target_cratedb/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Init CrateDB."""
from target_cratedb.patch import patch_sqlalchemy
from target_cratedb.sqlalchemy.patch import patch_sqlalchemy

patch_sqlalchemy()
74 changes: 70 additions & 4 deletions target_cratedb/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,18 @@
from datetime import datetime

import sqlalchemy
import sqlalchemy as sa
from crate.client.sqlalchemy.types import ObjectType, ObjectTypeImpl, _ObjectArray
from singer_sdk import typing as th
from sqlalchemy.dialects.postgresql import ARRAY, BIGINT
from singer_sdk.helpers._typing import is_array_type, is_boolean_type, is_integer_type, is_number_type, is_object_type
from sqlalchemy.types import (
ARRAY,
BIGINT,
BOOLEAN,
DATE,
DATETIME,
DECIMAL,
FLOAT,
INTEGER,
TEXT,
TIME,
Expand All @@ -22,7 +26,7 @@
)
from target_postgres.connector import NOTYPE, PostgresConnector

from target_cratedb.patch import polyfill_refresh_after_dml_engine
from target_cratedb.sqlalchemy.patch import polyfill_refresh_after_dml_engine


class CrateDBConnector(PostgresConnector):
Expand Down Expand Up @@ -111,8 +115,54 @@ def pick_individual_type(jsonschema_type: dict):
if "object" in jsonschema_type["type"]:
return ObjectType
if "array" in jsonschema_type["type"]:
# TODO: Handle other inner-types as well?
# Select between different kinds of `ARRAY` data types.
#
# This currently leverages an unspecified definition for the Singer SCHEMA,
# using the `additionalProperties` attribute to convey additional type
# information, agnostic of the target database.
#
# In this case, it is about telling different kinds of `ARRAY` types apart:
# Either it is a vanilla `ARRAY`, to be stored into a `jsonb[]` type, or,
# alternatively, it can be a "vector" kind `ARRAY` of floating point
# numbers, effectively what pgvector is storing in its `VECTOR` type.
#
# Still, `type: "vector"` is only a surrogate label here, because other
# database systems may use different types for implementing the same thing,
# and need to translate accordingly.
"""
Schema override rule in `meltano.yml`:
type: "array"
items:
type: "number"
additionalProperties:
storage:
type: "vector"
dim: 4
Produced schema annotation in `catalog.json`:
{"type": "array",
"items": {"type": "number"},
"additionalProperties": {"storage": {"type": "vector", "dim": 4}}}
"""
if "additionalProperties" in jsonschema_type and "storage" in jsonschema_type["additionalProperties"]:
storage_properties = jsonschema_type["additionalProperties"]["storage"]
if "type" in storage_properties and storage_properties["type"] == "vector":

Check warning on line 151 in target_cratedb/connector.py

View check run for this annotation

Codecov / codecov/patch

target_cratedb/connector.py#L150-L151

Added lines #L150 - L151 were not covered by tests
# On PostgreSQL/pgvector, use the corresponding type definition
# from its SQLAlchemy dialect.
from target_cratedb.sqlalchemy.vector import FloatVector

Check warning on line 154 in target_cratedb/connector.py

View check run for this annotation

Codecov / codecov/patch

target_cratedb/connector.py#L154

Added line #L154 was not covered by tests

return FloatVector(storage_properties["dim"])

Check warning on line 156 in target_cratedb/connector.py

View check run for this annotation

Codecov / codecov/patch

target_cratedb/connector.py#L156

Added line #L156 was not covered by tests

# Discover/translate inner types.
inner_type = resolve_array_inner_type(jsonschema_type)
if inner_type is not None:
return ARRAY(inner_type)

# When type discovery fails, assume `TEXT`.
return ARRAY(TEXT())

if jsonschema_type.get("format") == "date-time":
return TIMESTAMP()
individual_type = th.to_sql_type(jsonschema_type)
Expand All @@ -139,6 +189,7 @@ def pick_best_sql_type(sql_type_array: list):
DATE,
TIME,
DECIMAL,
FLOAT,
BIGINT,
INTEGER,
BOOLEAN,
Expand All @@ -151,7 +202,7 @@ def pick_best_sql_type(sql_type_array: list):
for obj in sql_type_array:
# FIXME: Workaround. Currently, ObjectType can not be resolved back to a type?
# TypeError: isinstance() arg 2 must be a type, a tuple of types, or a union
if isinstance(sql_type, ObjectTypeImpl):
if isinstance(obj, ObjectTypeImpl):
return ObjectType
if isinstance(obj, sql_type):
return obj
Expand Down Expand Up @@ -245,3 +296,18 @@ def prepare_schema(self, schema_name: str) -> None:
Don't emit `CREATE SCHEMA` statements to CrateDB.
"""
pass


def resolve_array_inner_type(jsonschema_type: dict) -> t.Union[sa.types.TypeEngine, None]:
if "items" in jsonschema_type:
if is_boolean_type(jsonschema_type["items"]):
return BOOLEAN()
if is_number_type(jsonschema_type["items"]):
return FLOAT()
if is_integer_type(jsonschema_type["items"]):
return BIGINT()

Check warning on line 308 in target_cratedb/connector.py

View check run for this annotation

Codecov / codecov/patch

target_cratedb/connector.py#L308

Added line #L308 was not covered by tests
if is_object_type(jsonschema_type["items"]):
return ObjectType()

Check warning on line 310 in target_cratedb/connector.py

View check run for this annotation

Codecov / codecov/patch

target_cratedb/connector.py#L310

Added line #L310 was not covered by tests
if is_array_type(jsonschema_type["items"]):
return resolve_array_inner_type(jsonschema_type["items"]["type"])

Check warning on line 312 in target_cratedb/connector.py

View check run for this annotation

Codecov / codecov/patch

target_cratedb/connector.py#L312

Added line #L312 was not covered by tests
return None
Empty file.
38 changes: 35 additions & 3 deletions target_cratedb/patch.py → target_cratedb/sqlalchemy/patch.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,36 @@
from datetime import datetime

import sqlalchemy as sa
from crate.client.sqlalchemy.dialect import TYPES_MAP, DateTime
from _decimal import Decimal
from crate.client.http import CrateJsonEncoder
from crate.client.sqlalchemy.dialect import ARRAY, TYPES_MAP, DateTime
from crate.client.sqlalchemy.types import _ObjectArray
from sqlalchemy.sql import sqltypes


def patch_sqlalchemy():
patch_types()
patch_json_encoder()


def patch_types():
"""
Register missing timestamp data type.
Register missing data types, and fix erroneous ones.
TODO: Upstream to crate-python.
"""
# TODO: Submit patch to `crate-python`.
TYPES_MAP["bigint"] = sqltypes.BIGINT
TYPES_MAP["bigint_array"] = ARRAY(sqltypes.BIGINT)
TYPES_MAP["long"] = sqltypes.BIGINT
TYPES_MAP["long_array"] = ARRAY(sqltypes.BIGINT)
TYPES_MAP["real"] = sqltypes.DOUBLE
TYPES_MAP["real_array"] = ARRAY(sqltypes.DOUBLE)
TYPES_MAP["timestamp without time zone"] = sqltypes.TIMESTAMP
TYPES_MAP["timestamp with time zone"] = sqltypes.TIMESTAMP

# TODO: Can `ARRAY` be inherited from PostgreSQL's
# `ARRAY`, to make type checking work?

def as_generic(self):
return sqltypes.ARRAY

Expand All @@ -36,6 +51,23 @@ def process(value):
DateTime.bind_processor = bind_processor


def patch_json_encoder():
"""
`Decimal` types have been rendered as strings.
TODO: Upstream to crate-python.
"""

json_encoder_default = CrateJsonEncoder.default

def default(self, o):
if isinstance(o, Decimal):
return float(o)
return json_encoder_default(o)

Check warning on line 66 in target_cratedb/sqlalchemy/patch.py

View check run for this annotation

Codecov / codecov/patch

target_cratedb/sqlalchemy/patch.py#L66

Added line #L66 was not covered by tests

CrateJsonEncoder.default = default


def polyfill_refresh_after_dml_engine(engine: sa.Engine):
def receive_after_execute(
conn: sa.engine.Connection, clauseelement, multiparams, params, execution_options, result
Expand Down
82 changes: 82 additions & 0 deletions target_cratedb/sqlalchemy/vector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# TODO: Refactor to CrateDB SQLAlchemy dialect.
import typing as t

Check warning on line 2 in target_cratedb/sqlalchemy/vector.py

View check run for this annotation

Codecov / codecov/patch

target_cratedb/sqlalchemy/vector.py#L2

Added line #L2 was not covered by tests

import numpy as np
import numpy.typing as npt
import sqlalchemy as sa
from sqlalchemy.types import UserDefinedType

Check warning on line 7 in target_cratedb/sqlalchemy/vector.py

View check run for this annotation

Codecov / codecov/patch

target_cratedb/sqlalchemy/vector.py#L4-L7

Added lines #L4 - L7 were not covered by tests

__all__ = ["FloatVector"]

Check warning on line 9 in target_cratedb/sqlalchemy/vector.py

View check run for this annotation

Codecov / codecov/patch

target_cratedb/sqlalchemy/vector.py#L9

Added line #L9 was not covered by tests


def from_db(value: t.Iterable) -> t.Optional[npt.ArrayLike]:

Check warning on line 12 in target_cratedb/sqlalchemy/vector.py

View check run for this annotation

Codecov / codecov/patch

target_cratedb/sqlalchemy/vector.py#L12

Added line #L12 was not covered by tests
# from `pgvector.utils`
# could be ndarray if already cast by lower-level driver
if value is None or isinstance(value, np.ndarray):
return value

Check warning on line 16 in target_cratedb/sqlalchemy/vector.py

View check run for this annotation

Codecov / codecov/patch

target_cratedb/sqlalchemy/vector.py#L15-L16

Added lines #L15 - L16 were not covered by tests

return np.array(value, dtype=np.float32)

Check warning on line 18 in target_cratedb/sqlalchemy/vector.py

View check run for this annotation

Codecov / codecov/patch

target_cratedb/sqlalchemy/vector.py#L18

Added line #L18 was not covered by tests


def to_db(value: t.Any, dim: t.Optional[int] = None) -> t.Optional[t.List]:

Check warning on line 21 in target_cratedb/sqlalchemy/vector.py

View check run for this annotation

Codecov / codecov/patch

target_cratedb/sqlalchemy/vector.py#L21

Added line #L21 was not covered by tests
# from `pgvector.utils`
if value is None:
return value

Check warning on line 24 in target_cratedb/sqlalchemy/vector.py

View check run for this annotation

Codecov / codecov/patch

target_cratedb/sqlalchemy/vector.py#L23-L24

Added lines #L23 - L24 were not covered by tests

if isinstance(value, np.ndarray):
if value.ndim != 1:
raise ValueError("expected ndim to be 1")

Check warning on line 28 in target_cratedb/sqlalchemy/vector.py

View check run for this annotation

Codecov / codecov/patch

target_cratedb/sqlalchemy/vector.py#L26-L28

Added lines #L26 - L28 were not covered by tests

if not np.issubdtype(value.dtype, np.integer) and not np.issubdtype(value.dtype, np.floating):
raise ValueError("dtype must be numeric")

Check warning on line 31 in target_cratedb/sqlalchemy/vector.py

View check run for this annotation

Codecov / codecov/patch

target_cratedb/sqlalchemy/vector.py#L30-L31

Added lines #L30 - L31 were not covered by tests

value = value.tolist()

Check warning on line 33 in target_cratedb/sqlalchemy/vector.py

View check run for this annotation

Codecov / codecov/patch

target_cratedb/sqlalchemy/vector.py#L33

Added line #L33 was not covered by tests

if dim is not None and len(value) != dim:
raise ValueError("expected %d dimensions, not %d" % (dim, len(value)))

Check warning on line 36 in target_cratedb/sqlalchemy/vector.py

View check run for this annotation

Codecov / codecov/patch

target_cratedb/sqlalchemy/vector.py#L35-L36

Added lines #L35 - L36 were not covered by tests

return value

Check warning on line 38 in target_cratedb/sqlalchemy/vector.py

View check run for this annotation

Codecov / codecov/patch

target_cratedb/sqlalchemy/vector.py#L38

Added line #L38 was not covered by tests


class FloatVector(UserDefinedType):

Check warning on line 41 in target_cratedb/sqlalchemy/vector.py

View check run for this annotation

Codecov / codecov/patch

target_cratedb/sqlalchemy/vector.py#L41

Added line #L41 was not covered by tests
"""
https://crate.io/docs/crate/reference/en/master/general/ddl/data-types.html#float-vector
https://crate.io/docs/crate/reference/en/master/general/builtins/scalar-functions.html#scalar-knn-match
"""

cache_ok = True

Check warning on line 47 in target_cratedb/sqlalchemy/vector.py

View check run for this annotation

Codecov / codecov/patch

target_cratedb/sqlalchemy/vector.py#L47

Added line #L47 was not covered by tests

def __init__(self, dim: t.Optional[int] = None) -> None:
super(UserDefinedType, self).__init__()
self.dim = dim

Check warning on line 51 in target_cratedb/sqlalchemy/vector.py

View check run for this annotation

Codecov / codecov/patch

target_cratedb/sqlalchemy/vector.py#L49-L51

Added lines #L49 - L51 were not covered by tests

def get_col_spec(self, **kw: t.Any) -> str:
if self.dim is None:
return "FLOAT_VECTOR"
return "FLOAT_VECTOR(%d)" % self.dim

Check warning on line 56 in target_cratedb/sqlalchemy/vector.py

View check run for this annotation

Codecov / codecov/patch

target_cratedb/sqlalchemy/vector.py#L53-L56

Added lines #L53 - L56 were not covered by tests

def bind_processor(self, dialect: sa.Dialect) -> t.Callable:
def process(value: t.Iterable) -> t.Optional[t.List]:
return to_db(value, self.dim)

Check warning on line 60 in target_cratedb/sqlalchemy/vector.py

View check run for this annotation

Codecov / codecov/patch

target_cratedb/sqlalchemy/vector.py#L58-L60

Added lines #L58 - L60 were not covered by tests

return process

Check warning on line 62 in target_cratedb/sqlalchemy/vector.py

View check run for this annotation

Codecov / codecov/patch

target_cratedb/sqlalchemy/vector.py#L62

Added line #L62 was not covered by tests

def result_processor(self, dialect: sa.Dialect, coltype: t.Any) -> t.Callable:
def process(value: t.Any) -> t.Optional[npt.ArrayLike]:
return from_db(value)

Check warning on line 66 in target_cratedb/sqlalchemy/vector.py

View check run for this annotation

Codecov / codecov/patch

target_cratedb/sqlalchemy/vector.py#L64-L66

Added lines #L64 - L66 were not covered by tests

return process

Check warning on line 68 in target_cratedb/sqlalchemy/vector.py

View check run for this annotation

Codecov / codecov/patch

target_cratedb/sqlalchemy/vector.py#L68

Added line #L68 was not covered by tests

"""

Check warning on line 70 in target_cratedb/sqlalchemy/vector.py

View check run for this annotation

Codecov / codecov/patch

target_cratedb/sqlalchemy/vector.py#L70

Added line #L70 was not covered by tests
CrateDB currently only supports similarity function `VectorSimilarityFunction.EUCLIDEAN`.
-- https://github.com/crate/crate/blob/1ca5c6dbb2/server/src/main/java/io/crate/types/FloatVectorType.java#L55
On the other hand, pgvector use a comparator to apply different similarity functions as operators,
see `pgvector.sqlalchemy.Vector.comparator_factory`.
<->: l2/euclidean_distance
<#>: max_inner_product
<=>: cosine_distance
TODO: Discuss.
""" # noqa: E501
Loading

0 comments on commit e6f32ea

Please sign in to comment.