Skip to content

Commit

Permalink
fix most tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Oct 2, 2023
1 parent 041da6d commit b72a1a9
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 49 deletions.
5 changes: 3 additions & 2 deletions dlt/extract/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ def _add_dynamic_table(resource: DltResource, data_item: TDataItem = None, table
"""
Computes new table and does contract checks
"""
# TODO: We have to normalize table identifiers here
table = resource.compute_table_schema(data_item)
if table_name:
table["name"] = table_name
Expand All @@ -142,8 +143,8 @@ def _add_dynamic_table(resource: DltResource, data_item: TDataItem = None, table
if not checked_table:
disallowed_tables.add(table["name"])
return False
dynamic_tables[table_name] = [checked_table]

dynamic_tables[checked_table["name"]] = [checked_table]
return True

# yield from all selected pipes
Expand Down
9 changes: 7 additions & 2 deletions dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -863,8 +863,11 @@ def _extract_source(self, storage: ExtractorStorage, source: DltSource, max_para

# if source schema does not exist in the pipeline
if source_schema.name not in self._schema_storage:
# save new schema into the pipeline
self._schema_storage.save_schema(Schema(source_schema.name))
# TODO: here we should create a new schema but copy hints and possibly other settings
# over from the schema table. Is this the right way?
new_schema = Schema(source_schema.name)
new_schema._settings = source_schema._settings
self._schema_storage.save_schema(new_schema)

# and set as default if this is first schema in pipeline
if not self.default_schema_name:
Expand All @@ -879,7 +882,9 @@ def _extract_source(self, storage: ExtractorStorage, source: DltSource, max_para

extract_id = extract_with_schema(storage, source, pipeline_schema, self.collector, max_parallel_items, workers)


# initialize import with fully discovered schema
# TODO: is this the right location for this?
self._schema_storage.save_import_schema_if_not_exists(source_schema)

# update the pipeline schema with all tables and contract settings
Expand Down
4 changes: 2 additions & 2 deletions tests/common/cases/schemas/eth/ethereum_schema_v7.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
version: 14
version_hash: VuzNqiLOk7XuPxYLndMFMPHTDVItKU5ayiy70nQLdus=
version: 15
version_hash: yjMtV4Zv0IJlfR5DPMwuXxGg8BRhy7E79L26XAHWEGE=
engine_version: 7
name: ethereum
tables:
Expand Down
111 changes: 78 additions & 33 deletions tests/common/schema/test_contract_mode_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import copy

from dlt.common.schema import Schema, DEFAULT_SCHEMA_CONTRACT_MODE
from dlt.common.schema.schema import resolve_contract_settings_for_table
from dlt.common.schema.exceptions import SchemaFrozenException


Expand Down Expand Up @@ -59,18 +60,18 @@ def test_resolve_contract_settings() -> None:

# defaults
schema = get_schema()
assert schema.resolve_contract_settings_for_table(None, "tables") == DEFAULT_SCHEMA_CONTRACT_MODE
assert schema.resolve_contract_settings_for_table("tables", "child_table") == DEFAULT_SCHEMA_CONTRACT_MODE
assert resolve_contract_settings_for_table(None, "tables", schema) == DEFAULT_SCHEMA_CONTRACT_MODE
assert resolve_contract_settings_for_table("tables", "child_table", schema) == DEFAULT_SCHEMA_CONTRACT_MODE

# table specific full setting
schema = get_schema()
schema.tables["tables"]["schema_contract"] = "freeze"
assert schema.resolve_contract_settings_for_table(None, "tables") == {
assert resolve_contract_settings_for_table(None, "tables", schema) == {
"tables": "freeze",
"columns": "freeze",
"data_type": "freeze"
}
assert schema.resolve_contract_settings_for_table("tables", "child_table") == {
assert resolve_contract_settings_for_table("tables", "child_table", schema) == {
"tables": "freeze",
"columns": "freeze",
"data_type": "freeze"
Expand All @@ -82,12 +83,12 @@ def test_resolve_contract_settings() -> None:
"tables": "freeze",
"columns": "discard_value",
}
assert schema.resolve_contract_settings_for_table(None, "tables") == {
assert resolve_contract_settings_for_table(None, "tables", schema) == {
"tables": "freeze",
"columns": "discard_value",
"data_type": "evolve"
}
assert schema.resolve_contract_settings_for_table("tables", "child_table") == {
assert resolve_contract_settings_for_table("tables", "child_table", schema) == {
"tables": "freeze",
"columns": "discard_value",
"data_type": "evolve"
Expand All @@ -96,12 +97,12 @@ def test_resolve_contract_settings() -> None:
# schema specific full setting
schema = get_schema()
schema._settings["schema_contract"] = "freeze"
assert schema.resolve_contract_settings_for_table(None, "tables") == {
assert resolve_contract_settings_for_table(None, "tables", schema) == {
"tables": "freeze",
"columns": "freeze",
"data_type": "freeze"
}
assert schema.resolve_contract_settings_for_table("tables", "child_table") == {
assert resolve_contract_settings_for_table("tables", "child_table", schema) == {
"tables": "freeze",
"columns": "freeze",
"data_type": "freeze"
Expand All @@ -113,35 +114,79 @@ def test_resolve_contract_settings() -> None:
"tables": "freeze",
"columns": "discard_value",
}
assert schema.resolve_contract_settings_for_table(None, "tables") == {
assert resolve_contract_settings_for_table(None, "tables", schema) == {
"tables": "freeze",
"columns": "discard_value",
"data_type": "evolve"
}
assert schema.resolve_contract_settings_for_table("tables", "child_table") == {
assert resolve_contract_settings_for_table("tables", "child_table", schema) == {
"tables": "freeze",
"columns": "discard_value",
"data_type": "evolve"
}

# mixed settings
# mixed settings: table setting always prevails
schema = get_schema()
schema._settings["schema_contract"] = "freeze"
schema.tables["tables"]["schema_contract"] = {
"tables": "evolve",
"columns": "discard_value",
}
assert schema.resolve_contract_settings_for_table(None, "tables") == {
assert resolve_contract_settings_for_table(None, "tables", schema) == {
"tables": "evolve",
"columns": "discard_value",
"data_type": "freeze"
"data_type": "evolve"
}
assert schema.resolve_contract_settings_for_table("tables", "child_table") == {
assert resolve_contract_settings_for_table("tables", "child_table", schema) == {
"tables": "evolve",
"columns": "discard_value",
"data_type": "evolve"
}

# current and incoming schema
current_schema = get_schema()
current_schema._settings["schema_contract"] = "discard_value"
incoming_schema = get_schema()
incoming_schema._settings["schema_contract"] = "discard_row"
incoming_table = {"name": "incomplete_table", "schema_contract": "freeze"}


# incoming schema overrides
assert resolve_contract_settings_for_table(None, "tables", current_schema, incoming_schema) == {
"tables": "discard_row",
"columns": "discard_row",
"data_type": "discard_row"
}

# direct incoming table overrides
assert resolve_contract_settings_for_table(None, "tables", current_schema, incoming_schema, incoming_table) == {
"tables": "freeze",
"columns": "freeze",
"data_type": "freeze"
}

# table defined on existing schema overrided incoming schema setting
current_schema.tables["tables"]["schema_contract"] = "discard_value"
assert resolve_contract_settings_for_table(None, "tables", current_schema, incoming_schema) == {
"tables": "discard_value",
"columns": "discard_value",
"data_type": "discard_value"
}

# but table on incoming schema overrides again
incoming_schema.tables["tables"]["schema_contract"] = "discard_row"
assert resolve_contract_settings_for_table(None, "tables", current_schema, incoming_schema) == {
"tables": "discard_row",
"columns": "discard_row",
"data_type": "discard_row"
}

# incoming table still overrides all
assert resolve_contract_settings_for_table(None, "tables", current_schema, incoming_schema, incoming_table) == {
"tables": "freeze",
"columns": "freeze",
"data_type": "freeze"
}

# ensure other settings do not interfere with the main setting we are testing
base_settings = [{
Expand Down Expand Up @@ -178,12 +223,12 @@ def test_check_adding_table(base_settings) -> None:
#
# check adding new table
#
assert schema.apply_schema_contract({**base_settings, **{"tables": "evolve"}}, "new_table", data, new_table, False) == (data, new_table)
assert schema.apply_schema_contract({**base_settings, **{"tables": "discard_row"}}, "new_table", data, new_table, False) == (None, None)
assert schema.apply_schema_contract({**base_settings, **{"tables": "discard_value"}}, "new_table", data, new_table, False) == (None, None)
assert schema.apply_schema_contract({**base_settings, **{"tables": "evolve"}}, "new_table", data, new_table) == (data, new_table)
assert schema.apply_schema_contract({**base_settings, **{"tables": "discard_row"}}, "new_table", data, new_table) == (None, None)
assert schema.apply_schema_contract({**base_settings, **{"tables": "discard_value"}}, "new_table", data, new_table) == (None, None)

with pytest.raises(SchemaFrozenException):
schema.apply_schema_contract({**base_settings, **{"tables": "freeze"}}, "new_table", data, new_table, False)
schema.apply_schema_contract({**base_settings, **{"tables": "freeze"}}, "new_table", data, new_table)


@pytest.mark.parametrize("base_settings", base_settings)
Expand Down Expand Up @@ -213,12 +258,12 @@ def test_check_adding_new_columns(base_settings) -> None:
popped_table_update = copy.deepcopy(table_update)
popped_table_update["columns"].pop("new_column")

assert schema.apply_schema_contract({**base_settings, **{"columns": "evolve"}}, "tables", copy.deepcopy(data_with_new_row), table_update, True) == (data_with_new_row, table_update)
assert schema.apply_schema_contract({**base_settings, **{"columns": "discard_row"}}, "tables", copy.deepcopy(data_with_new_row), table_update, True) == (None, None)
assert schema.apply_schema_contract({**base_settings, **{"columns": "discard_value"}}, "tables", copy.deepcopy(data_with_new_row), table_update, True) == (data, popped_table_update)
assert schema.apply_schema_contract({**base_settings, **{"columns": "evolve"}}, "tables", copy.deepcopy(data_with_new_row), table_update) == (data_with_new_row, table_update)
assert schema.apply_schema_contract({**base_settings, **{"columns": "discard_row"}}, "tables", copy.deepcopy(data_with_new_row), table_update) == (None, None)
assert schema.apply_schema_contract({**base_settings, **{"columns": "discard_value"}}, "tables", copy.deepcopy(data_with_new_row), table_update) == (data, popped_table_update)

with pytest.raises(SchemaFrozenException):
schema.apply_schema_contract({**base_settings, **{"columns": "freeze"}}, "tables", copy.deepcopy(data_with_new_row), table_update, True)
schema.apply_schema_contract({**base_settings, **{"columns": "freeze"}}, "tables", copy.deepcopy(data_with_new_row), table_update)


#
Expand All @@ -245,12 +290,12 @@ def test_check_adding_new_columns(base_settings) -> None:
popped_table_update["columns"].pop("incomplete_column_1")

# incomplete columns should be treated like new columns
assert schema.apply_schema_contract({**base_settings, **{"columns": "evolve"}}, "mixed_table", copy.deepcopy(data_with_new_row), table_update, True) == (data_with_new_row, table_update)
assert schema.apply_schema_contract({**base_settings, **{"columns": "discard_row"}}, "mixed_table", copy.deepcopy(data_with_new_row), table_update, True) == (None, None)
assert schema.apply_schema_contract({**base_settings, **{"columns": "discard_value"}}, "mixed_table", copy.deepcopy(data_with_new_row), table_update, True) == (data, popped_table_update)
assert schema.apply_schema_contract({**base_settings, **{"columns": "evolve"}}, "mixed_table", copy.deepcopy(data_with_new_row), table_update) == (data_with_new_row, table_update)
assert schema.apply_schema_contract({**base_settings, **{"columns": "discard_row"}}, "mixed_table", copy.deepcopy(data_with_new_row), table_update) == (None, None)
assert schema.apply_schema_contract({**base_settings, **{"columns": "discard_value"}}, "mixed_table", copy.deepcopy(data_with_new_row), table_update) == (data, popped_table_update)

with pytest.raises(SchemaFrozenException):
schema.apply_schema_contract({**base_settings, **{"columns": "freeze"}}, "mixed_table", copy.deepcopy(data_with_new_row), table_update, True)
schema.apply_schema_contract({**base_settings, **{"columns": "freeze"}}, "mixed_table", copy.deepcopy(data_with_new_row), table_update)



Expand Down Expand Up @@ -281,16 +326,16 @@ def test_check_adding_new_variant() -> None:
popped_table_update = copy.deepcopy(table_update)
popped_table_update["columns"].pop("column_2_variant")

assert schema.apply_schema_contract({**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "evolve"}}, "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update), True) == (data_with_new_row, table_update)
assert schema.apply_schema_contract({**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "discard_row"}}, "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update), True) == (None, None)
assert schema.apply_schema_contract({**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "discard_value"}}, "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update), True) == (data, popped_table_update)
assert schema.apply_schema_contract({**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "evolve"}}, "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update)) == (data_with_new_row, table_update)
assert schema.apply_schema_contract({**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "discard_row"}}, "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update)) == (None, None)
assert schema.apply_schema_contract({**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "discard_value"}}, "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update)) == (data, popped_table_update)

with pytest.raises(SchemaFrozenException):
schema.apply_schema_contract({**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "freeze"}}, "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update), True)
schema.apply_schema_contract({**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "freeze"}}, "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update))

# check interaction with new columns settings, variants are new columns..
with pytest.raises(SchemaFrozenException):
assert schema.apply_schema_contract({**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "evolve", "columns": "freeze"}}, "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update), True) == (data_with_new_row, table_update)
assert schema.apply_schema_contract({**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "evolve", "columns": "freeze"}}, "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update)) == (data_with_new_row, table_update)

assert schema.apply_schema_contract({**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "evolve", "columns": "discard_row"}}, "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update), True) == (None, None)
assert schema.apply_schema_contract({**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "evolve", "columns": "discard_value"}}, "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update), True) == (data, popped_table_update)
assert schema.apply_schema_contract({**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "evolve", "columns": "discard_row"}}, "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update)) == (None, None)
assert schema.apply_schema_contract({**DEFAULT_SCHEMA_CONTRACT_MODE, **{"data_type": "evolve", "columns": "discard_value"}}, "tables", copy.deepcopy(data_with_new_row), copy.deepcopy(table_update)) == (data, popped_table_update)
2 changes: 1 addition & 1 deletion tests/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

COMMON_TEST_CASES_PATH = "./tests/common/cases/"
# for import schema tests, change when upgrading the schema version
IMPORTED_VERSION_HASH_ETH_V7 = "VuzNqiLOk7XuPxYLndMFMPHTDVItKU5ayiy70nQLdus="
IMPORTED_VERSION_HASH_ETH_V7 = "yjMtV4Zv0IJlfR5DPMwuXxGg8BRhy7E79L26XAHWEGE="
# test sentry DSN
TEST_SENTRY_DSN = "https://[email protected]/4504306172821504"
# preserve secrets path to be able to restore it
Expand Down
6 changes: 3 additions & 3 deletions tests/extract/test_extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from dlt.common.storages import NormalizeStorageConfiguration
from dlt.extract.extract import ExtractorStorage, extract
from dlt.extract.source import DltResource, DltSource

from dlt.common.schema import Schema
from tests.utils import clean_test_storage
from tests.extract.utils import expect_extracted_file

Expand All @@ -18,7 +18,7 @@ def expect_tables(resource: DltResource) -> dlt.Schema:

storage = ExtractorStorage(NormalizeStorageConfiguration())
extract_id = storage.create_extract_id()
schema_update = extract(extract_id, source, storage)
schema_update = extract(extract_id, source, storage, pipeline_schema=Schema("some_schema"))
# odd and even tables
assert len(schema_update) == 2
assert "odd_table" in schema_update
Expand All @@ -42,7 +42,7 @@ def expect_tables(resource: DltResource) -> dlt.Schema:
source = source.with_resources(resource.name)
source.selected_resources[resource.name].bind(10).select_tables("odd_table")
extract_id = storage.create_extract_id()
schema_update = extract(extract_id, source, storage)
schema_update = extract(extract_id, source, storage, pipeline_schema=Schema("some_schema"))
assert len(schema_update) == 1
assert "odd_table" in schema_update
for partials in schema_update.values():
Expand Down
1 change: 0 additions & 1 deletion tests/extract/test_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,6 @@ def some_data(last_timestamp=dlt.sources.incremental("item.timestamp")):
@dlt.resource
def standalone_some_data(now=None, last_timestamp=dlt.sources.incremental("item.timestamp")):
for i in range(-10, 10):
print(i)
yield {"delta": i, "item": {"timestamp": (now or pendulum.now()).add(days=i).timestamp()}}


Expand Down
12 changes: 7 additions & 5 deletions tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -740,11 +740,13 @@ def some_source():
p = dlt.pipeline(pipeline_name=uniq_id(), destination='dummy')
p.run(source)

assert source.schema.tables['some_table']['resource'] == 'static_data'
assert source.schema.tables['dynamic_func_table']['resource'] == 'dynamic_func_data'
assert source.schema.tables['dynamic_mark_table']['resource'] == 'dynamic_mark_data'
assert source.schema.tables['parent_table']['resource'] == 'nested_data'
assert 'resource' not in source.schema.tables['parent_table__items']
schema = p.default_schema

assert schema.tables['some_table']['resource'] == 'static_data'
assert schema.tables['dynamic_func_table']['resource'] == 'dynamic_func_data'
assert schema.tables['dynamic_mark_table']['resource'] == 'dynamic_mark_data'
assert schema.tables['parent_table']['resource'] == 'nested_data'
assert 'resource' not in schema.tables['parent_table__items']


def test_preserve_fields_order() -> None:
Expand Down

0 comments on commit b72a1a9

Please sign in to comment.