From 4e28026b8fa5eef59827852aa3532bcc84a53c2e Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 14 Dec 2023 23:11:00 +0100 Subject: [PATCH] Add support for container types ARRAY, OBJECT, and FLOAT_VECTOR --- CHANGES.md | 1 + pyproject.toml | 4 +- target_cratedb/__init__.py | 2 +- target_cratedb/connector.py | 74 ++++++++- target_cratedb/sqlalchemy/__init__.py | 0 target_cratedb/{ => sqlalchemy}/patch.py | 38 ++++- target_cratedb/sqlalchemy/vector.py | 82 ++++++++++ target_cratedb/tests/test_standard_target.py | 152 ++++++++++++++++--- 8 files changed, 322 insertions(+), 31 deletions(-) create mode 100644 target_cratedb/sqlalchemy/__init__.py rename target_cratedb/{ => sqlalchemy}/patch.py (61%) create mode 100644 target_cratedb/sqlalchemy/vector.py diff --git a/CHANGES.md b/CHANGES.md index fcec18c..a868a72 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,7 @@ # Changelog for Meltano/Singer Target for CrateDB ## In progress +- Add support for container types `ARRAY`, `OBJECT`, and `FLOAT_VECTOR`. ## 2023-12-08 v0.0.1 - Make it work. It can run the canonical Meltano GitHub -> DB example. diff --git a/pyproject.toml b/pyproject.toml index f4930b0..5ece4d9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -94,8 +94,8 @@ dynamic = [ dependencies = [ "crate[sqlalchemy]", "cratedb-toolkit", - 'importlib-resources; python_version < "3.9"', - "meltanolabs-target-postgres==0.0.9", + 'importlib-resources; python_version < "3.9"', # "meltanolabs-target-postgres==0.0.9", + "meltanolabs-target-postgres@ git+https://github.com/singer-contrib/meltanolabs-target-postgres.git@pgvector", ] [project.optional-dependencies] develop = [ diff --git a/target_cratedb/__init__.py b/target_cratedb/__init__.py index 95e24c3..8ff48b8 100644 --- a/target_cratedb/__init__.py +++ b/target_cratedb/__init__.py @@ -1,4 +1,4 @@ """Init CrateDB.""" -from target_cratedb.patch import patch_sqlalchemy +from target_cratedb.sqlalchemy.patch import patch_sqlalchemy patch_sqlalchemy() diff --git a/target_cratedb/connector.py b/target_cratedb/connector.py index 63a0745..eb40e97 100644 --- a/target_cratedb/connector.py +++ b/target_cratedb/connector.py @@ -6,14 +6,18 @@ from datetime import datetime import sqlalchemy +import sqlalchemy as sa from crate.client.sqlalchemy.types import ObjectType, ObjectTypeImpl, _ObjectArray from singer_sdk import typing as th -from sqlalchemy.dialects.postgresql import ARRAY, BIGINT +from singer_sdk.helpers._typing import is_array_type, is_boolean_type, is_integer_type, is_number_type, is_object_type from sqlalchemy.types import ( + ARRAY, + BIGINT, BOOLEAN, DATE, DATETIME, DECIMAL, + FLOAT, INTEGER, TEXT, TIME, @@ -22,7 +26,7 @@ ) from target_postgres.connector import NOTYPE, PostgresConnector -from target_cratedb.patch import polyfill_refresh_after_dml_engine +from target_cratedb.sqlalchemy.patch import polyfill_refresh_after_dml_engine class CrateDBConnector(PostgresConnector): @@ -111,8 +115,54 @@ def pick_individual_type(jsonschema_type: dict): if "object" in jsonschema_type["type"]: return ObjectType if "array" in jsonschema_type["type"]: - # TODO: Handle other inner-types as well? + # Select between different kinds of `ARRAY` data types. + # + # This currently leverages an unspecified definition for the Singer SCHEMA, + # using the `additionalProperties` attribute to convey additional type + # information, agnostic of the target database. + # + # In this case, it is about telling different kinds of `ARRAY` types apart: + # Either it is a vanilla `ARRAY`, to be stored into a `jsonb[]` type, or, + # alternatively, it can be a "vector" kind `ARRAY` of floating point + # numbers, effectively what pgvector is storing in its `VECTOR` type. + # + # Still, `type: "vector"` is only a surrogate label here, because other + # database systems may use different types for implementing the same thing, + # and need to translate accordingly. + """ + Schema override rule in `meltano.yml`: + + type: "array" + items: + type: "number" + additionalProperties: + storage: + type: "vector" + dim: 4 + + Produced schema annotation in `catalog.json`: + + {"type": "array", + "items": {"type": "number"}, + "additionalProperties": {"storage": {"type": "vector", "dim": 4}}} + """ + if "additionalProperties" in jsonschema_type and "storage" in jsonschema_type["additionalProperties"]: + storage_properties = jsonschema_type["additionalProperties"]["storage"] + if "type" in storage_properties and storage_properties["type"] == "vector": + # On PostgreSQL/pgvector, use the corresponding type definition + # from its SQLAlchemy dialect. + from target_cratedb.sqlalchemy.vector import FloatVector + + return FloatVector(storage_properties["dim"]) + + # Discover/translate inner types. + inner_type = resolve_array_inner_type(jsonschema_type) + if inner_type is not None: + return ARRAY(inner_type) + + # When type discovery fails, assume `TEXT`. return ARRAY(TEXT()) + if jsonschema_type.get("format") == "date-time": return TIMESTAMP() individual_type = th.to_sql_type(jsonschema_type) @@ -139,6 +189,7 @@ def pick_best_sql_type(sql_type_array: list): DATE, TIME, DECIMAL, + FLOAT, BIGINT, INTEGER, BOOLEAN, @@ -151,7 +202,7 @@ def pick_best_sql_type(sql_type_array: list): for obj in sql_type_array: # FIXME: Workaround. Currently, ObjectType can not be resolved back to a type? # TypeError: isinstance() arg 2 must be a type, a tuple of types, or a union - if isinstance(sql_type, ObjectTypeImpl): + if isinstance(obj, ObjectTypeImpl): return ObjectType if isinstance(obj, sql_type): return obj @@ -245,3 +296,18 @@ def prepare_schema(self, schema_name: str) -> None: Don't emit `CREATE SCHEMA` statements to CrateDB. """ pass + + +def resolve_array_inner_type(jsonschema_type: dict) -> t.Union[sa.types.TypeEngine, None]: + if "items" in jsonschema_type: + if is_boolean_type(jsonschema_type["items"]): + return BOOLEAN() + if is_number_type(jsonschema_type["items"]): + return FLOAT() + if is_integer_type(jsonschema_type["items"]): + return BIGINT() + if is_object_type(jsonschema_type["items"]): + return ObjectType() + if is_array_type(jsonschema_type["items"]): + return resolve_array_inner_type(jsonschema_type["items"]["type"]) + return None diff --git a/target_cratedb/sqlalchemy/__init__.py b/target_cratedb/sqlalchemy/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/target_cratedb/patch.py b/target_cratedb/sqlalchemy/patch.py similarity index 61% rename from target_cratedb/patch.py rename to target_cratedb/sqlalchemy/patch.py index b42ff3b..3dbba6c 100644 --- a/target_cratedb/patch.py +++ b/target_cratedb/sqlalchemy/patch.py @@ -1,21 +1,36 @@ from datetime import datetime import sqlalchemy as sa -from crate.client.sqlalchemy.dialect import TYPES_MAP, DateTime +from _decimal import Decimal +from crate.client.http import CrateJsonEncoder +from crate.client.sqlalchemy.dialect import ARRAY, TYPES_MAP, DateTime from crate.client.sqlalchemy.types import _ObjectArray from sqlalchemy.sql import sqltypes def patch_sqlalchemy(): + patch_types() + patch_json_encoder() + + +def patch_types(): """ - Register missing timestamp data type. + Register missing data types, and fix erroneous ones. TODO: Upstream to crate-python. """ - # TODO: Submit patch to `crate-python`. + TYPES_MAP["bigint"] = sqltypes.BIGINT + TYPES_MAP["bigint_array"] = ARRAY(sqltypes.BIGINT) + TYPES_MAP["long"] = sqltypes.BIGINT + TYPES_MAP["long_array"] = ARRAY(sqltypes.BIGINT) + TYPES_MAP["real"] = sqltypes.DOUBLE + TYPES_MAP["real_array"] = ARRAY(sqltypes.DOUBLE) TYPES_MAP["timestamp without time zone"] = sqltypes.TIMESTAMP TYPES_MAP["timestamp with time zone"] = sqltypes.TIMESTAMP + # TODO: Can `ARRAY` be inherited from PostgreSQL's + # `ARRAY`, to make type checking work? + def as_generic(self): return sqltypes.ARRAY @@ -36,6 +51,23 @@ def process(value): DateTime.bind_processor = bind_processor +def patch_json_encoder(): + """ + `Decimal` types have been rendered as strings. + + TODO: Upstream to crate-python. + """ + + json_encoder_default = CrateJsonEncoder.default + + def default(self, o): + if isinstance(o, Decimal): + return float(o) + return json_encoder_default(o) + + CrateJsonEncoder.default = default + + def polyfill_refresh_after_dml_engine(engine: sa.Engine): def receive_after_execute( conn: sa.engine.Connection, clauseelement, multiparams, params, execution_options, result diff --git a/target_cratedb/sqlalchemy/vector.py b/target_cratedb/sqlalchemy/vector.py new file mode 100644 index 0000000..d339b38 --- /dev/null +++ b/target_cratedb/sqlalchemy/vector.py @@ -0,0 +1,82 @@ +# TODO: Refactor to CrateDB SQLAlchemy dialect. +import typing as t + +import numpy as np +import numpy.typing as npt +import sqlalchemy as sa +from sqlalchemy.types import UserDefinedType + +__all__ = ["FloatVector"] + + +def from_db(value: t.Iterable) -> t.Optional[npt.ArrayLike]: + # from `pgvector.utils` + # could be ndarray if already cast by lower-level driver + if value is None or isinstance(value, np.ndarray): + return value + + return np.array(value, dtype=np.float32) + + +def to_db(value: t.Any, dim: t.Optional[int] = None) -> t.Optional[t.List]: + # from `pgvector.utils` + if value is None: + return value + + if isinstance(value, np.ndarray): + if value.ndim != 1: + raise ValueError("expected ndim to be 1") + + if not np.issubdtype(value.dtype, np.integer) and not np.issubdtype(value.dtype, np.floating): + raise ValueError("dtype must be numeric") + + value = value.tolist() + + if dim is not None and len(value) != dim: + raise ValueError("expected %d dimensions, not %d" % (dim, len(value))) + + return value + + +class FloatVector(UserDefinedType): + """ + https://crate.io/docs/crate/reference/en/master/general/ddl/data-types.html#float-vector + https://crate.io/docs/crate/reference/en/master/general/builtins/scalar-functions.html#scalar-knn-match + """ + + cache_ok = True + + def __init__(self, dim: t.Optional[int] = None) -> None: + super(UserDefinedType, self).__init__() + self.dim = dim + + def get_col_spec(self, **kw: t.Any) -> str: + if self.dim is None: + return "FLOAT_VECTOR" + return "FLOAT_VECTOR(%d)" % self.dim + + def bind_processor(self, dialect: sa.Dialect) -> t.Callable: + def process(value: t.Iterable) -> t.Optional[t.List]: + return to_db(value, self.dim) + + return process + + def result_processor(self, dialect: sa.Dialect, coltype: t.Any) -> t.Callable: + def process(value: t.Any) -> t.Optional[npt.ArrayLike]: + return from_db(value) + + return process + + """ + CrateDB currently only supports similarity function `VectorSimilarityFunction.EUCLIDEAN`. + -- https://github.com/crate/crate/blob/1ca5c6dbb2/server/src/main/java/io/crate/types/FloatVectorType.java#L55 + + On the other hand, pgvector use a comparator to apply different similarity functions as operators, + see `pgvector.sqlalchemy.Vector.comparator_factory`. + + <->: l2/euclidean_distance + <#>: max_inner_product + <=>: cosine_distance + + TODO: Discuss. + """ # noqa: E501 diff --git a/target_cratedb/tests/test_standard_target.py b/target_cratedb/tests/test_standard_target.py index 9a6f040..0275dc4 100644 --- a/target_cratedb/tests/test_standard_target.py +++ b/target_cratedb/tests/test_standard_target.py @@ -6,17 +6,18 @@ import jsonschema import pytest import sqlalchemy +import sqlalchemy as sa +from crate.client.sqlalchemy.types import ObjectTypeImpl from singer_sdk.exceptions import MissingKeyPropertiesError from singer_sdk.testing import sync_end_to_end -from sqlalchemy.dialects.postgresql import ARRAY -from sqlalchemy.types import TEXT, TIMESTAMP from target_postgres.tests.samples.aapl.aapl import Fundamentals from target_postgres.tests.samples.sample_tap_countries.countries_tap import ( SampleTapCountries, ) +from target_postgres.tests.test_target_postgres import AssertionHelper from target_cratedb.connector import CrateDBConnector -from target_cratedb.patch import polyfill_refresh_after_dml_engine +from target_cratedb.sqlalchemy.patch import polyfill_refresh_after_dml_engine from target_cratedb.target import TargetCrateDB try: @@ -25,6 +26,9 @@ from importlib_resources import files as resource_files # type: ignore[no-redef] +METADATA_COLUMN_PREFIX = "__sdc" + + @pytest.fixture(scope="session") def cratedb_config_with_ssl(): return { @@ -89,6 +93,12 @@ def create_engine(target_cratedb: TargetCrateDB) -> sqlalchemy.engine.Engine: @pytest.fixture(scope="session", autouse=True) def initialize_database(cratedb_config): delete_table_names = [ + "melty.array_boolean", + "melty.array_number", + "melty.array_string", + "melty.array_timestamp", + "melty.foo", + "melty.object_mixed", "melty.test_new_array_column", "melty.test_schema_updates", ] @@ -100,6 +110,11 @@ def initialize_database(cratedb_config): conn.exec_driver_sql("CREATE TABLE IF NOT EXISTS melty.foo (a INT);") +@pytest.fixture +def helper(cratedb_target) -> AssertionHelper: + return AssertionHelper(target=cratedb_target, metadata_column_prefix=METADATA_COLUMN_PREFIX) + + def singer_file_to_target(file_name, target) -> None: """Singer file to Target, emulates a tap run @@ -126,14 +141,6 @@ def singer_file_to_target(file_name, target) -> None: # TODO should set schemas for each tap individually so we don't collide -def remove_metadata_columns(row: dict) -> dict: - new_row = {} - for column in row.keys(): - if not column.startswith("_SDC"): - new_row[column] = row[column] - return new_row - - def test_sqlalchemy_url_config(cratedb_config): """Be sure that passing a sqlalchemy_url works @@ -272,7 +279,7 @@ def test_multiple_state_messages(cratedb_target): @pytest.mark.skip("Upserts do not work yet") -def test_relational_data(cratedb_target): +def test_relational_data(cratedb_target, helper): engine = create_engine(cratedb_target) file_name = "user_location_data.singer" singer_file_to_target(file_name, cratedb_target) @@ -296,7 +303,7 @@ def test_relational_data(cratedb_target): full_table_name = f"{schema_name}.test_users" result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name} ORDER BY id")) - result_dict = [remove_metadata_columns(row._asdict()) for row in result.all()] + result_dict = [helper.remove_metadata_columns(row._asdict()) for row in result.all()] assert result_dict == expected_test_users expected_test_locations = [ @@ -309,7 +316,7 @@ def test_relational_data(cratedb_target): full_table_name = f"{schema_name}.test_locations" result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name} ORDER BY id")) - result_dict = [remove_metadata_columns(row._asdict()) for row in result.all()] + result_dict = [helper.remove_metadata_columns(row._asdict()) for row in result.all()] assert result_dict == expected_test_locations expected_test_user_in_location = [ @@ -347,7 +354,7 @@ def test_relational_data(cratedb_target): full_table_name = f"{schema_name}.test_user_in_location" result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name} ORDER BY id")) - result_dict = [remove_metadata_columns(row._asdict()) for row in result.all()] + result_dict = [helper.remove_metadata_columns(row._asdict()) for row in result.all()] assert result_dict == expected_test_user_in_location @@ -394,6 +401,109 @@ def test_array_data(cratedb_target): singer_file_to_target(file_name, cratedb_target) +def test_array_boolean(cratedb_target, helper): + file_name = "array_boolean.singer" + singer_file_to_target(file_name, cratedb_target) + row = {"id": 1, "value": [True, False]} + helper.verify_data("array_boolean", 3, "id", row) + helper.verify_schema( + "array_boolean", + check_columns={ + "id": {"type": sa.BIGINT}, + "value": {"type": sqlalchemy.types.ARRAY}, + }, + ) + + +def test_array_float_vector(cratedb_target, helper): + pgvector_sa = pytest.importorskip("pgvector.sqlalchemy") + + file_name = "array_float_vector.singer" + singer_file_to_target(file_name, cratedb_target) + row = { + "id": 1, + "value": "[1.1,2.1,1.1,1.3]", + } + helper.verify_data("array_float_vector", 3, "id", row) + + helper.verify_schema( + "array_float_vector", + check_columns={ + "id": {"type": sa.BIGINT}, + "value": {"type": pgvector_sa.Vector}, + }, + ) + + +def test_array_number(cratedb_target, helper): + file_name = "array_number.singer" + singer_file_to_target(file_name, cratedb_target) + row = {"id": 1, "value": [42.42, 84.84, 23]} + helper.verify_data("array_number", 3, "id", row) + helper.verify_schema( + "array_number", + check_columns={ + "id": {"type": sa.BIGINT}, + "value": {"type": sqlalchemy.types.ARRAY}, + }, + ) + + +def test_array_string(cratedb_target, helper): + file_name = "array_string.singer" + singer_file_to_target(file_name, cratedb_target) + row = {"id": 1, "value": ["apple", "orange", "pear"]} + helper.verify_data("array_string", 4, "id", row) + helper.verify_schema( + "array_string", + check_columns={ + "id": {"type": sa.BIGINT}, + "value": {"type": sqlalchemy.types.ARRAY}, + }, + ) + + +def test_array_timestamp(cratedb_target, helper): + file_name = "array_timestamp.singer" + singer_file_to_target(file_name, cratedb_target) + row = {"id": 1, "value": ["2023-12-13T01:15:02", "2023-12-13T01:16:02"]} + helper.verify_data("array_timestamp", 3, "id", row) + helper.verify_schema( + "array_timestamp", + check_columns={ + "id": {"type": sa.BIGINT}, + "value": {"type": sqlalchemy.types.ARRAY}, + }, + ) + + +def test_object_mixed(cratedb_target, helper): + file_name = "object_mixed.singer" + singer_file_to_target(file_name, cratedb_target) + row = { + "id": 1, + "value": { + "string": "foo", + "integer": 42, + "float": 42.42, + "timestamp": "2023-12-13T01:15:02", + "array_boolean": [True, False], + "array_float": [42.42, 84.84], + "array_integer": [42, 84], + "array_string": ["foo", "bar"], + "nested_object": {"foo": "bar"}, + }, + } + helper.verify_data("object_mixed", 1, "id", row) + helper.verify_schema( + "object_mixed", + check_columns={ + "id": {"type": sa.BIGINT}, + "value": {"type": ObjectTypeImpl}, + }, + ) + + # TODO test that data is correct def test_encoded_string_data(cratedb_target): """ @@ -453,28 +563,28 @@ def test_anyof(cratedb_target): for column in table.c: # {"type":"string"} if column.name == "id": - assert isinstance(column.type, TEXT) + assert isinstance(column.type, sa.TEXT) # Any of nullable date-time. # Note that postgres timestamp is equivalent to jsonschema date-time. # {"anyOf":[{"type":"string","format":"date-time"},{"type":"null"}]} if column.name in {"authored_date", "committed_date"}: - assert isinstance(column.type, TIMESTAMP) + assert isinstance(column.type, sa.TIMESTAMP) # Any of nullable array of strings or single string. # {"anyOf":[{"type":"array","items":{"type":["null","string"]}},{"type":"string"},{"type":"null"}]} if column.name == "parent_ids": - assert isinstance(column.type, ARRAY) + assert isinstance(column.type, sa.ARRAY) # Any of nullable string. # {"anyOf":[{"type":"string"},{"type":"null"}]} if column.name == "commit_message": - assert isinstance(column.type, TEXT) + assert isinstance(column.type, sa.TEXT) # Any of nullable string or integer. # {"anyOf":[{"type":"string"},{"type":"integer"},{"type":"null"}]} if column.name == "legacy_id": - assert isinstance(column.type, TEXT) + assert isinstance(column.type, sa.TEXT) def test_new_array_column(cratedb_target): @@ -556,7 +666,7 @@ def test_activate_version_soft_delete(cratedb_target): assert result.rowcount == 9 result = connection.execute( - sqlalchemy.text(f'SELECT * FROM {full_table_name} where "__sdc_deleted_at" is NOT NULL') + sqlalchemy.text(f'SELECT * FROM {full_table_name} where "{METADATA_COLUMN_PREFIX}_deleted_at" is NOT NULL') ) assert result.rowcount == 2