From 33a64fbccffc914d6759b4c6a02dbbf0eb2fd214 Mon Sep 17 00:00:00 2001 From: Dave Date: Sun, 19 Nov 2023 18:48:26 +0100 Subject: [PATCH 1/7] add schema ancestors --- dlt/common/schema/schema.py | 16 +- dlt/common/schema/typing.py | 3 +- dlt/common/schema/utils.py | 22 +- dlt/common/validation.py | 5 +- .../cases/schemas/eth/ethereum_schema_v7.yml | 459 ++++++++++++++++++ tests/common/schema/test_schema.py | 12 +- tests/common/schema/test_versioning.py | 33 +- tests/common/storages/test_schema_storage.py | 19 +- tests/common/test_validation.py | 4 +- tests/common/utils.py | 2 +- tests/load/pipeline/test_restore_state.py | 4 +- tests/load/weaviate/test_naming.py | 4 +- tests/pipeline/test_dlt_versions.py | 4 +- 13 files changed, 560 insertions(+), 27 deletions(-) create mode 100644 tests/common/cases/schemas/eth/ethereum_schema_v7.yml diff --git a/dlt/common/schema/schema.py b/dlt/common/schema/schema.py index 77a5ae8e8e..1878cf63d6 100644 --- a/dlt/common/schema/schema.py +++ b/dlt/common/schema/schema.py @@ -37,6 +37,7 @@ class Schema: _dlt_tables_prefix: str _stored_version: int # version at load/creation time _stored_version_hash: str # version hash at load/creation time + _stored_ancestors: Optional[List[str]] # list of ancestor hashes of the schema _imported_version_hash: str # version hash of recently imported schema _schema_description: str # optional schema description _schema_tables: TSchemaTables @@ -61,6 +62,7 @@ def __init__(self, name: str, normalizers: TNormalizersConfig = None) -> None: @classmethod def from_dict(cls, d: DictStrAny) -> "Schema": + # upgrade engine if needed stored_schema = utils.migrate_schema(d, d["engine_version"], cls.ENGINE_VERSION) # verify schema @@ -91,7 +93,8 @@ def to_dict(self, remove_defaults: bool = False) -> TStoredSchema: "name": self._schema_name, "tables": self._schema_tables, "settings": self._settings, - "normalizers": self._normalizers_config + "normalizers": self._normalizers_config, + "ancestors": self._stored_ancestors } if self._imported_version_hash and not remove_defaults: stored_schema["imported_version_hash"] = self._imported_version_hash @@ -223,7 +226,7 @@ def update_schema(self, schema: "Schema") -> None: self._compile_settings() - def bump_version(self) -> Tuple[int, str]: + def bump_version(self) -> Tuple[int, str, List[str]]: """Computes schema hash in order to check if schema content was modified. In such case the schema ``stored_version`` and ``stored_version_hash`` are updated. Should not be used in production code. The method ``to_dict`` will generate TStoredSchema with correct value, only once before persisting schema to storage. @@ -232,7 +235,7 @@ def bump_version(self) -> Tuple[int, str]: Tuple[int, str]: Current (``stored_version``, ``stored_version_hash``) tuple """ version = utils.bump_version_if_modified(self.to_dict()) - self._stored_version, self._stored_version_hash = version + self._stored_version, self._stored_version_hash, self._stored_ancestors = version return version def filter_row_with_hint(self, table_name: str, hint_type: TColumnHint, row: StrAny) -> StrAny: @@ -350,6 +353,11 @@ def version_hash(self) -> str: """Current version hash of the schema, recomputed from the actual content""" return utils.bump_version_if_modified(self.to_dict())[1] + @property + def ancestors(self) -> List[str]: + """Current version hash of the schema, recomputed from the actual content""" + return utils.bump_version_if_modified(self.to_dict())[2] + @property def stored_version_hash(self) -> str: """Version hash of the schema content form the time of schema loading/creation.""" @@ -532,6 +540,7 @@ def _reset_schema(self, name: str, normalizers: TNormalizersConfig = None) -> No self._stored_version_hash: str = None self._imported_version_hash: str = None self._schema_description: str = None + self._stored_ancestors: List[str] = [] self._settings: TSchemaSettings = {} self._compiled_preferred_types: List[Tuple[REPattern, TDataType]] = [] @@ -570,6 +579,7 @@ def _from_stored_schema(self, stored_schema: TStoredSchema) -> None: self._imported_version_hash = stored_schema.get("imported_version_hash") self._schema_description = stored_schema.get("description") self._settings = stored_schema.get("settings") or {} + self._stored_ancestors = stored_schema.get("ancestors") self._compile_settings() def _set_schema_name(self, name: str) -> None: diff --git a/dlt/common/schema/typing.py b/dlt/common/schema/typing.py index ac17f0ae9f..7d53f4e8a8 100644 --- a/dlt/common/schema/typing.py +++ b/dlt/common/schema/typing.py @@ -11,7 +11,7 @@ # current version of schema engine -SCHEMA_ENGINE_VERSION = 6 +SCHEMA_ENGINE_VERSION = 7 # dlt tables VERSION_TABLE_NAME = "_dlt_version" @@ -108,6 +108,7 @@ class TStoredSchema(TypedDict, total=False): """TypeDict defining the schema representation in storage""" version: int version_hash: str + ancestors: List[str] imported_version_hash: Optional[str] engine_version: int name: str diff --git a/dlt/common/schema/utils.py b/dlt/common/schema/utils.py index f2075ce85d..75b7d0dd31 100644 --- a/dlt/common/schema/utils.py +++ b/dlt/common/schema/utils.py @@ -134,7 +134,7 @@ def add_column_defaults(column: TColumnSchemaBase) -> TColumnSchema: # return copy(column) # type: ignore -def bump_version_if_modified(stored_schema: TStoredSchema) -> Tuple[int, str]: +def bump_version_if_modified(stored_schema: TStoredSchema) -> Tuple[int, str, List[str]]: # if any change to schema document is detected then bump version and write new hash hash_ = generate_version_hash(stored_schema) previous_hash = stored_schema.get("version_hash") @@ -143,8 +143,13 @@ def bump_version_if_modified(stored_schema: TStoredSchema) -> Tuple[int, str]: pass elif hash_ != previous_hash: stored_schema["version"] += 1 + # unshift previous hash to ancestors and limit array to 10 entries + if previous_hash not in stored_schema["ancestors"]: + stored_schema["ancestors"].insert(0, previous_hash) + stored_schema["ancestors"] = stored_schema["ancestors"][:10] + stored_schema["version_hash"] = hash_ - return stored_schema["version"], hash_ + return stored_schema["version"], hash_, stored_schema["ancestors"] def generate_version_hash(stored_schema: TStoredSchema) -> str: @@ -153,6 +158,7 @@ def generate_version_hash(stored_schema: TStoredSchema) -> str: schema_copy.pop("version") schema_copy.pop("version_hash", None) schema_copy.pop("imported_version_hash", None) + schema_copy.pop("ancestors", None) # ignore order of elements when computing the hash content = json.dumps(schema_copy, sort_keys=True) h = hashlib.sha3_256(content.encode("utf-8")) @@ -240,12 +246,18 @@ def compile_simple_regexes(r: Iterable[TSimpleRegex]) -> REPattern: def validate_stored_schema(stored_schema: TStoredSchema) -> None: + # exclude validation of keys added later + ignored_keys = [] + if stored_schema["engine_version"] < 7: + ignored_keys.append("ancestors") + # use lambda to verify only non extra fields validate_dict_ignoring_xkeys( spec=TStoredSchema, doc=stored_schema, path=".", - validator_f=simple_regex_validator + validator_f=simple_regex_validator, + filter_required=lambda k: k not in ignored_keys ) # check child parent relationships for table_name, table in stored_schema["tables"].items(): @@ -256,6 +268,7 @@ def validate_stored_schema(stored_schema: TStoredSchema) -> None: def migrate_schema(schema_dict: DictStrAny, from_engine: int, to_engine: int) -> TStoredSchema: + if from_engine == to_engine: return cast(TStoredSchema, schema_dict) @@ -340,6 +353,9 @@ def migrate_filters(group: str, filters: List[str]) -> None: # replace loads table schema_dict["tables"][LOADS_TABLE_NAME] = load_table() from_engine = 6 + if from_engine == 6 and to_engine > 6: + schema_dict["ancestors"] = [] + from_engine = 7 schema_dict["engine_version"] = from_engine if from_engine != to_engine: diff --git a/dlt/common/validation.py b/dlt/common/validation.py index f1900c1b0e..7a313b1b29 100644 --- a/dlt/common/validation.py +++ b/dlt/common/validation.py @@ -9,7 +9,7 @@ TCustomValidator = Callable[[str, str, Any, Any], bool] -def validate_dict(spec: Type[_TypedDict], doc: StrAny, path: str, filter_f: TFilterFunc = None, validator_f: TCustomValidator = None) -> None: +def validate_dict(spec: Type[_TypedDict], doc: StrAny, path: str, filter_f: TFilterFunc = None, validator_f: TCustomValidator = None, filter_required: TFilterFunc = None) -> None: """Validate the `doc` dictionary based on the given typed dictionary specification `spec`. Args: @@ -32,11 +32,12 @@ def validate_dict(spec: Type[_TypedDict], doc: StrAny, path: str, filter_f: TFil """ # pass through filter filter_f = filter_f or (lambda _: True) + filter_required = filter_required or (lambda _: True) # cannot validate anything validator_f = validator_f or (lambda p, pk, pv, t: False) allowed_props = get_type_hints(spec) - required_props = {k: v for k, v in allowed_props.items() if not is_optional_type(v)} + required_props = {k: v for k, v in allowed_props.items() if (not is_optional_type(v) and filter_required(k))} # remove optional props props = {k: v for k, v in doc.items() if filter_f(k)} # check missing props diff --git a/tests/common/cases/schemas/eth/ethereum_schema_v7.yml b/tests/common/cases/schemas/eth/ethereum_schema_v7.yml new file mode 100644 index 0000000000..fd612df987 --- /dev/null +++ b/tests/common/cases/schemas/eth/ethereum_schema_v7.yml @@ -0,0 +1,459 @@ +version: 14 +version_hash: ZbDv9+tdJK7P/4QIB0qqHzqNSsVynVx90GL4giV8/p0= +engine_version: 7 +name: ethereum +tables: + _dlt_loads: + columns: + load_id: + nullable: false + data_type: text + name: load_id + schema_name: + nullable: true + data_type: text + name: schema_name + status: + nullable: false + data_type: bigint + name: status + inserted_at: + nullable: false + data_type: timestamp + name: inserted_at + schema_version_hash: + nullable: true + data_type: text + name: schema_version_hash + write_disposition: skip + description: Created by DLT. Tracks completed loads + name: _dlt_loads + resource: _dlt_loads + _dlt_version: + columns: + version: + nullable: false + data_type: bigint + name: version + engine_version: + nullable: false + data_type: bigint + name: engine_version + inserted_at: + nullable: false + data_type: timestamp + name: inserted_at + schema_name: + nullable: false + data_type: text + name: schema_name + version_hash: + nullable: false + data_type: text + name: version_hash + schema: + nullable: false + data_type: text + name: schema + write_disposition: skip + description: Created by DLT. Tracks schema updates + name: _dlt_version + resource: _dlt_version + blocks: + description: Ethereum blocks + x-annotation: this will be preserved on save + write_disposition: append + table_sealed: true + filters: + includes: [] + excludes: [] + columns: + _dlt_load_id: + nullable: false + description: load id coming from the extractor + data_type: text + name: _dlt_load_id + _dlt_id: + nullable: false + unique: true + data_type: text + name: _dlt_id + number: + nullable: false + primary_key: true + data_type: bigint + name: number + parent_hash: + nullable: true + data_type: text + name: parent_hash + hash: + nullable: false + cluster: true + unique: true + data_type: text + name: hash + base_fee_per_gas: + nullable: false + data_type: wei + name: base_fee_per_gas + difficulty: + nullable: false + data_type: wei + name: difficulty + extra_data: + nullable: true + data_type: text + name: extra_data + gas_limit: + nullable: false + data_type: bigint + name: gas_limit + gas_used: + nullable: false + data_type: bigint + name: gas_used + logs_bloom: + nullable: true + data_type: binary + name: logs_bloom + miner: + nullable: true + data_type: text + name: miner + mix_hash: + nullable: true + data_type: text + name: mix_hash + nonce: + nullable: true + data_type: text + name: nonce + receipts_root: + nullable: true + data_type: text + name: receipts_root + sha3_uncles: + nullable: true + data_type: text + name: sha3_uncles + size: + nullable: true + data_type: bigint + name: size + state_root: + nullable: false + data_type: text + name: state_root + timestamp: + nullable: false + unique: true + sort: true + data_type: timestamp + name: timestamp + total_difficulty: + nullable: true + data_type: wei + name: total_difficulty + transactions_root: + nullable: false + data_type: text + name: transactions_root + name: blocks + resource: blocks + blocks__transactions: + parent: blocks + columns: + _dlt_id: + nullable: false + unique: true + data_type: text + name: _dlt_id + block_number: + nullable: false + primary_key: true + foreign_key: true + data_type: bigint + name: block_number + transaction_index: + nullable: false + primary_key: true + data_type: bigint + name: transaction_index + hash: + nullable: false + unique: true + data_type: text + name: hash + block_hash: + nullable: false + cluster: true + data_type: text + name: block_hash + block_timestamp: + nullable: false + sort: true + data_type: timestamp + name: block_timestamp + chain_id: + nullable: true + data_type: text + name: chain_id + from: + nullable: true + data_type: text + name: from + gas: + nullable: true + data_type: bigint + name: gas + gas_price: + nullable: true + data_type: bigint + name: gas_price + input: + nullable: true + data_type: text + name: input + max_fee_per_gas: + nullable: true + data_type: wei + name: max_fee_per_gas + max_priority_fee_per_gas: + nullable: true + data_type: wei + name: max_priority_fee_per_gas + nonce: + nullable: true + data_type: bigint + name: nonce + r: + nullable: true + data_type: text + name: r + s: + nullable: true + data_type: text + name: s + status: + nullable: true + data_type: bigint + name: status + to: + nullable: true + data_type: text + name: to + type: + nullable: true + data_type: text + name: type + v: + nullable: true + data_type: bigint + name: v + value: + nullable: false + data_type: wei + name: value + eth_value: + nullable: true + data_type: decimal + name: eth_value + name: blocks__transactions + blocks__transactions__logs: + parent: blocks__transactions + columns: + _dlt_id: + nullable: false + unique: true + data_type: text + name: _dlt_id + address: + nullable: false + data_type: text + name: address + block_timestamp: + nullable: false + sort: true + data_type: timestamp + name: block_timestamp + block_hash: + nullable: false + cluster: true + data_type: text + name: block_hash + block_number: + nullable: false + primary_key: true + foreign_key: true + data_type: bigint + name: block_number + transaction_index: + nullable: false + primary_key: true + foreign_key: true + data_type: bigint + name: transaction_index + log_index: + nullable: false + primary_key: true + data_type: bigint + name: log_index + data: + nullable: true + data_type: text + name: data + removed: + nullable: true + data_type: bool + name: removed + transaction_hash: + nullable: false + data_type: text + name: transaction_hash + name: blocks__transactions__logs + blocks__transactions__logs__topics: + parent: blocks__transactions__logs + columns: + _dlt_parent_id: + nullable: false + foreign_key: true + data_type: text + name: _dlt_parent_id + _dlt_list_idx: + nullable: false + data_type: bigint + name: _dlt_list_idx + _dlt_id: + nullable: false + unique: true + data_type: text + name: _dlt_id + _dlt_root_id: + nullable: false + root_key: true + data_type: text + name: _dlt_root_id + value: + nullable: true + data_type: text + name: value + name: blocks__transactions__logs__topics + blocks__transactions__access_list: + parent: blocks__transactions + columns: + _dlt_parent_id: + nullable: false + foreign_key: true + data_type: text + name: _dlt_parent_id + _dlt_list_idx: + nullable: false + data_type: bigint + name: _dlt_list_idx + _dlt_id: + nullable: false + unique: true + data_type: text + name: _dlt_id + _dlt_root_id: + nullable: false + root_key: true + data_type: text + name: _dlt_root_id + address: + nullable: true + data_type: text + name: address + name: blocks__transactions__access_list + blocks__transactions__access_list__storage_keys: + parent: blocks__transactions__access_list + columns: + _dlt_parent_id: + nullable: false + foreign_key: true + data_type: text + name: _dlt_parent_id + _dlt_list_idx: + nullable: false + data_type: bigint + name: _dlt_list_idx + _dlt_id: + nullable: false + unique: true + data_type: text + name: _dlt_id + _dlt_root_id: + nullable: false + root_key: true + data_type: text + name: _dlt_root_id + value: + nullable: true + data_type: text + name: value + name: blocks__transactions__access_list__storage_keys + blocks__uncles: + parent: blocks + columns: + _dlt_parent_id: + nullable: false + foreign_key: true + data_type: text + name: _dlt_parent_id + _dlt_list_idx: + nullable: false + data_type: bigint + name: _dlt_list_idx + _dlt_id: + nullable: false + unique: true + data_type: text + name: _dlt_id + _dlt_root_id: + nullable: false + root_key: true + data_type: text + name: _dlt_root_id + value: + nullable: true + data_type: text + name: value + name: blocks__uncles +settings: + schema_sealed: true + default_hints: + foreign_key: + - _dlt_parent_id + not_null: + - re:^_dlt_id$ + - _dlt_root_id + - _dlt_parent_id + - _dlt_list_idx + unique: + - _dlt_id + cluster: + - block_hash + partition: + - block_timestamp + root_key: + - _dlt_root_id + preferred_types: + timestamp: timestamp + block_timestamp: timestamp +normalizers: + names: dlt.common.normalizers.names.snake_case + json: + module: dlt.common.normalizers.json.relational + config: + generate_dlt_id: true + propagation: + root: + _dlt_id: _dlt_root_id + tables: + blocks: + timestamp: block_timestamp + hash: block_hash +ancestors: +- Q/LxiP7taycE+u9PQNb2wiit+G5GntiifOUK2CFM3sQ= + diff --git a/tests/common/schema/test_schema.py b/tests/common/schema/test_schema.py index 8b465d796e..c84c25574f 100644 --- a/tests/common/schema/test_schema.py +++ b/tests/common/schema/test_schema.py @@ -295,6 +295,15 @@ def test_upgrade_engine_v1_schema() -> None: assert upgraded["engine_version"] == 6 utils.validate_stored_schema(upgraded) + # upgrade 1 -> 7 + schema_dict = load_json_case("schemas/ev1/event.schema") + assert schema_dict["engine_version"] == 1 + upgraded = utils.migrate_schema(schema_dict, from_engine=1, to_engine=7) + assert upgraded["engine_version"] == 7 + utils.validate_stored_schema(upgraded) + # we should have an empty ancestors list after upgrade to 7 + assert upgraded["ancestors"] == [] + def test_unknown_engine_upgrade() -> None: schema_dict: TStoredSchema = load_json_case("schemas/ev1/event.schema") @@ -573,7 +582,8 @@ def assert_new_schema_values(schema: Schema) -> None: assert schema.stored_version == 1 assert schema.stored_version_hash is not None assert schema.version_hash is not None - assert schema.ENGINE_VERSION == 6 + assert schema.ENGINE_VERSION == 7 + assert schema._stored_ancestors == [] assert len(schema.settings["default_hints"]) > 0 # check settings assert utils.standard_type_detections() == schema.settings["detections"] == schema._type_detections diff --git a/tests/common/schema/test_versioning.py b/tests/common/schema/test_versioning.py index 1bfaaa5da2..7d0074e934 100644 --- a/tests/common/schema/test_versioning.py +++ b/tests/common/schema/test_versioning.py @@ -1,5 +1,6 @@ import pytest import yaml +from copy import deepcopy from dlt.common import json from dlt.common.schema import utils @@ -83,7 +84,7 @@ def test_infer_column_bumps_version() -> None: def test_preserve_version_on_load() -> None: - eth_v6: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v6") + eth_v6: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v7") version = eth_v6["version"] version_hash = eth_v6["version_hash"] schema = Schema.from_dict(eth_v6) # type: ignore[arg-type] @@ -95,7 +96,7 @@ def test_preserve_version_on_load() -> None: @pytest.mark.parametrize("remove_defaults", [True, False]) def test_version_preserve_on_reload(remove_defaults: bool) -> None: - eth_v6: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v6") + eth_v6: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v7") schema = Schema.from_dict(eth_v6) # type: ignore[arg-type] to_save_dict = schema.to_dict(remove_defaults=remove_defaults) @@ -122,3 +123,31 @@ def test_version_preserve_on_reload(remove_defaults: bool) -> None: saved_rasa_schema = Schema.from_dict(yaml.safe_load(rasa_yml)) assert saved_rasa_schema.stored_version == rasa_schema.stored_version assert saved_rasa_schema.stored_version_hash == rasa_schema.stored_version_hash + + +def test_create_ancestry() -> None: + eth_v7: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v7") + schema = Schema.from_dict(eth_v7) # type: ignore[arg-type] + assert schema._stored_ancestors == ["Q/LxiP7taycE+u9PQNb2wiit+G5GntiifOUK2CFM3sQ="] + version = schema._stored_version + + # modify save and load schema 15 times and check ancestry + expected_ancestors = ["Q/LxiP7taycE+u9PQNb2wiit+G5GntiifOUK2CFM3sQ="] + for i in range(1,15): + # keep expected ancestors + expected_ancestors.insert(0, schema._stored_version_hash) + + # update schema + row = {f"float{i}": 78172.128} + _, new_table = schema.coerce_row("event_user", None, row) + schema.update_table(new_table) + schema_dict = schema.to_dict() + schema = Schema.from_stored_schema(schema_dict) + + assert schema._stored_ancestors == expected_ancestors[:10] + assert schema._stored_version == version + i + + # we never have more than 10 ancestors + assert len(schema._stored_ancestors) == i + 1 if i + 1 <= 10 else 10 + + assert len(schema._stored_ancestors) == 10 \ No newline at end of file diff --git a/tests/common/storages/test_schema_storage.py b/tests/common/storages/test_schema_storage.py index f45773e4f5..a577729e5d 100644 --- a/tests/common/storages/test_schema_storage.py +++ b/tests/common/storages/test_schema_storage.py @@ -11,7 +11,7 @@ from dlt.common.storages import SchemaStorageConfiguration, SchemaStorage, LiveSchemaStorage, FileStorage from tests.utils import autouse_test_storage, TEST_STORAGE_ROOT -from tests.common.utils import load_yml_case, yml_case_path, COMMON_TEST_CASES_PATH, IMPORTED_VERSION_HASH_ETH_V6 +from tests.common.utils import load_yml_case, yml_case_path, COMMON_TEST_CASES_PATH, IMPORTED_VERSION_HASH_ETH_V7 @pytest.fixture @@ -87,6 +87,7 @@ def test_skip_import_if_not_modified(synced_storage: SchemaStorage, storage: Sch assert storage_schema.version == reloaded_schema.stored_version assert storage_schema.version_hash == reloaded_schema.stored_version_hash assert storage_schema._imported_version_hash == reloaded_schema._imported_version_hash + assert storage_schema.ancestors == reloaded_schema.ancestors # the import schema gets modified storage_schema.tables["_dlt_loads"]["write_disposition"] = "append" storage_schema.tables.pop("event_user") @@ -96,7 +97,11 @@ def test_skip_import_if_not_modified(synced_storage: SchemaStorage, storage: Sch # we have overwritten storage schema assert reloaded_schema.tables["_dlt_loads"]["write_disposition"] == "append" assert "event_user" not in reloaded_schema.tables + + # hash and ancestry stay the same assert reloaded_schema._imported_version_hash == storage_schema.version_hash + assert storage_schema.ancestors == reloaded_schema.ancestors + # but original version has increased assert reloaded_schema.stored_version == storage_schema.version + 1 @@ -194,12 +199,13 @@ def test_save_store_schema_over_import(ie_storage: SchemaStorage) -> None: ie_storage.save_schema(schema) assert schema.version_hash == schema_hash # we linked schema to import schema - assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V6 + assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V7 # load schema and make sure our new schema is here schema = ie_storage.load_schema("ethereum") - assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V6 + assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V7 assert schema._stored_version_hash == schema_hash assert schema.version_hash == schema_hash + assert schema.ancestors == [] # we have simple schema in export folder fs = FileStorage(ie_storage.config.export_schema_path) exported_name = ie_storage._file_name_in_store("ethereum", "yaml") @@ -213,12 +219,13 @@ def test_save_store_schema_over_import_sync(synced_storage: SchemaStorage) -> No schema = Schema("ethereum") schema_hash = schema.version_hash synced_storage.save_schema(schema) - assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V6 + assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V7 # import schema is overwritten fs = FileStorage(synced_storage.config.import_schema_path) exported_name = synced_storage._file_name_in_store("ethereum", "yaml") exported_schema = yaml.safe_load(fs.load(exported_name)) assert schema.version_hash == exported_schema["version_hash"] == schema_hash + assert schema.ancestors == [] # when it is loaded we will import schema again which is identical to the current one but the import link # will be set to itself schema = synced_storage.load_schema("ethereum") @@ -269,12 +276,12 @@ def test_schema_from_file() -> None: def prepare_import_folder(storage: SchemaStorage) -> None: - shutil.copy(yml_case_path("schemas/eth/ethereum_schema_v6"), os.path.join(storage.storage.storage_path, "../import/ethereum.schema.yaml")) + shutil.copy(yml_case_path("schemas/eth/ethereum_schema_v7"), os.path.join(storage.storage.storage_path, "../import/ethereum.schema.yaml")) def assert_schema_imported(synced_storage: SchemaStorage, storage: SchemaStorage) -> Schema: prepare_import_folder(synced_storage) - eth_v6: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v6") + eth_v6: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v7") schema = synced_storage.load_schema("ethereum") # is linked to imported schema schema._imported_version_hash = eth_v6["version_hash"] diff --git a/tests/common/test_validation.py b/tests/common/test_validation.py index 4583da3a1e..bbda683717 100644 --- a/tests/common/test_validation.py +++ b/tests/common/test_validation.py @@ -83,14 +83,14 @@ def test_doc() -> TTestRecord: def test_validate_schema_cases() -> None: - with open("tests/common/cases/schemas/eth/ethereum_schema_v4.yml", mode="r", encoding="utf-8") as f: + with open("tests/common/cases/schemas/eth/ethereum_schema_v7.yml", mode="r", encoding="utf-8") as f: schema_dict: TStoredSchema = yaml.safe_load(f) validate_dict_ignoring_xkeys( spec=TStoredSchema, doc=schema_dict, path=".", - validator_f=simple_regex_validator + validator_f=simple_regex_validator, ) # with open("tests/common/cases/schemas/rasa/event.schema.json") as f: diff --git a/tests/common/utils.py b/tests/common/utils.py index 54a48825af..8e0d5351e6 100644 --- a/tests/common/utils.py +++ b/tests/common/utils.py @@ -16,7 +16,7 @@ COMMON_TEST_CASES_PATH = "./tests/common/cases/" # for import schema tests, change when upgrading the schema version -IMPORTED_VERSION_HASH_ETH_V6 = "Q/LxiP7taycE+u9PQNb2wiit+G5GntiifOUK2CFM3sQ=" +IMPORTED_VERSION_HASH_ETH_V7 = "ZbDv9+tdJK7P/4QIB0qqHzqNSsVynVx90GL4giV8/p0=" # test sentry DSN TEST_SENTRY_DSN = "https://797678dd0af64b96937435326c7d30c1@o1061158.ingest.sentry.io/4504306172821504" # preserve secrets path to be able to restore it diff --git a/tests/load/pipeline/test_restore_state.py b/tests/load/pipeline/test_restore_state.py index f80dbbd7e6..c9c6c4c437 100644 --- a/tests/load/pipeline/test_restore_state.py +++ b/tests/load/pipeline/test_restore_state.py @@ -18,7 +18,7 @@ from tests.utils import TEST_STORAGE_ROOT from tests.cases import JSON_TYPED_DICT, JSON_TYPED_DICT_DECODED -from tests.common.utils import IMPORTED_VERSION_HASH_ETH_V6, yml_case_path as common_yml_case_path +from tests.common.utils import IMPORTED_VERSION_HASH_ETH_V7, yml_case_path as common_yml_case_path from tests.common.configuration.utils import environment from tests.load.pipeline.utils import assert_query_data, drop_active_pipeline_data from tests.load.utils import destinations_configs, DestinationTestConfiguration, get_normalized_dataset_name @@ -404,7 +404,7 @@ def test_restore_schemas_while_import_schemas_exist(destination_config: Destinat assert normalized_annotations in schema.tables # check if attached to import schema - assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V6 + assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V7 # extract some data with restored pipeline p.run(["C", "D", "E"], table_name="blacklist") assert normalized_labels in schema.tables diff --git a/tests/load/weaviate/test_naming.py b/tests/load/weaviate/test_naming.py index a965201425..488d66b725 100644 --- a/tests/load/weaviate/test_naming.py +++ b/tests/load/weaviate/test_naming.py @@ -87,7 +87,7 @@ def test_reserved_property_names() -> None: # print(schema_2.name) # print(schema_2.naming) -# eth_v6 = load_yml_case("schemas/eth/ethereum_schema_v6") +# eth_v6 = load_yml_case("schemas/eth/ethereum_schema_v7") # eth_v6_schema = dlt.Schema.from_dict(eth_v6) # pipeline.extract(s, schema=eth_v6_schema) @@ -101,7 +101,7 @@ def test_reserved_property_names() -> None: # print(pipeline.dataset_name) # s = small() -# eth_v6 = load_yml_case("schemas/eth/ethereum_schema_v6") +# eth_v6 = load_yml_case("schemas/eth/ethereum_schema_v7") # eth_v6_schema = dlt.Schema.from_dict(eth_v6) # pipeline.extract(s, schema=eth_v6_schema) diff --git a/tests/pipeline/test_dlt_versions.py b/tests/pipeline/test_dlt_versions.py index 09d8e98d82..ebd2ddb515 100644 --- a/tests/pipeline/test_dlt_versions.py +++ b/tests/pipeline/test_dlt_versions.py @@ -81,7 +81,7 @@ def test_pipeline_with_dlt_update(test_storage: FileStorage) -> None: pipeline.sync_destination() # print(pipeline.working_dir) # we have updated schema - assert pipeline.default_schema.ENGINE_VERSION == 6 + assert pipeline.default_schema.ENGINE_VERSION == 7 # make sure that schema hash retrieved from the destination is exactly the same as the schema hash that was in storage before the schema was wiped assert pipeline.default_schema.stored_version_hash == github_schema["version_hash"] @@ -114,6 +114,6 @@ def test_load_package_with_dlt_update(test_storage: FileStorage) -> None: github_schema = json.loads(test_storage.load(f".dlt/pipelines/{GITHUB_PIPELINE_NAME}/schemas/github.schema.json")) pipeline = pipeline.drop() pipeline.sync_destination() - assert pipeline.default_schema.ENGINE_VERSION == 6 + assert pipeline.default_schema.ENGINE_VERSION == 7 # schema version does not match `dlt.attach` does not update to the right schema by itself assert pipeline.default_schema.stored_version_hash != github_schema["version_hash"] From be2d64efbffeefffc4813032119b8fc43a8f3e9f Mon Sep 17 00:00:00 2001 From: Dave Date: Sun, 19 Nov 2023 19:04:16 +0100 Subject: [PATCH 2/7] remove name attribute and init arg from dltsource --- dlt/extract/decorators.py | 5 +---- dlt/extract/source.py | 17 ++++++++--------- dlt/pipeline/pipeline.py | 6 +++--- tests/extract/test_decorators.py | 4 ++-- tests/extract/test_extract.py | 8 ++++---- tests/extract/test_incremental.py | 6 +++--- tests/extract/test_sources.py | 18 +++++++++--------- tests/pipeline/test_pipeline.py | 14 +++++++------- tests/pipeline/test_pipeline_trace.py | 8 ++++---- 9 files changed, 41 insertions(+), 45 deletions(-) diff --git a/dlt/extract/decorators.py b/dlt/extract/decorators.py index dbc5f2fa82..17afd1f1c6 100644 --- a/dlt/extract/decorators.py +++ b/dlt/extract/decorators.py @@ -145,9 +145,6 @@ def decorator(f: Callable[TSourceFunParams, Any]) -> Callable[TSourceFunParams, if name and name != schema.name: raise ExplicitSourceNameInvalid(name, schema.name) - # the name of the source must be identical to the name of the schema - name = schema.name - # wrap source extraction function in configuration with section func_module = inspect.getmodule(f) source_section = section or _get_source_section_name(func_module) @@ -171,7 +168,7 @@ def _wrap(*args: Any, **kwargs: Any) -> TDltSourceImpl: rv = list(rv) # convert to source - s = _impl_cls.from_data(name, source_section, schema.clone(update_normalizers=True), rv) + s = _impl_cls.from_data(source_section, schema.clone(update_normalizers=True), rv) # apply hints if max_table_nesting is not None: s.max_table_nesting = max_table_nesting diff --git a/dlt/extract/source.py b/dlt/extract/source.py index d36cb4b121..3174d8a0f0 100644 --- a/dlt/extract/source.py +++ b/dlt/extract/source.py @@ -642,22 +642,17 @@ class DltSource(Iterable[TDataItem]): * You can use a `run` method to load the data with a default instance of dlt pipeline. * You can get source read only state for the currently active Pipeline instance """ - def __init__(self, name: str, section: str, schema: Schema, resources: Sequence[DltResource] = None) -> None: - self.name = name + def __init__(self, section: str, schema: Schema, resources: Sequence[DltResource] = None) -> None: self.section = section """Tells if iterator associated with a source is exhausted""" self._schema = schema self._resources: DltResourceDict = DltResourceDict(self.name, self.section) - if self.name != schema.name: - # raise ValueError(f"Schema name {schema.name} differs from source name {name}! The explicit source name argument is deprecated and will be soon removed.") - warnings.warn(f"Schema name {schema.name} differs from source name {name}! The explicit source name argument is deprecated and will be soon removed.") - if resources: self.resources.add(*resources) @classmethod - def from_data(cls, name: str, section: str, schema: Schema, data: Any) -> Self: + def from_data(cls, section: str, schema: Schema, data: Any) -> Self: """Converts any `data` supported by `dlt` `run` method into `dlt source` with a name `section`.`name` and `schema` schema.""" # creates source from various forms of data if isinstance(data, DltSource): @@ -669,10 +664,14 @@ def from_data(cls, name: str, section: str, schema: Schema, data: Any) -> Self: else: resources = [DltResource.from_data(data)] - return cls(name, section, schema, resources) + return cls(section, schema, resources) # TODO: 4 properties below must go somewhere else ie. into RelationalSchema which is Schema + Relational normalizer. + @property + def name(self) -> str: + return self._schema.name + @property def max_table_nesting(self) -> int: """A schema hint that sets the maximum depth of nested table above which the remaining nodes are loaded as structs or JSON.""" @@ -795,7 +794,7 @@ def state(self) -> StrAny: def clone(self) -> "DltSource": """Creates a deep copy of the source where copies of schema, resources and pipes are created""" # mind that resources and pipes are cloned when added to the DltResourcesDict in the source constructor - return DltSource(self.name, self.section, self.schema.clone(), list(self._resources.values())) + return DltSource(self.section, self.schema.clone(), list(self._resources.values())) def __iter__(self) -> Iterator[TDataItem]: """Opens iterator that yields the data items from all the resources within the source in the same order as in Pipeline class. diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index e6e27afec7..a719c0ce18 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -826,7 +826,7 @@ def append_data(data_item: Any) -> None: # do not set section to prevent source that represent a standalone resource # to overwrite other standalone resources (ie. parents) in that source sources.append( - DltSource(effective_schema.name, "", effective_schema, [data_item]) + DltSource("", effective_schema, [data_item]) ) else: # iterator/iterable/generator @@ -848,7 +848,7 @@ def append_data(data_item: Any) -> None: if resources: # add all the appended resources in one source - sources.append(DltSource(effective_schema.name, self.pipeline_name, effective_schema, resources)) + sources.append(DltSource( self.pipeline_name, effective_schema, resources)) return sources @@ -1252,7 +1252,7 @@ def _save_state(self, state: TPipelineState) -> None: def _extract_state(self, state: TPipelineState) -> TPipelineState: # this will extract the state into current load package and update the schema with the _dlt_pipeline_state table # note: the schema will be persisted because the schema saving decorator is over the state manager decorator for extract - state_source = DltSource(self.default_schema.name, self.pipeline_name, self.default_schema, [state_resource(state)]) + state_source = DltSource(self.pipeline_name, self.default_schema, [state_resource(state)]) storage = ExtractorStorage(self._normalize_storage_config) extract_id = extract_with_schema(storage, state_source, self.default_schema, _NULL_COLLECTOR, 1, 1) storage.commit_extract_files(extract_id) diff --git a/tests/extract/test_decorators.py b/tests/extract/test_decorators.py index 05e3a2fbf3..b8a6b80cfa 100644 --- a/tests/extract/test_decorators.py +++ b/tests/extract/test_decorators.py @@ -25,7 +25,7 @@ from dlt.common.schema.exceptions import InvalidSchemaName from dlt.extract.typing import TableNameMeta -from tests.common.utils import IMPORTED_VERSION_HASH_ETH_V6 +from tests.common.utils import IMPORTED_VERSION_HASH_ETH_V7 def test_none_returning_source() -> None: @@ -72,7 +72,7 @@ def test_load_schema_for_callable() -> None: schema = s.schema assert schema.name == "ethereum" == s.name # the schema in the associated file has this hash - assert schema.stored_version_hash == IMPORTED_VERSION_HASH_ETH_V6 + assert schema.stored_version_hash == IMPORTED_VERSION_HASH_ETH_V7 def test_unbound_parametrized_transformer() -> None: diff --git a/tests/extract/test_extract.py b/tests/extract/test_extract.py index c487d19aa1..1557a64315 100644 --- a/tests/extract/test_extract.py +++ b/tests/extract/test_extract.py @@ -13,7 +13,7 @@ def test_extract_select_tables() -> None: def expect_tables(resource: DltResource) -> dlt.Schema: # delete files clean_test_storage() - source = DltSource("selectables", "module", dlt.Schema("selectables"), [resource(10)]) + source = DltSource("module", dlt.Schema("selectables"), [resource(10)]) schema = source.discover_schema() storage = ExtractorStorage(NormalizeStorageConfiguration()) @@ -38,7 +38,7 @@ def expect_tables(resource: DltResource) -> dlt.Schema: clean_test_storage() storage = ExtractorStorage(NormalizeStorageConfiguration()) # same thing but select only odd - source = DltSource("selectables", "module", dlt.Schema("selectables"), [resource]) + source = DltSource("module", dlt.Schema("selectables"), [resource]) source = source.with_resources(resource.name) source.selected_resources[resource.name].bind(10).select_tables("odd_table") extract_id = storage.create_extract_id() @@ -83,7 +83,7 @@ def input_gen(): yield from [1, 2, 3] input_r = DltResource.from_data(input_gen) - source = DltSource("selectables", "module", dlt.Schema("selectables"), [input_r, input_r.with_name("gen_clone")]) + source = DltSource("module", dlt.Schema("selectables"), [input_r, input_r.with_name("gen_clone")]) storage = ExtractorStorage(NormalizeStorageConfiguration()) extract_id = storage.create_extract_id() schema_update = extract(extract_id, source, storage) @@ -102,7 +102,7 @@ def tx_step(item): input_r = DltResource.from_data(input_gen) input_tx = DltResource.from_data(tx_step, data_from=DltResource.Empty) - source = DltSource("selectables", "module", dlt.Schema("selectables"), [input_r, (input_r | input_tx).with_name("tx_clone")]) + source = DltSource("module", dlt.Schema("selectables"), [input_r, (input_r | input_tx).with_name("tx_clone")]) storage = ExtractorStorage(NormalizeStorageConfiguration()) extract_id = storage.create_extract_id() schema_update = extract(extract_id, source, storage) diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index 9d5b37f472..fb72eee6f4 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -692,14 +692,14 @@ def child(item): assert child.write_disposition == "replace" # create a source where we place only child - s = DltSource("comp", "section", Schema("comp"), [child]) + s = DltSource("section", Schema("comp"), [child]) # but extracted resources will include its parent where it derives write disposition from child extracted = s.resources.extracted assert extracted[child.name].write_disposition == "replace" assert extracted[child._pipe.parent.name].write_disposition == "replace" # create a source where we place parent explicitly - s = DltSource("comp", "section", Schema("comp"), [parent_r, child]) + s = DltSource("section", Schema("comp"), [parent_r, child]) extracted = s.resources.extracted assert extracted[child.name].write_disposition == "replace" # now parent exists separately and has its own write disposition @@ -720,7 +720,7 @@ def child(item): # now we add child that has parent_r as parent but we add another instance of standalone_some_data explicitly # so we have a resource with the same name as child parent but the pipe instance is different - s = DltSource("comp", "section", Schema("comp"), [standalone_some_data(now), child]) + s = DltSource("section", Schema("comp"), [standalone_some_data(now), child]) assert extracted[child.name].write_disposition == "replace" # now parent exists separately and has its own write disposition - because we search by name to identify matching resource assert extracted[child._pipe.parent.name].write_disposition == "append" diff --git a/tests/extract/test_sources.py b/tests/extract/test_sources.py index d8223f2ee8..965392f271 100644 --- a/tests/extract/test_sources.py +++ b/tests/extract/test_sources.py @@ -51,7 +51,7 @@ def parametrized(p1, /, p2, *, p3 = None): # as part of the source r = DltResource.from_data(parametrized) - s = DltSource("source", "module", Schema("source"), [r]) + s = DltSource("module", Schema("source"), [r]) with pytest.raises(ParametrizedResourceUnbound) as py_ex: list(s) @@ -1010,7 +1010,7 @@ def some_data(): yield [1, 2, 3] yield [1, 2, 3] - s = DltSource("source", "module", Schema("source"), [dlt.resource(some_data())]) + s = DltSource("module", Schema("source"), [dlt.resource(some_data())]) assert s.exhausted is False assert list(s) == [1, 2, 3, 1, 2, 3] assert s.exhausted is True @@ -1024,19 +1024,19 @@ def test_exhausted_property() -> None: # this example will be exhausted after iteration def open_generator_data(): yield from [1, 2, 3, 4] - s = DltSource("source", "module", Schema("source"), [dlt.resource(open_generator_data())]) + s = DltSource("module", Schema("source"), [dlt.resource(open_generator_data())]) assert s.exhausted is False assert next(iter(s)) == 1 assert s.exhausted is True # lists will not exhaust - s = DltSource("source", "module", Schema("source"), [dlt.resource([1, 2, 3, 4], table_name="table", name="resource")]) + s = DltSource("module", Schema("source"), [dlt.resource([1, 2, 3, 4], table_name="table", name="resource")]) assert s.exhausted is False assert next(iter(s)) == 1 assert s.exhausted is False # iterators will not exhaust - s = DltSource("source", "module", Schema("source"), [dlt.resource(iter([1, 2, 3, 4]), table_name="table", name="resource")]) + s = DltSource("module", Schema("source"), [dlt.resource(iter([1, 2, 3, 4]), table_name="table", name="resource")]) assert s.exhausted is False assert next(iter(s)) == 1 assert s.exhausted is False @@ -1044,7 +1044,7 @@ def open_generator_data(): # having on exhausted generator resource will make the whole source exhausted def open_generator_data(): # type: ignore[no-redef] yield from [1, 2, 3, 4] - s = DltSource("source", "module", Schema("source"), [ dlt.resource([1, 2, 3, 4], table_name="table", name="resource"), dlt.resource(open_generator_data())]) + s = DltSource("module", Schema("source"), [ dlt.resource([1, 2, 3, 4], table_name="table", name="resource"), dlt.resource(open_generator_data())]) assert s.exhausted is False # execute the whole source @@ -1234,7 +1234,7 @@ def tx_step(item): input_r_clone = input_r.with_name("input_gen_2") # separate resources have separate pipe instances - source = DltSource("dupes", "module", Schema("dupes"), [input_r, input_r_clone]) + source = DltSource("module", Schema("dupes"), [input_r, input_r_clone]) pipes = source.resources.pipes assert len(pipes) == 2 assert pipes[0].name == "input_gen" @@ -1245,13 +1245,13 @@ def tx_step(item): assert list(source) == [1, 2, 3, 1, 2, 3] # cloned from fresh resource - source = DltSource("dupes", "module", Schema("dupes"), [DltResource.from_data(input_gen), DltResource.from_data(input_gen).with_name("gen_2")]) + source = DltSource("module", Schema("dupes"), [DltResource.from_data(input_gen), DltResource.from_data(input_gen).with_name("gen_2")]) assert list(source) == [1, 2, 3, 1, 2, 3] # clone transformer input_r = DltResource.from_data(input_gen) input_tx = DltResource.from_data(tx_step, data_from=DltResource.Empty) - source = DltSource("dupes", "module", Schema("dupes"), [input_r, (input_r | input_tx).with_name("tx_clone")]) + source = DltSource("module", Schema("dupes"), [input_r, (input_r | input_tx).with_name("tx_clone")]) pipes = source.resources.pipes assert len(pipes) == 2 assert source.resources[pipes[0].name] == source.input_gen diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 560a683709..16cce0c7f6 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -242,7 +242,7 @@ def some_data(): yield [1, 2, 3] yield [1, 2, 3] - s = DltSource("source", "module", dlt.Schema("source"), [dlt.resource(some_data())]) + s = DltSource("module", dlt.Schema("source"), [dlt.resource(some_data())]) dlt.pipeline().extract(s) with pytest.raises(PipelineStepFailed) as py_ex: dlt.pipeline().extract(s) @@ -257,7 +257,7 @@ def test_disable_enable_state_sync(environment: Any) -> None: def some_data(): yield [1, 2, 3] - s = DltSource("default", "module", dlt.Schema("default"), [dlt.resource(some_data())]) + s = DltSource("module", dlt.Schema("default"), [dlt.resource(some_data())]) dlt.pipeline().extract(s) storage = ExtractorStorage(p._normalize_storage_config) assert len(storage.list_files_to_normalize_sorted()) == 1 @@ -267,14 +267,14 @@ def some_data(): p.config.restore_from_destination = True # extract to different schema, state must go to default schema - s = DltSource("default_2", "module", dlt.Schema("default_2"), [dlt.resource(some_data())]) + s = DltSource("module", dlt.Schema("default_2"), [dlt.resource(some_data())]) dlt.pipeline().extract(s) expect_extracted_file(storage, "default", s.schema.state_table_name, "***") def test_extract_multiple_sources() -> None: - s1 = DltSource("default", "module", dlt.Schema("default"), [dlt.resource([1, 2, 3], name="resource_1"), dlt.resource([3, 4, 5], name="resource_2")]) - s2 = DltSource("default_2", "module", dlt.Schema("default_2"), [dlt.resource([6, 7, 8], name="resource_3"), dlt.resource([9, 10, 0], name="resource_4")]) + s1 = DltSource("module", dlt.Schema("default"), [dlt.resource([1, 2, 3], name="resource_1"), dlt.resource([3, 4, 5], name="resource_2")]) + s2 = DltSource("module", dlt.Schema("default_2"), [dlt.resource([6, 7, 8], name="resource_3"), dlt.resource([9, 10, 0], name="resource_4")]) p = dlt.pipeline(destination="dummy") p.config.restore_from_destination = False @@ -293,8 +293,8 @@ def test_extract_multiple_sources() -> None: def i_fail(): raise NotImplementedError() - s3 = DltSource("default_3", "module", dlt.Schema("default_3"), [dlt.resource([1, 2, 3], name="resource_1"), dlt.resource([3, 4, 5], name="resource_2")]) - s4 = DltSource("default_4", "module", dlt.Schema("default_4"), [dlt.resource([6, 7, 8], name="resource_3"), i_fail]) + s3 = DltSource("module", dlt.Schema("default_3"), [dlt.resource([1, 2, 3], name="resource_1"), dlt.resource([3, 4, 5], name="resource_2")]) + s4 = DltSource("module", dlt.Schema("default_4"), [dlt.resource([6, 7, 8], name="resource_3"), i_fail]) with pytest.raises(PipelineStepFailed): p.extract([s3, s4]) diff --git a/tests/pipeline/test_pipeline_trace.py b/tests/pipeline/test_pipeline_trace.py index 706644b60e..67fb62af3a 100644 --- a/tests/pipeline/test_pipeline_trace.py +++ b/tests/pipeline/test_pipeline_trace.py @@ -298,12 +298,12 @@ def data(): def test_extract_data_describe() -> None: schema = Schema("test") - assert describe_extract_data(DltSource("sss_extract", "sect", schema)) == [{"name": "sss_extract", "data_type": "source"}] + assert describe_extract_data(DltSource("sect", schema)) == [{"name": "sss_extract", "data_type": "source"}] assert describe_extract_data(DltResource(Pipe("rrr_extract"), None, False)) == [{"name": "rrr_extract", "data_type": "resource"}] - assert describe_extract_data([DltSource("sss_extract", "sect", schema)]) == [{"name": "sss_extract", "data_type": "source"}] + assert describe_extract_data([DltSource("sect", schema)]) == [{"name": "sss_extract", "data_type": "source"}] assert describe_extract_data([DltResource(Pipe("rrr_extract"), None, False)]) == [{"name": "rrr_extract", "data_type": "resource"}] assert describe_extract_data( - [DltResource(Pipe("rrr_extract"), None, False), DltSource("sss_extract", "sect", schema)] + [DltResource(Pipe("rrr_extract"), None, False), DltSource("sect", schema)] ) == [ {"name": "rrr_extract", "data_type": "resource"}, {"name": "sss_extract", "data_type": "source"} ] @@ -313,7 +313,7 @@ def test_extract_data_describe() -> None: assert describe_extract_data([DataFrame(), {"a": "b"}]) == [{"name": "", "data_type": "DataFrame"}] # first unnamed element in the list breaks checking info assert describe_extract_data( - [DltResource(Pipe("rrr_extract"), None, False), DataFrame(), DltSource("sss_extract", "sect", schema)] + [DltResource(Pipe("rrr_extract"), None, False), DataFrame(), DltSource("sect", schema)] ) == [ {"name": "rrr_extract", "data_type": "resource"}, {"name": "", "data_type": "DataFrame"} ] From 951b27a838aacfb5762a8d7d22db13e1ef1e1ef0 Mon Sep 17 00:00:00 2001 From: Dave Date: Sun, 19 Nov 2023 19:29:03 +0100 Subject: [PATCH 3/7] fix 2 tests --- dlt/extract/source.py | 4 ++-- tests/pipeline/test_dlt_versions.py | 2 +- tests/pipeline/test_pipeline_trace.py | 6 +++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/dlt/extract/source.py b/dlt/extract/source.py index 3174d8a0f0..db4f4db454 100644 --- a/dlt/extract/source.py +++ b/dlt/extract/source.py @@ -666,12 +666,12 @@ def from_data(cls, section: str, schema: Schema, data: Any) -> Self: return cls(section, schema, resources) - # TODO: 4 properties below must go somewhere else ie. into RelationalSchema which is Schema + Relational normalizer. - @property def name(self) -> str: return self._schema.name + + # TODO: 4 properties below must go somewhere else ie. into RelationalSchema which is Schema + Relational normalizer. @property def max_table_nesting(self) -> int: """A schema hint that sets the maximum depth of nested table above which the remaining nodes are loaded as structs or JSON.""" diff --git a/tests/pipeline/test_dlt_versions.py b/tests/pipeline/test_dlt_versions.py index ebd2ddb515..3d79c22b14 100644 --- a/tests/pipeline/test_dlt_versions.py +++ b/tests/pipeline/test_dlt_versions.py @@ -55,7 +55,7 @@ def test_pipeline_with_dlt_update(test_storage: FileStorage) -> None: print(venv.run_script("../tests/pipeline/cases/github_pipeline/github_pipeline.py")) # hash hash in schema github_schema = json.loads(test_storage.load(f".dlt/pipelines/{GITHUB_PIPELINE_NAME}/schemas/github.schema.json")) - assert github_schema["engine_version"] == 6 + assert github_schema["engine_version"] == 7 assert "schema_version_hash" in github_schema["tables"][LOADS_TABLE_NAME]["columns"] with DuckDbSqlClient(GITHUB_DATASET, duckdb_cfg.credentials) as client: rows = client.execute_sql(f"SELECT * FROM {LOADS_TABLE_NAME} ORDER BY inserted_at") diff --git a/tests/pipeline/test_pipeline_trace.py b/tests/pipeline/test_pipeline_trace.py index 67fb62af3a..4e84e14425 100644 --- a/tests/pipeline/test_pipeline_trace.py +++ b/tests/pipeline/test_pipeline_trace.py @@ -298,14 +298,14 @@ def data(): def test_extract_data_describe() -> None: schema = Schema("test") - assert describe_extract_data(DltSource("sect", schema)) == [{"name": "sss_extract", "data_type": "source"}] + assert describe_extract_data(DltSource("sect", schema)) == [{"name": "test", "data_type": "source"}] assert describe_extract_data(DltResource(Pipe("rrr_extract"), None, False)) == [{"name": "rrr_extract", "data_type": "resource"}] - assert describe_extract_data([DltSource("sect", schema)]) == [{"name": "sss_extract", "data_type": "source"}] + assert describe_extract_data([DltSource("sect", schema)]) == [{"name": "test", "data_type": "source"}] assert describe_extract_data([DltResource(Pipe("rrr_extract"), None, False)]) == [{"name": "rrr_extract", "data_type": "resource"}] assert describe_extract_data( [DltResource(Pipe("rrr_extract"), None, False), DltSource("sect", schema)] ) == [ - {"name": "rrr_extract", "data_type": "resource"}, {"name": "sss_extract", "data_type": "source"} + {"name": "rrr_extract", "data_type": "resource"}, {"name": "test", "data_type": "source"} ] assert describe_extract_data([{"a": "b"}]) == [{"name": "", "data_type": "dict"}] from pandas import DataFrame From 7cb566ebbc141bbe5b24e207bb68e3245089262c Mon Sep 17 00:00:00 2001 From: Dave Date: Sun, 19 Nov 2023 19:38:11 +0100 Subject: [PATCH 4/7] fix statekey related errors --- dlt/extract/decorators.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dlt/extract/decorators.py b/dlt/extract/decorators.py index 17afd1f1c6..fd06df4d16 100644 --- a/dlt/extract/decorators.py +++ b/dlt/extract/decorators.py @@ -159,10 +159,10 @@ def _wrap(*args: Any, **kwargs: Any) -> TDltSourceImpl: # configurations will be accessed in this section in the source proxy = Container()[PipelineContext] pipeline_name = None if not proxy.is_active() else proxy.pipeline().pipeline_name - with inject_section(ConfigSectionContext(pipeline_name=pipeline_name, sections=source_sections, source_state_key=name)): + with inject_section(ConfigSectionContext(pipeline_name=pipeline_name, sections=source_sections, source_state_key=schema.name)): rv = conf_f(*args, **kwargs) if rv is None: - raise SourceDataIsNone(name) + raise SourceDataIsNone(schema.name) # if generator, consume it immediately if inspect.isgenerator(rv): rv = list(rv) From 07991971013c3e4a0d52c23c5986f29692ff7838 Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 21 Nov 2023 12:17:54 +0100 Subject: [PATCH 5/7] pr fixes --- dlt/common/schema/schema.py | 12 +- dlt/common/schema/typing.py | 4 +- dlt/common/schema/utils.py | 16 +- dlt/common/validation.py | 2 + dlt/extract/decorators.py | 2 +- dlt/extract/source.py | 8 +- dlt/pipeline/pipeline.py | 6 +- .../cases/schemas/eth/ethereum_schema_v8.yml | 461 ++++++++++++++++++ tests/common/schema/test_schema.py | 16 +- tests/common/schema/test_versioning.py | 32 +- tests/common/storages/test_schema_storage.py | 24 +- tests/common/test_validation.py | 2 +- tests/common/utils.py | 2 +- tests/extract/test_decorators.py | 4 +- tests/extract/test_extract.py | 8 +- tests/extract/test_incremental.py | 6 +- tests/extract/test_sources.py | 18 +- tests/load/pipeline/test_restore_state.py | 4 +- tests/load/weaviate/test_naming.py | 20 +- tests/pipeline/test_dlt_versions.py | 4 +- tests/pipeline/test_pipeline.py | 14 +- tests/pipeline/test_pipeline_trace.py | 8 +- 22 files changed, 570 insertions(+), 103 deletions(-) create mode 100644 tests/common/cases/schemas/eth/ethereum_schema_v8.yml diff --git a/dlt/common/schema/schema.py b/dlt/common/schema/schema.py index f1c798b7c5..bcfba11c61 100644 --- a/dlt/common/schema/schema.py +++ b/dlt/common/schema/schema.py @@ -44,7 +44,7 @@ class Schema: _dlt_tables_prefix: str _stored_version: int # version at load/creation time _stored_version_hash: str # version hash at load/creation time - _stored_ancestors: Optional[List[str]] # list of ancestor hashes of the schema + _stored_previous_hashes: Optional[List[str]] # list of ancestor hashes of the schema _imported_version_hash: str # version hash of recently imported schema _schema_description: str # optional schema description _schema_tables: TSchemaTables @@ -103,7 +103,7 @@ def to_dict(self, remove_defaults: bool = False, bump_version: bool = True) -> T "tables": self._schema_tables, "settings": self._settings, "normalizers": self._normalizers_config, - "ancestors": self._stored_ancestors + "previous_hashes": self._stored_previous_hashes } if self._imported_version_hash and not remove_defaults: stored_schema["imported_version_hash"] = self._imported_version_hash @@ -478,9 +478,9 @@ def version_hash(self) -> str: return utils.bump_version_if_modified(self.to_dict())[1] @property - def ancestors(self) -> List[str]: + def previous_hashes(self) -> List[str]: """Current version hash of the schema, recomputed from the actual content""" - return utils.bump_version_if_modified(self.to_dict())[2] + return utils.bump_version_if_modified(self.to_dict())[3] @property def stored_version_hash(self) -> str: @@ -670,7 +670,7 @@ def _reset_schema(self, name: str, normalizers: TNormalizersConfig = None) -> No self._stored_version_hash: str = None self._imported_version_hash: str = None self._schema_description: str = None - self._stored_ancestors: List[str] = [] + self._stored_previous_hashes: List[str] = [] self._settings: TSchemaSettings = {} self._compiled_preferred_types: List[Tuple[REPattern, TDataType]] = [] @@ -709,7 +709,7 @@ def _from_stored_schema(self, stored_schema: TStoredSchema) -> None: self._imported_version_hash = stored_schema.get("imported_version_hash") self._schema_description = stored_schema.get("description") self._settings = stored_schema.get("settings") or {} - self._stored_ancestors = stored_schema.get("ancestors") + self._stored_previous_hashes = stored_schema.get("previous_hashes") self._compile_settings() def _set_schema_name(self, name: str) -> None: diff --git a/dlt/common/schema/typing.py b/dlt/common/schema/typing.py index e986d951e5..1b6ef31800 100644 --- a/dlt/common/schema/typing.py +++ b/dlt/common/schema/typing.py @@ -11,7 +11,7 @@ # current version of schema engine -SCHEMA_ENGINE_VERSION = 7 +SCHEMA_ENGINE_VERSION = 8 # dlt tables VERSION_TABLE_NAME = "_dlt_version" @@ -123,7 +123,7 @@ class TStoredSchema(TypedDict, total=False): """TypeDict defining the schema representation in storage""" version: int version_hash: str - ancestors: List[str] + previous_hashes: List[str] imported_version_hash: Optional[str] engine_version: int name: str diff --git a/dlt/common/schema/utils.py b/dlt/common/schema/utils.py index 41b0010242..899f60dfb1 100644 --- a/dlt/common/schema/utils.py +++ b/dlt/common/schema/utils.py @@ -143,13 +143,13 @@ def bump_version_if_modified(stored_schema: TStoredSchema) -> Tuple[int, str, st pass elif hash_ != previous_hash: stored_schema["version"] += 1 - # unshift previous hash to ancestors and limit array to 10 entries - if previous_hash not in stored_schema["ancestors"]: - stored_schema["ancestors"].insert(0, previous_hash) - stored_schema["ancestors"] = stored_schema["ancestors"][:10] + # unshift previous hash to previous_hashes and limit array to 10 entries + if previous_hash not in stored_schema["previous_hashes"]: + stored_schema["previous_hashes"].insert(0, previous_hash) + stored_schema["previous_hashes"] = stored_schema["previous_hashes"][:10] stored_schema["version_hash"] = hash_ - return stored_schema["version"], hash_, previous_hash, stored_schema["ancestors"] + return stored_schema["version"], hash_, previous_hash, stored_schema["previous_hashes"] def generate_version_hash(stored_schema: TStoredSchema) -> str: @@ -158,7 +158,7 @@ def generate_version_hash(stored_schema: TStoredSchema) -> str: schema_copy.pop("version") schema_copy.pop("version_hash", None) schema_copy.pop("imported_version_hash", None) - schema_copy.pop("ancestors", None) + schema_copy.pop("previous_hashes", None) # ignore order of elements when computing the hash content = json.dumps(schema_copy, sort_keys=True) h = hashlib.sha3_256(content.encode("utf-8")) @@ -249,7 +249,7 @@ def validate_stored_schema(stored_schema: TStoredSchema) -> None: # exclude validation of keys added later ignored_keys = [] if stored_schema["engine_version"] < 7: - ignored_keys.append("ancestors") + ignored_keys.append("previous_hashes") # use lambda to verify only non extra fields validate_dict_ignoring_xkeys( @@ -363,7 +363,7 @@ def migrate_filters(group: str, filters: List[str]) -> None: table["schema_contract"] = {} from_engine = 7 if from_engine == 7 and to_engine > 7: - schema_dict["ancestors"] = [] + schema_dict["previous_hashes"] = [] from_engine = 8 schema_dict["engine_version"] = from_engine diff --git a/dlt/common/validation.py b/dlt/common/validation.py index 791138054a..5e5e18de42 100644 --- a/dlt/common/validation.py +++ b/dlt/common/validation.py @@ -22,6 +22,8 @@ def validate_dict(spec: Type[_TypedDict], doc: StrAny, path: str, filter_f: TFil validator_f (TCustomValidator, optional): A function to perform additional validation for types not covered by this function. It should return `True` if the validation passes. Defaults to a function that rejects all such types. + filter_required (TFilterFunc, optional): A function to filter out required fields, useful + for testing historic versions of dict that might now have certain fields yet. Raises: DictValidationException: If there are missing required fields, unexpected fields, diff --git a/dlt/extract/decorators.py b/dlt/extract/decorators.py index fbe712fae8..1dbfcb4350 100644 --- a/dlt/extract/decorators.py +++ b/dlt/extract/decorators.py @@ -173,7 +173,7 @@ def _wrap(*args: Any, **kwargs: Any) -> TDltSourceImpl: rv = list(rv) # convert to source - s = _impl_cls.from_data(source_section, schema.clone(update_normalizers=True), rv) + s = _impl_cls.from_data(schema.clone(update_normalizers=True), source_section, rv) # apply hints if max_table_nesting is not None: s.max_table_nesting = max_table_nesting diff --git a/dlt/extract/source.py b/dlt/extract/source.py index ace76cee2b..0ff24d1f86 100644 --- a/dlt/extract/source.py +++ b/dlt/extract/source.py @@ -167,7 +167,7 @@ class DltSource(Iterable[TDataItem]): * You can use a `run` method to load the data with a default instance of dlt pipeline. * You can get source read only state for the currently active Pipeline instance """ - def __init__(self, section: str, schema: Schema, resources: Sequence[DltResource] = None) -> None: + def __init__(self, schema: Schema, section: str, resources: Sequence[DltResource] = None) -> None: self.section = section """Tells if iterator associated with a source is exhausted""" self._schema = schema @@ -177,7 +177,7 @@ def __init__(self, section: str, schema: Schema, resources: Sequence[DltResource self.resources.add(*resources) @classmethod - def from_data(cls, section: str, schema: Schema, data: Any) -> Self: + def from_data(cls, schema: Schema, section: str, data: Any) -> Self: """Converts any `data` supported by `dlt` `run` method into `dlt source` with a name `section`.`name` and `schema` schema.""" # creates source from various forms of data if isinstance(data, DltSource): @@ -189,7 +189,7 @@ def from_data(cls, section: str, schema: Schema, data: Any) -> Self: else: resources = [DltResource.from_data(data)] - return cls(section, schema, resources) + return cls(schema, section, resources) @property def name(self) -> str: @@ -327,7 +327,7 @@ def state(self) -> StrAny: def clone(self) -> "DltSource": """Creates a deep copy of the source where copies of schema, resources and pipes are created""" # mind that resources and pipes are cloned when added to the DltResourcesDict in the source constructor - return DltSource(self.section, self.schema.clone(), list(self._resources.values())) + return DltSource(self.schema.clone(), self.section, list(self._resources.values())) def __iter__(self) -> Iterator[TDataItem]: """Opens iterator that yields the data items from all the resources within the source in the same order as in Pipeline class. diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index b9eb958027..c893fd4e75 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -860,7 +860,7 @@ def append_data(data_item: Any) -> None: # do not set section to prevent source that represent a standalone resource # to overwrite other standalone resources (ie. parents) in that source sources.append( - DltSource(effective_schema.name, "", effective_schema, [data_item]) + DltSource(effective_schema, "", [data_item]) ) else: # iterator/iterable/generator @@ -881,7 +881,7 @@ def append_data(data_item: Any) -> None: # add all the appended resources in one source if resources: - sources.append(DltSource(effective_schema.name, self.pipeline_name, effective_schema, resources)) + sources.append(DltSource(effective_schema, self.pipeline_name, resources)) # apply hints and settings for source in sources: @@ -1293,7 +1293,7 @@ def _save_state(self, state: TPipelineState) -> None: def _extract_state(self, state: TPipelineState) -> TPipelineState: # this will extract the state into current load package and update the schema with the _dlt_pipeline_state table # note: the schema will be persisted because the schema saving decorator is over the state manager decorator for extract - state_source = DltSource(self.default_schema.name, self.pipeline_name, self.default_schema, [state_resource(state)]) + state_source = DltSource(self.default_schema, self.pipeline_name, [state_resource(state)]) storage = ExtractorStorage(self._normalize_storage_config) extract_id = extract_with_schema(storage, state_source, _NULL_COLLECTOR, 1, 1) storage.commit_extract_files(extract_id) diff --git a/tests/common/cases/schemas/eth/ethereum_schema_v8.yml b/tests/common/cases/schemas/eth/ethereum_schema_v8.yml new file mode 100644 index 0000000000..928c9a3e54 --- /dev/null +++ b/tests/common/cases/schemas/eth/ethereum_schema_v8.yml @@ -0,0 +1,461 @@ +version: 16 +version_hash: C5An8WClbavalXDdNSqXbdI7Swqh/mTWMcwWKCF//EE= +engine_version: 8 +name: ethereum +tables: + _dlt_loads: + columns: + load_id: + nullable: false + data_type: text + name: load_id + schema_name: + nullable: true + data_type: text + name: schema_name + status: + nullable: false + data_type: bigint + name: status + inserted_at: + nullable: false + data_type: timestamp + name: inserted_at + schema_version_hash: + nullable: true + data_type: text + name: schema_version_hash + write_disposition: skip + description: Created by DLT. Tracks completed loads + schema_contract: {} + name: _dlt_loads + resource: _dlt_loads + _dlt_version: + columns: + version: + nullable: false + data_type: bigint + name: version + engine_version: + nullable: false + data_type: bigint + name: engine_version + inserted_at: + nullable: false + data_type: timestamp + name: inserted_at + schema_name: + nullable: false + data_type: text + name: schema_name + version_hash: + nullable: false + data_type: text + name: version_hash + schema: + nullable: false + data_type: text + name: schema + write_disposition: skip + description: Created by DLT. Tracks schema updates + schema_contract: {} + name: _dlt_version + resource: _dlt_version + blocks: + description: Ethereum blocks + x-annotation: this will be preserved on save + write_disposition: append + filters: + includes: [] + excludes: [] + columns: + _dlt_load_id: + nullable: false + description: load id coming from the extractor + data_type: text + name: _dlt_load_id + _dlt_id: + nullable: false + unique: true + data_type: text + name: _dlt_id + number: + nullable: false + primary_key: true + data_type: bigint + name: number + parent_hash: + nullable: true + data_type: text + name: parent_hash + hash: + nullable: false + cluster: true + unique: true + data_type: text + name: hash + base_fee_per_gas: + nullable: false + data_type: wei + name: base_fee_per_gas + difficulty: + nullable: false + data_type: wei + name: difficulty + extra_data: + nullable: true + data_type: text + name: extra_data + gas_limit: + nullable: false + data_type: bigint + name: gas_limit + gas_used: + nullable: false + data_type: bigint + name: gas_used + logs_bloom: + nullable: true + data_type: binary + name: logs_bloom + miner: + nullable: true + data_type: text + name: miner + mix_hash: + nullable: true + data_type: text + name: mix_hash + nonce: + nullable: true + data_type: text + name: nonce + receipts_root: + nullable: true + data_type: text + name: receipts_root + sha3_uncles: + nullable: true + data_type: text + name: sha3_uncles + size: + nullable: true + data_type: bigint + name: size + state_root: + nullable: false + data_type: text + name: state_root + timestamp: + nullable: false + unique: true + sort: true + data_type: timestamp + name: timestamp + total_difficulty: + nullable: true + data_type: wei + name: total_difficulty + transactions_root: + nullable: false + data_type: text + name: transactions_root + schema_contract: {} + name: blocks + resource: blocks + blocks__transactions: + parent: blocks + columns: + _dlt_id: + nullable: false + unique: true + data_type: text + name: _dlt_id + block_number: + nullable: false + primary_key: true + foreign_key: true + data_type: bigint + name: block_number + transaction_index: + nullable: false + primary_key: true + data_type: bigint + name: transaction_index + hash: + nullable: false + unique: true + data_type: text + name: hash + block_hash: + nullable: false + cluster: true + data_type: text + name: block_hash + block_timestamp: + nullable: false + sort: true + data_type: timestamp + name: block_timestamp + chain_id: + nullable: true + data_type: text + name: chain_id + from: + nullable: true + data_type: text + name: from + gas: + nullable: true + data_type: bigint + name: gas + gas_price: + nullable: true + data_type: bigint + name: gas_price + input: + nullable: true + data_type: text + name: input + max_fee_per_gas: + nullable: true + data_type: wei + name: max_fee_per_gas + max_priority_fee_per_gas: + nullable: true + data_type: wei + name: max_priority_fee_per_gas + nonce: + nullable: true + data_type: bigint + name: nonce + r: + nullable: true + data_type: text + name: r + s: + nullable: true + data_type: text + name: s + status: + nullable: true + data_type: bigint + name: status + to: + nullable: true + data_type: text + name: to + type: + nullable: true + data_type: text + name: type + v: + nullable: true + data_type: bigint + name: v + value: + nullable: false + data_type: wei + name: value + eth_value: + nullable: true + data_type: decimal + name: eth_value + name: blocks__transactions + blocks__transactions__logs: + parent: blocks__transactions + columns: + _dlt_id: + nullable: false + unique: true + data_type: text + name: _dlt_id + address: + nullable: false + data_type: text + name: address + block_timestamp: + nullable: false + sort: true + data_type: timestamp + name: block_timestamp + block_hash: + nullable: false + cluster: true + data_type: text + name: block_hash + block_number: + nullable: false + primary_key: true + foreign_key: true + data_type: bigint + name: block_number + transaction_index: + nullable: false + primary_key: true + foreign_key: true + data_type: bigint + name: transaction_index + log_index: + nullable: false + primary_key: true + data_type: bigint + name: log_index + data: + nullable: true + data_type: text + name: data + removed: + nullable: true + data_type: bool + name: removed + transaction_hash: + nullable: false + data_type: text + name: transaction_hash + name: blocks__transactions__logs + blocks__transactions__logs__topics: + parent: blocks__transactions__logs + columns: + _dlt_parent_id: + nullable: false + foreign_key: true + data_type: text + name: _dlt_parent_id + _dlt_list_idx: + nullable: false + data_type: bigint + name: _dlt_list_idx + _dlt_id: + nullable: false + unique: true + data_type: text + name: _dlt_id + _dlt_root_id: + nullable: false + root_key: true + data_type: text + name: _dlt_root_id + value: + nullable: true + data_type: text + name: value + name: blocks__transactions__logs__topics + blocks__transactions__access_list: + parent: blocks__transactions + columns: + _dlt_parent_id: + nullable: false + foreign_key: true + data_type: text + name: _dlt_parent_id + _dlt_list_idx: + nullable: false + data_type: bigint + name: _dlt_list_idx + _dlt_id: + nullable: false + unique: true + data_type: text + name: _dlt_id + _dlt_root_id: + nullable: false + root_key: true + data_type: text + name: _dlt_root_id + address: + nullable: true + data_type: text + name: address + name: blocks__transactions__access_list + blocks__transactions__access_list__storage_keys: + parent: blocks__transactions__access_list + columns: + _dlt_parent_id: + nullable: false + foreign_key: true + data_type: text + name: _dlt_parent_id + _dlt_list_idx: + nullable: false + data_type: bigint + name: _dlt_list_idx + _dlt_id: + nullable: false + unique: true + data_type: text + name: _dlt_id + _dlt_root_id: + nullable: false + root_key: true + data_type: text + name: _dlt_root_id + value: + nullable: true + data_type: text + name: value + name: blocks__transactions__access_list__storage_keys + blocks__uncles: + parent: blocks + columns: + _dlt_parent_id: + nullable: false + foreign_key: true + data_type: text + name: _dlt_parent_id + _dlt_list_idx: + nullable: false + data_type: bigint + name: _dlt_list_idx + _dlt_id: + nullable: false + unique: true + data_type: text + name: _dlt_id + _dlt_root_id: + nullable: false + root_key: true + data_type: text + name: _dlt_root_id + value: + nullable: true + data_type: text + name: value + name: blocks__uncles +settings: + default_hints: + foreign_key: + - _dlt_parent_id + not_null: + - re:^_dlt_id$ + - _dlt_root_id + - _dlt_parent_id + - _dlt_list_idx + unique: + - _dlt_id + cluster: + - block_hash + partition: + - block_timestamp + root_key: + - _dlt_root_id + preferred_types: + timestamp: timestamp + block_timestamp: timestamp + schema_contract: {} +normalizers: + names: dlt.common.normalizers.names.snake_case + json: + module: dlt.common.normalizers.json.relational + config: + generate_dlt_id: true + propagation: + root: + _dlt_id: _dlt_root_id + tables: + blocks: + timestamp: block_timestamp + hash: block_hash +previous_hashes: +- yjMtV4Zv0IJlfR5DPMwuXxGg8BRhy7E79L26XAHWEGE= + diff --git a/tests/common/schema/test_schema.py b/tests/common/schema/test_schema.py index 104b634491..3f6e5e5b93 100644 --- a/tests/common/schema/test_schema.py +++ b/tests/common/schema/test_schema.py @@ -132,7 +132,7 @@ def test_simple_regex_validator() -> None: def test_load_corrupted_schema() -> None: - eth_v4: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v7") + eth_v4: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v8") del eth_v4["tables"]["blocks"] with pytest.raises(ParentTableNotFoundException): utils.validate_stored_schema(eth_v4) @@ -308,9 +308,13 @@ def test_upgrade_engine_v1_schema() -> None: assert schema_dict["engine_version"] == 1 upgraded = utils.migrate_schema(schema_dict, from_engine=1, to_engine=7) assert upgraded["engine_version"] == 7 - utils.validate_stored_schema(upgraded) - # we should have an empty ancestors list after upgrade to 7 - assert upgraded["ancestors"] == [] + + + # upgrade 1 -> 8 + schema_dict = load_json_case("schemas/ev1/event.schema") + assert schema_dict["engine_version"] == 1 + upgraded = utils.migrate_schema(schema_dict, from_engine=1, to_engine=8) + assert upgraded["engine_version"] == 8 def test_unknown_engine_upgrade() -> None: @@ -590,8 +594,8 @@ def assert_new_schema_values(schema: Schema) -> None: assert schema.stored_version == 1 assert schema.stored_version_hash is not None assert schema.version_hash is not None - assert schema.ENGINE_VERSION == 7 - assert schema._stored_ancestors == [] + assert schema.ENGINE_VERSION == 8 + assert schema._stored_previous_hashes == [] assert len(schema.settings["default_hints"]) > 0 # check settings assert utils.standard_type_detections() == schema.settings["detections"] == schema._type_detections diff --git a/tests/common/schema/test_versioning.py b/tests/common/schema/test_versioning.py index a971c8c93f..401b463875 100644 --- a/tests/common/schema/test_versioning.py +++ b/tests/common/schema/test_versioning.py @@ -84,10 +84,10 @@ def test_infer_column_bumps_version() -> None: def test_preserve_version_on_load() -> None: - eth_v7: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v7") - version = eth_v7["version"] - version_hash = eth_v7["version_hash"] - schema = Schema.from_dict(eth_v7) # type: ignore[arg-type] + eth_v8: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v8") + version = eth_v8["version"] + version_hash = eth_v8["version_hash"] + schema = Schema.from_dict(eth_v8) # type: ignore[arg-type] # version should not be bumped assert version_hash == schema._stored_version_hash assert version_hash == schema.version_hash @@ -96,8 +96,8 @@ def test_preserve_version_on_load() -> None: @pytest.mark.parametrize("remove_defaults", [True, False]) def test_version_preserve_on_reload(remove_defaults: bool) -> None: - eth_v7: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v7") - schema = Schema.from_dict(eth_v7) # type: ignore[arg-type] + eth_v8: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v8") + schema = Schema.from_dict(eth_v8) # type: ignore[arg-type] to_save_dict = schema.to_dict(remove_defaults=remove_defaults) assert schema.stored_version == to_save_dict["version"] @@ -126,16 +126,16 @@ def test_version_preserve_on_reload(remove_defaults: bool) -> None: def test_create_ancestry() -> None: - eth_v7: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v7") - schema = Schema.from_dict(eth_v7) # type: ignore[arg-type] - assert schema._stored_ancestors == ["Q/LxiP7taycE+u9PQNb2wiit+G5GntiifOUK2CFM3sQ="] + eth_v8: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v8") + schema = Schema.from_dict(eth_v8) # type: ignore[arg-type] + assert schema._stored_previous_hashes == ["yjMtV4Zv0IJlfR5DPMwuXxGg8BRhy7E79L26XAHWEGE="] version = schema._stored_version # modify save and load schema 15 times and check ancestry - expected_ancestors = ["Q/LxiP7taycE+u9PQNb2wiit+G5GntiifOUK2CFM3sQ="] + expected_previous_hashes = ["yjMtV4Zv0IJlfR5DPMwuXxGg8BRhy7E79L26XAHWEGE="] for i in range(1,15): - # keep expected ancestors - expected_ancestors.insert(0, schema._stored_version_hash) + # keep expected previous_hashes + expected_previous_hashes.insert(0, schema._stored_version_hash) # update schema row = {f"float{i}": 78172.128} @@ -144,10 +144,10 @@ def test_create_ancestry() -> None: schema_dict = schema.to_dict() schema = Schema.from_stored_schema(schema_dict) - assert schema._stored_ancestors == expected_ancestors[:10] + assert schema._stored_previous_hashes == expected_previous_hashes[:10] assert schema._stored_version == version + i - # we never have more than 10 ancestors - assert len(schema._stored_ancestors) == i + 1 if i + 1 <= 10 else 10 + # we never have more than 10 previous_hashes + assert len(schema._stored_previous_hashes) == i + 1 if i + 1 <= 10 else 10 - assert len(schema._stored_ancestors) == 10 \ No newline at end of file + assert len(schema._stored_previous_hashes) == 10 \ No newline at end of file diff --git a/tests/common/storages/test_schema_storage.py b/tests/common/storages/test_schema_storage.py index a577729e5d..401c22f0bc 100644 --- a/tests/common/storages/test_schema_storage.py +++ b/tests/common/storages/test_schema_storage.py @@ -11,7 +11,7 @@ from dlt.common.storages import SchemaStorageConfiguration, SchemaStorage, LiveSchemaStorage, FileStorage from tests.utils import autouse_test_storage, TEST_STORAGE_ROOT -from tests.common.utils import load_yml_case, yml_case_path, COMMON_TEST_CASES_PATH, IMPORTED_VERSION_HASH_ETH_V7 +from tests.common.utils import load_yml_case, yml_case_path, COMMON_TEST_CASES_PATH, IMPORTED_VERSION_HASH_ETH_V8 @pytest.fixture @@ -87,7 +87,7 @@ def test_skip_import_if_not_modified(synced_storage: SchemaStorage, storage: Sch assert storage_schema.version == reloaded_schema.stored_version assert storage_schema.version_hash == reloaded_schema.stored_version_hash assert storage_schema._imported_version_hash == reloaded_schema._imported_version_hash - assert storage_schema.ancestors == reloaded_schema.ancestors + assert storage_schema.previous_hashes == reloaded_schema.previous_hashes # the import schema gets modified storage_schema.tables["_dlt_loads"]["write_disposition"] = "append" storage_schema.tables.pop("event_user") @@ -100,7 +100,7 @@ def test_skip_import_if_not_modified(synced_storage: SchemaStorage, storage: Sch # hash and ancestry stay the same assert reloaded_schema._imported_version_hash == storage_schema.version_hash - assert storage_schema.ancestors == reloaded_schema.ancestors + assert storage_schema.previous_hashes == reloaded_schema.previous_hashes # but original version has increased assert reloaded_schema.stored_version == storage_schema.version + 1 @@ -199,13 +199,13 @@ def test_save_store_schema_over_import(ie_storage: SchemaStorage) -> None: ie_storage.save_schema(schema) assert schema.version_hash == schema_hash # we linked schema to import schema - assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V7 + assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V8 # load schema and make sure our new schema is here schema = ie_storage.load_schema("ethereum") - assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V7 + assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V8 assert schema._stored_version_hash == schema_hash assert schema.version_hash == schema_hash - assert schema.ancestors == [] + assert schema.previous_hashes == [] # we have simple schema in export folder fs = FileStorage(ie_storage.config.export_schema_path) exported_name = ie_storage._file_name_in_store("ethereum", "yaml") @@ -219,13 +219,13 @@ def test_save_store_schema_over_import_sync(synced_storage: SchemaStorage) -> No schema = Schema("ethereum") schema_hash = schema.version_hash synced_storage.save_schema(schema) - assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V7 + assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V8 # import schema is overwritten fs = FileStorage(synced_storage.config.import_schema_path) exported_name = synced_storage._file_name_in_store("ethereum", "yaml") exported_schema = yaml.safe_load(fs.load(exported_name)) assert schema.version_hash == exported_schema["version_hash"] == schema_hash - assert schema.ancestors == [] + assert schema.previous_hashes == [] # when it is loaded we will import schema again which is identical to the current one but the import link # will be set to itself schema = synced_storage.load_schema("ethereum") @@ -276,18 +276,18 @@ def test_schema_from_file() -> None: def prepare_import_folder(storage: SchemaStorage) -> None: - shutil.copy(yml_case_path("schemas/eth/ethereum_schema_v7"), os.path.join(storage.storage.storage_path, "../import/ethereum.schema.yaml")) + shutil.copy(yml_case_path("schemas/eth/ethereum_schema_v8"), os.path.join(storage.storage.storage_path, "../import/ethereum.schema.yaml")) def assert_schema_imported(synced_storage: SchemaStorage, storage: SchemaStorage) -> Schema: prepare_import_folder(synced_storage) - eth_v6: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v7") + eth_V8: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v8") schema = synced_storage.load_schema("ethereum") # is linked to imported schema - schema._imported_version_hash = eth_v6["version_hash"] + schema._imported_version_hash = eth_V8["version_hash"] # also was saved in storage assert synced_storage.has_schema("ethereum") # and has link to imported schema s well (load without import) schema = storage.load_schema("ethereum") - assert schema._imported_version_hash == eth_v6["version_hash"] + assert schema._imported_version_hash == eth_V8["version_hash"] return schema diff --git a/tests/common/test_validation.py b/tests/common/test_validation.py index d35adc8c7b..f274c82014 100644 --- a/tests/common/test_validation.py +++ b/tests/common/test_validation.py @@ -89,7 +89,7 @@ def test_doc() -> TTestRecord: def test_validate_schema_cases() -> None: - with open("tests/common/cases/schemas/eth/ethereum_schema_v7.yml", mode="r", encoding="utf-8") as f: + with open("tests/common/cases/schemas/eth/ethereum_schema_v8.yml", mode="r", encoding="utf-8") as f: schema_dict: TStoredSchema = yaml.safe_load(f) validate_dict_ignoring_xkeys( diff --git a/tests/common/utils.py b/tests/common/utils.py index d612dcbdcf..db9a8318fb 100644 --- a/tests/common/utils.py +++ b/tests/common/utils.py @@ -16,7 +16,7 @@ COMMON_TEST_CASES_PATH = "./tests/common/cases/" # for import schema tests, change when upgrading the schema version -IMPORTED_VERSION_HASH_ETH_V7 = "yjMtV4Zv0IJlfR5DPMwuXxGg8BRhy7E79L26XAHWEGE=" +IMPORTED_VERSION_HASH_ETH_V8 = "C5An8WClbavalXDdNSqXbdI7Swqh/mTWMcwWKCF//EE=" # test sentry DSN TEST_SENTRY_DSN = "https://797678dd0af64b96937435326c7d30c1@o1061158.ingest.sentry.io/4504306172821504" # preserve secrets path to be able to restore it diff --git a/tests/extract/test_decorators.py b/tests/extract/test_decorators.py index 28f3d34dcf..27cdc3d22d 100644 --- a/tests/extract/test_decorators.py +++ b/tests/extract/test_decorators.py @@ -28,7 +28,7 @@ SourceDataIsNone, SourceIsAClassTypeError, SourceNotAFunction, SourceSchemaNotAvailable) from dlt.extract.typing import TableNameMeta -from tests.common.utils import IMPORTED_VERSION_HASH_ETH_V7 +from tests.common.utils import IMPORTED_VERSION_HASH_ETH_V8 def test_none_returning_source() -> None: @@ -75,7 +75,7 @@ def test_load_schema_for_callable() -> None: schema = s.schema assert schema.name == "ethereum" == s.name # the schema in the associated file has this hash - assert schema.stored_version_hash == IMPORTED_VERSION_HASH_ETH_V7 + assert schema.stored_version_hash == IMPORTED_VERSION_HASH_ETH_V8 def test_unbound_parametrized_transformer() -> None: diff --git a/tests/extract/test_extract.py b/tests/extract/test_extract.py index 8259483088..a045dd4f3c 100644 --- a/tests/extract/test_extract.py +++ b/tests/extract/test_extract.py @@ -14,7 +14,7 @@ def test_extract_select_tables() -> None: def expect_tables(resource: DltResource) -> dlt.Schema: # delete files clean_test_storage() - source = DltSource("module", dlt.Schema("selectables"), [resource(10)]) + source = DltSource(dlt.Schema("selectables"), "module", [resource(10)]) schema = source.discover_schema() storage = ExtractorStorage(NormalizeStorageConfiguration()) @@ -37,7 +37,7 @@ def expect_tables(resource: DltResource) -> dlt.Schema: clean_test_storage() storage = ExtractorStorage(NormalizeStorageConfiguration()) # same thing but select only odd - source = DltSource("module", dlt.Schema("selectables"), [resource]) + source = DltSource(dlt.Schema("selectables"), "module", [resource]) source = source.with_resources(resource.name) source.selected_resources[resource.name].bind(10).select_tables("odd_table") extract_id = storage.create_extract_id() @@ -80,7 +80,7 @@ def input_gen(): yield from [1, 2, 3] input_r = DltResource.from_data(input_gen) - source = DltSource("module", dlt.Schema("selectables"), [input_r, input_r.with_name("gen_clone")]) + source = DltSource(dlt.Schema("selectables"), "module", [input_r, input_r.with_name("gen_clone")]) storage = ExtractorStorage(NormalizeStorageConfiguration()) extract_id = storage.create_extract_id() extract(extract_id, source, storage) @@ -99,7 +99,7 @@ def tx_step(item): input_r = DltResource.from_data(input_gen) input_tx = DltResource.from_data(tx_step, data_from=DltResource.Empty) - source = DltSource("module", dlt.Schema("selectables"), [input_r, (input_r | input_tx).with_name("tx_clone")]) + source = DltSource(dlt.Schema("selectables"), "module", [input_r, (input_r | input_tx).with_name("tx_clone")]) storage = ExtractorStorage(NormalizeStorageConfiguration()) extract_id = storage.create_extract_id() extract(extract_id, source, storage) diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index 595c67f7c6..ec28018add 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -694,14 +694,14 @@ def child(item): # create a source where we place only child child.write_disposition = "replace" - s = DltSource("comp", "section", Schema("comp"), [child]) + s = DltSource(Schema("comp"), "section", [child]) # but extracted resources will include its parent where it derives write disposition from child extracted = s.resources.extracted assert extracted[child.name].write_disposition == "replace" assert extracted[child._pipe.parent.name].write_disposition == "replace" # create a source where we place parent explicitly - s = DltSource("section", Schema("comp"), [parent_r, child]) + s = DltSource(Schema("comp"), "section", [parent_r, child]) extracted = s.resources.extracted assert extracted[child.name].write_disposition == "replace" # now parent exists separately and has its own write disposition @@ -722,7 +722,7 @@ def child(item): # now we add child that has parent_r as parent but we add another instance of standalone_some_data explicitly # so we have a resource with the same name as child parent but the pipe instance is different - s = DltSource("section", Schema("comp"), [standalone_some_data(now), child]) + s = DltSource(Schema("comp"), "section", [standalone_some_data(now), child]) assert extracted[child.name].write_disposition == "replace" # now parent exists separately and has its own write disposition - because we search by name to identify matching resource assert extracted[child._pipe.parent.name].write_disposition == "append" diff --git a/tests/extract/test_sources.py b/tests/extract/test_sources.py index e9f7b4a8f1..aae95e0a3f 100644 --- a/tests/extract/test_sources.py +++ b/tests/extract/test_sources.py @@ -55,7 +55,7 @@ def parametrized(p1, /, p2, *, p3 = None): # as part of the source r = DltResource.from_data(parametrized) - s = DltSource("module", Schema("source"), [r]) + s = DltSource(Schema("source"), "module", [r]) with pytest.raises(ParametrizedResourceUnbound) as py_ex: list(s) @@ -1014,7 +1014,7 @@ def some_data(): yield [1, 2, 3] yield [1, 2, 3] - s = DltSource("module", Schema("source"), [dlt.resource(some_data())]) + s = DltSource(Schema("source"), "module", [dlt.resource(some_data())]) assert s.exhausted is False assert list(s) == [1, 2, 3, 1, 2, 3] assert s.exhausted is True @@ -1028,19 +1028,19 @@ def test_exhausted_property() -> None: # this example will be exhausted after iteration def open_generator_data(): yield from [1, 2, 3, 4] - s = DltSource("module", Schema("source"), [dlt.resource(open_generator_data())]) + s = DltSource(Schema("source"), "module", [dlt.resource(open_generator_data())]) assert s.exhausted is False assert next(iter(s)) == 1 assert s.exhausted is True # lists will not exhaust - s = DltSource("module", Schema("source"), [dlt.resource([1, 2, 3, 4], table_name="table", name="resource")]) + s = DltSource(Schema("source"), "module", [dlt.resource([1, 2, 3, 4], table_name="table", name="resource")]) assert s.exhausted is False assert next(iter(s)) == 1 assert s.exhausted is False # iterators will not exhaust - s = DltSource("module", Schema("source"), [dlt.resource(iter([1, 2, 3, 4]), table_name="table", name="resource")]) + s = DltSource(Schema("source"), "module", [dlt.resource(iter([1, 2, 3, 4]), table_name="table", name="resource")]) assert s.exhausted is False assert next(iter(s)) == 1 assert s.exhausted is False @@ -1048,7 +1048,7 @@ def open_generator_data(): # having on exhausted generator resource will make the whole source exhausted def open_generator_data(): # type: ignore[no-redef] yield from [1, 2, 3, 4] - s = DltSource("module", Schema("source"), [ dlt.resource([1, 2, 3, 4], table_name="table", name="resource"), dlt.resource(open_generator_data())]) + s = DltSource(Schema("source"), "module", [ dlt.resource([1, 2, 3, 4], table_name="table", name="resource"), dlt.resource(open_generator_data())]) assert s.exhausted is False # execute the whole source @@ -1239,7 +1239,7 @@ def tx_step(item): input_r_clone = input_r.with_name("input_gen_2") # separate resources have separate pipe instances - source = DltSource("module", Schema("dupes"), [input_r, input_r_clone]) + source = DltSource(Schema("dupes"), "module", [input_r, input_r_clone]) pipes = source.resources.pipes assert len(pipes) == 2 assert pipes[0].name == "input_gen" @@ -1250,13 +1250,13 @@ def tx_step(item): assert list(source) == [1, 2, 3, 1, 2, 3] # cloned from fresh resource - source = DltSource("module", Schema("dupes"), [DltResource.from_data(input_gen), DltResource.from_data(input_gen).with_name("gen_2")]) + source = DltSource(Schema("dupes"), "module", [DltResource.from_data(input_gen), DltResource.from_data(input_gen).with_name("gen_2")]) assert list(source) == [1, 2, 3, 1, 2, 3] # clone transformer input_r = DltResource.from_data(input_gen) input_tx = DltResource.from_data(tx_step, data_from=DltResource.Empty) - source = DltSource("module", Schema("dupes"), [input_r, (input_r | input_tx).with_name("tx_clone")]) + source = DltSource(Schema("dupes"), "module", [input_r, (input_r | input_tx).with_name("tx_clone")]) pipes = source.resources.pipes assert len(pipes) == 2 assert source.resources[pipes[0].name] == source.input_gen diff --git a/tests/load/pipeline/test_restore_state.py b/tests/load/pipeline/test_restore_state.py index c9c6c4c437..1ebb3378a6 100644 --- a/tests/load/pipeline/test_restore_state.py +++ b/tests/load/pipeline/test_restore_state.py @@ -18,7 +18,7 @@ from tests.utils import TEST_STORAGE_ROOT from tests.cases import JSON_TYPED_DICT, JSON_TYPED_DICT_DECODED -from tests.common.utils import IMPORTED_VERSION_HASH_ETH_V7, yml_case_path as common_yml_case_path +from tests.common.utils import IMPORTED_VERSION_HASH_ETH_V8, yml_case_path as common_yml_case_path from tests.common.configuration.utils import environment from tests.load.pipeline.utils import assert_query_data, drop_active_pipeline_data from tests.load.utils import destinations_configs, DestinationTestConfiguration, get_normalized_dataset_name @@ -404,7 +404,7 @@ def test_restore_schemas_while_import_schemas_exist(destination_config: Destinat assert normalized_annotations in schema.tables # check if attached to import schema - assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V7 + assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V8 # extract some data with restored pipeline p.run(["C", "D", "E"], table_name="blacklist") assert normalized_labels in schema.tables diff --git a/tests/load/weaviate/test_naming.py b/tests/load/weaviate/test_naming.py index 850f70ee19..dad7fc176f 100644 --- a/tests/load/weaviate/test_naming.py +++ b/tests/load/weaviate/test_naming.py @@ -87,13 +87,13 @@ def test_reserved_property_names() -> None: # print(schema_2.name) # print(schema_2.naming) -# eth_v6 = load_yml_case("schemas/eth/ethereum_schema_v7") -# eth_v6_schema = dlt.Schema.from_dict(eth_v6) +# eth_V8 = load_yml_case("schemas/eth/ethereum_schema_v8") +# eth_V8_schema = dlt.Schema.from_dict(eth_V8) -# pipeline.extract(s, schema=eth_v6_schema) +# pipeline.extract(s, schema=eth_V8_schema) -# print(eth_v6_schema.data_tables()) -# print(eth_v6_schema.dlt_tables()) +# print(eth_V8_schema.data_tables()) +# print(eth_V8_schema.dlt_tables()) # def test_x_schema_naming_normalize() -> None: @@ -101,14 +101,14 @@ def test_reserved_property_names() -> None: # print(pipeline.dataset_name) # s = small() -# eth_v6 = load_yml_case("schemas/eth/ethereum_schema_v7") -# eth_v6_schema = dlt.Schema.from_dict(eth_v6) +# eth_V8 = load_yml_case("schemas/eth/ethereum_schema_v8") +# eth_V8_schema = dlt.Schema.from_dict(eth_V8) -# pipeline.extract(s, schema=eth_v6_schema) -# print(eth_v6_schema.tables.keys()) +# pipeline.extract(s, schema=eth_V8_schema) +# print(eth_V8_schema.tables.keys()) # default_schema = pipeline.default_schema # print(default_schema.name) -# print(eth_v6_schema.tables.keys()) +# print(eth_V8_schema.tables.keys()) # pipeline.run(s, destination="weaviate") # print(default_schema.tables.keys()) diff --git a/tests/pipeline/test_dlt_versions.py b/tests/pipeline/test_dlt_versions.py index 7ac7dcbb34..17861d0bc2 100644 --- a/tests/pipeline/test_dlt_versions.py +++ b/tests/pipeline/test_dlt_versions.py @@ -81,7 +81,7 @@ def test_pipeline_with_dlt_update(test_storage: FileStorage) -> None: pipeline.sync_destination() # print(pipeline.working_dir) # we have updated schema - assert pipeline.default_schema.ENGINE_VERSION == 7 + assert pipeline.default_schema.ENGINE_VERSION == 8 # make sure that schema hash retrieved from the destination is exactly the same as the schema hash that was in storage before the schema was wiped assert pipeline.default_schema.stored_version_hash == github_schema["version_hash"] @@ -114,6 +114,6 @@ def test_load_package_with_dlt_update(test_storage: FileStorage) -> None: github_schema = json.loads(test_storage.load(f".dlt/pipelines/{GITHUB_PIPELINE_NAME}/schemas/github.schema.json")) pipeline = pipeline.drop() pipeline.sync_destination() - assert pipeline.default_schema.ENGINE_VERSION == 7 + assert pipeline.default_schema.ENGINE_VERSION == 8 # schema version does not match `dlt.attach` does not update to the right schema by itself assert pipeline.default_schema.stored_version_hash != github_schema["version_hash"] diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index e761186d3a..af21eb9f81 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -274,7 +274,7 @@ def some_data(): yield [1, 2, 3] yield [1, 2, 3] - s = DltSource("module", dlt.Schema("source"), [dlt.resource(some_data())]) + s = DltSource(dlt.Schema("source"), "module", [dlt.resource(some_data())]) dlt.pipeline().extract(s) with pytest.raises(PipelineStepFailed) as py_ex: dlt.pipeline().extract(s) @@ -289,7 +289,7 @@ def test_disable_enable_state_sync(environment: Any) -> None: def some_data(): yield [1, 2, 3] - s = DltSource("module", dlt.Schema("default"), [dlt.resource(some_data())]) + s = DltSource(dlt.Schema("default"), "module", [dlt.resource(some_data())]) dlt.pipeline().extract(s) storage = ExtractorStorage(p._normalize_storage_config) assert len(storage.list_files_to_normalize_sorted()) == 1 @@ -299,14 +299,14 @@ def some_data(): p.config.restore_from_destination = True # extract to different schema, state must go to default schema - s = DltSource("module", dlt.Schema("default_2"), [dlt.resource(some_data())]) + s = DltSource(dlt.Schema("default_2"), "module", [dlt.resource(some_data())]) dlt.pipeline().extract(s) expect_extracted_file(storage, "default", s.schema.state_table_name, "***") def test_extract_multiple_sources() -> None: - s1 = DltSource("module", dlt.Schema("default"), [dlt.resource([1, 2, 3], name="resource_1"), dlt.resource([3, 4, 5], name="resource_2")]) - s2 = DltSource("module", dlt.Schema("default_2"), [dlt.resource([6, 7, 8], name="resource_3"), dlt.resource([9, 10, 0], name="resource_4")]) + s1 = DltSource(dlt.Schema("default"), "module", [dlt.resource([1, 2, 3], name="resource_1"), dlt.resource([3, 4, 5], name="resource_2")]) + s2 = DltSource(dlt.Schema("default_2"),"module", [dlt.resource([6, 7, 8], name="resource_3"), dlt.resource([9, 10, 0], name="resource_4")]) p = dlt.pipeline(destination="dummy") p.config.restore_from_destination = False @@ -325,8 +325,8 @@ def test_extract_multiple_sources() -> None: def i_fail(): raise NotImplementedError() - s3 = DltSource("module", dlt.Schema("default_3"), [dlt.resource([1, 2, 3], name="resource_1"), dlt.resource([3, 4, 5], name="resource_2")]) - s4 = DltSource("module", dlt.Schema("default_4"), [dlt.resource([6, 7, 8], name="resource_3"), i_fail]) + s3 = DltSource(dlt.Schema("default_3"), "module", [dlt.resource([1, 2, 3], name="resource_1"), dlt.resource([3, 4, 5], name="resource_2")]) + s4 = DltSource(dlt.Schema("default_4"), "module", [dlt.resource([6, 7, 8], name="resource_3"), i_fail]) with pytest.raises(PipelineStepFailed): # NOTE: if you swap s3 and s4 the test on list_schemas will fail: s3 will extract normally and update live schemas, s4 will break exec later diff --git a/tests/pipeline/test_pipeline_trace.py b/tests/pipeline/test_pipeline_trace.py index 793e51b909..5644a32d2f 100644 --- a/tests/pipeline/test_pipeline_trace.py +++ b/tests/pipeline/test_pipeline_trace.py @@ -298,12 +298,12 @@ def data(): def test_extract_data_describe() -> None: schema = Schema("test") - assert describe_extract_data(DltSource("sect", schema)) == [{"name": "test", "data_type": "source"}] + assert describe_extract_data(DltSource(schema, "sect")) == [{"name": "test", "data_type": "source"}] assert describe_extract_data(DltResource(Pipe("rrr_extract"), None, False)) == [{"name": "rrr_extract", "data_type": "resource"}] - assert describe_extract_data([DltSource("sect", schema)]) == [{"name": "test", "data_type": "source"}] + assert describe_extract_data([DltSource(schema, "sect")]) == [{"name": "test", "data_type": "source"}] assert describe_extract_data([DltResource(Pipe("rrr_extract"), None, False)]) == [{"name": "rrr_extract", "data_type": "resource"}] assert describe_extract_data( - [DltResource(Pipe("rrr_extract"), None, False), DltSource("sect", schema)] + [DltResource(Pipe("rrr_extract"), None, False), DltSource(schema, "sect")] ) == [ {"name": "rrr_extract", "data_type": "resource"}, {"name": "test", "data_type": "source"} ] @@ -313,7 +313,7 @@ def test_extract_data_describe() -> None: assert describe_extract_data([DataFrame(), {"a": "b"}]) == [{"name": "", "data_type": "DataFrame"}] # first unnamed element in the list breaks checking info assert describe_extract_data( - [DltResource(Pipe("rrr_extract"), None, False), DataFrame(), DltSource("sect", schema)] + [DltResource(Pipe("rrr_extract"), None, False), DataFrame(), DltSource(schema, "sect")] ) == [ {"name": "rrr_extract", "data_type": "resource"}, {"name": "", "data_type": "DataFrame"} ] From b68013d64bbc00662f31fc7e70e03486241c131b Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 21 Nov 2023 13:06:23 +0100 Subject: [PATCH 6/7] revert changes on validate dict --- dlt/common/schema/utils.py | 7 +------ dlt/common/validation.py | 5 ++--- tests/common/schema/test_schema.py | 9 +++------ 3 files changed, 6 insertions(+), 15 deletions(-) diff --git a/dlt/common/schema/utils.py b/dlt/common/schema/utils.py index 899f60dfb1..b6a3cca0e2 100644 --- a/dlt/common/schema/utils.py +++ b/dlt/common/schema/utils.py @@ -246,18 +246,13 @@ def compile_simple_regexes(r: Iterable[TSimpleRegex]) -> REPattern: def validate_stored_schema(stored_schema: TStoredSchema) -> None: - # exclude validation of keys added later - ignored_keys = [] - if stored_schema["engine_version"] < 7: - ignored_keys.append("previous_hashes") # use lambda to verify only non extra fields validate_dict_ignoring_xkeys( spec=TStoredSchema, doc=stored_schema, path=".", - validator_f=simple_regex_validator, - filter_required=lambda k: k not in ignored_keys + validator_f=simple_regex_validator ) # check child parent relationships for table_name, table in stored_schema["tables"].items(): diff --git a/dlt/common/validation.py b/dlt/common/validation.py index 5e5e18de42..312371bbf1 100644 --- a/dlt/common/validation.py +++ b/dlt/common/validation.py @@ -9,7 +9,7 @@ TCustomValidator = Callable[[str, str, Any, Any], bool] -def validate_dict(spec: Type[_TypedDict], doc: StrAny, path: str, filter_f: TFilterFunc = None, validator_f: TCustomValidator = None, filter_required: TFilterFunc = None) -> None: +def validate_dict(spec: Type[_TypedDict], doc: StrAny, path: str, filter_f: TFilterFunc = None, validator_f: TCustomValidator = None) -> None: """Validate the `doc` dictionary based on the given typed dictionary specification `spec`. Args: @@ -34,12 +34,11 @@ def validate_dict(spec: Type[_TypedDict], doc: StrAny, path: str, filter_f: TFil """ # pass through filter filter_f = filter_f or (lambda _: True) - filter_required = filter_required or (lambda _: True) # cannot validate anything validator_f = validator_f or (lambda p, pk, pv, t: False) allowed_props = get_type_hints(spec) - required_props = {k: v for k, v in allowed_props.items() if (not is_optional_type(v) and filter_required(k))} + required_props = {k: v for k, v in allowed_props.items() if not is_optional_type(v)} # remove optional props props = {k: v for k, v in doc.items() if filter_f(k)} # check missing props diff --git a/tests/common/schema/test_schema.py b/tests/common/schema/test_schema.py index 3f6e5e5b93..a42018f97b 100644 --- a/tests/common/schema/test_schema.py +++ b/tests/common/schema/test_schema.py @@ -132,10 +132,10 @@ def test_simple_regex_validator() -> None: def test_load_corrupted_schema() -> None: - eth_v4: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v8") - del eth_v4["tables"]["blocks"] + eth_v8: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v8") + del eth_v8["tables"]["blocks"] with pytest.raises(ParentTableNotFoundException): - utils.validate_stored_schema(eth_v4) + utils.validate_stored_schema(eth_v8) def test_column_name_validator(schema: Schema) -> None: @@ -287,21 +287,18 @@ def test_upgrade_engine_v1_schema() -> None: assert schema_dict["engine_version"] == 2 upgraded = utils.migrate_schema(schema_dict, from_engine=2, to_engine=4) assert upgraded["engine_version"] == 4 - utils.validate_stored_schema(upgraded) # upgrade 1 -> 4 schema_dict = load_json_case("schemas/ev1/event.schema") assert schema_dict["engine_version"] == 1 upgraded = utils.migrate_schema(schema_dict, from_engine=1, to_engine=4) assert upgraded["engine_version"] == 4 - utils.validate_stored_schema(upgraded) # upgrade 1 -> 6 schema_dict = load_json_case("schemas/ev1/event.schema") assert schema_dict["engine_version"] == 1 upgraded = utils.migrate_schema(schema_dict, from_engine=1, to_engine=6) assert upgraded["engine_version"] == 6 - utils.validate_stored_schema(upgraded) # upgrade 1 -> 7 schema_dict = load_json_case("schemas/ev1/event.schema") From cb251e94ff1c1b9f18d874485d94d79abf23a780 Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 21 Nov 2023 15:25:19 +0100 Subject: [PATCH 7/7] fix one test --- tests/pipeline/test_dlt_versions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/pipeline/test_dlt_versions.py b/tests/pipeline/test_dlt_versions.py index 17861d0bc2..71f6b5b813 100644 --- a/tests/pipeline/test_dlt_versions.py +++ b/tests/pipeline/test_dlt_versions.py @@ -55,7 +55,7 @@ def test_pipeline_with_dlt_update(test_storage: FileStorage) -> None: print(venv.run_script("../tests/pipeline/cases/github_pipeline/github_pipeline.py")) # hash hash in schema github_schema = json.loads(test_storage.load(f".dlt/pipelines/{GITHUB_PIPELINE_NAME}/schemas/github.schema.json")) - assert github_schema["engine_version"] == 7 + assert github_schema["engine_version"] == 8 assert "schema_version_hash" in github_schema["tables"][LOADS_TABLE_NAME]["columns"] with DuckDbSqlClient(GITHUB_DATASET, duckdb_cfg.credentials) as client: rows = client.execute_sql(f"SELECT * FROM {LOADS_TABLE_NAME} ORDER BY inserted_at")