From c446d94c161c09d9bd8b18de9ed5040b2949f93c Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 14 Dec 2023 21:27:48 +0100 Subject: [PATCH] test: Refactor utility functions `verify_data` and `verify_schema` By wrapping them into a container class `AssertionHelper`, it is easy to parameterize them, and to provide them to the test functions using a pytest fixture. This way, they are reusable from database adapter implementations which derive from PostgreSQL. The motivation for this is because the metadata column prefix `_sdc` needs to be adjusted for other database systems, as they reject such columns, being reserved for system purposes. In the specific case of CrateDB, it is enough to rename it like `__sdc`. Sad but true. --- target_postgres/tests/test_target_postgres.py | 270 +++++++++--------- 1 file changed, 139 insertions(+), 131 deletions(-) diff --git a/target_postgres/tests/test_target_postgres.py b/target_postgres/tests/test_target_postgres.py index ab4cd11d..d675dc90 100644 --- a/target_postgres/tests/test_target_postgres.py +++ b/target_postgres/tests/test_target_postgres.py @@ -28,6 +28,8 @@ postgres_config_ssh_tunnel, ) +METADATA_COLUMN_PREFIX = "_sdc" + # The below syntax is documented at https://docs.pytest.org/en/stable/deprecations.html#calling-fixtures-directly @pytest.fixture(scope="session", name="postgres_config") @@ -75,100 +77,112 @@ def singer_file_to_target(file_name, target) -> None: # TODO should set schemas for each tap individually so we don't collide -def remove_metadata_columns(row: dict) -> dict: - new_row = {} - for column in row.keys(): - if not column.startswith("_sdc"): - new_row[column] = row[column] - return new_row - - -def verify_data( - target: TargetPostgres, - table_name: str, - number_of_rows: int = 1, - primary_key: str | None = None, - check_data: dict | list[dict] | None = None, -): - """Checks whether the data in a table matches a provided data sample. - - Args: - target: The target to obtain a database connection from. - table_name: The schema and table name of the table to check data for. - primary_key: The primary key of the table. - number_of_rows: The expected number of rows that should be in the table. - check_data: A dictionary representing the full contents of the first row in the - table, as determined by lowest primary_key value, or else a list of - dictionaries representing every row in the table. - """ - engine = create_engine(target) - full_table_name = f"{target.config['default_target_schema']}.{table_name}" - with engine.connect() as connection: - if primary_key is not None and check_data is not None: - if isinstance(check_data, dict): - result = connection.execute( - sqlalchemy.text( - f"SELECT * FROM {full_table_name} ORDER BY {primary_key}" +class AssertionHelper: + def __init__(self, target: TargetPostgres, metadata_column_prefix: str): + self.target = target + self.metadata_column_prefix = metadata_column_prefix + + def remove_metadata_columns(self, row: dict) -> dict: + new_row = {} + for column in row.keys(): + if not column.startswith(self.metadata_column_prefix): + new_row[column] = row[column] + return new_row + + def verify_data( + self, + table_name: str, + number_of_rows: int = 1, + primary_key: str | None = None, + check_data: dict | list[dict] | None = None, + ): + """Checks whether the data in a table matches a provided data sample. + + Args: + target: The target to obtain a database connection from. + table_name: The schema and table name of the table to check data for. + primary_key: The primary key of the table. + number_of_rows: The expected number of rows that should be in the table. + check_data: A dictionary representing the full contents of the first row in the + table, as determined by lowest primary_key value, or else a list of + dictionaries representing every row in the table. + """ + engine = create_engine(self.target) + full_table_name = f"{self.target.config['default_target_schema']}.{table_name}" + with engine.connect() as connection: + if primary_key is not None and check_data is not None: + if isinstance(check_data, dict): + result = connection.execute( + sqlalchemy.text( + f"SELECT * FROM {full_table_name} ORDER BY {primary_key}" + ) ) - ) - assert result.rowcount == number_of_rows - result_dict = remove_metadata_columns(result.first()._asdict()) - assert result_dict == check_data - elif isinstance(check_data, list): - result = connection.execute( - sqlalchemy.text( - f"SELECT * FROM {full_table_name} ORDER BY {primary_key}" + assert result.rowcount == number_of_rows + result_dict = self.remove_metadata_columns(result.first()._asdict()) + assert result_dict == check_data + elif isinstance(check_data, list): + result = connection.execute( + sqlalchemy.text( + f"SELECT * FROM {full_table_name} ORDER BY {primary_key}" + ) ) - ) - assert result.rowcount == number_of_rows - result_dict = [ - remove_metadata_columns(row._asdict()) for row in result.all() - ] - assert result_dict == check_data + assert result.rowcount == number_of_rows + result_dict = [ + self.remove_metadata_columns(row._asdict()) + for row in result.all() + ] + assert result_dict == check_data + else: + raise ValueError("Invalid check_data - not dict or list of dicts") else: - raise ValueError("Invalid check_data - not dict or list of dicts") - else: - result = connection.execute( - sqlalchemy.text(f"SELECT COUNT(*) FROM {full_table_name}") + result = connection.execute( + sqlalchemy.text(f"SELECT COUNT(*) FROM {full_table_name}") + ) + assert result.first()[0] == number_of_rows + + def verify_schema( + self, + table_name: str, + check_columns: dict = None, + ): + """Checks whether the schema of a database table matches the provided column definitions. + + Args: + target: The target to obtain a database connection from. + table_name: The schema and table name of the table to check data for. + check_columns: A dictionary mapping column names to their definitions. Currently, + it is all about the `type` attribute which is compared. + metadata_column_prefix: The prefix string for metadata columns. Usually `_sdc`. + """ + engine = create_engine(self.target) + schema = self.target.config["default_target_schema"] + with engine.connect() as connection: + meta = sqlalchemy.MetaData() + table = sqlalchemy.Table( + table_name, meta, schema=schema, autoload_with=connection ) - assert result.first()[0] == number_of_rows - + for column in table.c: + # Ignore `_sdc` metadata columns when veriying table schema. + if column.name.startswith(self.metadata_column_prefix): + continue + try: + column_type_expected = check_columns[column.name]["type"] + except KeyError: + raise ValueError( + f"Invalid check_columns - missing definition for column: {column.name}" + ) + if not isinstance(column.type, column_type_expected): + raise TypeError( + f"Column '{column.name}' (with type '{column.type}') " + f"does not match expected type: {column_type_expected}" + ) -def verify_schema( - target: TargetPostgres, - table_name: str, - check_columns: dict = None, -): - """Checks whether the schema of a database table matches the provided column definitions. - Args: - target: The target to obtain a database connection from. - table_name: The schema and table name of the table to check data for. - check_columns: A dictionary mapping column names to their definitions. Currently, - it is all about the `type` attribute which is compared. - """ - engine = create_engine(target) - schema = target.config["default_target_schema"] - with engine.connect() as connection: - meta = sqlalchemy.MetaData() - table = sqlalchemy.Table( - table_name, meta, schema=schema, autoload_with=connection - ) - for column in table.c: - # Ignore `_sdc` columns for now. - if column.name.startswith("_sdc"): - continue - try: - column_type_expected = check_columns[column.name]["type"] - except KeyError: - raise ValueError( - f"Invalid check_columns - missing definition for column: {column.name}" - ) - if not isinstance(column.type, column_type_expected): - raise TypeError( - f"Column '{column.name}' (with type '{column.type}') " - f"does not match expected type: {column_type_expected}" - ) +@pytest.fixture +def helper(postgres_target) -> AssertionHelper: + return AssertionHelper( + target=postgres_target, metadata_column_prefix=METADATA_COLUMN_PREFIX + ) def test_sqlalchemy_url_config(postgres_config_no_ssl): @@ -285,11 +299,11 @@ def test_special_chars_in_attributes(postgres_target): singer_file_to_target(file_name, postgres_target) -def test_optional_attributes(postgres_target): +def test_optional_attributes(postgres_target, helper): file_name = "optional_attributes.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "optional": "This is optional"} - verify_data(postgres_target, "test_optional_attributes", 4, "id", row) + helper.verify_data("test_optional_attributes", 4, "id", row) def test_schema_no_properties(postgres_target): @@ -309,7 +323,7 @@ def test_large_numeric_primary_key(postgres_target): # TODO test that data is correct -def test_schema_updates(postgres_target): +def test_schema_updates(postgres_target, helper): file_name = "schema_updates.singer" singer_file_to_target(file_name, postgres_target) row = { @@ -321,16 +335,16 @@ def test_schema_updates(postgres_target): "a5": None, "a6": None, } - verify_data(postgres_target, "test_schema_updates", 6, "id", row) + helper.verify_data("test_schema_updates", 6, "id", row) -def test_multiple_state_messages(postgres_target): +def test_multiple_state_messages(postgres_target, helper): file_name = "multiple_state_messages.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "metric": 100} - verify_data(postgres_target, "test_multiple_state_messages_a", 6, "id", row) + helper.verify_data("test_multiple_state_messages_a", 6, "id", row) row = {"id": 1, "metric": 110} - verify_data(postgres_target, "test_multiple_state_messages_b", 6, "id", row) + helper.verify_data("test_multiple_state_messages_b", 6, "id", row) # TODO test that data is correct @@ -347,7 +361,7 @@ def test_multiple_schema_messages(postgres_target, caplog): assert "Schema has changed for stream" not in caplog.text -def test_relational_data(postgres_target): +def test_relational_data(postgres_target, helper): file_name = "user_location_data.singer" singer_file_to_target(file_name, postgres_target) @@ -404,12 +418,12 @@ def test_relational_data(postgres_target): }, ] - verify_data(postgres_target, "test_users", 8, "id", users) - verify_data(postgres_target, "test_locations", 5, "id", locations) - verify_data(postgres_target, "test_user_in_location", 5, "id", user_in_location) + helper.verify_data("test_users", 8, "id", users) + helper.verify_data("test_locations", 5, "id", locations) + helper.verify_data("test_user_in_location", 5, "id", user_in_location) -def test_no_primary_keys(postgres_target): +def test_no_primary_keys(postgres_target, helper): """We run both of these tests twice just to ensure that no records are removed and append only works properly""" engine = create_engine(postgres_target) table_name = "test_no_pk" @@ -428,7 +442,7 @@ def test_no_primary_keys(postgres_target): file_name = f"{table_name}_append.singer" singer_file_to_target(file_name, postgres_target) - verify_data(postgres_target, table_name, 16) + helper.verify_data(table_name, 16) def test_no_type(postgres_target): @@ -436,20 +450,19 @@ def test_no_type(postgres_target): singer_file_to_target(file_name, postgres_target) -def test_duplicate_records(postgres_target): +def test_duplicate_records(postgres_target, helper): file_name = "duplicate_records.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "metric": 100} - verify_data(postgres_target, "test_duplicate_records", 2, "id", row) + helper.verify_data("test_duplicate_records", 2, "id", row) -def test_array_boolean(postgres_target): +def test_array_boolean(postgres_target, helper): file_name = "array_boolean.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "value": [True, False]} - verify_data(postgres_target, "array_boolean", 3, "id", row) - verify_schema( - postgres_target, + helper.verify_data("array_boolean", 3, "id", row) + helper.verify_schema( "array_boolean", check_columns={ "id": {"type": BIGINT}, @@ -458,13 +471,12 @@ def test_array_boolean(postgres_target): ) -def test_array_number(postgres_target): +def test_array_number(postgres_target, helper): file_name = "array_number.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "value": [Decimal("42.42"), Decimal("84.84"), 23]} - verify_data(postgres_target, "array_number", 3, "id", row) - verify_schema( - postgres_target, + helper.verify_data("array_number", 3, "id", row) + helper.verify_schema( "array_number", check_columns={ "id": {"type": BIGINT}, @@ -473,13 +485,12 @@ def test_array_number(postgres_target): ) -def test_array_string(postgres_target): +def test_array_string(postgres_target, helper): file_name = "array_string.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "value": ["apple", "orange", "pear"]} - verify_data(postgres_target, "array_string", 4, "id", row) - verify_schema( - postgres_target, + helper.verify_data("array_string", 4, "id", row) + helper.verify_schema( "array_string", check_columns={ "id": {"type": BIGINT}, @@ -488,13 +499,12 @@ def test_array_string(postgres_target): ) -def test_array_timestamp(postgres_target): +def test_array_timestamp(postgres_target, helper): file_name = "array_timestamp.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "value": ["2023-12-13T01:15:02", "2023-12-13T01:16:02"]} - verify_data(postgres_target, "array_timestamp", 3, "id", row) - verify_schema( - postgres_target, + helper.verify_data("array_timestamp", 3, "id", row) + helper.verify_schema( "array_timestamp", check_columns={ "id": {"type": BIGINT}, @@ -503,7 +513,7 @@ def test_array_timestamp(postgres_target): ) -def test_object_mixed(postgres_target): +def test_object_mixed(postgres_target, helper): file_name = "object_mixed.singer" singer_file_to_target(file_name, postgres_target) row = { @@ -520,9 +530,8 @@ def test_object_mixed(postgres_target): "nested_object": {"foo": "bar"}, }, } - verify_data(postgres_target, "object_mixed", 1, "id", row) - verify_schema( - postgres_target, + helper.verify_data("object_mixed", 1, "id", row) + helper.verify_schema( "object_mixed", check_columns={ "id": {"type": BIGINT}, @@ -531,7 +540,7 @@ def test_object_mixed(postgres_target): ) -def test_encoded_string_data(postgres_target): +def test_encoded_string_data(postgres_target, helper): """ We removed NUL characters from the original encoded_strings.singer as postgres doesn't allow them. https://www.postgresql.org/docs/current/functions-string.html#:~:text=chr(0)%20is%20disallowed%20because%20text%20data%20types%20cannot%20store%20that%20character. @@ -544,11 +553,11 @@ def test_encoded_string_data(postgres_target): file_name = "encoded_strings.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "info": "simple string 2837"} - verify_data(postgres_target, "test_strings", 11, "id", row) + helper.verify_data("test_strings", 11, "id", row) row = {"id": 1, "info": {"name": "simple", "value": "simple string 2837"}} - verify_data(postgres_target, "test_strings_in_objects", 11, "id", row) + helper.verify_data("test_strings_in_objects", 11, "id", row) row = {"id": 1, "strings": ["simple string", "απλή συμβολοσειρά", "简单的字串"]} - verify_data(postgres_target, "test_strings_in_arrays", 6, "id", row) + helper.verify_data("test_strings_in_arrays", 6, "id", row) def test_tap_appl(postgres_target): @@ -572,14 +581,13 @@ def test_large_int(postgres_target): singer_file_to_target(file_name, postgres_target) -def test_anyof(postgres_target): +def test_anyof(postgres_target, helper): """Test that anyOf is handled correctly""" table_name = "commits" file_name = f"{table_name}.singer" singer_file_to_target(file_name, postgres_target) - verify_schema( - postgres_target, + helper.verify_schema( table_name, check_columns={ # {"type":"string"} @@ -688,7 +696,7 @@ def test_activate_version_soft_delete(postgres_target): result = connection.execute( sqlalchemy.text( - f"SELECT * FROM {full_table_name} where _sdc_deleted_at is NOT NULL" + f"SELECT * FROM {full_table_name} where {METADATA_COLUMN_PREFIX}_deleted_at is NOT NULL" ) ) assert result.rowcount == 2