Skip to content

Commit

Permalink
initial sql uppercase naming convention
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Feb 25, 2024
1 parent c4e9f35 commit 6345377
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 7 deletions.
20 changes: 20 additions & 0 deletions dlt/common/normalizers/naming/sql_upper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from typing import Any, Sequence

from dlt.common.normalizers.naming.naming import NamingConvention as BaseNamingConvention


class NamingConvention(BaseNamingConvention):
PATH_SEPARATOR = "__"

_CLEANUP_TABLE = str.maketrans(".\n\r'\"▶", "______")

def normalize_identifier(self, identifier: str) -> str:
identifier = super().normalize_identifier(identifier)
norm_identifier = identifier.translate(self._CLEANUP_TABLE).upper()
return self.shorten_identifier(norm_identifier, identifier, self.max_length)

def make_path(self, *identifiers: Any) -> str:
return self.PATH_SEPARATOR.join(filter(lambda x: x.strip(), identifiers))

def break_path(self, path: str) -> Sequence[str]:
return [ident for ident in path.split(self.PATH_SEPARATOR) if ident.strip()]
7 changes: 6 additions & 1 deletion tests/load/test_job_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from dlt.common.destination.reference import WithStagingDataset

from tests.cases import table_update_and_row, assert_all_data_types_row
from tests.utils import TEST_STORAGE_ROOT, autouse_test_storage
from tests.utils import TEST_STORAGE_ROOT, autouse_test_storage, preserve_environ
from tests.common.utils import load_json_case
from tests.load.utils import (
TABLE_UPDATE,
Expand All @@ -45,6 +45,10 @@
from tests.load.pipeline.utils import destinations_configs, DestinationTestConfiguration


@pytest.fixture(autouse=True)
def set_environ():
os.environ["SCHEMA__NAMING"] = "sql_upper"

@pytest.fixture
def file_storage() -> FileStorage:
return FileStorage(TEST_STORAGE_ROOT, file_type="b", makedirs=True)
Expand Down Expand Up @@ -374,6 +378,7 @@ def test_get_storage_table_with_all_types(client: SqlJobClientBase) -> None:
# now get the actual schema from the db
exists, storage_table = client.get_storage_table(table_name)
assert exists is True
print(storage_table)
# column order must match TABLE_UPDATE
storage_columns = list(storage_table.values())
for c, expected_c in zip(TABLE_UPDATE, storage_columns):
Expand Down
2 changes: 2 additions & 0 deletions tests/load/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,8 @@ def yield_client(
)
schema_storage = SchemaStorage(storage_config)
schema = schema_storage.load_schema(schema_name)
schema.update_normalizers()
schema.bump_version()
# create client and dataset
client: SqlJobClientBase = None

Expand Down
12 changes: 6 additions & 6 deletions tests/pipeline/test_dlt_versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from dlt.common.storages import FileStorage
from dlt.common.schema.typing import (
LOADS_TABLE_NAME,
STATE_TABLE_NAME,
PIPELINE_STATE_TABLE_NAME,
VERSION_TABLE_NAME,
TStoredSchema,
)
Expand Down Expand Up @@ -66,7 +66,7 @@ def test_pipeline_with_dlt_update(test_storage: FileStorage) -> None:
)
# check the dlt state table
assert {
"version_hash" not in github_schema["tables"][STATE_TABLE_NAME]["columns"]
"version_hash" not in github_schema["tables"][PIPELINE_STATE_TABLE_NAME]["columns"]
}
# check loads table without attaching to pipeline
duckdb_cfg = resolve_configuration(
Expand All @@ -79,7 +79,7 @@ def test_pipeline_with_dlt_update(test_storage: FileStorage) -> None:
assert len(rows[0]) == 4
rows = client.execute_sql("SELECT * FROM issues")
assert len(rows) == 20
rows = client.execute_sql(f"SELECT * FROM {STATE_TABLE_NAME}")
rows = client.execute_sql(f"SELECT * FROM {PIPELINE_STATE_TABLE_NAME}")
# only 5 columns + 2 dlt columns
assert len(rows[0]) == 5 + 2
# inspect old state
Expand Down Expand Up @@ -131,7 +131,7 @@ def test_pipeline_with_dlt_update(test_storage: FileStorage) -> None:
# two schema versions
rows = client.execute_sql(f"SELECT * FROM {VERSION_TABLE_NAME}")
assert len(rows) == 2
rows = client.execute_sql(f"SELECT * FROM {STATE_TABLE_NAME} ORDER BY version")
rows = client.execute_sql(f"SELECT * FROM {PIPELINE_STATE_TABLE_NAME} ORDER BY version")
# we have hash columns
assert len(rows[0]) == 6 + 2
assert len(rows) == 2
Expand Down Expand Up @@ -217,7 +217,7 @@ def test_load_package_with_dlt_update(test_storage: FileStorage) -> None:
assert pipeline.state["_version_hash"] is not None
# but in db there's no hash - we loaded an old package with backward compatible schema
with pipeline.sql_client() as client:
rows = client.execute_sql(f"SELECT * FROM {STATE_TABLE_NAME}")
rows = client.execute_sql(f"SELECT * FROM {PIPELINE_STATE_TABLE_NAME}")
# no hash
assert len(rows[0]) == 5 + 2
assert len(rows) == 1
Expand All @@ -227,7 +227,7 @@ def test_load_package_with_dlt_update(test_storage: FileStorage) -> None:
# this will sync schema to destination
pipeline.sync_schema()
# we have hash now
rows = client.execute_sql(f"SELECT * FROM {STATE_TABLE_NAME}")
rows = client.execute_sql(f"SELECT * FROM {PIPELINE_STATE_TABLE_NAME}")
assert len(rows[0]) == 6 + 2


Expand Down

0 comments on commit 6345377

Please sign in to comment.