From fe92e4567c19dc9ce6e94b2d92b2557c619c89e9 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Wed, 13 Dec 2023 02:08:25 +0100 Subject: [PATCH 1/9] chore: Add `.idea` to `.gitignore` --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 56313116..4f8c62d2 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ smoke-test .meltano/** .tox/** .secrets/** +.idea .vscode/** output/** .env From c0d4a8901c2d71fbeb2bd8cd888113df409f74ca Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Wed, 13 Dec 2023 02:07:57 +0100 Subject: [PATCH 2/9] test: Add test cases for arrays and objects In PostgreSQL, all boils down to the `jsonb[]` type, but arrays are reflected as `sqlalchemy.dialects.postgresql.ARRAY` instead of `sqlalchemy.dialects.postgresql.JSONB`. In order to prepare for more advanced type mangling & validation, and to better support databases pretending to be compatible with PostgreSQL, the new test cases exercise arrays with different kinds of inner values, because, on other databases, ARRAYs may need to have uniform content. Along the lines, it adds a `verify_schema` utility function in the spirit of the `verify_data` function, refactored and generalized from the `test_anyof` test case. --- .../tests/data_files/array_boolean.singer | 5 + .../tests/data_files/array_data.singer | 6 - .../tests/data_files/array_number.singer | 5 + .../tests/data_files/array_string.singer | 6 + .../tests/data_files/array_timestamp.singer | 5 + .../tests/data_files/object_mixed.singer | 3 + target_postgres/tests/test_target_postgres.py | 167 +++++++++++++++--- 7 files changed, 162 insertions(+), 35 deletions(-) create mode 100644 target_postgres/tests/data_files/array_boolean.singer delete mode 100644 target_postgres/tests/data_files/array_data.singer create mode 100644 target_postgres/tests/data_files/array_number.singer create mode 100644 target_postgres/tests/data_files/array_string.singer create mode 100644 target_postgres/tests/data_files/array_timestamp.singer create mode 100644 target_postgres/tests/data_files/object_mixed.singer diff --git a/target_postgres/tests/data_files/array_boolean.singer b/target_postgres/tests/data_files/array_boolean.singer new file mode 100644 index 00000000..268a64a0 --- /dev/null +++ b/target_postgres/tests/data_files/array_boolean.singer @@ -0,0 +1,5 @@ +{"type": "SCHEMA", "stream": "array_boolean", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "array", "items": {"type": "boolean"}}}}} +{"type": "RECORD", "stream": "array_boolean", "record": {"id": 1, "value": [ true, false ]}} +{"type": "RECORD", "stream": "array_boolean", "record": {"id": 2, "value": [ false ]}} +{"type": "RECORD", "stream": "array_boolean", "record": {"id": 3, "value": [ false, true, true, false ]}} +{"type": "STATE", "value": {"array_boolean": 3}} diff --git a/target_postgres/tests/data_files/array_data.singer b/target_postgres/tests/data_files/array_data.singer deleted file mode 100644 index 0d132ac6..00000000 --- a/target_postgres/tests/data_files/array_data.singer +++ /dev/null @@ -1,6 +0,0 @@ -{"type": "SCHEMA", "stream": "test_carts", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "fruits": {"type": "array","items": {"type": "string"}}}}} -{"type": "RECORD", "stream": "test_carts", "record": {"id": 1, "fruits": [ "apple", "orange", "pear" ]}} -{"type": "RECORD", "stream": "test_carts", "record": {"id": 2, "fruits": [ "banana", "apple" ]}} -{"type": "RECORD", "stream": "test_carts", "record": {"id": 3, "fruits": [ "pear" ]}} -{"type": "RECORD", "stream": "test_carts", "record": {"id": 4, "fruits": [ "orange", "banana", "apple", "pear" ]}} -{"type": "STATE", "value": {"test_carts": 4}} diff --git a/target_postgres/tests/data_files/array_number.singer b/target_postgres/tests/data_files/array_number.singer new file mode 100644 index 00000000..4eac276e --- /dev/null +++ b/target_postgres/tests/data_files/array_number.singer @@ -0,0 +1,5 @@ +{"type": "SCHEMA", "stream": "array_number", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "array", "items": {"type": "number"}}}}} +{"type": "RECORD", "stream": "array_number", "record": {"id": 1, "value": [ 42.42, 84.84, 23 ]}} +{"type": "RECORD", "stream": "array_number", "record": {"id": 2, "value": [ 1.0 ]}} +{"type": "RECORD", "stream": "array_number", "record": {"id": 3, "value": [ 1.11, 2.22, 3, 4, 5.55 ]}} +{"type": "STATE", "value": {"array_number": 3}} diff --git a/target_postgres/tests/data_files/array_string.singer b/target_postgres/tests/data_files/array_string.singer new file mode 100644 index 00000000..f14e7870 --- /dev/null +++ b/target_postgres/tests/data_files/array_string.singer @@ -0,0 +1,6 @@ +{"type": "SCHEMA", "stream": "array_string", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "array","items": {"type": "string"}}}}} +{"type": "RECORD", "stream": "array_string", "record": {"id": 1, "value": [ "apple", "orange", "pear" ]}} +{"type": "RECORD", "stream": "array_string", "record": {"id": 2, "value": [ "banana", "apple" ]}} +{"type": "RECORD", "stream": "array_string", "record": {"id": 3, "value": [ "pear" ]}} +{"type": "RECORD", "stream": "array_string", "record": {"id": 4, "value": [ "orange", "banana", "apple", "pear" ]}} +{"type": "STATE", "value": {"array_string": 4}} diff --git a/target_postgres/tests/data_files/array_timestamp.singer b/target_postgres/tests/data_files/array_timestamp.singer new file mode 100644 index 00000000..e5192cec --- /dev/null +++ b/target_postgres/tests/data_files/array_timestamp.singer @@ -0,0 +1,5 @@ +{"type": "SCHEMA", "stream": "array_timestamp", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "array", "items": {"type": "string", "format": "date-time"}}}}} +{"type": "RECORD", "stream": "array_timestamp", "record": {"id": 1, "value": [ "2023-12-13T01:15:02", "2023-12-13T01:16:02" ]}} +{"type": "RECORD", "stream": "array_timestamp", "record": {"id": 2, "value": [ "2023-12-13T01:15:02" ]}} +{"type": "RECORD", "stream": "array_timestamp", "record": {"id": 3, "value": [ "2023-12-13T01:15:02", "2023-12-13T01:16:02", "2023-12-13T01:17:02" ]}} +{"type": "STATE", "value": {"array_timestamp": 3}} diff --git a/target_postgres/tests/data_files/object_mixed.singer b/target_postgres/tests/data_files/object_mixed.singer new file mode 100644 index 00000000..2ed86261 --- /dev/null +++ b/target_postgres/tests/data_files/object_mixed.singer @@ -0,0 +1,3 @@ +{"type": "SCHEMA", "stream": "object_mixed", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "object"}}}} +{"type": "RECORD", "stream": "object_mixed", "record": {"id": 1, "value": {"string": "foo", "integer": 42, "float": 42.42, "timestamp": "2023-12-13T01:15:02", "array_boolean": [true, false], "array_float": [42.42, 84.84], "array_integer": [42, 84], "array_string": ["foo", "bar"], "nested_object": {"foo": "bar"}}}} +{"type": "STATE", "value": {"object_mixed": 1}} diff --git a/target_postgres/tests/test_target_postgres.py b/target_postgres/tests/test_target_postgres.py index 1eaa9978..ab4cd11d 100644 --- a/target_postgres/tests/test_target_postgres.py +++ b/target_postgres/tests/test_target_postgres.py @@ -11,8 +11,8 @@ import sqlalchemy from singer_sdk.exceptions import MissingKeyPropertiesError from singer_sdk.testing import get_target_test_class, sync_end_to_end -from sqlalchemy.dialects.postgresql import ARRAY -from sqlalchemy.types import TEXT, TIMESTAMP +from sqlalchemy.dialects.postgresql import ARRAY, JSONB +from sqlalchemy.types import BIGINT, TEXT, TIMESTAMP from target_postgres.connector import PostgresConnector from target_postgres.target import TargetPostgres @@ -94,7 +94,7 @@ def verify_data( Args: target: The target to obtain a database connection from. - full_table_name: The schema and table name of the table to check data for. + table_name: The schema and table name of the table to check data for. primary_key: The primary key of the table. number_of_rows: The expected number of rows that should be in the table. check_data: A dictionary representing the full contents of the first row in the @@ -134,6 +134,43 @@ def verify_data( assert result.first()[0] == number_of_rows +def verify_schema( + target: TargetPostgres, + table_name: str, + check_columns: dict = None, +): + """Checks whether the schema of a database table matches the provided column definitions. + + Args: + target: The target to obtain a database connection from. + table_name: The schema and table name of the table to check data for. + check_columns: A dictionary mapping column names to their definitions. Currently, + it is all about the `type` attribute which is compared. + """ + engine = create_engine(target) + schema = target.config["default_target_schema"] + with engine.connect() as connection: + meta = sqlalchemy.MetaData() + table = sqlalchemy.Table( + table_name, meta, schema=schema, autoload_with=connection + ) + for column in table.c: + # Ignore `_sdc` columns for now. + if column.name.startswith("_sdc"): + continue + try: + column_type_expected = check_columns[column.name]["type"] + except KeyError: + raise ValueError( + f"Invalid check_columns - missing definition for column: {column.name}" + ) + if not isinstance(column.type, column_type_expected): + raise TypeError( + f"Column '{column.name}' (with type '{column.type}') " + f"does not match expected type: {column_type_expected}" + ) + + def test_sqlalchemy_url_config(postgres_config_no_ssl): """Be sure that passing a sqlalchemy_url works @@ -406,11 +443,92 @@ def test_duplicate_records(postgres_target): verify_data(postgres_target, "test_duplicate_records", 2, "id", row) -def test_array_data(postgres_target): - file_name = "array_data.singer" +def test_array_boolean(postgres_target): + file_name = "array_boolean.singer" + singer_file_to_target(file_name, postgres_target) + row = {"id": 1, "value": [True, False]} + verify_data(postgres_target, "array_boolean", 3, "id", row) + verify_schema( + postgres_target, + "array_boolean", + check_columns={ + "id": {"type": BIGINT}, + "value": {"type": ARRAY}, + }, + ) + + +def test_array_number(postgres_target): + file_name = "array_number.singer" + singer_file_to_target(file_name, postgres_target) + row = {"id": 1, "value": [Decimal("42.42"), Decimal("84.84"), 23]} + verify_data(postgres_target, "array_number", 3, "id", row) + verify_schema( + postgres_target, + "array_number", + check_columns={ + "id": {"type": BIGINT}, + "value": {"type": ARRAY}, + }, + ) + + +def test_array_string(postgres_target): + file_name = "array_string.singer" + singer_file_to_target(file_name, postgres_target) + row = {"id": 1, "value": ["apple", "orange", "pear"]} + verify_data(postgres_target, "array_string", 4, "id", row) + verify_schema( + postgres_target, + "array_string", + check_columns={ + "id": {"type": BIGINT}, + "value": {"type": ARRAY}, + }, + ) + + +def test_array_timestamp(postgres_target): + file_name = "array_timestamp.singer" + singer_file_to_target(file_name, postgres_target) + row = {"id": 1, "value": ["2023-12-13T01:15:02", "2023-12-13T01:16:02"]} + verify_data(postgres_target, "array_timestamp", 3, "id", row) + verify_schema( + postgres_target, + "array_timestamp", + check_columns={ + "id": {"type": BIGINT}, + "value": {"type": ARRAY}, + }, + ) + + +def test_object_mixed(postgres_target): + file_name = "object_mixed.singer" singer_file_to_target(file_name, postgres_target) - row = {"id": 1, "fruits": ["apple", "orange", "pear"]} - verify_data(postgres_target, "test_carts", 4, "id", row) + row = { + "id": 1, + "value": { + "string": "foo", + "integer": 42, + "float": Decimal("42.42"), + "timestamp": "2023-12-13T01:15:02", + "array_boolean": [True, False], + "array_float": [Decimal("42.42"), Decimal("84.84")], + "array_integer": [42, 84], + "array_string": ["foo", "bar"], + "nested_object": {"foo": "bar"}, + }, + } + verify_data(postgres_target, "object_mixed", 1, "id", row) + verify_schema( + postgres_target, + "object_mixed", + check_columns={ + "id": {"type": BIGINT}, + "value": {"type": JSONB}, + }, + ) def test_encoded_string_data(postgres_target): @@ -456,41 +574,32 @@ def test_large_int(postgres_target): def test_anyof(postgres_target): """Test that anyOf is handled correctly""" - engine = create_engine(postgres_target) table_name = "commits" file_name = f"{table_name}.singer" - schema = postgres_target.config["default_target_schema"] singer_file_to_target(file_name, postgres_target) - with engine.connect() as connection: - meta = sqlalchemy.MetaData() - table = sqlalchemy.Table( - "commits", meta, schema=schema, autoload_with=connection - ) - for column in table.c: - # {"type":"string"} - if column.name == "id": - assert isinstance(column.type, TEXT) + verify_schema( + postgres_target, + table_name, + check_columns={ + # {"type":"string"} + "id": {"type": TEXT}, # Any of nullable date-time. # Note that postgres timestamp is equivalent to jsonschema date-time. # {"anyOf":[{"type":"string","format":"date-time"},{"type":"null"}]} - if column.name in {"authored_date", "committed_date"}: - assert isinstance(column.type, TIMESTAMP) - + "authored_date": {"type": TIMESTAMP}, + "committed_date": {"type": TIMESTAMP}, # Any of nullable array of strings or single string. # {"anyOf":[{"type":"array","items":{"type":["null","string"]}},{"type":"string"},{"type":"null"}]} - if column.name == "parent_ids": - assert isinstance(column.type, ARRAY) - + "parent_ids": {"type": ARRAY}, # Any of nullable string. # {"anyOf":[{"type":"string"},{"type":"null"}]} - if column.name == "commit_message": - assert isinstance(column.type, TEXT) - + "commit_message": {"type": TEXT}, # Any of nullable string or integer. # {"anyOf":[{"type":"string"},{"type":"integer"},{"type":"null"}]} - if column.name == "legacy_id": - assert isinstance(column.type, TEXT) + "legacy_id": {"type": TEXT}, + }, + ) def test_new_array_column(postgres_target): From 8b3ea4f55c3d3e69df5f7db7c9d95fce95317308 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Tue, 19 Dec 2023 19:05:45 +0100 Subject: [PATCH 3/9] test: Fix `FATAL: sorry, too many clients already` Dispose the SQLAlchemy engine object after use within test utility functions. --- target_postgres/tests/test_target_postgres.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/target_postgres/tests/test_target_postgres.py b/target_postgres/tests/test_target_postgres.py index ab4cd11d..f676f8f1 100644 --- a/target_postgres/tests/test_target_postgres.py +++ b/target_postgres/tests/test_target_postgres.py @@ -132,6 +132,7 @@ def verify_data( sqlalchemy.text(f"SELECT COUNT(*) FROM {full_table_name}") ) assert result.first()[0] == number_of_rows + engine.dispose() def verify_schema( @@ -169,6 +170,7 @@ def verify_schema( f"Column '{column.name}' (with type '{column.type}') " f"does not match expected type: {column_type_expected}" ) + engine.dispose() def test_sqlalchemy_url_config(postgres_config_no_ssl): From a9d179694aa8b51290928865da7f267ed27e44cc Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Tue, 19 Dec 2023 20:10:36 +0100 Subject: [PATCH 4/9] test: Fix `FATAL: sorry, too many clients already` Within `BasePostgresSDKTests`, new database connections via SQLAlchemy haven't been closed, and started filling up the connection pool, eventually saturating it. --- target_postgres/tests/test_sdk.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/target_postgres/tests/test_sdk.py b/target_postgres/tests/test_sdk.py index 3f95c393..5d4207ad 100644 --- a/target_postgres/tests/test_sdk.py +++ b/target_postgres/tests/test_sdk.py @@ -61,7 +61,9 @@ class BasePostgresSDKTests: @pytest.fixture() def connection(self, runner): engine = create_engine(runner) - return engine.connect() + with engine.connect() as connection: + yield connection + engine.dispose() SDKTests = get_target_test_class( From 723e1fa699b188a3d8e3ccd5589e6f64cfacdea3 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Tue, 19 Dec 2023 21:31:43 +0100 Subject: [PATCH 5/9] test: Fix `FATAL: sorry, too many clients already` Dispose the SQLAlchemy engine object after use within `PostgresConnector`. --- target_postgres/connector.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/target_postgres/connector.py b/target_postgres/connector.py index d6730539..369eb462 100644 --- a/target_postgres/connector.py +++ b/target_postgres/connector.py @@ -180,8 +180,10 @@ def copy_table_structure( @contextmanager def _connect(self) -> t.Iterator[sqlalchemy.engine.Connection]: - with self._engine.connect().execution_options() as conn: + engine = self._engine + with engine.connect().execution_options() as conn: yield conn + engine.dispose() def drop_table( self, table: sqlalchemy.Table, connection: sqlalchemy.engine.Connection From 9e2f6dba372c115064f598ad5183a9c062bedcd3 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 14 Dec 2023 21:23:20 +0100 Subject: [PATCH 6/9] chore: Fix parameter names in docstrings --- target_postgres/connector.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/target_postgres/connector.py b/target_postgres/connector.py index 369eb462..59314d60 100644 --- a/target_postgres/connector.py +++ b/target_postgres/connector.py @@ -332,7 +332,7 @@ def create_empty_table( # type: ignore[override] """Create an empty target table. Args: - full_table_name: the target table name. + table_name: the target table name. schema: the JSON schema for the new table. primary_keys: list of key properties. partition_keys: list of partition keys. @@ -427,7 +427,7 @@ def _create_empty_column( # type: ignore[override] """Create a new column. Args: - full_table_name: The target table name. + table_name: The target table name. column_name: The name of the new column. sql_type: SQLAlchemy type engine to be used in creating the new column. @@ -491,7 +491,7 @@ def _adapt_column_type( # type: ignore[override] """Adapt table column type to support the new JSON schema type. Args: - full_table_name: The target table name. + table_name: The target table name. column_name: The target column name. sql_type: The new SQLAlchemy type. @@ -722,7 +722,7 @@ def _get_column_type( # type: ignore[override] """Get the SQL type of the declared column. Args: - full_table_name: The name of the table. + table_name: The name of the table. column_name: The name of the column. Returns: From 86bb083720bf05dc6b499d3d7ec3830e3eea5a0a Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 14 Dec 2023 21:27:48 +0100 Subject: [PATCH 7/9] test: Refactor utility functions `verify_data` and `verify_schema` By wrapping them into a container class `AssertionHelper`, it is easy to parameterize them, and to provide them to the test functions using a pytest fixture. This way, they are reusable from database adapter implementations which derive from PostgreSQL. The motivation for this is because the metadata column prefix `_sdc` needs to be adjusted for other database systems, as they reject such columns, being reserved for system purposes. In the specific case of CrateDB, it is enough to rename it like `__sdc`. Sad but true. --- target_postgres/tests/test_target_postgres.py | 274 +++++++++--------- 1 file changed, 141 insertions(+), 133 deletions(-) diff --git a/target_postgres/tests/test_target_postgres.py b/target_postgres/tests/test_target_postgres.py index f676f8f1..2f7462a8 100644 --- a/target_postgres/tests/test_target_postgres.py +++ b/target_postgres/tests/test_target_postgres.py @@ -28,6 +28,8 @@ postgres_config_ssh_tunnel, ) +METADATA_COLUMN_PREFIX = "_sdc" + # The below syntax is documented at https://docs.pytest.org/en/stable/deprecations.html#calling-fixtures-directly @pytest.fixture(scope="session", name="postgres_config") @@ -75,102 +77,114 @@ def singer_file_to_target(file_name, target) -> None: # TODO should set schemas for each tap individually so we don't collide -def remove_metadata_columns(row: dict) -> dict: - new_row = {} - for column in row.keys(): - if not column.startswith("_sdc"): - new_row[column] = row[column] - return new_row - - -def verify_data( - target: TargetPostgres, - table_name: str, - number_of_rows: int = 1, - primary_key: str | None = None, - check_data: dict | list[dict] | None = None, -): - """Checks whether the data in a table matches a provided data sample. - - Args: - target: The target to obtain a database connection from. - table_name: The schema and table name of the table to check data for. - primary_key: The primary key of the table. - number_of_rows: The expected number of rows that should be in the table. - check_data: A dictionary representing the full contents of the first row in the - table, as determined by lowest primary_key value, or else a list of - dictionaries representing every row in the table. - """ - engine = create_engine(target) - full_table_name = f"{target.config['default_target_schema']}.{table_name}" - with engine.connect() as connection: - if primary_key is not None and check_data is not None: - if isinstance(check_data, dict): - result = connection.execute( - sqlalchemy.text( - f"SELECT * FROM {full_table_name} ORDER BY {primary_key}" +class AssertionHelper: + def __init__(self, target: TargetPostgres, metadata_column_prefix: str): + self.target = target + self.metadata_column_prefix = metadata_column_prefix + + def remove_metadata_columns(self, row: dict) -> dict: + new_row = {} + for column in row.keys(): + if not column.startswith(self.metadata_column_prefix): + new_row[column] = row[column] + return new_row + + def verify_data( + self, + table_name: str, + number_of_rows: int = 1, + primary_key: str | None = None, + check_data: dict | list[dict] | None = None, + ): + """Checks whether the data in a table matches a provided data sample. + + Args: + target: The target to obtain a database connection from. + table_name: The schema and table name of the table to check data for. + primary_key: The primary key of the table. + number_of_rows: The expected number of rows that should be in the table. + check_data: A dictionary representing the full contents of the first row in the + table, as determined by lowest primary_key value, or else a list of + dictionaries representing every row in the table. + """ + engine = create_engine(self.target) + full_table_name = f"{self.target.config['default_target_schema']}.{table_name}" + with engine.connect() as connection: + if primary_key is not None and check_data is not None: + if isinstance(check_data, dict): + result = connection.execute( + sqlalchemy.text( + f"SELECT * FROM {full_table_name} ORDER BY {primary_key}" + ) ) - ) - assert result.rowcount == number_of_rows - result_dict = remove_metadata_columns(result.first()._asdict()) - assert result_dict == check_data - elif isinstance(check_data, list): - result = connection.execute( - sqlalchemy.text( - f"SELECT * FROM {full_table_name} ORDER BY {primary_key}" + assert result.rowcount == number_of_rows + result_dict = self.remove_metadata_columns(result.first()._asdict()) + assert result_dict == check_data + elif isinstance(check_data, list): + result = connection.execute( + sqlalchemy.text( + f"SELECT * FROM {full_table_name} ORDER BY {primary_key}" + ) ) - ) - assert result.rowcount == number_of_rows - result_dict = [ - remove_metadata_columns(row._asdict()) for row in result.all() - ] - assert result_dict == check_data + assert result.rowcount == number_of_rows + result_dict = [ + self.remove_metadata_columns(row._asdict()) + for row in result.all() + ] + assert result_dict == check_data + else: + raise ValueError("Invalid check_data - not dict or list of dicts") else: - raise ValueError("Invalid check_data - not dict or list of dicts") - else: - result = connection.execute( - sqlalchemy.text(f"SELECT COUNT(*) FROM {full_table_name}") + result = connection.execute( + sqlalchemy.text(f"SELECT COUNT(*) FROM {full_table_name}") + ) + assert result.first()[0] == number_of_rows + engine.dispose() + + def verify_schema( + self, + table_name: str, + check_columns: dict = None, + ): + """Checks whether the schema of a database table matches the provided column definitions. + + Args: + target: The target to obtain a database connection from. + table_name: The schema and table name of the table to check data for. + check_columns: A dictionary mapping column names to their definitions. Currently, + it is all about the `type` attribute which is compared. + metadata_column_prefix: The prefix string for metadata columns. Usually `_sdc`. + """ + engine = create_engine(self.target) + schema = self.target.config["default_target_schema"] + with engine.connect() as connection: + meta = sqlalchemy.MetaData() + table = sqlalchemy.Table( + table_name, meta, schema=schema, autoload_with=connection ) - assert result.first()[0] == number_of_rows - engine.dispose() - + for column in table.c: + # Ignore `_sdc` metadata columns when verifying table schema. + if column.name.startswith(self.metadata_column_prefix): + continue + try: + column_type_expected = check_columns[column.name]["type"] + except KeyError: + raise ValueError( + f"Invalid check_columns - missing definition for column: {column.name}" + ) + if not isinstance(column.type, column_type_expected): + raise TypeError( + f"Column '{column.name}' (with type '{column.type}') " + f"does not match expected type: {column_type_expected}" + ) + engine.dispose() -def verify_schema( - target: TargetPostgres, - table_name: str, - check_columns: dict = None, -): - """Checks whether the schema of a database table matches the provided column definitions. - Args: - target: The target to obtain a database connection from. - table_name: The schema and table name of the table to check data for. - check_columns: A dictionary mapping column names to their definitions. Currently, - it is all about the `type` attribute which is compared. - """ - engine = create_engine(target) - schema = target.config["default_target_schema"] - with engine.connect() as connection: - meta = sqlalchemy.MetaData() - table = sqlalchemy.Table( - table_name, meta, schema=schema, autoload_with=connection - ) - for column in table.c: - # Ignore `_sdc` columns for now. - if column.name.startswith("_sdc"): - continue - try: - column_type_expected = check_columns[column.name]["type"] - except KeyError: - raise ValueError( - f"Invalid check_columns - missing definition for column: {column.name}" - ) - if not isinstance(column.type, column_type_expected): - raise TypeError( - f"Column '{column.name}' (with type '{column.type}') " - f"does not match expected type: {column_type_expected}" - ) - engine.dispose() +@pytest.fixture +def helper(postgres_target) -> AssertionHelper: + return AssertionHelper( + target=postgres_target, metadata_column_prefix=METADATA_COLUMN_PREFIX + ) def test_sqlalchemy_url_config(postgres_config_no_ssl): @@ -287,11 +301,11 @@ def test_special_chars_in_attributes(postgres_target): singer_file_to_target(file_name, postgres_target) -def test_optional_attributes(postgres_target): +def test_optional_attributes(postgres_target, helper): file_name = "optional_attributes.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "optional": "This is optional"} - verify_data(postgres_target, "test_optional_attributes", 4, "id", row) + helper.verify_data("test_optional_attributes", 4, "id", row) def test_schema_no_properties(postgres_target): @@ -311,7 +325,7 @@ def test_large_numeric_primary_key(postgres_target): # TODO test that data is correct -def test_schema_updates(postgres_target): +def test_schema_updates(postgres_target, helper): file_name = "schema_updates.singer" singer_file_to_target(file_name, postgres_target) row = { @@ -323,16 +337,16 @@ def test_schema_updates(postgres_target): "a5": None, "a6": None, } - verify_data(postgres_target, "test_schema_updates", 6, "id", row) + helper.verify_data("test_schema_updates", 6, "id", row) -def test_multiple_state_messages(postgres_target): +def test_multiple_state_messages(postgres_target, helper): file_name = "multiple_state_messages.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "metric": 100} - verify_data(postgres_target, "test_multiple_state_messages_a", 6, "id", row) + helper.verify_data("test_multiple_state_messages_a", 6, "id", row) row = {"id": 1, "metric": 110} - verify_data(postgres_target, "test_multiple_state_messages_b", 6, "id", row) + helper.verify_data("test_multiple_state_messages_b", 6, "id", row) # TODO test that data is correct @@ -349,7 +363,7 @@ def test_multiple_schema_messages(postgres_target, caplog): assert "Schema has changed for stream" not in caplog.text -def test_relational_data(postgres_target): +def test_relational_data(postgres_target, helper): file_name = "user_location_data.singer" singer_file_to_target(file_name, postgres_target) @@ -406,12 +420,12 @@ def test_relational_data(postgres_target): }, ] - verify_data(postgres_target, "test_users", 8, "id", users) - verify_data(postgres_target, "test_locations", 5, "id", locations) - verify_data(postgres_target, "test_user_in_location", 5, "id", user_in_location) + helper.verify_data("test_users", 8, "id", users) + helper.verify_data("test_locations", 5, "id", locations) + helper.verify_data("test_user_in_location", 5, "id", user_in_location) -def test_no_primary_keys(postgres_target): +def test_no_primary_keys(postgres_target, helper): """We run both of these tests twice just to ensure that no records are removed and append only works properly""" engine = create_engine(postgres_target) table_name = "test_no_pk" @@ -430,7 +444,7 @@ def test_no_primary_keys(postgres_target): file_name = f"{table_name}_append.singer" singer_file_to_target(file_name, postgres_target) - verify_data(postgres_target, table_name, 16) + helper.verify_data(table_name, 16) def test_no_type(postgres_target): @@ -438,20 +452,19 @@ def test_no_type(postgres_target): singer_file_to_target(file_name, postgres_target) -def test_duplicate_records(postgres_target): +def test_duplicate_records(postgres_target, helper): file_name = "duplicate_records.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "metric": 100} - verify_data(postgres_target, "test_duplicate_records", 2, "id", row) + helper.verify_data("test_duplicate_records", 2, "id", row) -def test_array_boolean(postgres_target): +def test_array_boolean(postgres_target, helper): file_name = "array_boolean.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "value": [True, False]} - verify_data(postgres_target, "array_boolean", 3, "id", row) - verify_schema( - postgres_target, + helper.verify_data("array_boolean", 3, "id", row) + helper.verify_schema( "array_boolean", check_columns={ "id": {"type": BIGINT}, @@ -460,13 +473,12 @@ def test_array_boolean(postgres_target): ) -def test_array_number(postgres_target): +def test_array_number(postgres_target, helper): file_name = "array_number.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "value": [Decimal("42.42"), Decimal("84.84"), 23]} - verify_data(postgres_target, "array_number", 3, "id", row) - verify_schema( - postgres_target, + helper.verify_data("array_number", 3, "id", row) + helper.verify_schema( "array_number", check_columns={ "id": {"type": BIGINT}, @@ -475,13 +487,12 @@ def test_array_number(postgres_target): ) -def test_array_string(postgres_target): +def test_array_string(postgres_target, helper): file_name = "array_string.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "value": ["apple", "orange", "pear"]} - verify_data(postgres_target, "array_string", 4, "id", row) - verify_schema( - postgres_target, + helper.verify_data("array_string", 4, "id", row) + helper.verify_schema( "array_string", check_columns={ "id": {"type": BIGINT}, @@ -490,13 +501,12 @@ def test_array_string(postgres_target): ) -def test_array_timestamp(postgres_target): +def test_array_timestamp(postgres_target, helper): file_name = "array_timestamp.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "value": ["2023-12-13T01:15:02", "2023-12-13T01:16:02"]} - verify_data(postgres_target, "array_timestamp", 3, "id", row) - verify_schema( - postgres_target, + helper.verify_data("array_timestamp", 3, "id", row) + helper.verify_schema( "array_timestamp", check_columns={ "id": {"type": BIGINT}, @@ -505,7 +515,7 @@ def test_array_timestamp(postgres_target): ) -def test_object_mixed(postgres_target): +def test_object_mixed(postgres_target, helper): file_name = "object_mixed.singer" singer_file_to_target(file_name, postgres_target) row = { @@ -522,9 +532,8 @@ def test_object_mixed(postgres_target): "nested_object": {"foo": "bar"}, }, } - verify_data(postgres_target, "object_mixed", 1, "id", row) - verify_schema( - postgres_target, + helper.verify_data("object_mixed", 1, "id", row) + helper.verify_schema( "object_mixed", check_columns={ "id": {"type": BIGINT}, @@ -533,7 +542,7 @@ def test_object_mixed(postgres_target): ) -def test_encoded_string_data(postgres_target): +def test_encoded_string_data(postgres_target, helper): """ We removed NUL characters from the original encoded_strings.singer as postgres doesn't allow them. https://www.postgresql.org/docs/current/functions-string.html#:~:text=chr(0)%20is%20disallowed%20because%20text%20data%20types%20cannot%20store%20that%20character. @@ -546,11 +555,11 @@ def test_encoded_string_data(postgres_target): file_name = "encoded_strings.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "info": "simple string 2837"} - verify_data(postgres_target, "test_strings", 11, "id", row) + helper.verify_data("test_strings", 11, "id", row) row = {"id": 1, "info": {"name": "simple", "value": "simple string 2837"}} - verify_data(postgres_target, "test_strings_in_objects", 11, "id", row) + helper.verify_data("test_strings_in_objects", 11, "id", row) row = {"id": 1, "strings": ["simple string", "απλή συμβολοσειρά", "简单的字串"]} - verify_data(postgres_target, "test_strings_in_arrays", 6, "id", row) + helper.verify_data("test_strings_in_arrays", 6, "id", row) def test_tap_appl(postgres_target): @@ -574,14 +583,13 @@ def test_large_int(postgres_target): singer_file_to_target(file_name, postgres_target) -def test_anyof(postgres_target): +def test_anyof(postgres_target, helper): """Test that anyOf is handled correctly""" table_name = "commits" file_name = f"{table_name}.singer" singer_file_to_target(file_name, postgres_target) - verify_schema( - postgres_target, + helper.verify_schema( table_name, check_columns={ # {"type":"string"} @@ -690,7 +698,7 @@ def test_activate_version_soft_delete(postgres_target): result = connection.execute( sqlalchemy.text( - f"SELECT * FROM {full_table_name} where _sdc_deleted_at is NOT NULL" + f"SELECT * FROM {full_table_name} where {METADATA_COLUMN_PREFIX}_deleted_at is NOT NULL" ) ) assert result.rowcount == 2 From f007377192e712bd770598a35af15e6041036cad Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Wed, 13 Dec 2023 03:56:08 +0100 Subject: [PATCH 8/9] feat: Add support for pgvector's `vector` data type --- .github/workflows/ci_workflow.yml | 2 +- README.md | 25 +++++++- docker-compose.yml | 12 +++- poetry.lock | 64 +++++++++++++++++-- pyproject.toml | 9 ++- target_postgres/connector.py | 62 +++++++++++++++++- .../data_files/array_float_vector.singer | 5 ++ target_postgres/tests/init.sql | 1 + target_postgres/tests/test_target_postgres.py | 20 ++++++ tox.ini | 8 +-- 10 files changed, 189 insertions(+), 19 deletions(-) create mode 100644 target_postgres/tests/data_files/array_float_vector.singer create mode 100644 target_postgres/tests/init.sql diff --git a/.github/workflows/ci_workflow.yml b/.github/workflows/ci_workflow.yml index 1c44e462..e02f7b99 100644 --- a/.github/workflows/ci_workflow.yml +++ b/.github/workflows/ci_workflow.yml @@ -36,7 +36,7 @@ jobs: pipx install poetry - name: Install dependencies run: | - poetry install + poetry install --all-extras - name: Run pytest run: | poetry run pytest --capture=no diff --git a/README.md b/README.md index a1b48c33..6fdffa7a 100644 --- a/README.md +++ b/README.md @@ -102,7 +102,7 @@ tap-carbon-intensity | target-postgres --config /path/to/target-postgres-config. ```bash pipx install poetry -poetry install +poetry install --all-extras pipx install pre-commit pre-commit install ``` @@ -152,6 +152,8 @@ develop your own Singer taps and targets. ## Data Types +### Mapping + The below table shows how this tap will map between jsonschema datatypes and Postgres datatypes. | jsonschema | Postgres | @@ -202,7 +204,20 @@ The below table shows how this tap will map between jsonschema datatypes and Pos Note that while object types are mapped directly to jsonb, array types are mapped to a jsonb array. -If a column has multiple jsonschema types, the following order is using to order Postgres types, from highest priority to lowest priority. +When using [pgvector], this type mapping applies, additionally to the table above. + +| jsonschema | Postgres | +|------------------------------------------------|----------| +| array (with additional SCHEMA annotations [1]) | vector | + +[1] `"additionalProperties": {"storage": {"type": "vector", "dim": 4}}` + +### Resolution Order + +If a column has multiple jsonschema types, there is a priority list for +resolving the best type candidate, from the highest priority to the +lowest priority. + - ARRAY(JSONB) - JSONB - TEXT @@ -215,3 +230,9 @@ If a column has multiple jsonschema types, the following order is using to order - INTEGER - BOOLEAN - NOTYPE + +When using [pgvector], the `pgvector.sqlalchemy.Vector` type is added to the bottom +of the list. + + +[pgvector]: https://github.com/pgvector/pgvector diff --git a/docker-compose.yml b/docker-compose.yml index f2d453c4..a187f7c9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,7 +3,7 @@ version: "2.1" services: postgres: - image: docker.io/postgres:latest + image: ankane/pgvector:latest command: postgres -c ssl=on -c ssl_cert_file=/var/lib/postgresql/server.crt -c ssl_key_file=/var/lib/postgresql/server.key -c ssl_ca_file=/var/lib/postgresql/ca.crt -c hba_file=/var/lib/postgresql/pg_hba.conf environment: POSTGRES_USER: postgres @@ -13,6 +13,7 @@ services: POSTGRES_INITDB_ARGS: --auth-host=cert # Not placed in the data directory (/var/lib/postgresql/data) because of https://gist.github.com/mrw34/c97bb03ea1054afb551886ffc8b63c3b?permalink_comment_id=2678568#gistcomment-2678568 volumes: + - ./target_postgres/tests/init.sql:/docker-entrypoint-initdb.d/init.sql - ./ssl/server.crt:/var/lib/postgresql/server.crt # Certificate verifying the server's identity to the client. - ./ssl/server.key:/var/lib/postgresql/server.key # Private key to verify the server's certificate is legitimate. - ./ssl/ca.crt:/var/lib/postgresql/ca.crt # Certificate authority to use when verifying the client's identity to the server. @@ -20,9 +21,11 @@ services: ports: - "5432:5432" postgres_no_ssl: # Borrowed from https://github.com/MeltanoLabs/tap-postgres/blob/main/.github/workflows/test.yml#L13-L23 - image: docker.io/postgres:latest + image: ankane/pgvector:latest environment: POSTGRES_PASSWORD: postgres + volumes: + - ./target_postgres/tests/init.sql:/docker-entrypoint-initdb.d/init.sql ports: - 5433:5432 ssh: @@ -37,17 +40,20 @@ services: - PASSWORD_ACCESS=false - USER_NAME=melty volumes: + - ./target_postgres/tests/init.sql:/docker-entrypoint-initdb.d/init.sql - ./ssh_tunnel/ssh-server-config:/config/ssh_host_keys:ro ports: - "127.0.0.1:2223:2222" networks: - inner postgresdb: - image: postgres:13.0 + image: ankane/pgvector:latest environment: POSTGRES_USER: postgres POSTGRES_PASSWORD: postgres POSTGRES_DB: main + volumes: + - ./target_postgres/tests/init.sql:/docker-entrypoint-initdb.d/init.sql networks: inner: ipv4_address: 10.5.0.5 diff --git a/poetry.lock b/poetry.lock index 8e082faf..e61b03b1 100644 --- a/poetry.lock +++ b/poetry.lock @@ -774,6 +774,43 @@ files = [ {file = "mypy_extensions-1.0.0.tar.gz", hash = "sha256:75dbf8955dc00442a438fc4d0666508a9a97b6bd41aa2f0ffe9d2f2725af0782"}, ] +[[package]] +name = "numpy" +version = "1.24.4" +description = "Fundamental package for array computing in Python" +optional = true +python-versions = ">=3.8" +files = [ + {file = "numpy-1.24.4-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:c0bfb52d2169d58c1cdb8cc1f16989101639b34c7d3ce60ed70b19c63eba0b64"}, + {file = "numpy-1.24.4-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:ed094d4f0c177b1b8e7aa9cba7d6ceed51c0e569a5318ac0ca9a090680a6a1b1"}, + {file = "numpy-1.24.4-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:79fc682a374c4a8ed08b331bef9c5f582585d1048fa6d80bc6c35bc384eee9b4"}, + {file = "numpy-1.24.4-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7ffe43c74893dbf38c2b0a1f5428760a1a9c98285553c89e12d70a96a7f3a4d6"}, + {file = "numpy-1.24.4-cp310-cp310-win32.whl", hash = "sha256:4c21decb6ea94057331e111a5bed9a79d335658c27ce2adb580fb4d54f2ad9bc"}, + {file = "numpy-1.24.4-cp310-cp310-win_amd64.whl", hash = "sha256:b4bea75e47d9586d31e892a7401f76e909712a0fd510f58f5337bea9572c571e"}, + {file = "numpy-1.24.4-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:f136bab9c2cfd8da131132c2cf6cc27331dd6fae65f95f69dcd4ae3c3639c810"}, + {file = "numpy-1.24.4-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:e2926dac25b313635e4d6cf4dc4e51c8c0ebfed60b801c799ffc4c32bf3d1254"}, + {file = "numpy-1.24.4-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:222e40d0e2548690405b0b3c7b21d1169117391c2e82c378467ef9ab4c8f0da7"}, + {file = "numpy-1.24.4-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7215847ce88a85ce39baf9e89070cb860c98fdddacbaa6c0da3ffb31b3350bd5"}, + {file = "numpy-1.24.4-cp311-cp311-win32.whl", hash = "sha256:4979217d7de511a8d57f4b4b5b2b965f707768440c17cb70fbf254c4b225238d"}, + {file = "numpy-1.24.4-cp311-cp311-win_amd64.whl", hash = "sha256:b7b1fc9864d7d39e28f41d089bfd6353cb5f27ecd9905348c24187a768c79694"}, + {file = "numpy-1.24.4-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:1452241c290f3e2a312c137a9999cdbf63f78864d63c79039bda65ee86943f61"}, + {file = "numpy-1.24.4-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:04640dab83f7c6c85abf9cd729c5b65f1ebd0ccf9de90b270cd61935eef0197f"}, + {file = "numpy-1.24.4-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a5425b114831d1e77e4b5d812b69d11d962e104095a5b9c3b641a218abcc050e"}, + {file = "numpy-1.24.4-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:dd80e219fd4c71fc3699fc1dadac5dcf4fd882bfc6f7ec53d30fa197b8ee22dc"}, + {file = "numpy-1.24.4-cp38-cp38-win32.whl", hash = "sha256:4602244f345453db537be5314d3983dbf5834a9701b7723ec28923e2889e0bb2"}, + {file = "numpy-1.24.4-cp38-cp38-win_amd64.whl", hash = "sha256:692f2e0f55794943c5bfff12b3f56f99af76f902fc47487bdfe97856de51a706"}, + {file = "numpy-1.24.4-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:2541312fbf09977f3b3ad449c4e5f4bb55d0dbf79226d7724211acc905049400"}, + {file = "numpy-1.24.4-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:9667575fb6d13c95f1b36aca12c5ee3356bf001b714fc354eb5465ce1609e62f"}, + {file = "numpy-1.24.4-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f3a86ed21e4f87050382c7bc96571755193c4c1392490744ac73d660e8f564a9"}, + {file = "numpy-1.24.4-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d11efb4dbecbdf22508d55e48d9c8384db795e1b7b51ea735289ff96613ff74d"}, + {file = "numpy-1.24.4-cp39-cp39-win32.whl", hash = "sha256:6620c0acd41dbcb368610bb2f4d83145674040025e5536954782467100aa8835"}, + {file = "numpy-1.24.4-cp39-cp39-win_amd64.whl", hash = "sha256:befe2bf740fd8373cf56149a5c23a0f601e82869598d41f8e188a0e9869926f8"}, + {file = "numpy-1.24.4-pp38-pypy38_pp73-macosx_10_9_x86_64.whl", hash = "sha256:31f13e25b4e304632a4619d0e0777662c2ffea99fcae2029556b17d8ff958aef"}, + {file = "numpy-1.24.4-pp38-pypy38_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:95f7ac6540e95bc440ad77f56e520da5bf877f87dca58bd095288dce8940532a"}, + {file = "numpy-1.24.4-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:e98f220aa76ca2a977fe435f5b04d7b3470c0a2e6312907b37ba6068f26787f2"}, + {file = "numpy-1.24.4.tar.gz", hash = "sha256:80f5e3a4e498641401868df4208b74581206afbee7cf7b8329daae82676d9463"}, +] + [[package]] name = "packaging" version = "23.2" @@ -787,13 +824,13 @@ files = [ [[package]] name = "paramiko" -version = "3.3.1" +version = "3.4.0" description = "SSH2 protocol library" optional = false python-versions = ">=3.6" files = [ - {file = "paramiko-3.3.1-py3-none-any.whl", hash = "sha256:b7bc5340a43de4287bbe22fe6de728aa2c22468b2a849615498dd944c2f275eb"}, - {file = "paramiko-3.3.1.tar.gz", hash = "sha256:6a3777a961ac86dbef375c5f5b8d50014a1a96d0fd7f054a43bc880134b0ff77"}, + {file = "paramiko-3.4.0-py3-none-any.whl", hash = "sha256:43f0b51115a896f9c00f59618023484cb3a14b98bbceab43394a39c6739b7ee7"}, + {file = "paramiko-3.4.0.tar.gz", hash = "sha256:aac08f26a31dc4dffd92821527d1682d99d52f9ef6851968114a8728f3c274d3"}, ] [package.dependencies] @@ -918,6 +955,19 @@ tzdata = ">=2020.1" [package.extras] test = ["time-machine (>=2.6.0)"] +[[package]] +name = "pgvector" +version = "0.2.4" +description = "pgvector support for Python" +optional = true +python-versions = ">=3.8" +files = [ + {file = "pgvector-0.2.4-py2.py3-none-any.whl", hash = "sha256:548e1f88d3c7433020c1c177feddad2f36915c262852d621f9018fcafff6870b"}, +] + +[package.dependencies] +numpy = "*" + [[package]] name = "pkgutil-resolve-name" version = "1.3.10" @@ -1003,7 +1053,6 @@ files = [ {file = "psycopg2_binary-2.9.9-cp311-cp311-win32.whl", hash = "sha256:dc4926288b2a3e9fd7b50dc6a1909a13bbdadfc67d93f3374d984e56f885579d"}, {file = "psycopg2_binary-2.9.9-cp311-cp311-win_amd64.whl", hash = "sha256:b76bedd166805480ab069612119ea636f5ab8f8771e640ae103e05a4aae3e417"}, {file = "psycopg2_binary-2.9.9-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:8532fd6e6e2dc57bcb3bc90b079c60de896d2128c5d9d6f24a63875a95a088cf"}, - {file = "psycopg2_binary-2.9.9-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:b0605eaed3eb239e87df0d5e3c6489daae3f7388d455d0c0b4df899519c6a38d"}, {file = "psycopg2_binary-2.9.9-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8f8544b092a29a6ddd72f3556a9fcf249ec412e10ad28be6a0c0d948924f2212"}, {file = "psycopg2_binary-2.9.9-cp312-cp312-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:2d423c8d8a3c82d08fe8af900ad5b613ce3632a1249fd6a223941d0735fce493"}, {file = "psycopg2_binary-2.9.9-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:2e5afae772c00980525f6d6ecf7cbca55676296b580c0e6abb407f15f3706996"}, @@ -1012,8 +1061,6 @@ files = [ {file = "psycopg2_binary-2.9.9-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:cb16c65dcb648d0a43a2521f2f0a2300f40639f6f8c1ecbc662141e4e3e1ee07"}, {file = "psycopg2_binary-2.9.9-cp312-cp312-musllinux_1_1_ppc64le.whl", hash = "sha256:911dda9c487075abd54e644ccdf5e5c16773470a6a5d3826fda76699410066fb"}, {file = "psycopg2_binary-2.9.9-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:57fede879f08d23c85140a360c6a77709113efd1c993923c59fde17aa27599fe"}, - {file = "psycopg2_binary-2.9.9-cp312-cp312-win32.whl", hash = "sha256:64cf30263844fa208851ebb13b0732ce674d8ec6a0c86a4e160495d299ba3c93"}, - {file = "psycopg2_binary-2.9.9-cp312-cp312-win_amd64.whl", hash = "sha256:81ff62668af011f9a48787564ab7eded4e9fb17a4a6a74af5ffa6a457400d2ab"}, {file = "psycopg2_binary-2.9.9-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:2293b001e319ab0d869d660a704942c9e2cce19745262a8aba2115ef41a0a42a"}, {file = "psycopg2_binary-2.9.9-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:03ef7df18daf2c4c07e2695e8cfd5ee7f748a1d54d802330985a78d2a5a6dca9"}, {file = "psycopg2_binary-2.9.9-cp37-cp37m-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:0a602ea5aff39bb9fac6308e9c9d82b9a35c2bf288e184a816002c9fae930b77"}, @@ -1873,7 +1920,10 @@ files = [ docs = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (<7.2.5)", "sphinx (>=3.5)", "sphinx-lint"] testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-ignore-flaky", "pytest-mypy (>=0.9.1)", "pytest-ruff"] +[extras] +pgvector = ["pgvector"] + [metadata] lock-version = "2.0" python-versions = "<3.13,>=3.8.1" -content-hash = "94fac1eb94f683deb254b8194a895caa8e120b5b102ddac384858cd1ea5251c4" +content-hash = "94def8eec452286849b1956daf2e1e66aa7377dcdc745adf19db230e1a71c6f7" diff --git a/pyproject.toml b/pyproject.toml index 96c7dfd1..8b91b952 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,7 @@ packages = [ python = "<3.13,>=3.8.1" requests = "^2.25.1" singer-sdk = ">=0.28,<0.34" +pgvector = { version="^0.2.4", optional = true } psycopg2-binary = "2.9.9" sqlalchemy = ">=2.0,<3.0" sshtunnel = "0.4.0" @@ -51,11 +52,17 @@ types-simplejson = "^3.19.0.2" types-sqlalchemy = "^1.4.53.38" types-jsonschema = "^4.19.0.3" +[tool.poetry.extras] +pgvector = ["pgvector"] + [tool.mypy] exclude = "tests" [[tool.mypy.overrides]] -module = ["sshtunnel"] +module = [ + "pgvector.sqlalchemy", + "sshtunnel", +] ignore_missing_imports = true [tool.isort] diff --git a/target_postgres/connector.py b/target_postgres/connector.py index 59314d60..3e38ef91 100644 --- a/target_postgres/connector.py +++ b/target_postgres/connector.py @@ -115,6 +115,14 @@ def prepare_table( # type: ignore[override] connection=connection, ) return table + # To make table reflection work properly with pgvector, + # the module needs to be imported beforehand. + try: + from pgvector.sqlalchemy import Vector # noqa: F401 + except ImportError: + self.logger.debug( + "Unable to handle pgvector's `Vector` type. Please install `pgvector`." + ) meta.reflect(connection, only=[table_name]) table = meta.tables[ full_table_name @@ -280,6 +288,51 @@ def pick_individual_type(jsonschema_type: dict): if "object" in jsonschema_type["type"]: return JSONB() if "array" in jsonschema_type["type"]: + # Select between different kinds of `ARRAY` data types. + # + # This currently leverages an unspecified definition for the Singer SCHEMA, + # using the `additionalProperties` attribute to convey additional type + # information, agnostic of the target database. + # + # In this case, it is about telling different kinds of `ARRAY` types apart: + # Either it is a vanilla `ARRAY`, to be stored into a `jsonb[]` type, or, + # alternatively, it can be a "vector" kind `ARRAY` of floating point + # numbers, effectively what pgvector is storing in its `VECTOR` type. + # + # Still, `type: "vector"` is only a surrogate label here, because other + # database systems may use different types for implementing the same thing, + # and need to translate accordingly. + """ + Schema override rule in `meltano.yml`: + + type: "array" + items: + type: "number" + additionalProperties: + storage: + type: "vector" + dim: 4 + + Produced schema annotation in `catalog.json`: + + {"type": "array", + "items": {"type": "number"}, + "additionalProperties": {"storage": {"type": "vector", "dim": 4}}} + """ + if ( + "additionalProperties" in jsonschema_type + and "storage" in jsonschema_type["additionalProperties"] + ): + storage_properties = jsonschema_type["additionalProperties"]["storage"] + if ( + "type" in storage_properties + and storage_properties["type"] == "vector" + ): + # On PostgreSQL/pgvector, use the corresponding type definition + # from its SQLAlchemy dialect. + from pgvector.sqlalchemy import Vector + + return Vector(storage_properties["dim"]) return ARRAY(JSONB()) if jsonschema_type.get("format") == "date-time": return TIMESTAMP() @@ -313,6 +366,13 @@ def pick_best_sql_type(sql_type_array: list): NOTYPE, ] + try: + from pgvector.sqlalchemy import Vector + + precedence_order.append(Vector) + except ImportError: + pass + for sql_type in precedence_order: for obj in sql_type_array: if isinstance(obj, sql_type): @@ -519,7 +579,7 @@ def _adapt_column_type( # type: ignore[override] return # Not the same type, generic type or compatible types - # calling merge_sql_types for assistnace + # calling merge_sql_types for assistance. compatible_sql_type = self.merge_sql_types([current_type, sql_type]) if str(compatible_sql_type) == str(current_type): diff --git a/target_postgres/tests/data_files/array_float_vector.singer b/target_postgres/tests/data_files/array_float_vector.singer new file mode 100644 index 00000000..9f4cd04e --- /dev/null +++ b/target_postgres/tests/data_files/array_float_vector.singer @@ -0,0 +1,5 @@ +{"type": "SCHEMA", "stream": "array_float_vector", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "array", "items": {"type": "number"}, "additionalProperties": {"storage": {"type": "vector", "dim": 4}}}}}} +{"type": "RECORD", "stream": "array_float_vector", "record": {"id": 1, "value": [ 1.1, 2.1, 1.1, 1.3 ]}} +{"type": "RECORD", "stream": "array_float_vector", "record": {"id": 2, "value": [ 1.0, 1.0, 1.0, 2.3 ]}} +{"type": "RECORD", "stream": "array_float_vector", "record": {"id": 3, "value": [ 2.0, 1.2, 1.0, 0.9 ]}} +{"type": "STATE", "value": {"array_float_vector": 3}} diff --git a/target_postgres/tests/init.sql b/target_postgres/tests/init.sql new file mode 100644 index 00000000..0aa0fc22 --- /dev/null +++ b/target_postgres/tests/init.sql @@ -0,0 +1 @@ +CREATE EXTENSION IF NOT EXISTS vector; diff --git a/target_postgres/tests/test_target_postgres.py b/target_postgres/tests/test_target_postgres.py index 2f7462a8..a31ed6ae 100644 --- a/target_postgres/tests/test_target_postgres.py +++ b/target_postgres/tests/test_target_postgres.py @@ -473,6 +473,26 @@ def test_array_boolean(postgres_target, helper): ) +def test_array_float_vector(postgres_target, helper): + pgvector_sa = pytest.importorskip("pgvector.sqlalchemy") + + file_name = "array_float_vector.singer" + singer_file_to_target(file_name, postgres_target) + row = { + "id": 1, + "value": "[1.1,2.1,1.1,1.3]", + } + helper.verify_data("array_float_vector", 3, "id", row) + + helper.verify_schema( + "array_float_vector", + check_columns={ + "id": {"type": BIGINT}, + "value": {"type": pgvector_sa.Vector}, + }, + ) + + def test_array_number(postgres_target, helper): file_name = "array_number.singer" singer_file_to_target(file_name, postgres_target) diff --git a/tox.ini b/tox.ini index 0c287e8f..85c03b5a 100644 --- a/tox.ini +++ b/tox.ini @@ -9,7 +9,7 @@ isolated_build = true allowlist_externals = poetry commands = - poetry install -v + poetry install --all-extras -v poetry run pytest poetry run black --check target_postgres/ poetry run flake8 target_postgres @@ -21,14 +21,14 @@ commands = # To execute, run `tox -e pytest` envlist = py37, py38, py39 commands = - poetry install -v + poetry install --all-extras -v poetry run pytest [testenv:format] # Attempt to auto-resolve lint errors before they are raised. # To execute, run `tox -e format` commands = - poetry install -v + poetry install --all-extras -v poetry run black target_postgres/ poetry run isort target_postgres @@ -36,7 +36,7 @@ commands = # Raise an error if lint and style standards are not met. # To execute, run `tox -e lint` commands = - poetry install -v + poetry install --all-extras -v poetry run black --check --diff target_postgres/ poetry run isort --check target_postgres poetry run flake8 target_postgres From fb320f1511c0a560ef735a66705eb0727af3f94f Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 14 Dec 2023 23:18:00 +0100 Subject: [PATCH 9/9] chore: Re-add Python 3.8 compatibility --- target_postgres/tests/test_target_postgres.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/target_postgres/tests/test_target_postgres.py b/target_postgres/tests/test_target_postgres.py index a31ed6ae..82a71b6e 100644 --- a/target_postgres/tests/test_target_postgres.py +++ b/target_postgres/tests/test_target_postgres.py @@ -2,6 +2,7 @@ # flake8: noqa import copy import io +import typing as t from contextlib import redirect_stdout from decimal import Decimal from pathlib import Path @@ -93,8 +94,8 @@ def verify_data( self, table_name: str, number_of_rows: int = 1, - primary_key: str | None = None, - check_data: dict | list[dict] | None = None, + primary_key: t.Union[str, None] = None, + check_data: t.Union[t.Dict, t.List[t.Dict], None] = None, ): """Checks whether the data in a table matches a provided data sample.