diff --git a/dlt/common/normalizers/naming/sql_upper.py b/dlt/common/normalizers/naming/sql_upper.py new file mode 100644 index 0000000000..992940d9a2 --- /dev/null +++ b/dlt/common/normalizers/naming/sql_upper.py @@ -0,0 +1,20 @@ +from typing import Any, Sequence + +from dlt.common.normalizers.naming.naming import NamingConvention as BaseNamingConvention + + +class NamingConvention(BaseNamingConvention): + PATH_SEPARATOR = "__" + + _CLEANUP_TABLE = str.maketrans(".\n\r'\"▶", "______") + + def normalize_identifier(self, identifier: str) -> str: + identifier = super().normalize_identifier(identifier) + norm_identifier = identifier.translate(self._CLEANUP_TABLE).upper() + return self.shorten_identifier(norm_identifier, identifier, self.max_length) + + def make_path(self, *identifiers: Any) -> str: + return self.PATH_SEPARATOR.join(filter(lambda x: x.strip(), identifiers)) + + def break_path(self, path: str) -> Sequence[str]: + return [ident for ident in path.split(self.PATH_SEPARATOR) if ident.strip()] diff --git a/tests/load/test_job_client.py b/tests/load/test_job_client.py index 63f9d3c28d..3db2acb11d 100644 --- a/tests/load/test_job_client.py +++ b/tests/load/test_job_client.py @@ -29,7 +29,7 @@ from dlt.common.destination.reference import WithStagingDataset from tests.cases import table_update_and_row, assert_all_data_types_row -from tests.utils import TEST_STORAGE_ROOT, autouse_test_storage +from tests.utils import TEST_STORAGE_ROOT, autouse_test_storage, preserve_environ from tests.common.utils import load_json_case from tests.load.utils import ( TABLE_UPDATE, @@ -45,6 +45,10 @@ from tests.load.pipeline.utils import destinations_configs, DestinationTestConfiguration +@pytest.fixture(autouse=True) +def set_environ(): + os.environ["SCHEMA__NAMING"] = "sql_upper" + @pytest.fixture def file_storage() -> FileStorage: return FileStorage(TEST_STORAGE_ROOT, file_type="b", makedirs=True) @@ -374,6 +378,7 @@ def test_get_storage_table_with_all_types(client: SqlJobClientBase) -> None: # now get the actual schema from the db exists, storage_table = client.get_storage_table(table_name) assert exists is True + print(storage_table) # column order must match TABLE_UPDATE storage_columns = list(storage_table.values()) for c, expected_c in zip(TABLE_UPDATE, storage_columns): diff --git a/tests/load/utils.py b/tests/load/utils.py index 50dca88248..877b32fd2f 100644 --- a/tests/load/utils.py +++ b/tests/load/utils.py @@ -488,6 +488,8 @@ def yield_client( ) schema_storage = SchemaStorage(storage_config) schema = schema_storage.load_schema(schema_name) + schema.update_normalizers() + schema.bump_version() # create client and dataset client: SqlJobClientBase = None diff --git a/tests/pipeline/test_dlt_versions.py b/tests/pipeline/test_dlt_versions.py index 8906958e0c..1fecc0eeaa 100644 --- a/tests/pipeline/test_dlt_versions.py +++ b/tests/pipeline/test_dlt_versions.py @@ -14,7 +14,7 @@ from dlt.common.storages import FileStorage from dlt.common.schema.typing import ( LOADS_TABLE_NAME, - STATE_TABLE_NAME, + PIPELINE_STATE_TABLE_NAME, VERSION_TABLE_NAME, TStoredSchema, ) @@ -66,7 +66,7 @@ def test_pipeline_with_dlt_update(test_storage: FileStorage) -> None: ) # check the dlt state table assert { - "version_hash" not in github_schema["tables"][STATE_TABLE_NAME]["columns"] + "version_hash" not in github_schema["tables"][PIPELINE_STATE_TABLE_NAME]["columns"] } # check loads table without attaching to pipeline duckdb_cfg = resolve_configuration( @@ -79,7 +79,7 @@ def test_pipeline_with_dlt_update(test_storage: FileStorage) -> None: assert len(rows[0]) == 4 rows = client.execute_sql("SELECT * FROM issues") assert len(rows) == 20 - rows = client.execute_sql(f"SELECT * FROM {STATE_TABLE_NAME}") + rows = client.execute_sql(f"SELECT * FROM {PIPELINE_STATE_TABLE_NAME}") # only 5 columns + 2 dlt columns assert len(rows[0]) == 5 + 2 # inspect old state @@ -131,7 +131,7 @@ def test_pipeline_with_dlt_update(test_storage: FileStorage) -> None: # two schema versions rows = client.execute_sql(f"SELECT * FROM {VERSION_TABLE_NAME}") assert len(rows) == 2 - rows = client.execute_sql(f"SELECT * FROM {STATE_TABLE_NAME} ORDER BY version") + rows = client.execute_sql(f"SELECT * FROM {PIPELINE_STATE_TABLE_NAME} ORDER BY version") # we have hash columns assert len(rows[0]) == 6 + 2 assert len(rows) == 2 @@ -217,7 +217,7 @@ def test_load_package_with_dlt_update(test_storage: FileStorage) -> None: assert pipeline.state["_version_hash"] is not None # but in db there's no hash - we loaded an old package with backward compatible schema with pipeline.sql_client() as client: - rows = client.execute_sql(f"SELECT * FROM {STATE_TABLE_NAME}") + rows = client.execute_sql(f"SELECT * FROM {PIPELINE_STATE_TABLE_NAME}") # no hash assert len(rows[0]) == 5 + 2 assert len(rows) == 1 @@ -227,7 +227,7 @@ def test_load_package_with_dlt_update(test_storage: FileStorage) -> None: # this will sync schema to destination pipeline.sync_schema() # we have hash now - rows = client.execute_sql(f"SELECT * FROM {STATE_TABLE_NAME}") + rows = client.execute_sql(f"SELECT * FROM {PIPELINE_STATE_TABLE_NAME}") assert len(rows[0]) == 6 + 2