Skip to content

Commit

Permalink
fix linter after merge
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Oct 2, 2023
1 parent f2abadf commit 2ae36e3
Show file tree
Hide file tree
Showing 15 changed files with 84 additions and 81 deletions.
2 changes: 1 addition & 1 deletion dlt/common/schema/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from dlt.common.schema.typing import TSchemaUpdate, TSchemaTables, TTableSchema, TStoredSchema, TTableSchemaColumns, TColumnHint, TColumnSchema, TColumnSchemaBase # noqa: F401
from dlt.common.schema.typing import TSchemaContractDict, TSchemaUpdate, TSchemaTables, TTableSchema, TStoredSchema, TTableSchemaColumns, TColumnHint, TColumnSchema, TColumnSchemaBase # noqa: F401
from dlt.common.schema.typing import COLUMN_HINTS # noqa: F401
from dlt.common.schema.schema import Schema, DEFAULT_SCHEMA_CONTRACT_MODE # noqa: F401
from dlt.common.schema.utils import verify_schema_hash # noqa: F401
10 changes: 5 additions & 5 deletions dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ def apply_schema_contract(self, contract_modes: TSchemaContractDict, table_name:
return row, partial_table

# if evolve once is set, allow all column changes
evolve_once = (table_name in self.tables) and self.tables[table_name].get("x-normalizer", {}).get("evolve_once", False)
evolve_once = (table_name in self.tables) and self.tables[table_name].get("x-normalizer", {}).get("evolve_once", False) # type: ignore[attr-defined]
if evolve_once:
return row, partial_table

Expand Down Expand Up @@ -650,18 +650,18 @@ def _compile_settings(self) -> None:
def __repr__(self) -> str:
return f"Schema {self.name} at {id(self)}"

def resolve_contract_settings_for_table(parent_table: str, table_name: str, current_schema: Schema, incoming_schema: Schema = None, incoming_table: TTableSchema = None) -> Tuple[bool, TSchemaContractDict]:
def resolve_contract_settings_for_table(parent_table: str, table_name: str, current_schema: Schema, incoming_schema: Schema = None, incoming_table: TTableSchema = None) -> TSchemaContractDict:
"""Resolve the exact applicable schema contract settings for the table during the normalization stage."""

def resolve_single(settings: TSchemaContract) -> TSchemaContractDict:
settings = settings or {}
if isinstance(settings, str):
settings = TSchemaContractDict(tables=settings, columns=settings, data_type=settings)
return {**DEFAULT_SCHEMA_CONTRACT_MODE, **settings} if settings else {}
return cast(TSchemaContractDict, {**DEFAULT_SCHEMA_CONTRACT_MODE, **settings} if settings else {})

if incoming_table and (incoming_table_contract_mode := resolve_single(incoming_table.get("schema_contract", {}))):
return incoming_table_contract_mode

# find table settings
table = parent_table or table_name
if table in current_schema.tables:
Expand Down
23 changes: 12 additions & 11 deletions dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,18 @@ class NormalizerInfo(TypedDict, total=True):
new_table: bool

# TypedDict that defines properties of a table
TTableSchema = TypedDict("TTableSchema", {
"name": Optional[str],
"description": Optional[str],
"write_disposition": Optional[TWriteDisposition],
"schema_contract": Optional[TSchemaContract],
"parent": Optional[str],
"filters": Optional[TRowFilters],
"columns": TTableSchemaColumns,
"resource": Optional[str],
"x-normalizer": Optional[NormalizerInfo],
})

class TTableSchema(TypedDict, total=False):
"""TypedDict that defines properties of a table"""
name: Optional[str]
description: Optional[str]
write_disposition: Optional[TWriteDisposition]
schema_contract: Optional[TSchemaContract]
table_sealed: Optional[bool]
parent: Optional[str]
filters: Optional[TRowFilters]
columns: TTableSchemaColumns
resource: Optional[str]

class TPartialTableSchema(TTableSchema):
pass
Expand Down
4 changes: 2 additions & 2 deletions dlt/extract/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ def _add_dynamic_table(resource: DltResource, data_item: TDataItem = None, table
# TODO: is this the correct check for a new table, should a table with only incomplete columns be new too?
is_new_table = (table["name"] not in pipeline_schema.tables) or (not pipeline_schema.tables[table["name"]]["columns"])
if is_new_table:
table["x-normalizer"] = {"evolve_once": True}
table["x-normalizer"] = {"evolve_once": True} # type: ignore[typeddict-unknown-key]

# apply schema contract and apply on pipeline schema
# here we only check that table may be created
schema_contract = resolve_contract_settings_for_table(None, table["name"], pipeline_schema, source.schema, table)
Expand Down
2 changes: 1 addition & 1 deletion dlt/extract/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def new_table_template(
columns = ensure_table_schema_columns_hint(columns)
if not callable(columns):
columns = columns.values() # type: ignore
is_table_complete = len([c for c in columns if c.get("name") and c.get("data_type")])
is_table_complete = len([c for c in columns if c.get("name") and c.get("data_type")]) # type: ignore
else:
validator = None
is_table_complete = False
Expand Down
2 changes: 1 addition & 1 deletion dlt/normalize/normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ def spool_files(self, schema_name: str, load_id: str, map_f: TMapFuncType, files
needs_schema_save = len(schema_updates) > 0
# remove normalizer specific info
for table in schema.tables.values():
if table.pop("x-normalizer", None):
if table.pop("x-normalizer", None): # type: ignore[typeddict-item]
needs_schema_save = True
# logger.metrics("Normalize metrics", extra=get_logging_extras([self.schema_version_gauge.labels(schema_name)]))
if needs_schema_save:
Expand Down
2 changes: 1 addition & 1 deletion dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -893,7 +893,7 @@ def _extract_source(self, storage: ExtractorStorage, source: DltSource, max_para
table
)
pipeline_schema.set_schema_contract(source_schema._settings.get("schema_contract", {}))

# globally apply contract override again for all merged tables
if global_contract is not None:
pipeline_schema.set_schema_contract(global_contract, True)
Expand Down
82 changes: 42 additions & 40 deletions tests/common/schema/test_contract_mode_functions.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
from typing import cast

import pytest
import copy

from dlt.common.schema import Schema, DEFAULT_SCHEMA_CONTRACT_MODE
from dlt.common.schema import Schema, DEFAULT_SCHEMA_CONTRACT_MODE, TSchemaContractDict
from dlt.common.schema.schema import resolve_contract_settings_for_table
from dlt.common.schema.exceptions import SchemaFrozenException

from dlt.common.schema.typing import TTableSchema

def get_schema() -> Schema:
s = Schema("event")

columns = {
"column_1": {
"name": "column_1",
"data_type": "string"
"data_type": "text"
},
"column_2": {
"name": "column_2",
"data_type": "number",
"data_type": "bigint",
"is_variant": True
}
}
Expand All @@ -32,26 +34,26 @@ def get_schema() -> Schema:


# add some tables
s.update_table({
s.update_table(cast(TTableSchema, {
"name": "tables",
"columns": columns
})
}))

s.update_table({
s.update_table(cast(TTableSchema, {
"name": "child_table",
"parent": "tables",
"columns": columns
})
}))

s.update_table({
s.update_table(cast(TTableSchema, {
"name": "incomplete_table",
"columns": incomplete_columns
})
}))

s.update_table({
s.update_table(cast(TTableSchema, {
"name": "mixed_table",
"columns": {**incomplete_columns, **columns}
})
}))

return s

Expand Down Expand Up @@ -148,7 +150,7 @@ def test_resolve_contract_settings() -> None:
current_schema._settings["schema_contract"] = "discard_value"
incoming_schema = get_schema()
incoming_schema._settings["schema_contract"] = "discard_row"
incoming_table = {"name": "incomplete_table", "schema_contract": "freeze"}
incoming_table: TTableSchema = {"name": "incomplete_table", "schema_contract": "freeze"}


# incoming schema overrides
Expand Down Expand Up @@ -223,12 +225,12 @@ def test_check_adding_table(base_settings) -> None:
#
# check adding new table
#
assert schema.apply_schema_contract({**base_settings, **{"tables": "evolve"}}, "new_table", data, new_table) == (data, new_table)
assert schema.apply_schema_contract({**base_settings, **{"tables": "discard_row"}}, "new_table", data, new_table) == (None, None)
assert schema.apply_schema_contract({**base_settings, **{"tables": "discard_value"}}, "new_table", data, new_table) == (None, None)
assert schema.apply_schema_contract(cast(TSchemaContractDict, {**base_settings, **{"tables": "evolve"}}), "new_table", data, new_table) == (data, new_table)
assert schema.apply_schema_contract(cast(TSchemaContractDict, {**base_settings, **{"tables": "discard_row"}}), "new_table", data, new_table) == (None, None)
assert schema.apply_schema_contract(cast(TSchemaContractDict, {**base_settings, **{"tables": "discard_value"}}), "new_table", data, new_table) == (None, None)

with pytest.raises(SchemaFrozenException):
schema.apply_schema_contract({**base_settings, **{"tables": "freeze"}}, "new_table", data, new_table)
schema.apply_schema_contract(cast(TSchemaContractDict, {**base_settings, **{"tables": "freeze"}}), "new_table", data, new_table)


@pytest.mark.parametrize("base_settings", base_settings)
Expand All @@ -243,27 +245,27 @@ def test_check_adding_new_columns(base_settings) -> None:
"column_2": 123
}
data_with_new_row = {
**data,
**data, # type: ignore
"new_column": "some string"
}
table_update = {
table_update: TTableSchema = {
"name": "tables",
"columns": {
"new_column": {
"name": "new_column",
"data_type": "string"
"data_type": "text"
}
}
}
popped_table_update = copy.deepcopy(table_update)
popped_table_update["columns"].pop("new_column")

assert schema.apply_schema_contract({**base_settings, **{"columns": "evolve"}}, "tables", copy.deepcopy(data_with_new_row), table_update) == (data_with_new_row, table_update)
assert schema.apply_schema_contract({**base_settings, **{"columns": "discard_row"}}, "tables", copy.deepcopy(data_with_new_row), table_update) == (None, None)
assert schema.apply_schema_contract({**base_settings, **{"columns": "discard_value"}}, "tables", copy.deepcopy(data_with_new_row), table_update) == (data, popped_table_update)
assert schema.apply_schema_contract(cast(TSchemaContractDict, {**base_settings, **{"columns": "evolve"}}), "tables", copy.deepcopy(data_with_new_row), table_update) == (data_with_new_row, table_update)
assert schema.apply_schema_contract(cast(TSchemaContractDict, {**base_settings, **{"columns": "discard_row"}}), "tables", copy.deepcopy(data_with_new_row), table_update) == (None, None)
assert schema.apply_schema_contract(cast(TSchemaContractDict, {**base_settings, **{"columns": "discard_value"}}), "tables", copy.deepcopy(data_with_new_row), table_update) == (data, popped_table_update)

with pytest.raises(SchemaFrozenException):
schema.apply_schema_contract({**base_settings, **{"columns": "freeze"}}, "tables", copy.deepcopy(data_with_new_row), table_update)
schema.apply_schema_contract(cast(TSchemaContractDict, {**base_settings, **{"columns": "freeze"}}), "tables", copy.deepcopy(data_with_new_row), table_update)


#
Expand All @@ -274,28 +276,28 @@ def test_check_adding_new_columns(base_settings) -> None:
"column_2": 123,
}
data_with_new_row = {
**data,
**data, # type: ignore
"incomplete_column_1": "some other string",
}
table_update = {
"name": "mixed_table",
"columns": {
"incomplete_column_1": {
"name": "incomplete_column_1",
"data_type": "string"
"data_type": "text"
}
}
}
popped_table_update = copy.deepcopy(table_update)
popped_table_update["columns"].pop("incomplete_column_1")

# incomplete columns should be treated like new columns
assert schema.apply_schema_contract({**base_settings, **{"columns": "evolve"}}, "mixed_table", copy.deepcopy(data_with_new_row), table_update) == (data_with_new_row, table_update)
assert schema.apply_schema_contract({**base_settings, **{"columns": "discard_row"}}, "mixed_table", copy.deepcopy(data_with_new_row), table_update) == (None, None)
assert schema.apply_schema_contract({**base_settings, **{"columns": "discard_value"}}, "mixed_table", copy.deepcopy(data_with_new_row), table_update) == (data, popped_table_update)
assert schema.apply_schema_contract(cast(TSchemaContractDict, {**base_settings, **{"columns": "evolve"}}), "mixed_table", copy.deepcopy(data_with_new_row), table_update) == (data_with_new_row, table_update)
assert schema.apply_schema_contract(cast(TSchemaContractDict, {**base_settings, **{"columns": "discard_row"}}), "mixed_table", copy.deepcopy(data_with_new_row), table_update) == (None, None)
assert schema.apply_schema_contract(cast(TSchemaContractDict, {**base_settings, **{"columns": "discard_value"}}), "mixed_table", copy.deepcopy(data_with_new_row), table_update) == (data, popped_table_update)

with pytest.raises(SchemaFrozenException):
schema.apply_schema_contract({**base_settings, **{"columns": "freeze"}}, "mixed_table", copy.deepcopy(data_with_new_row), table_update)
schema.apply_schema_contract(cast(TSchemaContractDict, {**base_settings, **{"columns": "freeze"}}), "mixed_table", copy.deepcopy(data_with_new_row), table_update)



Expand All @@ -310,32 +312,32 @@ def test_check_adding_new_variant() -> None:
"column_2": 123
}
data_with_new_row = {
**data,
**data, # type: ignore
"column_2_variant": 345345
}
table_update = {
table_update: TTableSchema = {
"name": "tables",
"columns": {
"column_2_variant": {
"name": "column_2_variant",
"data_type": "number",
"data_type": "bigint",
"variant": True
}
}
}
popped_table_update = copy.deepcopy(table_update)
popped_table_update["columns"].pop("column_2_variant")

assert schema.apply_schema_contract({**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "evolve"}}, "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update)) == (data_with_new_row, table_update)
assert schema.apply_schema_contract({**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "discard_row"}}, "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update)) == (None, None)
assert schema.apply_schema_contract({**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "discard_value"}}, "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update)) == (data, popped_table_update)
assert schema.apply_schema_contract(cast(TSchemaContractDict, {**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "evolve"}}), "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update)) == (data_with_new_row, table_update)
assert schema.apply_schema_contract(cast(TSchemaContractDict, {**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "discard_row"}}), "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update)) == (None, None)
assert schema.apply_schema_contract(cast(TSchemaContractDict, {**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "discard_value"}}), "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update)) == (data, popped_table_update)

with pytest.raises(SchemaFrozenException):
schema.apply_schema_contract({**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "freeze"}}, "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update))
schema.apply_schema_contract(cast(TSchemaContractDict, {**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "freeze"}}), "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update))

# check interaction with new columns settings, variants are new columns..
with pytest.raises(SchemaFrozenException):
assert schema.apply_schema_contract({**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "evolve", "columns": "freeze"}}, "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update)) == (data_with_new_row, table_update)
assert schema.apply_schema_contract(cast(TSchemaContractDict, {**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "evolve", "columns": "freeze"}}), "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update)) == (data_with_new_row, table_update)

assert schema.apply_schema_contract({**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "evolve", "columns": "discard_row"}}, "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update)) == (None, None)
assert schema.apply_schema_contract({**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "evolve", "columns": "discard_value"}}, "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update)) == (data, popped_table_update)
assert schema.apply_schema_contract(cast(TSchemaContractDict, {**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "evolve", "columns": "discard_row"}}), "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update)) == (None, None)
assert schema.apply_schema_contract(cast(TSchemaContractDict, {**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "evolve", "columns": "discard_value"}}), "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update)) == (data, popped_table_update)
2 changes: 1 addition & 1 deletion tests/common/schema/test_filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,5 +120,5 @@ def _add_excludes(schema: Schema) -> None:
bot_table["filters"]["includes"] = [
TSimpleRegex("re:^data__custom$"), TSimpleRegex("re:^custom_data__included_object__"), TSimpleRegex("re:^metadata__elvl1__elvl2__")
]
schema.update_schema(bot_table)
schema.update_table(bot_table)
schema._compile_settings()
2 changes: 1 addition & 1 deletion tests/common/schema/test_versioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def test_infer_column_bumps_version() -> None:


def test_preserve_version_on_load() -> None:
eth_v6: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v7")
eth_v7: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v7")
version = eth_v7["version"]
version_hash = eth_v7["version_hash"]
schema = Schema.from_dict(eth_v7) # type: ignore[arg-type]
Expand Down
Loading

0 comments on commit 2ae36e3

Please sign in to comment.