From 3ddc35046cb4363ddee07d1e43257d5c1dfb2e14 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 14 Dec 2023 23:11:00 +0100 Subject: [PATCH] Add support for container types ARRAY, OBJECT, and FLOAT_VECTOR --- .github/workflows/main.yml | 2 +- CHANGES.md | 1 + README.md | 21 +++ pyproject.toml | 10 +- target_cratedb/__init__.py | 2 +- target_cratedb/connector.py | 80 +++++++++- target_cratedb/sqlalchemy/__init__.py | 0 target_cratedb/{ => sqlalchemy}/patch.py | 38 ++++- target_cratedb/sqlalchemy/vector.py | 157 +++++++++++++++++++ target_cratedb/tests/test_standard_target.py | 156 +++++++++++++++--- 10 files changed, 431 insertions(+), 36 deletions(-) create mode 100644 target_cratedb/sqlalchemy/__init__.py rename target_cratedb/{ => sqlalchemy}/patch.py (61%) create mode 100644 target_cratedb/sqlalchemy/vector.py diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 7ddc331..e525c9c 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -65,7 +65,7 @@ jobs: pip install "setuptools>=64" --upgrade # Install package in editable mode. - pip install --use-pep517 --prefer-binary --editable=.[test,develop] + pip install --use-pep517 --prefer-binary --editable=.[all,develop,test] - name: Run linter and software tests run: | diff --git a/CHANGES.md b/CHANGES.md index fcec18c..a868a72 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,7 @@ # Changelog for Meltano/Singer Target for CrateDB ## In progress +- Add support for container types `ARRAY`, `OBJECT`, and `FLOAT_VECTOR`. ## 2023-12-08 v0.0.1 - Make it work. It can run the canonical Meltano GitHub -> DB example. diff --git a/README.md b/README.md index b51da2f..ba78ce8 100644 --- a/README.md +++ b/README.md @@ -123,6 +123,27 @@ LIMIT ``` +## Vector Store Support + +In order to support CrateDB's vector store feature, i.e. its `FLOAT_VECTOR` +data type, you will need to install `numpy`. It has been added to an "extra" +of the Python package, called `vector`. + +When installing the package using pip, this would apply: +``` +pip install 'meltano-target-cratedb[vector]' +``` + +When installing the package using the Meltano's project definition, this +would probably the right way to write it down, but it hasn't been verified +yet. +```yaml +- name: target-cratedb + variant: cratedb + pip_url: meltano-target-cratedb[vector] +``` + + ## Development In order to work on this adapter dialect on behalf of a real pipeline definition, diff --git a/pyproject.toml b/pyproject.toml index f4930b0..d24675e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -94,10 +94,13 @@ dynamic = [ dependencies = [ "crate[sqlalchemy]", "cratedb-toolkit", - 'importlib-resources; python_version < "3.9"', - "meltanolabs-target-postgres==0.0.9", + 'importlib-resources; python_version < "3.9"', # "meltanolabs-target-postgres==0.0.9", + "meltanolabs-target-postgres@ git+https://github.com/singer-contrib/meltanolabs-target-postgres.git@pgvector", ] [project.optional-dependencies] +all = [ + "meltano-target-cratedb[vector]", +] develop = [ "black<24", "mypy==1.7.1", @@ -115,6 +118,9 @@ test = [ "pytest-cov<5", "pytest-mock<4", ] +vector = [ + "numpy", +] [project.urls] changelog = "https://github.com/crate-workbench/meltano-target-cratedb/blob/main/CHANGES.md" documentation = "https://github.com/crate-workbench/meltano-target-cratedb" diff --git a/target_cratedb/__init__.py b/target_cratedb/__init__.py index 95e24c3..8ff48b8 100644 --- a/target_cratedb/__init__.py +++ b/target_cratedb/__init__.py @@ -1,4 +1,4 @@ """Init CrateDB.""" -from target_cratedb.patch import patch_sqlalchemy +from target_cratedb.sqlalchemy.patch import patch_sqlalchemy patch_sqlalchemy() diff --git a/target_cratedb/connector.py b/target_cratedb/connector.py index 63a0745..fca4181 100644 --- a/target_cratedb/connector.py +++ b/target_cratedb/connector.py @@ -6,14 +6,18 @@ from datetime import datetime import sqlalchemy +import sqlalchemy as sa from crate.client.sqlalchemy.types import ObjectType, ObjectTypeImpl, _ObjectArray from singer_sdk import typing as th -from sqlalchemy.dialects.postgresql import ARRAY, BIGINT +from singer_sdk.helpers._typing import is_array_type, is_boolean_type, is_integer_type, is_number_type, is_object_type from sqlalchemy.types import ( + ARRAY, + BIGINT, BOOLEAN, DATE, DATETIME, DECIMAL, + FLOAT, INTEGER, TEXT, TIME, @@ -22,7 +26,8 @@ ) from target_postgres.connector import NOTYPE, PostgresConnector -from target_cratedb.patch import polyfill_refresh_after_dml_engine +from target_cratedb.sqlalchemy.patch import polyfill_refresh_after_dml_engine +from target_cratedb.sqlalchemy.vector import FloatVector class CrateDBConnector(PostgresConnector): @@ -111,8 +116,52 @@ def pick_individual_type(jsonschema_type: dict): if "object" in jsonschema_type["type"]: return ObjectType if "array" in jsonschema_type["type"]: - # TODO: Handle other inner-types as well? + # Select between different kinds of `ARRAY` data types. + # + # This currently leverages an unspecified definition for the Singer SCHEMA, + # using the `additionalProperties` attribute to convey additional type + # information, agnostic of the target database. + # + # In this case, it is about telling different kinds of `ARRAY` types apart: + # Either it is a vanilla `ARRAY`, to be stored into a `jsonb[]` type, or, + # alternatively, it can be a "vector" kind `ARRAY` of floating point + # numbers, effectively what pgvector is storing in its `VECTOR` type. + # + # Still, `type: "vector"` is only a surrogate label here, because other + # database systems may use different types for implementing the same thing, + # and need to translate accordingly. + """ + Schema override rule in `meltano.yml`: + + type: "array" + items: + type: "number" + additionalProperties: + storage: + type: "vector" + dim: 4 + + Produced schema annotation in `catalog.json`: + + {"type": "array", + "items": {"type": "number"}, + "additionalProperties": {"storage": {"type": "vector", "dim": 4}}} + """ + if "additionalProperties" in jsonschema_type and "storage" in jsonschema_type["additionalProperties"]: + storage_properties = jsonschema_type["additionalProperties"]["storage"] + if "type" in storage_properties and storage_properties["type"] == "vector": + # On PostgreSQL/pgvector, use the corresponding type definition + # from its SQLAlchemy dialect. + return FloatVector(storage_properties["dim"]) + + # Discover/translate inner types. + inner_type = resolve_array_inner_type(jsonschema_type) + if inner_type is not None: + return ARRAY(inner_type) + + # When type discovery fails, assume `TEXT`. return ARRAY(TEXT()) + if jsonschema_type.get("format") == "date-time": return TIMESTAMP() individual_type = th.to_sql_type(jsonschema_type) @@ -139,20 +188,18 @@ def pick_best_sql_type(sql_type_array: list): DATE, TIME, DECIMAL, + FLOAT, BIGINT, INTEGER, BOOLEAN, NOTYPE, ARRAY, - ObjectType, + FloatVector, + ObjectTypeImpl, ] for sql_type in precedence_order: for obj in sql_type_array: - # FIXME: Workaround. Currently, ObjectType can not be resolved back to a type? - # TypeError: isinstance() arg 2 must be a type, a tuple of types, or a union - if isinstance(sql_type, ObjectTypeImpl): - return ObjectType if isinstance(obj, sql_type): return obj return TEXT() @@ -188,6 +235,8 @@ def _get_type_sort_key( if isinstance(sql_type, _ObjectArray): return 0, _len + if isinstance(sql_type, FloatVector): + return 0, _len if isinstance(sql_type, NOTYPE): return 0, _len @@ -245,3 +294,18 @@ def prepare_schema(self, schema_name: str) -> None: Don't emit `CREATE SCHEMA` statements to CrateDB. """ pass + + +def resolve_array_inner_type(jsonschema_type: dict) -> t.Union[sa.types.TypeEngine, None]: + if "items" in jsonschema_type: + if is_boolean_type(jsonschema_type["items"]): + return BOOLEAN() + if is_number_type(jsonschema_type["items"]): + return FLOAT() + if is_integer_type(jsonschema_type["items"]): + return BIGINT() + if is_object_type(jsonschema_type["items"]): + return ObjectType() + if is_array_type(jsonschema_type["items"]): + return resolve_array_inner_type(jsonschema_type["items"]["type"]) + return None diff --git a/target_cratedb/sqlalchemy/__init__.py b/target_cratedb/sqlalchemy/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/target_cratedb/patch.py b/target_cratedb/sqlalchemy/patch.py similarity index 61% rename from target_cratedb/patch.py rename to target_cratedb/sqlalchemy/patch.py index b42ff3b..3dbba6c 100644 --- a/target_cratedb/patch.py +++ b/target_cratedb/sqlalchemy/patch.py @@ -1,21 +1,36 @@ from datetime import datetime import sqlalchemy as sa -from crate.client.sqlalchemy.dialect import TYPES_MAP, DateTime +from _decimal import Decimal +from crate.client.http import CrateJsonEncoder +from crate.client.sqlalchemy.dialect import ARRAY, TYPES_MAP, DateTime from crate.client.sqlalchemy.types import _ObjectArray from sqlalchemy.sql import sqltypes def patch_sqlalchemy(): + patch_types() + patch_json_encoder() + + +def patch_types(): """ - Register missing timestamp data type. + Register missing data types, and fix erroneous ones. TODO: Upstream to crate-python. """ - # TODO: Submit patch to `crate-python`. + TYPES_MAP["bigint"] = sqltypes.BIGINT + TYPES_MAP["bigint_array"] = ARRAY(sqltypes.BIGINT) + TYPES_MAP["long"] = sqltypes.BIGINT + TYPES_MAP["long_array"] = ARRAY(sqltypes.BIGINT) + TYPES_MAP["real"] = sqltypes.DOUBLE + TYPES_MAP["real_array"] = ARRAY(sqltypes.DOUBLE) TYPES_MAP["timestamp without time zone"] = sqltypes.TIMESTAMP TYPES_MAP["timestamp with time zone"] = sqltypes.TIMESTAMP + # TODO: Can `ARRAY` be inherited from PostgreSQL's + # `ARRAY`, to make type checking work? + def as_generic(self): return sqltypes.ARRAY @@ -36,6 +51,23 @@ def process(value): DateTime.bind_processor = bind_processor +def patch_json_encoder(): + """ + `Decimal` types have been rendered as strings. + + TODO: Upstream to crate-python. + """ + + json_encoder_default = CrateJsonEncoder.default + + def default(self, o): + if isinstance(o, Decimal): + return float(o) + return json_encoder_default(o) + + CrateJsonEncoder.default = default + + def polyfill_refresh_after_dml_engine(engine: sa.Engine): def receive_after_execute( conn: sa.engine.Connection, clauseelement, multiparams, params, execution_options, result diff --git a/target_cratedb/sqlalchemy/vector.py b/target_cratedb/sqlalchemy/vector.py new file mode 100644 index 0000000..e47f8df --- /dev/null +++ b/target_cratedb/sqlalchemy/vector.py @@ -0,0 +1,157 @@ +# TODO: Refactor to CrateDB SQLAlchemy dialect. +import typing as t + +import numpy as np +import numpy.typing as npt +import sqlalchemy as sa +from crate.client.sqlalchemy.compiler import CrateTypeCompiler +from crate.client.sqlalchemy.dialect import TYPES_MAP +from sqlalchemy.sql import sqltypes +from sqlalchemy.sql.type_api import TypeEngine + +__all__ = ["FloatVector"] + + +def from_db(value: t.Iterable) -> t.Optional[npt.ArrayLike]: + # from `pgvector.utils` + # could be ndarray if already cast by lower-level driver + if value is None or isinstance(value, np.ndarray): + return value + + return np.array(value, dtype=np.float32) + + +def to_db(value: t.Any, dim: t.Optional[int] = None) -> t.Optional[t.List]: + # from `pgvector.utils` + if value is None: + return value + + if isinstance(value, np.ndarray): + if value.ndim != 1: + raise ValueError("expected ndim to be 1") + + if not np.issubdtype(value.dtype, np.integer) and not np.issubdtype(value.dtype, np.floating): + raise ValueError("dtype must be numeric") + + value = value.tolist() + + if dim is not None and len(value) != dim: + raise ValueError("expected %d dimensions, not %d" % (dim, len(value))) + + return value + + +class FloatVector(TypeEngine[t.Sequence[t.Any]]): + """ + An improved implementation of the `FloatVector` data type for CrateDB, + compared to the previous implementation on behalf of the LangChain adapter. + + https://crate.io/docs/crate/reference/en/master/general/ddl/data-types.html#float-vector + https://crate.io/docs/crate/reference/en/master/general/builtins/scalar-functions.html#scalar-knn-match + + The previous implementation, based on SQLAlchemy's `UserDefinedType`, didn't + respect the `python_type` property on backward/reverse resolution of types. + This was observed on Meltano's database connector machinery doing a + type cast, which led to a `NotImplementedError`. + + typing.cast(type, sql_type.python_type) => NotImplementedError + + The `UserDefinedType` approach is easier to implement, because it doesn't + need compiler support. + + To get full SQLAlchemy type support, including support for forward- and + backward resolution / type casting, the custom data type should derive + from SQLAlchemy's `TypeEngine` base class instead. + + When deriving from `TypeEngine`, you will need to set the `__visit_name__` + attribute, and add a corresponding visitor method to the `CrateTypeCompiler`, + in this case, `visit_FLOAT_VECTOR`. + + Now, rendering a DDL succeeds. However, when reflecting the DDL schema back, + it doesn't work until you will establish a corresponding reverse type mapping. + + By invoking `SELECT DISTINCT(data_type) FROM information_schema.columns;`, + you will find out that the internal type name is `float_vector`, so you + announce it to the dialect using `TYPES_MAP["float_vector"] = FloatVector`. + + Still not there: `NotImplementedError: Default TypeEngine.as_generic() heuristic + method was unsuccessful for target_cratedb.sqlalchemy.vector.FloatVector. A + custom as_generic() method must be implemented for this type class.` + + So, as it signals that the type implementation also needs an `as_generic` + property, let's supply one, returning `sqltypes.ARRAY`. + + It looks like, in exchange to those improvements, the `get_col_spec` + method is not needed any longer. + + TODO: Would it be a good idea to derive from SQLAlchemy's + `ARRAY` right away, to get a few of the features without + the need to redefine them? + + Please note the outcome of this analysis and the corresponding implementation + has been derived from empirical observations, and from the feeling that we also + lack corresponding support on the other special data types of CrateDB (ARRAY and + OBJECT) within the SQLAlchemy dialect, i.e. "that something must be wrong or + incomplete". In this spirit, it is advisable to review and improve their + implementations correspondingly. + """ + + cache_ok = True + + __visit_name__ = "FLOAT_VECTOR" + + _is_array = True + + zero_indexes = False + + def __init__(self, dim: t.Optional[int] = None, as_tuple: bool = False) -> None: + self.dim = dim + self.as_tuple = as_tuple + + @property + def hashable(self): + return self.as_tuple + + @property + def python_type(self): + return list + + def as_generic(self): + return sqltypes.ARRAY + + def bind_processor(self, dialect: sa.Dialect) -> t.Callable: + def process(value: t.Iterable) -> t.Optional[t.List]: + return to_db(value, self.dim) + + return process + + def result_processor(self, dialect: sa.Dialect, coltype: t.Any) -> t.Callable: + def process(value: t.Any) -> t.Optional[npt.ArrayLike]: + return from_db(value) + + return process + + """ + CrateDB currently only supports the similarity function `VectorSimilarityFunction.EUCLIDEAN`. + -- https://github.com/crate/crate/blob/1ca5c6dbb2/server/src/main/java/io/crate/types/FloatVectorType.java#L55 + + On the other hand, pgvector use a comparator to apply different similarity functions as operators, + see `pgvector.sqlalchemy.Vector.comparator_factory`. + + <->: l2/euclidean_distance + <#>: max_inner_product + <=>: cosine_distance + + TODO: Discuss. + """ # noqa: E501 + + +# Accompanies the type definition for reverse type lookups. +TYPES_MAP["float_vector"] = FloatVector + + +def visit_FLOAT_VECTOR(self, type_, **kw): + return f"FLOAT_VECTOR({type_.dim})" + + +CrateTypeCompiler.visit_FLOAT_VECTOR = visit_FLOAT_VECTOR diff --git a/target_cratedb/tests/test_standard_target.py b/target_cratedb/tests/test_standard_target.py index 9a6f040..372927f 100644 --- a/target_cratedb/tests/test_standard_target.py +++ b/target_cratedb/tests/test_standard_target.py @@ -1,4 +1,6 @@ """ Attempt at making some standard Target Tests. """ +from __future__ import annotations + import copy import io from contextlib import redirect_stdout @@ -6,17 +8,18 @@ import jsonschema import pytest import sqlalchemy +import sqlalchemy as sa +from crate.client.sqlalchemy.types import ObjectTypeImpl from singer_sdk.exceptions import MissingKeyPropertiesError from singer_sdk.testing import sync_end_to_end -from sqlalchemy.dialects.postgresql import ARRAY -from sqlalchemy.types import TEXT, TIMESTAMP from target_postgres.tests.samples.aapl.aapl import Fundamentals from target_postgres.tests.samples.sample_tap_countries.countries_tap import ( SampleTapCountries, ) +from target_postgres.tests.test_target_postgres import AssertionHelper from target_cratedb.connector import CrateDBConnector -from target_cratedb.patch import polyfill_refresh_after_dml_engine +from target_cratedb.sqlalchemy.patch import polyfill_refresh_after_dml_engine from target_cratedb.target import TargetCrateDB try: @@ -25,6 +28,9 @@ from importlib_resources import files as resource_files # type: ignore[no-redef] +METADATA_COLUMN_PREFIX = "__sdc" + + @pytest.fixture(scope="session") def cratedb_config_with_ssl(): return { @@ -89,6 +95,14 @@ def create_engine(target_cratedb: TargetCrateDB) -> sqlalchemy.engine.Engine: @pytest.fixture(scope="session", autouse=True) def initialize_database(cratedb_config): delete_table_names = [ + "melty.array_boolean", + "melty.array_float", + "melty.array_float_vector", + "melty.array_number", + "melty.array_string", + "melty.array_timestamp", + "melty.foo", + "melty.object_mixed", "melty.test_new_array_column", "melty.test_schema_updates", ] @@ -100,6 +114,11 @@ def initialize_database(cratedb_config): conn.exec_driver_sql("CREATE TABLE IF NOT EXISTS melty.foo (a INT);") +@pytest.fixture +def helper(cratedb_target) -> AssertionHelper: + return AssertionHelper(target=cratedb_target, metadata_column_prefix=METADATA_COLUMN_PREFIX) + + def singer_file_to_target(file_name, target) -> None: """Singer file to Target, emulates a tap run @@ -126,14 +145,6 @@ def singer_file_to_target(file_name, target) -> None: # TODO should set schemas for each tap individually so we don't collide -def remove_metadata_columns(row: dict) -> dict: - new_row = {} - for column in row.keys(): - if not column.startswith("_SDC"): - new_row[column] = row[column] - return new_row - - def test_sqlalchemy_url_config(cratedb_config): """Be sure that passing a sqlalchemy_url works @@ -272,7 +283,7 @@ def test_multiple_state_messages(cratedb_target): @pytest.mark.skip("Upserts do not work yet") -def test_relational_data(cratedb_target): +def test_relational_data(cratedb_target, helper): engine = create_engine(cratedb_target) file_name = "user_location_data.singer" singer_file_to_target(file_name, cratedb_target) @@ -296,7 +307,7 @@ def test_relational_data(cratedb_target): full_table_name = f"{schema_name}.test_users" result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name} ORDER BY id")) - result_dict = [remove_metadata_columns(row._asdict()) for row in result.all()] + result_dict = [helper.remove_metadata_columns(row._asdict()) for row in result.all()] assert result_dict == expected_test_users expected_test_locations = [ @@ -309,7 +320,7 @@ def test_relational_data(cratedb_target): full_table_name = f"{schema_name}.test_locations" result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name} ORDER BY id")) - result_dict = [remove_metadata_columns(row._asdict()) for row in result.all()] + result_dict = [helper.remove_metadata_columns(row._asdict()) for row in result.all()] assert result_dict == expected_test_locations expected_test_user_in_location = [ @@ -347,7 +358,7 @@ def test_relational_data(cratedb_target): full_table_name = f"{schema_name}.test_user_in_location" result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name} ORDER BY id")) - result_dict = [remove_metadata_columns(row._asdict()) for row in result.all()] + result_dict = [helper.remove_metadata_columns(row._asdict()) for row in result.all()] assert result_dict == expected_test_user_in_location @@ -394,6 +405,109 @@ def test_array_data(cratedb_target): singer_file_to_target(file_name, cratedb_target) +def test_array_boolean(cratedb_target, helper): + file_name = "array_boolean.singer" + singer_file_to_target(file_name, cratedb_target) + row = {"id": 1, "value": [True, False]} + helper.verify_data("array_boolean", 3, "id", row) + helper.verify_schema( + "array_boolean", + check_columns={ + "id": {"type": sa.BIGINT}, + "value": {"type": sqlalchemy.types.ARRAY}, + }, + ) + + +def test_array_float_vector(cratedb_target, helper): + file_name = "array_float_vector.singer" + singer_file_to_target(file_name, cratedb_target) + row = { + "id": 1, + "value": [1.1, 2.1, 1.1, 1.3], + } + helper.verify_data("array_float_vector", 3, "id", row) + + from target_cratedb.sqlalchemy.vector import FloatVector + + helper.verify_schema( + "array_float_vector", + check_columns={ + "id": {"type": sa.BIGINT}, + "value": {"type": FloatVector}, + }, + ) + + +def test_array_number(cratedb_target, helper): + file_name = "array_number.singer" + singer_file_to_target(file_name, cratedb_target) + row = {"id": 1, "value": [42.42, 84.84, 23]} + helper.verify_data("array_number", 3, "id", row) + helper.verify_schema( + "array_number", + check_columns={ + "id": {"type": sa.BIGINT}, + "value": {"type": sqlalchemy.types.ARRAY}, + }, + ) + + +def test_array_string(cratedb_target, helper): + file_name = "array_string.singer" + singer_file_to_target(file_name, cratedb_target) + row = {"id": 1, "value": ["apple", "orange", "pear"]} + helper.verify_data("array_string", 4, "id", row) + helper.verify_schema( + "array_string", + check_columns={ + "id": {"type": sa.BIGINT}, + "value": {"type": sqlalchemy.types.ARRAY}, + }, + ) + + +def test_array_timestamp(cratedb_target, helper): + file_name = "array_timestamp.singer" + singer_file_to_target(file_name, cratedb_target) + row = {"id": 1, "value": ["2023-12-13T01:15:02", "2023-12-13T01:16:02"]} + helper.verify_data("array_timestamp", 3, "id", row) + helper.verify_schema( + "array_timestamp", + check_columns={ + "id": {"type": sa.BIGINT}, + "value": {"type": sqlalchemy.types.ARRAY}, + }, + ) + + +def test_object_mixed(cratedb_target, helper): + file_name = "object_mixed.singer" + singer_file_to_target(file_name, cratedb_target) + row = { + "id": 1, + "value": { + "string": "foo", + "integer": 42, + "float": 42.42, + "timestamp": "2023-12-13T01:15:02", + "array_boolean": [True, False], + "array_float": [42.42, 84.84], + "array_integer": [42, 84], + "array_string": ["foo", "bar"], + "nested_object": {"foo": "bar"}, + }, + } + helper.verify_data("object_mixed", 1, "id", row) + helper.verify_schema( + "object_mixed", + check_columns={ + "id": {"type": sa.BIGINT}, + "value": {"type": ObjectTypeImpl}, + }, + ) + + # TODO test that data is correct def test_encoded_string_data(cratedb_target): """ @@ -453,28 +567,28 @@ def test_anyof(cratedb_target): for column in table.c: # {"type":"string"} if column.name == "id": - assert isinstance(column.type, TEXT) + assert isinstance(column.type, sa.TEXT) # Any of nullable date-time. # Note that postgres timestamp is equivalent to jsonschema date-time. # {"anyOf":[{"type":"string","format":"date-time"},{"type":"null"}]} if column.name in {"authored_date", "committed_date"}: - assert isinstance(column.type, TIMESTAMP) + assert isinstance(column.type, sa.TIMESTAMP) # Any of nullable array of strings or single string. # {"anyOf":[{"type":"array","items":{"type":["null","string"]}},{"type":"string"},{"type":"null"}]} if column.name == "parent_ids": - assert isinstance(column.type, ARRAY) + assert isinstance(column.type, sa.ARRAY) # Any of nullable string. # {"anyOf":[{"type":"string"},{"type":"null"}]} if column.name == "commit_message": - assert isinstance(column.type, TEXT) + assert isinstance(column.type, sa.TEXT) # Any of nullable string or integer. # {"anyOf":[{"type":"string"},{"type":"integer"},{"type":"null"}]} if column.name == "legacy_id": - assert isinstance(column.type, TEXT) + assert isinstance(column.type, sa.TEXT) def test_new_array_column(cratedb_target): @@ -556,7 +670,7 @@ def test_activate_version_soft_delete(cratedb_target): assert result.rowcount == 9 result = connection.execute( - sqlalchemy.text(f'SELECT * FROM {full_table_name} where "__sdc_deleted_at" is NOT NULL') + sqlalchemy.text(f'SELECT * FROM {full_table_name} where "{METADATA_COLUMN_PREFIX}_deleted_at" is NOT NULL') ) assert result.rowcount == 2