diff --git a/dlt/common/schema/utils.py b/dlt/common/schema/utils.py index 41b0010242..899f60dfb1 100644 --- a/dlt/common/schema/utils.py +++ b/dlt/common/schema/utils.py @@ -143,13 +143,13 @@ def bump_version_if_modified(stored_schema: TStoredSchema) -> Tuple[int, str, st pass elif hash_ != previous_hash: stored_schema["version"] += 1 - # unshift previous hash to ancestors and limit array to 10 entries - if previous_hash not in stored_schema["ancestors"]: - stored_schema["ancestors"].insert(0, previous_hash) - stored_schema["ancestors"] = stored_schema["ancestors"][:10] + # unshift previous hash to previous_hashes and limit array to 10 entries + if previous_hash not in stored_schema["previous_hashes"]: + stored_schema["previous_hashes"].insert(0, previous_hash) + stored_schema["previous_hashes"] = stored_schema["previous_hashes"][:10] stored_schema["version_hash"] = hash_ - return stored_schema["version"], hash_, previous_hash, stored_schema["ancestors"] + return stored_schema["version"], hash_, previous_hash, stored_schema["previous_hashes"] def generate_version_hash(stored_schema: TStoredSchema) -> str: @@ -158,7 +158,7 @@ def generate_version_hash(stored_schema: TStoredSchema) -> str: schema_copy.pop("version") schema_copy.pop("version_hash", None) schema_copy.pop("imported_version_hash", None) - schema_copy.pop("ancestors", None) + schema_copy.pop("previous_hashes", None) # ignore order of elements when computing the hash content = json.dumps(schema_copy, sort_keys=True) h = hashlib.sha3_256(content.encode("utf-8")) @@ -249,7 +249,7 @@ def validate_stored_schema(stored_schema: TStoredSchema) -> None: # exclude validation of keys added later ignored_keys = [] if stored_schema["engine_version"] < 7: - ignored_keys.append("ancestors") + ignored_keys.append("previous_hashes") # use lambda to verify only non extra fields validate_dict_ignoring_xkeys( @@ -363,7 +363,7 @@ def migrate_filters(group: str, filters: List[str]) -> None: table["schema_contract"] = {} from_engine = 7 if from_engine == 7 and to_engine > 7: - schema_dict["ancestors"] = [] + schema_dict["previous_hashes"] = [] from_engine = 8 schema_dict["engine_version"] = from_engine diff --git a/dlt/extract/decorators.py b/dlt/extract/decorators.py index fbe712fae8..1dbfcb4350 100644 --- a/dlt/extract/decorators.py +++ b/dlt/extract/decorators.py @@ -173,7 +173,7 @@ def _wrap(*args: Any, **kwargs: Any) -> TDltSourceImpl: rv = list(rv) # convert to source - s = _impl_cls.from_data(source_section, schema.clone(update_normalizers=True), rv) + s = _impl_cls.from_data(schema.clone(update_normalizers=True), source_section, rv) # apply hints if max_table_nesting is not None: s.max_table_nesting = max_table_nesting diff --git a/dlt/extract/source.py b/dlt/extract/source.py index ace76cee2b..0ff24d1f86 100644 --- a/dlt/extract/source.py +++ b/dlt/extract/source.py @@ -167,7 +167,7 @@ class DltSource(Iterable[TDataItem]): * You can use a `run` method to load the data with a default instance of dlt pipeline. * You can get source read only state for the currently active Pipeline instance """ - def __init__(self, section: str, schema: Schema, resources: Sequence[DltResource] = None) -> None: + def __init__(self, schema: Schema, section: str, resources: Sequence[DltResource] = None) -> None: self.section = section """Tells if iterator associated with a source is exhausted""" self._schema = schema @@ -177,7 +177,7 @@ def __init__(self, section: str, schema: Schema, resources: Sequence[DltResource self.resources.add(*resources) @classmethod - def from_data(cls, section: str, schema: Schema, data: Any) -> Self: + def from_data(cls, schema: Schema, section: str, data: Any) -> Self: """Converts any `data` supported by `dlt` `run` method into `dlt source` with a name `section`.`name` and `schema` schema.""" # creates source from various forms of data if isinstance(data, DltSource): @@ -189,7 +189,7 @@ def from_data(cls, section: str, schema: Schema, data: Any) -> Self: else: resources = [DltResource.from_data(data)] - return cls(section, schema, resources) + return cls(schema, section, resources) @property def name(self) -> str: @@ -327,7 +327,7 @@ def state(self) -> StrAny: def clone(self) -> "DltSource": """Creates a deep copy of the source where copies of schema, resources and pipes are created""" # mind that resources and pipes are cloned when added to the DltResourcesDict in the source constructor - return DltSource(self.section, self.schema.clone(), list(self._resources.values())) + return DltSource(self.schema.clone(), self.section, list(self._resources.values())) def __iter__(self) -> Iterator[TDataItem]: """Opens iterator that yields the data items from all the resources within the source in the same order as in Pipeline class. diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index b9eb958027..c893fd4e75 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -860,7 +860,7 @@ def append_data(data_item: Any) -> None: # do not set section to prevent source that represent a standalone resource # to overwrite other standalone resources (ie. parents) in that source sources.append( - DltSource(effective_schema.name, "", effective_schema, [data_item]) + DltSource(effective_schema, "", [data_item]) ) else: # iterator/iterable/generator @@ -881,7 +881,7 @@ def append_data(data_item: Any) -> None: # add all the appended resources in one source if resources: - sources.append(DltSource(effective_schema.name, self.pipeline_name, effective_schema, resources)) + sources.append(DltSource(effective_schema, self.pipeline_name, resources)) # apply hints and settings for source in sources: @@ -1293,7 +1293,7 @@ def _save_state(self, state: TPipelineState) -> None: def _extract_state(self, state: TPipelineState) -> TPipelineState: # this will extract the state into current load package and update the schema with the _dlt_pipeline_state table # note: the schema will be persisted because the schema saving decorator is over the state manager decorator for extract - state_source = DltSource(self.default_schema.name, self.pipeline_name, self.default_schema, [state_resource(state)]) + state_source = DltSource(self.default_schema, self.pipeline_name, [state_resource(state)]) storage = ExtractorStorage(self._normalize_storage_config) extract_id = extract_with_schema(storage, state_source, _NULL_COLLECTOR, 1, 1) storage.commit_extract_files(extract_id) diff --git a/tests/common/cases/schemas/eth/ethereum_schema_v8.yml b/tests/common/cases/schemas/eth/ethereum_schema_v8.yml new file mode 100644 index 0000000000..928c9a3e54 --- /dev/null +++ b/tests/common/cases/schemas/eth/ethereum_schema_v8.yml @@ -0,0 +1,461 @@ +version: 16 +version_hash: C5An8WClbavalXDdNSqXbdI7Swqh/mTWMcwWKCF//EE= +engine_version: 8 +name: ethereum +tables: + _dlt_loads: + columns: + load_id: + nullable: false + data_type: text + name: load_id + schema_name: + nullable: true + data_type: text + name: schema_name + status: + nullable: false + data_type: bigint + name: status + inserted_at: + nullable: false + data_type: timestamp + name: inserted_at + schema_version_hash: + nullable: true + data_type: text + name: schema_version_hash + write_disposition: skip + description: Created by DLT. Tracks completed loads + schema_contract: {} + name: _dlt_loads + resource: _dlt_loads + _dlt_version: + columns: + version: + nullable: false + data_type: bigint + name: version + engine_version: + nullable: false + data_type: bigint + name: engine_version + inserted_at: + nullable: false + data_type: timestamp + name: inserted_at + schema_name: + nullable: false + data_type: text + name: schema_name + version_hash: + nullable: false + data_type: text + name: version_hash + schema: + nullable: false + data_type: text + name: schema + write_disposition: skip + description: Created by DLT. Tracks schema updates + schema_contract: {} + name: _dlt_version + resource: _dlt_version + blocks: + description: Ethereum blocks + x-annotation: this will be preserved on save + write_disposition: append + filters: + includes: [] + excludes: [] + columns: + _dlt_load_id: + nullable: false + description: load id coming from the extractor + data_type: text + name: _dlt_load_id + _dlt_id: + nullable: false + unique: true + data_type: text + name: _dlt_id + number: + nullable: false + primary_key: true + data_type: bigint + name: number + parent_hash: + nullable: true + data_type: text + name: parent_hash + hash: + nullable: false + cluster: true + unique: true + data_type: text + name: hash + base_fee_per_gas: + nullable: false + data_type: wei + name: base_fee_per_gas + difficulty: + nullable: false + data_type: wei + name: difficulty + extra_data: + nullable: true + data_type: text + name: extra_data + gas_limit: + nullable: false + data_type: bigint + name: gas_limit + gas_used: + nullable: false + data_type: bigint + name: gas_used + logs_bloom: + nullable: true + data_type: binary + name: logs_bloom + miner: + nullable: true + data_type: text + name: miner + mix_hash: + nullable: true + data_type: text + name: mix_hash + nonce: + nullable: true + data_type: text + name: nonce + receipts_root: + nullable: true + data_type: text + name: receipts_root + sha3_uncles: + nullable: true + data_type: text + name: sha3_uncles + size: + nullable: true + data_type: bigint + name: size + state_root: + nullable: false + data_type: text + name: state_root + timestamp: + nullable: false + unique: true + sort: true + data_type: timestamp + name: timestamp + total_difficulty: + nullable: true + data_type: wei + name: total_difficulty + transactions_root: + nullable: false + data_type: text + name: transactions_root + schema_contract: {} + name: blocks + resource: blocks + blocks__transactions: + parent: blocks + columns: + _dlt_id: + nullable: false + unique: true + data_type: text + name: _dlt_id + block_number: + nullable: false + primary_key: true + foreign_key: true + data_type: bigint + name: block_number + transaction_index: + nullable: false + primary_key: true + data_type: bigint + name: transaction_index + hash: + nullable: false + unique: true + data_type: text + name: hash + block_hash: + nullable: false + cluster: true + data_type: text + name: block_hash + block_timestamp: + nullable: false + sort: true + data_type: timestamp + name: block_timestamp + chain_id: + nullable: true + data_type: text + name: chain_id + from: + nullable: true + data_type: text + name: from + gas: + nullable: true + data_type: bigint + name: gas + gas_price: + nullable: true + data_type: bigint + name: gas_price + input: + nullable: true + data_type: text + name: input + max_fee_per_gas: + nullable: true + data_type: wei + name: max_fee_per_gas + max_priority_fee_per_gas: + nullable: true + data_type: wei + name: max_priority_fee_per_gas + nonce: + nullable: true + data_type: bigint + name: nonce + r: + nullable: true + data_type: text + name: r + s: + nullable: true + data_type: text + name: s + status: + nullable: true + data_type: bigint + name: status + to: + nullable: true + data_type: text + name: to + type: + nullable: true + data_type: text + name: type + v: + nullable: true + data_type: bigint + name: v + value: + nullable: false + data_type: wei + name: value + eth_value: + nullable: true + data_type: decimal + name: eth_value + name: blocks__transactions + blocks__transactions__logs: + parent: blocks__transactions + columns: + _dlt_id: + nullable: false + unique: true + data_type: text + name: _dlt_id + address: + nullable: false + data_type: text + name: address + block_timestamp: + nullable: false + sort: true + data_type: timestamp + name: block_timestamp + block_hash: + nullable: false + cluster: true + data_type: text + name: block_hash + block_number: + nullable: false + primary_key: true + foreign_key: true + data_type: bigint + name: block_number + transaction_index: + nullable: false + primary_key: true + foreign_key: true + data_type: bigint + name: transaction_index + log_index: + nullable: false + primary_key: true + data_type: bigint + name: log_index + data: + nullable: true + data_type: text + name: data + removed: + nullable: true + data_type: bool + name: removed + transaction_hash: + nullable: false + data_type: text + name: transaction_hash + name: blocks__transactions__logs + blocks__transactions__logs__topics: + parent: blocks__transactions__logs + columns: + _dlt_parent_id: + nullable: false + foreign_key: true + data_type: text + name: _dlt_parent_id + _dlt_list_idx: + nullable: false + data_type: bigint + name: _dlt_list_idx + _dlt_id: + nullable: false + unique: true + data_type: text + name: _dlt_id + _dlt_root_id: + nullable: false + root_key: true + data_type: text + name: _dlt_root_id + value: + nullable: true + data_type: text + name: value + name: blocks__transactions__logs__topics + blocks__transactions__access_list: + parent: blocks__transactions + columns: + _dlt_parent_id: + nullable: false + foreign_key: true + data_type: text + name: _dlt_parent_id + _dlt_list_idx: + nullable: false + data_type: bigint + name: _dlt_list_idx + _dlt_id: + nullable: false + unique: true + data_type: text + name: _dlt_id + _dlt_root_id: + nullable: false + root_key: true + data_type: text + name: _dlt_root_id + address: + nullable: true + data_type: text + name: address + name: blocks__transactions__access_list + blocks__transactions__access_list__storage_keys: + parent: blocks__transactions__access_list + columns: + _dlt_parent_id: + nullable: false + foreign_key: true + data_type: text + name: _dlt_parent_id + _dlt_list_idx: + nullable: false + data_type: bigint + name: _dlt_list_idx + _dlt_id: + nullable: false + unique: true + data_type: text + name: _dlt_id + _dlt_root_id: + nullable: false + root_key: true + data_type: text + name: _dlt_root_id + value: + nullable: true + data_type: text + name: value + name: blocks__transactions__access_list__storage_keys + blocks__uncles: + parent: blocks + columns: + _dlt_parent_id: + nullable: false + foreign_key: true + data_type: text + name: _dlt_parent_id + _dlt_list_idx: + nullable: false + data_type: bigint + name: _dlt_list_idx + _dlt_id: + nullable: false + unique: true + data_type: text + name: _dlt_id + _dlt_root_id: + nullable: false + root_key: true + data_type: text + name: _dlt_root_id + value: + nullable: true + data_type: text + name: value + name: blocks__uncles +settings: + default_hints: + foreign_key: + - _dlt_parent_id + not_null: + - re:^_dlt_id$ + - _dlt_root_id + - _dlt_parent_id + - _dlt_list_idx + unique: + - _dlt_id + cluster: + - block_hash + partition: + - block_timestamp + root_key: + - _dlt_root_id + preferred_types: + timestamp: timestamp + block_timestamp: timestamp + schema_contract: {} +normalizers: + names: dlt.common.normalizers.names.snake_case + json: + module: dlt.common.normalizers.json.relational + config: + generate_dlt_id: true + propagation: + root: + _dlt_id: _dlt_root_id + tables: + blocks: + timestamp: block_timestamp + hash: block_hash +previous_hashes: +- yjMtV4Zv0IJlfR5DPMwuXxGg8BRhy7E79L26XAHWEGE= + diff --git a/tests/common/schema/test_schema.py b/tests/common/schema/test_schema.py index 104b634491..3f6e5e5b93 100644 --- a/tests/common/schema/test_schema.py +++ b/tests/common/schema/test_schema.py @@ -132,7 +132,7 @@ def test_simple_regex_validator() -> None: def test_load_corrupted_schema() -> None: - eth_v4: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v7") + eth_v4: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v8") del eth_v4["tables"]["blocks"] with pytest.raises(ParentTableNotFoundException): utils.validate_stored_schema(eth_v4) @@ -308,9 +308,13 @@ def test_upgrade_engine_v1_schema() -> None: assert schema_dict["engine_version"] == 1 upgraded = utils.migrate_schema(schema_dict, from_engine=1, to_engine=7) assert upgraded["engine_version"] == 7 - utils.validate_stored_schema(upgraded) - # we should have an empty ancestors list after upgrade to 7 - assert upgraded["ancestors"] == [] + + + # upgrade 1 -> 8 + schema_dict = load_json_case("schemas/ev1/event.schema") + assert schema_dict["engine_version"] == 1 + upgraded = utils.migrate_schema(schema_dict, from_engine=1, to_engine=8) + assert upgraded["engine_version"] == 8 def test_unknown_engine_upgrade() -> None: @@ -590,8 +594,8 @@ def assert_new_schema_values(schema: Schema) -> None: assert schema.stored_version == 1 assert schema.stored_version_hash is not None assert schema.version_hash is not None - assert schema.ENGINE_VERSION == 7 - assert schema._stored_ancestors == [] + assert schema.ENGINE_VERSION == 8 + assert schema._stored_previous_hashes == [] assert len(schema.settings["default_hints"]) > 0 # check settings assert utils.standard_type_detections() == schema.settings["detections"] == schema._type_detections diff --git a/tests/common/schema/test_versioning.py b/tests/common/schema/test_versioning.py index a971c8c93f..401b463875 100644 --- a/tests/common/schema/test_versioning.py +++ b/tests/common/schema/test_versioning.py @@ -84,10 +84,10 @@ def test_infer_column_bumps_version() -> None: def test_preserve_version_on_load() -> None: - eth_v7: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v7") - version = eth_v7["version"] - version_hash = eth_v7["version_hash"] - schema = Schema.from_dict(eth_v7) # type: ignore[arg-type] + eth_v8: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v8") + version = eth_v8["version"] + version_hash = eth_v8["version_hash"] + schema = Schema.from_dict(eth_v8) # type: ignore[arg-type] # version should not be bumped assert version_hash == schema._stored_version_hash assert version_hash == schema.version_hash @@ -96,8 +96,8 @@ def test_preserve_version_on_load() -> None: @pytest.mark.parametrize("remove_defaults", [True, False]) def test_version_preserve_on_reload(remove_defaults: bool) -> None: - eth_v7: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v7") - schema = Schema.from_dict(eth_v7) # type: ignore[arg-type] + eth_v8: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v8") + schema = Schema.from_dict(eth_v8) # type: ignore[arg-type] to_save_dict = schema.to_dict(remove_defaults=remove_defaults) assert schema.stored_version == to_save_dict["version"] @@ -126,16 +126,16 @@ def test_version_preserve_on_reload(remove_defaults: bool) -> None: def test_create_ancestry() -> None: - eth_v7: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v7") - schema = Schema.from_dict(eth_v7) # type: ignore[arg-type] - assert schema._stored_ancestors == ["Q/LxiP7taycE+u9PQNb2wiit+G5GntiifOUK2CFM3sQ="] + eth_v8: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v8") + schema = Schema.from_dict(eth_v8) # type: ignore[arg-type] + assert schema._stored_previous_hashes == ["yjMtV4Zv0IJlfR5DPMwuXxGg8BRhy7E79L26XAHWEGE="] version = schema._stored_version # modify save and load schema 15 times and check ancestry - expected_ancestors = ["Q/LxiP7taycE+u9PQNb2wiit+G5GntiifOUK2CFM3sQ="] + expected_previous_hashes = ["yjMtV4Zv0IJlfR5DPMwuXxGg8BRhy7E79L26XAHWEGE="] for i in range(1,15): - # keep expected ancestors - expected_ancestors.insert(0, schema._stored_version_hash) + # keep expected previous_hashes + expected_previous_hashes.insert(0, schema._stored_version_hash) # update schema row = {f"float{i}": 78172.128} @@ -144,10 +144,10 @@ def test_create_ancestry() -> None: schema_dict = schema.to_dict() schema = Schema.from_stored_schema(schema_dict) - assert schema._stored_ancestors == expected_ancestors[:10] + assert schema._stored_previous_hashes == expected_previous_hashes[:10] assert schema._stored_version == version + i - # we never have more than 10 ancestors - assert len(schema._stored_ancestors) == i + 1 if i + 1 <= 10 else 10 + # we never have more than 10 previous_hashes + assert len(schema._stored_previous_hashes) == i + 1 if i + 1 <= 10 else 10 - assert len(schema._stored_ancestors) == 10 \ No newline at end of file + assert len(schema._stored_previous_hashes) == 10 \ No newline at end of file diff --git a/tests/common/storages/test_schema_storage.py b/tests/common/storages/test_schema_storage.py index a577729e5d..401c22f0bc 100644 --- a/tests/common/storages/test_schema_storage.py +++ b/tests/common/storages/test_schema_storage.py @@ -11,7 +11,7 @@ from dlt.common.storages import SchemaStorageConfiguration, SchemaStorage, LiveSchemaStorage, FileStorage from tests.utils import autouse_test_storage, TEST_STORAGE_ROOT -from tests.common.utils import load_yml_case, yml_case_path, COMMON_TEST_CASES_PATH, IMPORTED_VERSION_HASH_ETH_V7 +from tests.common.utils import load_yml_case, yml_case_path, COMMON_TEST_CASES_PATH, IMPORTED_VERSION_HASH_ETH_V8 @pytest.fixture @@ -87,7 +87,7 @@ def test_skip_import_if_not_modified(synced_storage: SchemaStorage, storage: Sch assert storage_schema.version == reloaded_schema.stored_version assert storage_schema.version_hash == reloaded_schema.stored_version_hash assert storage_schema._imported_version_hash == reloaded_schema._imported_version_hash - assert storage_schema.ancestors == reloaded_schema.ancestors + assert storage_schema.previous_hashes == reloaded_schema.previous_hashes # the import schema gets modified storage_schema.tables["_dlt_loads"]["write_disposition"] = "append" storage_schema.tables.pop("event_user") @@ -100,7 +100,7 @@ def test_skip_import_if_not_modified(synced_storage: SchemaStorage, storage: Sch # hash and ancestry stay the same assert reloaded_schema._imported_version_hash == storage_schema.version_hash - assert storage_schema.ancestors == reloaded_schema.ancestors + assert storage_schema.previous_hashes == reloaded_schema.previous_hashes # but original version has increased assert reloaded_schema.stored_version == storage_schema.version + 1 @@ -199,13 +199,13 @@ def test_save_store_schema_over_import(ie_storage: SchemaStorage) -> None: ie_storage.save_schema(schema) assert schema.version_hash == schema_hash # we linked schema to import schema - assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V7 + assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V8 # load schema and make sure our new schema is here schema = ie_storage.load_schema("ethereum") - assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V7 + assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V8 assert schema._stored_version_hash == schema_hash assert schema.version_hash == schema_hash - assert schema.ancestors == [] + assert schema.previous_hashes == [] # we have simple schema in export folder fs = FileStorage(ie_storage.config.export_schema_path) exported_name = ie_storage._file_name_in_store("ethereum", "yaml") @@ -219,13 +219,13 @@ def test_save_store_schema_over_import_sync(synced_storage: SchemaStorage) -> No schema = Schema("ethereum") schema_hash = schema.version_hash synced_storage.save_schema(schema) - assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V7 + assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V8 # import schema is overwritten fs = FileStorage(synced_storage.config.import_schema_path) exported_name = synced_storage._file_name_in_store("ethereum", "yaml") exported_schema = yaml.safe_load(fs.load(exported_name)) assert schema.version_hash == exported_schema["version_hash"] == schema_hash - assert schema.ancestors == [] + assert schema.previous_hashes == [] # when it is loaded we will import schema again which is identical to the current one but the import link # will be set to itself schema = synced_storage.load_schema("ethereum") @@ -276,18 +276,18 @@ def test_schema_from_file() -> None: def prepare_import_folder(storage: SchemaStorage) -> None: - shutil.copy(yml_case_path("schemas/eth/ethereum_schema_v7"), os.path.join(storage.storage.storage_path, "../import/ethereum.schema.yaml")) + shutil.copy(yml_case_path("schemas/eth/ethereum_schema_v8"), os.path.join(storage.storage.storage_path, "../import/ethereum.schema.yaml")) def assert_schema_imported(synced_storage: SchemaStorage, storage: SchemaStorage) -> Schema: prepare_import_folder(synced_storage) - eth_v6: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v7") + eth_V8: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v8") schema = synced_storage.load_schema("ethereum") # is linked to imported schema - schema._imported_version_hash = eth_v6["version_hash"] + schema._imported_version_hash = eth_V8["version_hash"] # also was saved in storage assert synced_storage.has_schema("ethereum") # and has link to imported schema s well (load without import) schema = storage.load_schema("ethereum") - assert schema._imported_version_hash == eth_v6["version_hash"] + assert schema._imported_version_hash == eth_V8["version_hash"] return schema diff --git a/tests/common/test_validation.py b/tests/common/test_validation.py index d35adc8c7b..f274c82014 100644 --- a/tests/common/test_validation.py +++ b/tests/common/test_validation.py @@ -89,7 +89,7 @@ def test_doc() -> TTestRecord: def test_validate_schema_cases() -> None: - with open("tests/common/cases/schemas/eth/ethereum_schema_v7.yml", mode="r", encoding="utf-8") as f: + with open("tests/common/cases/schemas/eth/ethereum_schema_v8.yml", mode="r", encoding="utf-8") as f: schema_dict: TStoredSchema = yaml.safe_load(f) validate_dict_ignoring_xkeys( diff --git a/tests/common/utils.py b/tests/common/utils.py index d612dcbdcf..db9a8318fb 100644 --- a/tests/common/utils.py +++ b/tests/common/utils.py @@ -16,7 +16,7 @@ COMMON_TEST_CASES_PATH = "./tests/common/cases/" # for import schema tests, change when upgrading the schema version -IMPORTED_VERSION_HASH_ETH_V7 = "yjMtV4Zv0IJlfR5DPMwuXxGg8BRhy7E79L26XAHWEGE=" +IMPORTED_VERSION_HASH_ETH_V8 = "C5An8WClbavalXDdNSqXbdI7Swqh/mTWMcwWKCF//EE=" # test sentry DSN TEST_SENTRY_DSN = "https://797678dd0af64b96937435326c7d30c1@o1061158.ingest.sentry.io/4504306172821504" # preserve secrets path to be able to restore it diff --git a/tests/extract/test_decorators.py b/tests/extract/test_decorators.py index 28f3d34dcf..27cdc3d22d 100644 --- a/tests/extract/test_decorators.py +++ b/tests/extract/test_decorators.py @@ -28,7 +28,7 @@ SourceDataIsNone, SourceIsAClassTypeError, SourceNotAFunction, SourceSchemaNotAvailable) from dlt.extract.typing import TableNameMeta -from tests.common.utils import IMPORTED_VERSION_HASH_ETH_V7 +from tests.common.utils import IMPORTED_VERSION_HASH_ETH_V8 def test_none_returning_source() -> None: @@ -75,7 +75,7 @@ def test_load_schema_for_callable() -> None: schema = s.schema assert schema.name == "ethereum" == s.name # the schema in the associated file has this hash - assert schema.stored_version_hash == IMPORTED_VERSION_HASH_ETH_V7 + assert schema.stored_version_hash == IMPORTED_VERSION_HASH_ETH_V8 def test_unbound_parametrized_transformer() -> None: diff --git a/tests/extract/test_extract.py b/tests/extract/test_extract.py index 8259483088..a045dd4f3c 100644 --- a/tests/extract/test_extract.py +++ b/tests/extract/test_extract.py @@ -14,7 +14,7 @@ def test_extract_select_tables() -> None: def expect_tables(resource: DltResource) -> dlt.Schema: # delete files clean_test_storage() - source = DltSource("module", dlt.Schema("selectables"), [resource(10)]) + source = DltSource(dlt.Schema("selectables"), "module", [resource(10)]) schema = source.discover_schema() storage = ExtractorStorage(NormalizeStorageConfiguration()) @@ -37,7 +37,7 @@ def expect_tables(resource: DltResource) -> dlt.Schema: clean_test_storage() storage = ExtractorStorage(NormalizeStorageConfiguration()) # same thing but select only odd - source = DltSource("module", dlt.Schema("selectables"), [resource]) + source = DltSource(dlt.Schema("selectables"), "module", [resource]) source = source.with_resources(resource.name) source.selected_resources[resource.name].bind(10).select_tables("odd_table") extract_id = storage.create_extract_id() @@ -80,7 +80,7 @@ def input_gen(): yield from [1, 2, 3] input_r = DltResource.from_data(input_gen) - source = DltSource("module", dlt.Schema("selectables"), [input_r, input_r.with_name("gen_clone")]) + source = DltSource(dlt.Schema("selectables"), "module", [input_r, input_r.with_name("gen_clone")]) storage = ExtractorStorage(NormalizeStorageConfiguration()) extract_id = storage.create_extract_id() extract(extract_id, source, storage) @@ -99,7 +99,7 @@ def tx_step(item): input_r = DltResource.from_data(input_gen) input_tx = DltResource.from_data(tx_step, data_from=DltResource.Empty) - source = DltSource("module", dlt.Schema("selectables"), [input_r, (input_r | input_tx).with_name("tx_clone")]) + source = DltSource(dlt.Schema("selectables"), "module", [input_r, (input_r | input_tx).with_name("tx_clone")]) storage = ExtractorStorage(NormalizeStorageConfiguration()) extract_id = storage.create_extract_id() extract(extract_id, source, storage) diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index 595c67f7c6..ec28018add 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -694,14 +694,14 @@ def child(item): # create a source where we place only child child.write_disposition = "replace" - s = DltSource("comp", "section", Schema("comp"), [child]) + s = DltSource(Schema("comp"), "section", [child]) # but extracted resources will include its parent where it derives write disposition from child extracted = s.resources.extracted assert extracted[child.name].write_disposition == "replace" assert extracted[child._pipe.parent.name].write_disposition == "replace" # create a source where we place parent explicitly - s = DltSource("section", Schema("comp"), [parent_r, child]) + s = DltSource(Schema("comp"), "section", [parent_r, child]) extracted = s.resources.extracted assert extracted[child.name].write_disposition == "replace" # now parent exists separately and has its own write disposition @@ -722,7 +722,7 @@ def child(item): # now we add child that has parent_r as parent but we add another instance of standalone_some_data explicitly # so we have a resource with the same name as child parent but the pipe instance is different - s = DltSource("section", Schema("comp"), [standalone_some_data(now), child]) + s = DltSource(Schema("comp"), "section", [standalone_some_data(now), child]) assert extracted[child.name].write_disposition == "replace" # now parent exists separately and has its own write disposition - because we search by name to identify matching resource assert extracted[child._pipe.parent.name].write_disposition == "append" diff --git a/tests/extract/test_sources.py b/tests/extract/test_sources.py index e9f7b4a8f1..aae95e0a3f 100644 --- a/tests/extract/test_sources.py +++ b/tests/extract/test_sources.py @@ -55,7 +55,7 @@ def parametrized(p1, /, p2, *, p3 = None): # as part of the source r = DltResource.from_data(parametrized) - s = DltSource("module", Schema("source"), [r]) + s = DltSource(Schema("source"), "module", [r]) with pytest.raises(ParametrizedResourceUnbound) as py_ex: list(s) @@ -1014,7 +1014,7 @@ def some_data(): yield [1, 2, 3] yield [1, 2, 3] - s = DltSource("module", Schema("source"), [dlt.resource(some_data())]) + s = DltSource(Schema("source"), "module", [dlt.resource(some_data())]) assert s.exhausted is False assert list(s) == [1, 2, 3, 1, 2, 3] assert s.exhausted is True @@ -1028,19 +1028,19 @@ def test_exhausted_property() -> None: # this example will be exhausted after iteration def open_generator_data(): yield from [1, 2, 3, 4] - s = DltSource("module", Schema("source"), [dlt.resource(open_generator_data())]) + s = DltSource(Schema("source"), "module", [dlt.resource(open_generator_data())]) assert s.exhausted is False assert next(iter(s)) == 1 assert s.exhausted is True # lists will not exhaust - s = DltSource("module", Schema("source"), [dlt.resource([1, 2, 3, 4], table_name="table", name="resource")]) + s = DltSource(Schema("source"), "module", [dlt.resource([1, 2, 3, 4], table_name="table", name="resource")]) assert s.exhausted is False assert next(iter(s)) == 1 assert s.exhausted is False # iterators will not exhaust - s = DltSource("module", Schema("source"), [dlt.resource(iter([1, 2, 3, 4]), table_name="table", name="resource")]) + s = DltSource(Schema("source"), "module", [dlt.resource(iter([1, 2, 3, 4]), table_name="table", name="resource")]) assert s.exhausted is False assert next(iter(s)) == 1 assert s.exhausted is False @@ -1048,7 +1048,7 @@ def open_generator_data(): # having on exhausted generator resource will make the whole source exhausted def open_generator_data(): # type: ignore[no-redef] yield from [1, 2, 3, 4] - s = DltSource("module", Schema("source"), [ dlt.resource([1, 2, 3, 4], table_name="table", name="resource"), dlt.resource(open_generator_data())]) + s = DltSource(Schema("source"), "module", [ dlt.resource([1, 2, 3, 4], table_name="table", name="resource"), dlt.resource(open_generator_data())]) assert s.exhausted is False # execute the whole source @@ -1239,7 +1239,7 @@ def tx_step(item): input_r_clone = input_r.with_name("input_gen_2") # separate resources have separate pipe instances - source = DltSource("module", Schema("dupes"), [input_r, input_r_clone]) + source = DltSource(Schema("dupes"), "module", [input_r, input_r_clone]) pipes = source.resources.pipes assert len(pipes) == 2 assert pipes[0].name == "input_gen" @@ -1250,13 +1250,13 @@ def tx_step(item): assert list(source) == [1, 2, 3, 1, 2, 3] # cloned from fresh resource - source = DltSource("module", Schema("dupes"), [DltResource.from_data(input_gen), DltResource.from_data(input_gen).with_name("gen_2")]) + source = DltSource(Schema("dupes"), "module", [DltResource.from_data(input_gen), DltResource.from_data(input_gen).with_name("gen_2")]) assert list(source) == [1, 2, 3, 1, 2, 3] # clone transformer input_r = DltResource.from_data(input_gen) input_tx = DltResource.from_data(tx_step, data_from=DltResource.Empty) - source = DltSource("module", Schema("dupes"), [input_r, (input_r | input_tx).with_name("tx_clone")]) + source = DltSource(Schema("dupes"), "module", [input_r, (input_r | input_tx).with_name("tx_clone")]) pipes = source.resources.pipes assert len(pipes) == 2 assert source.resources[pipes[0].name] == source.input_gen diff --git a/tests/load/pipeline/test_restore_state.py b/tests/load/pipeline/test_restore_state.py index c9c6c4c437..1ebb3378a6 100644 --- a/tests/load/pipeline/test_restore_state.py +++ b/tests/load/pipeline/test_restore_state.py @@ -18,7 +18,7 @@ from tests.utils import TEST_STORAGE_ROOT from tests.cases import JSON_TYPED_DICT, JSON_TYPED_DICT_DECODED -from tests.common.utils import IMPORTED_VERSION_HASH_ETH_V7, yml_case_path as common_yml_case_path +from tests.common.utils import IMPORTED_VERSION_HASH_ETH_V8, yml_case_path as common_yml_case_path from tests.common.configuration.utils import environment from tests.load.pipeline.utils import assert_query_data, drop_active_pipeline_data from tests.load.utils import destinations_configs, DestinationTestConfiguration, get_normalized_dataset_name @@ -404,7 +404,7 @@ def test_restore_schemas_while_import_schemas_exist(destination_config: Destinat assert normalized_annotations in schema.tables # check if attached to import schema - assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V7 + assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V8 # extract some data with restored pipeline p.run(["C", "D", "E"], table_name="blacklist") assert normalized_labels in schema.tables diff --git a/tests/load/weaviate/test_naming.py b/tests/load/weaviate/test_naming.py index 850f70ee19..dad7fc176f 100644 --- a/tests/load/weaviate/test_naming.py +++ b/tests/load/weaviate/test_naming.py @@ -87,13 +87,13 @@ def test_reserved_property_names() -> None: # print(schema_2.name) # print(schema_2.naming) -# eth_v6 = load_yml_case("schemas/eth/ethereum_schema_v7") -# eth_v6_schema = dlt.Schema.from_dict(eth_v6) +# eth_V8 = load_yml_case("schemas/eth/ethereum_schema_v8") +# eth_V8_schema = dlt.Schema.from_dict(eth_V8) -# pipeline.extract(s, schema=eth_v6_schema) +# pipeline.extract(s, schema=eth_V8_schema) -# print(eth_v6_schema.data_tables()) -# print(eth_v6_schema.dlt_tables()) +# print(eth_V8_schema.data_tables()) +# print(eth_V8_schema.dlt_tables()) # def test_x_schema_naming_normalize() -> None: @@ -101,14 +101,14 @@ def test_reserved_property_names() -> None: # print(pipeline.dataset_name) # s = small() -# eth_v6 = load_yml_case("schemas/eth/ethereum_schema_v7") -# eth_v6_schema = dlt.Schema.from_dict(eth_v6) +# eth_V8 = load_yml_case("schemas/eth/ethereum_schema_v8") +# eth_V8_schema = dlt.Schema.from_dict(eth_V8) -# pipeline.extract(s, schema=eth_v6_schema) -# print(eth_v6_schema.tables.keys()) +# pipeline.extract(s, schema=eth_V8_schema) +# print(eth_V8_schema.tables.keys()) # default_schema = pipeline.default_schema # print(default_schema.name) -# print(eth_v6_schema.tables.keys()) +# print(eth_V8_schema.tables.keys()) # pipeline.run(s, destination="weaviate") # print(default_schema.tables.keys()) diff --git a/tests/pipeline/test_dlt_versions.py b/tests/pipeline/test_dlt_versions.py index 7ac7dcbb34..17861d0bc2 100644 --- a/tests/pipeline/test_dlt_versions.py +++ b/tests/pipeline/test_dlt_versions.py @@ -81,7 +81,7 @@ def test_pipeline_with_dlt_update(test_storage: FileStorage) -> None: pipeline.sync_destination() # print(pipeline.working_dir) # we have updated schema - assert pipeline.default_schema.ENGINE_VERSION == 7 + assert pipeline.default_schema.ENGINE_VERSION == 8 # make sure that schema hash retrieved from the destination is exactly the same as the schema hash that was in storage before the schema was wiped assert pipeline.default_schema.stored_version_hash == github_schema["version_hash"] @@ -114,6 +114,6 @@ def test_load_package_with_dlt_update(test_storage: FileStorage) -> None: github_schema = json.loads(test_storage.load(f".dlt/pipelines/{GITHUB_PIPELINE_NAME}/schemas/github.schema.json")) pipeline = pipeline.drop() pipeline.sync_destination() - assert pipeline.default_schema.ENGINE_VERSION == 7 + assert pipeline.default_schema.ENGINE_VERSION == 8 # schema version does not match `dlt.attach` does not update to the right schema by itself assert pipeline.default_schema.stored_version_hash != github_schema["version_hash"] diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index e761186d3a..af21eb9f81 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -274,7 +274,7 @@ def some_data(): yield [1, 2, 3] yield [1, 2, 3] - s = DltSource("module", dlt.Schema("source"), [dlt.resource(some_data())]) + s = DltSource(dlt.Schema("source"), "module", [dlt.resource(some_data())]) dlt.pipeline().extract(s) with pytest.raises(PipelineStepFailed) as py_ex: dlt.pipeline().extract(s) @@ -289,7 +289,7 @@ def test_disable_enable_state_sync(environment: Any) -> None: def some_data(): yield [1, 2, 3] - s = DltSource("module", dlt.Schema("default"), [dlt.resource(some_data())]) + s = DltSource(dlt.Schema("default"), "module", [dlt.resource(some_data())]) dlt.pipeline().extract(s) storage = ExtractorStorage(p._normalize_storage_config) assert len(storage.list_files_to_normalize_sorted()) == 1 @@ -299,14 +299,14 @@ def some_data(): p.config.restore_from_destination = True # extract to different schema, state must go to default schema - s = DltSource("module", dlt.Schema("default_2"), [dlt.resource(some_data())]) + s = DltSource(dlt.Schema("default_2"), "module", [dlt.resource(some_data())]) dlt.pipeline().extract(s) expect_extracted_file(storage, "default", s.schema.state_table_name, "***") def test_extract_multiple_sources() -> None: - s1 = DltSource("module", dlt.Schema("default"), [dlt.resource([1, 2, 3], name="resource_1"), dlt.resource([3, 4, 5], name="resource_2")]) - s2 = DltSource("module", dlt.Schema("default_2"), [dlt.resource([6, 7, 8], name="resource_3"), dlt.resource([9, 10, 0], name="resource_4")]) + s1 = DltSource(dlt.Schema("default"), "module", [dlt.resource([1, 2, 3], name="resource_1"), dlt.resource([3, 4, 5], name="resource_2")]) + s2 = DltSource(dlt.Schema("default_2"),"module", [dlt.resource([6, 7, 8], name="resource_3"), dlt.resource([9, 10, 0], name="resource_4")]) p = dlt.pipeline(destination="dummy") p.config.restore_from_destination = False @@ -325,8 +325,8 @@ def test_extract_multiple_sources() -> None: def i_fail(): raise NotImplementedError() - s3 = DltSource("module", dlt.Schema("default_3"), [dlt.resource([1, 2, 3], name="resource_1"), dlt.resource([3, 4, 5], name="resource_2")]) - s4 = DltSource("module", dlt.Schema("default_4"), [dlt.resource([6, 7, 8], name="resource_3"), i_fail]) + s3 = DltSource(dlt.Schema("default_3"), "module", [dlt.resource([1, 2, 3], name="resource_1"), dlt.resource([3, 4, 5], name="resource_2")]) + s4 = DltSource(dlt.Schema("default_4"), "module", [dlt.resource([6, 7, 8], name="resource_3"), i_fail]) with pytest.raises(PipelineStepFailed): # NOTE: if you swap s3 and s4 the test on list_schemas will fail: s3 will extract normally and update live schemas, s4 will break exec later diff --git a/tests/pipeline/test_pipeline_trace.py b/tests/pipeline/test_pipeline_trace.py index 793e51b909..5644a32d2f 100644 --- a/tests/pipeline/test_pipeline_trace.py +++ b/tests/pipeline/test_pipeline_trace.py @@ -298,12 +298,12 @@ def data(): def test_extract_data_describe() -> None: schema = Schema("test") - assert describe_extract_data(DltSource("sect", schema)) == [{"name": "test", "data_type": "source"}] + assert describe_extract_data(DltSource(schema, "sect")) == [{"name": "test", "data_type": "source"}] assert describe_extract_data(DltResource(Pipe("rrr_extract"), None, False)) == [{"name": "rrr_extract", "data_type": "resource"}] - assert describe_extract_data([DltSource("sect", schema)]) == [{"name": "test", "data_type": "source"}] + assert describe_extract_data([DltSource(schema, "sect")]) == [{"name": "test", "data_type": "source"}] assert describe_extract_data([DltResource(Pipe("rrr_extract"), None, False)]) == [{"name": "rrr_extract", "data_type": "resource"}] assert describe_extract_data( - [DltResource(Pipe("rrr_extract"), None, False), DltSource("sect", schema)] + [DltResource(Pipe("rrr_extract"), None, False), DltSource(schema, "sect")] ) == [ {"name": "rrr_extract", "data_type": "resource"}, {"name": "test", "data_type": "source"} ] @@ -313,7 +313,7 @@ def test_extract_data_describe() -> None: assert describe_extract_data([DataFrame(), {"a": "b"}]) == [{"name": "", "data_type": "DataFrame"}] # first unnamed element in the list breaks checking info assert describe_extract_data( - [DltResource(Pipe("rrr_extract"), None, False), DataFrame(), DltSource("sect", schema)] + [DltResource(Pipe("rrr_extract"), None, False), DataFrame(), DltSource(schema, "sect")] ) == [ {"name": "rrr_extract", "data_type": "resource"}, {"name": "", "data_type": "DataFrame"} ]