From 28dbba6e8e4dde3da8aa5516091487ed01119613 Mon Sep 17 00:00:00 2001 From: David Scharf Date: Tue, 21 Nov 2023 19:18:59 +0100 Subject: [PATCH] source and schema changes (#769) * add schema ancestors * remove name attribute and init arg from dltsource * fix 2 tests * fix statekey related errors * pr fixes * revert changes on validate dict * fix one test --- dlt/common/schema/schema.py | 13 +- dlt/common/schema/typing.py | 3 +- dlt/common/schema/utils.py | 15 +- dlt/common/validation.py | 2 + dlt/extract/decorators.py | 9 +- dlt/extract/source.py | 19 +- dlt/pipeline/pipeline.py | 6 +- .../cases/schemas/eth/ethereum_schema_v8.yml | 461 ++++++++++++++++++ tests/common/schema/test_schema.py | 25 +- tests/common/schema/test_versioning.py | 41 +- tests/common/storages/test_schema_storage.py | 23 +- tests/common/test_validation.py | 4 +- tests/common/utils.py | 2 +- tests/extract/test_decorators.py | 4 +- tests/extract/test_extract.py | 8 +- tests/extract/test_incremental.py | 6 +- tests/extract/test_sources.py | 18 +- tests/load/pipeline/test_restore_state.py | 4 +- tests/load/weaviate/test_naming.py | 20 +- tests/pipeline/test_dlt_versions.py | 6 +- tests/pipeline/test_pipeline.py | 14 +- tests/pipeline/test_pipeline_trace.py | 10 +- 22 files changed, 620 insertions(+), 93 deletions(-) create mode 100644 tests/common/cases/schemas/eth/ethereum_schema_v8.yml diff --git a/dlt/common/schema/schema.py b/dlt/common/schema/schema.py index 67ae345845..bcfba11c61 100644 --- a/dlt/common/schema/schema.py +++ b/dlt/common/schema/schema.py @@ -44,6 +44,7 @@ class Schema: _dlt_tables_prefix: str _stored_version: int # version at load/creation time _stored_version_hash: str # version hash at load/creation time + _stored_previous_hashes: Optional[List[str]] # list of ancestor hashes of the schema _imported_version_hash: str # version hash of recently imported schema _schema_description: str # optional schema description _schema_tables: TSchemaTables @@ -101,7 +102,8 @@ def to_dict(self, remove_defaults: bool = False, bump_version: bool = True) -> T "name": self._schema_name, "tables": self._schema_tables, "settings": self._settings, - "normalizers": self._normalizers_config + "normalizers": self._normalizers_config, + "previous_hashes": self._stored_previous_hashes } if self._imported_version_hash and not remove_defaults: stored_schema["imported_version_hash"] = self._imported_version_hash @@ -353,7 +355,7 @@ def bump_version(self) -> Tuple[int, str]: Returns: Tuple[int, str]: Current (``stored_version``, ``stored_version_hash``) tuple """ - self._stored_version, self._stored_version_hash, _ = utils.bump_version_if_modified(self.to_dict(bump_version=False)) + self._stored_version, self._stored_version_hash, _, _ = utils.bump_version_if_modified(self.to_dict(bump_version=False)) return self._stored_version, self._stored_version_hash def filter_row_with_hint(self, table_name: str, hint_type: TColumnHint, row: StrAny) -> StrAny: @@ -475,6 +477,11 @@ def version_hash(self) -> str: """Current version hash of the schema, recomputed from the actual content""" return utils.bump_version_if_modified(self.to_dict())[1] + @property + def previous_hashes(self) -> List[str]: + """Current version hash of the schema, recomputed from the actual content""" + return utils.bump_version_if_modified(self.to_dict())[3] + @property def stored_version_hash(self) -> str: """Version hash of the schema content form the time of schema loading/creation.""" @@ -663,6 +670,7 @@ def _reset_schema(self, name: str, normalizers: TNormalizersConfig = None) -> No self._stored_version_hash: str = None self._imported_version_hash: str = None self._schema_description: str = None + self._stored_previous_hashes: List[str] = [] self._settings: TSchemaSettings = {} self._compiled_preferred_types: List[Tuple[REPattern, TDataType]] = [] @@ -701,6 +709,7 @@ def _from_stored_schema(self, stored_schema: TStoredSchema) -> None: self._imported_version_hash = stored_schema.get("imported_version_hash") self._schema_description = stored_schema.get("description") self._settings = stored_schema.get("settings") or {} + self._stored_previous_hashes = stored_schema.get("previous_hashes") self._compile_settings() def _set_schema_name(self, name: str) -> None: diff --git a/dlt/common/schema/typing.py b/dlt/common/schema/typing.py index 720313b57b..1b6ef31800 100644 --- a/dlt/common/schema/typing.py +++ b/dlt/common/schema/typing.py @@ -11,7 +11,7 @@ # current version of schema engine -SCHEMA_ENGINE_VERSION = 7 +SCHEMA_ENGINE_VERSION = 8 # dlt tables VERSION_TABLE_NAME = "_dlt_version" @@ -123,6 +123,7 @@ class TStoredSchema(TypedDict, total=False): """TypeDict defining the schema representation in storage""" version: int version_hash: str + previous_hashes: List[str] imported_version_hash: Optional[str] engine_version: int name: str diff --git a/dlt/common/schema/utils.py b/dlt/common/schema/utils.py index 9b4e8fb047..b6a3cca0e2 100644 --- a/dlt/common/schema/utils.py +++ b/dlt/common/schema/utils.py @@ -134,7 +134,7 @@ def add_column_defaults(column: TColumnSchemaBase) -> TColumnSchema: # return copy(column) # type: ignore -def bump_version_if_modified(stored_schema: TStoredSchema) -> Tuple[int, str, str]: +def bump_version_if_modified(stored_schema: TStoredSchema) -> Tuple[int, str, str, List[str]]: """Bumps the `stored_schema` version and version hash if content modified, returns (new version, new hash, old hash) tuple""" hash_ = generate_version_hash(stored_schema) previous_hash = stored_schema.get("version_hash") @@ -143,8 +143,13 @@ def bump_version_if_modified(stored_schema: TStoredSchema) -> Tuple[int, str, st pass elif hash_ != previous_hash: stored_schema["version"] += 1 + # unshift previous hash to previous_hashes and limit array to 10 entries + if previous_hash not in stored_schema["previous_hashes"]: + stored_schema["previous_hashes"].insert(0, previous_hash) + stored_schema["previous_hashes"] = stored_schema["previous_hashes"][:10] + stored_schema["version_hash"] = hash_ - return stored_schema["version"], hash_, previous_hash + return stored_schema["version"], hash_, previous_hash, stored_schema["previous_hashes"] def generate_version_hash(stored_schema: TStoredSchema) -> str: @@ -153,6 +158,7 @@ def generate_version_hash(stored_schema: TStoredSchema) -> str: schema_copy.pop("version") schema_copy.pop("version_hash", None) schema_copy.pop("imported_version_hash", None) + schema_copy.pop("previous_hashes", None) # ignore order of elements when computing the hash content = json.dumps(schema_copy, sort_keys=True) h = hashlib.sha3_256(content.encode("utf-8")) @@ -240,6 +246,7 @@ def compile_simple_regexes(r: Iterable[TSimpleRegex]) -> REPattern: def validate_stored_schema(stored_schema: TStoredSchema) -> None: + # use lambda to verify only non extra fields validate_dict_ignoring_xkeys( spec=TStoredSchema, @@ -256,6 +263,7 @@ def validate_stored_schema(stored_schema: TStoredSchema) -> None: def migrate_schema(schema_dict: DictStrAny, from_engine: int, to_engine: int) -> TStoredSchema: + if from_engine == to_engine: return cast(TStoredSchema, schema_dict) @@ -349,6 +357,9 @@ def migrate_filters(group: str, filters: List[str]) -> None: if not table.get("parent"): table["schema_contract"] = {} from_engine = 7 + if from_engine == 7 and to_engine > 7: + schema_dict["previous_hashes"] = [] + from_engine = 8 schema_dict["engine_version"] = from_engine if from_engine != to_engine: diff --git a/dlt/common/validation.py b/dlt/common/validation.py index b746fda361..312371bbf1 100644 --- a/dlt/common/validation.py +++ b/dlt/common/validation.py @@ -22,6 +22,8 @@ def validate_dict(spec: Type[_TypedDict], doc: StrAny, path: str, filter_f: TFil validator_f (TCustomValidator, optional): A function to perform additional validation for types not covered by this function. It should return `True` if the validation passes. Defaults to a function that rejects all such types. + filter_required (TFilterFunc, optional): A function to filter out required fields, useful + for testing historic versions of dict that might now have certain fields yet. Raises: DictValidationException: If there are missing required fields, unexpected fields, diff --git a/dlt/extract/decorators.py b/dlt/extract/decorators.py index b8abbc1d57..1dbfcb4350 100644 --- a/dlt/extract/decorators.py +++ b/dlt/extract/decorators.py @@ -150,9 +150,6 @@ def decorator(f: Callable[TSourceFunParams, Any]) -> Callable[TSourceFunParams, if name and name != schema.name: raise ExplicitSourceNameInvalid(name, schema.name) - # the name of the source must be identical to the name of the schema - name = schema.name - # wrap source extraction function in configuration with section func_module = inspect.getmodule(f) source_section = section or _get_source_section_name(func_module) @@ -167,16 +164,16 @@ def _wrap(*args: Any, **kwargs: Any) -> TDltSourceImpl: # configurations will be accessed in this section in the source proxy = Container()[PipelineContext] pipeline_name = None if not proxy.is_active() else proxy.pipeline().pipeline_name - with inject_section(ConfigSectionContext(pipeline_name=pipeline_name, sections=source_sections, source_state_key=name)): + with inject_section(ConfigSectionContext(pipeline_name=pipeline_name, sections=source_sections, source_state_key=schema.name)): rv = conf_f(*args, **kwargs) if rv is None: - raise SourceDataIsNone(name) + raise SourceDataIsNone(schema.name) # if generator, consume it immediately if inspect.isgenerator(rv): rv = list(rv) # convert to source - s = _impl_cls.from_data(name, source_section, schema.clone(update_normalizers=True), rv) + s = _impl_cls.from_data(schema.clone(update_normalizers=True), source_section, rv) # apply hints if max_table_nesting is not None: s.max_table_nesting = max_table_nesting diff --git a/dlt/extract/source.py b/dlt/extract/source.py index 771e8ca0cc..0ff24d1f86 100644 --- a/dlt/extract/source.py +++ b/dlt/extract/source.py @@ -167,22 +167,17 @@ class DltSource(Iterable[TDataItem]): * You can use a `run` method to load the data with a default instance of dlt pipeline. * You can get source read only state for the currently active Pipeline instance """ - def __init__(self, name: str, section: str, schema: Schema, resources: Sequence[DltResource] = None) -> None: - self.name = name + def __init__(self, schema: Schema, section: str, resources: Sequence[DltResource] = None) -> None: self.section = section """Tells if iterator associated with a source is exhausted""" self._schema = schema self._resources: DltResourceDict = DltResourceDict(self.name, self.section) - if self.name != schema.name: - # raise ValueError(f"Schema name {schema.name} differs from source name {name}! The explicit source name argument is deprecated and will be soon removed.") - warnings.warn(f"Schema name {schema.name} differs from source name {name}! The explicit source name argument is deprecated and will be soon removed.") - if resources: self.resources.add(*resources) @classmethod - def from_data(cls, name: str, section: str, schema: Schema, data: Any) -> Self: + def from_data(cls, schema: Schema, section: str, data: Any) -> Self: """Converts any `data` supported by `dlt` `run` method into `dlt source` with a name `section`.`name` and `schema` schema.""" # creates source from various forms of data if isinstance(data, DltSource): @@ -194,10 +189,14 @@ def from_data(cls, name: str, section: str, schema: Schema, data: Any) -> Self: else: resources = [DltResource.from_data(data)] - return cls(name, section, schema, resources) + return cls(schema, section, resources) - # TODO: 4 properties below must go somewhere else ie. into RelationalSchema which is Schema + Relational normalizer. + @property + def name(self) -> str: + return self._schema.name + + # TODO: 4 properties below must go somewhere else ie. into RelationalSchema which is Schema + Relational normalizer. @property def max_table_nesting(self) -> int: """A schema hint that sets the maximum depth of nested table above which the remaining nodes are loaded as structs or JSON.""" @@ -328,7 +327,7 @@ def state(self) -> StrAny: def clone(self) -> "DltSource": """Creates a deep copy of the source where copies of schema, resources and pipes are created""" # mind that resources and pipes are cloned when added to the DltResourcesDict in the source constructor - return DltSource(self.name, self.section, self.schema.clone(), list(self._resources.values())) + return DltSource(self.schema.clone(), self.section, list(self._resources.values())) def __iter__(self) -> Iterator[TDataItem]: """Opens iterator that yields the data items from all the resources within the source in the same order as in Pipeline class. diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index b9eb958027..c893fd4e75 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -860,7 +860,7 @@ def append_data(data_item: Any) -> None: # do not set section to prevent source that represent a standalone resource # to overwrite other standalone resources (ie. parents) in that source sources.append( - DltSource(effective_schema.name, "", effective_schema, [data_item]) + DltSource(effective_schema, "", [data_item]) ) else: # iterator/iterable/generator @@ -881,7 +881,7 @@ def append_data(data_item: Any) -> None: # add all the appended resources in one source if resources: - sources.append(DltSource(effective_schema.name, self.pipeline_name, effective_schema, resources)) + sources.append(DltSource(effective_schema, self.pipeline_name, resources)) # apply hints and settings for source in sources: @@ -1293,7 +1293,7 @@ def _save_state(self, state: TPipelineState) -> None: def _extract_state(self, state: TPipelineState) -> TPipelineState: # this will extract the state into current load package and update the schema with the _dlt_pipeline_state table # note: the schema will be persisted because the schema saving decorator is over the state manager decorator for extract - state_source = DltSource(self.default_schema.name, self.pipeline_name, self.default_schema, [state_resource(state)]) + state_source = DltSource(self.default_schema, self.pipeline_name, [state_resource(state)]) storage = ExtractorStorage(self._normalize_storage_config) extract_id = extract_with_schema(storage, state_source, _NULL_COLLECTOR, 1, 1) storage.commit_extract_files(extract_id) diff --git a/tests/common/cases/schemas/eth/ethereum_schema_v8.yml b/tests/common/cases/schemas/eth/ethereum_schema_v8.yml new file mode 100644 index 0000000000..928c9a3e54 --- /dev/null +++ b/tests/common/cases/schemas/eth/ethereum_schema_v8.yml @@ -0,0 +1,461 @@ +version: 16 +version_hash: C5An8WClbavalXDdNSqXbdI7Swqh/mTWMcwWKCF//EE= +engine_version: 8 +name: ethereum +tables: + _dlt_loads: + columns: + load_id: + nullable: false + data_type: text + name: load_id + schema_name: + nullable: true + data_type: text + name: schema_name + status: + nullable: false + data_type: bigint + name: status + inserted_at: + nullable: false + data_type: timestamp + name: inserted_at + schema_version_hash: + nullable: true + data_type: text + name: schema_version_hash + write_disposition: skip + description: Created by DLT. Tracks completed loads + schema_contract: {} + name: _dlt_loads + resource: _dlt_loads + _dlt_version: + columns: + version: + nullable: false + data_type: bigint + name: version + engine_version: + nullable: false + data_type: bigint + name: engine_version + inserted_at: + nullable: false + data_type: timestamp + name: inserted_at + schema_name: + nullable: false + data_type: text + name: schema_name + version_hash: + nullable: false + data_type: text + name: version_hash + schema: + nullable: false + data_type: text + name: schema + write_disposition: skip + description: Created by DLT. Tracks schema updates + schema_contract: {} + name: _dlt_version + resource: _dlt_version + blocks: + description: Ethereum blocks + x-annotation: this will be preserved on save + write_disposition: append + filters: + includes: [] + excludes: [] + columns: + _dlt_load_id: + nullable: false + description: load id coming from the extractor + data_type: text + name: _dlt_load_id + _dlt_id: + nullable: false + unique: true + data_type: text + name: _dlt_id + number: + nullable: false + primary_key: true + data_type: bigint + name: number + parent_hash: + nullable: true + data_type: text + name: parent_hash + hash: + nullable: false + cluster: true + unique: true + data_type: text + name: hash + base_fee_per_gas: + nullable: false + data_type: wei + name: base_fee_per_gas + difficulty: + nullable: false + data_type: wei + name: difficulty + extra_data: + nullable: true + data_type: text + name: extra_data + gas_limit: + nullable: false + data_type: bigint + name: gas_limit + gas_used: + nullable: false + data_type: bigint + name: gas_used + logs_bloom: + nullable: true + data_type: binary + name: logs_bloom + miner: + nullable: true + data_type: text + name: miner + mix_hash: + nullable: true + data_type: text + name: mix_hash + nonce: + nullable: true + data_type: text + name: nonce + receipts_root: + nullable: true + data_type: text + name: receipts_root + sha3_uncles: + nullable: true + data_type: text + name: sha3_uncles + size: + nullable: true + data_type: bigint + name: size + state_root: + nullable: false + data_type: text + name: state_root + timestamp: + nullable: false + unique: true + sort: true + data_type: timestamp + name: timestamp + total_difficulty: + nullable: true + data_type: wei + name: total_difficulty + transactions_root: + nullable: false + data_type: text + name: transactions_root + schema_contract: {} + name: blocks + resource: blocks + blocks__transactions: + parent: blocks + columns: + _dlt_id: + nullable: false + unique: true + data_type: text + name: _dlt_id + block_number: + nullable: false + primary_key: true + foreign_key: true + data_type: bigint + name: block_number + transaction_index: + nullable: false + primary_key: true + data_type: bigint + name: transaction_index + hash: + nullable: false + unique: true + data_type: text + name: hash + block_hash: + nullable: false + cluster: true + data_type: text + name: block_hash + block_timestamp: + nullable: false + sort: true + data_type: timestamp + name: block_timestamp + chain_id: + nullable: true + data_type: text + name: chain_id + from: + nullable: true + data_type: text + name: from + gas: + nullable: true + data_type: bigint + name: gas + gas_price: + nullable: true + data_type: bigint + name: gas_price + input: + nullable: true + data_type: text + name: input + max_fee_per_gas: + nullable: true + data_type: wei + name: max_fee_per_gas + max_priority_fee_per_gas: + nullable: true + data_type: wei + name: max_priority_fee_per_gas + nonce: + nullable: true + data_type: bigint + name: nonce + r: + nullable: true + data_type: text + name: r + s: + nullable: true + data_type: text + name: s + status: + nullable: true + data_type: bigint + name: status + to: + nullable: true + data_type: text + name: to + type: + nullable: true + data_type: text + name: type + v: + nullable: true + data_type: bigint + name: v + value: + nullable: false + data_type: wei + name: value + eth_value: + nullable: true + data_type: decimal + name: eth_value + name: blocks__transactions + blocks__transactions__logs: + parent: blocks__transactions + columns: + _dlt_id: + nullable: false + unique: true + data_type: text + name: _dlt_id + address: + nullable: false + data_type: text + name: address + block_timestamp: + nullable: false + sort: true + data_type: timestamp + name: block_timestamp + block_hash: + nullable: false + cluster: true + data_type: text + name: block_hash + block_number: + nullable: false + primary_key: true + foreign_key: true + data_type: bigint + name: block_number + transaction_index: + nullable: false + primary_key: true + foreign_key: true + data_type: bigint + name: transaction_index + log_index: + nullable: false + primary_key: true + data_type: bigint + name: log_index + data: + nullable: true + data_type: text + name: data + removed: + nullable: true + data_type: bool + name: removed + transaction_hash: + nullable: false + data_type: text + name: transaction_hash + name: blocks__transactions__logs + blocks__transactions__logs__topics: + parent: blocks__transactions__logs + columns: + _dlt_parent_id: + nullable: false + foreign_key: true + data_type: text + name: _dlt_parent_id + _dlt_list_idx: + nullable: false + data_type: bigint + name: _dlt_list_idx + _dlt_id: + nullable: false + unique: true + data_type: text + name: _dlt_id + _dlt_root_id: + nullable: false + root_key: true + data_type: text + name: _dlt_root_id + value: + nullable: true + data_type: text + name: value + name: blocks__transactions__logs__topics + blocks__transactions__access_list: + parent: blocks__transactions + columns: + _dlt_parent_id: + nullable: false + foreign_key: true + data_type: text + name: _dlt_parent_id + _dlt_list_idx: + nullable: false + data_type: bigint + name: _dlt_list_idx + _dlt_id: + nullable: false + unique: true + data_type: text + name: _dlt_id + _dlt_root_id: + nullable: false + root_key: true + data_type: text + name: _dlt_root_id + address: + nullable: true + data_type: text + name: address + name: blocks__transactions__access_list + blocks__transactions__access_list__storage_keys: + parent: blocks__transactions__access_list + columns: + _dlt_parent_id: + nullable: false + foreign_key: true + data_type: text + name: _dlt_parent_id + _dlt_list_idx: + nullable: false + data_type: bigint + name: _dlt_list_idx + _dlt_id: + nullable: false + unique: true + data_type: text + name: _dlt_id + _dlt_root_id: + nullable: false + root_key: true + data_type: text + name: _dlt_root_id + value: + nullable: true + data_type: text + name: value + name: blocks__transactions__access_list__storage_keys + blocks__uncles: + parent: blocks + columns: + _dlt_parent_id: + nullable: false + foreign_key: true + data_type: text + name: _dlt_parent_id + _dlt_list_idx: + nullable: false + data_type: bigint + name: _dlt_list_idx + _dlt_id: + nullable: false + unique: true + data_type: text + name: _dlt_id + _dlt_root_id: + nullable: false + root_key: true + data_type: text + name: _dlt_root_id + value: + nullable: true + data_type: text + name: value + name: blocks__uncles +settings: + default_hints: + foreign_key: + - _dlt_parent_id + not_null: + - re:^_dlt_id$ + - _dlt_root_id + - _dlt_parent_id + - _dlt_list_idx + unique: + - _dlt_id + cluster: + - block_hash + partition: + - block_timestamp + root_key: + - _dlt_root_id + preferred_types: + timestamp: timestamp + block_timestamp: timestamp + schema_contract: {} +normalizers: + names: dlt.common.normalizers.names.snake_case + json: + module: dlt.common.normalizers.json.relational + config: + generate_dlt_id: true + propagation: + root: + _dlt_id: _dlt_root_id + tables: + blocks: + timestamp: block_timestamp + hash: block_hash +previous_hashes: +- yjMtV4Zv0IJlfR5DPMwuXxGg8BRhy7E79L26XAHWEGE= + diff --git a/tests/common/schema/test_schema.py b/tests/common/schema/test_schema.py index f5f406a7a1..a42018f97b 100644 --- a/tests/common/schema/test_schema.py +++ b/tests/common/schema/test_schema.py @@ -132,10 +132,10 @@ def test_simple_regex_validator() -> None: def test_load_corrupted_schema() -> None: - eth_v4: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v7") - del eth_v4["tables"]["blocks"] + eth_v8: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v8") + del eth_v8["tables"]["blocks"] with pytest.raises(ParentTableNotFoundException): - utils.validate_stored_schema(eth_v4) + utils.validate_stored_schema(eth_v8) def test_column_name_validator(schema: Schema) -> None: @@ -287,21 +287,31 @@ def test_upgrade_engine_v1_schema() -> None: assert schema_dict["engine_version"] == 2 upgraded = utils.migrate_schema(schema_dict, from_engine=2, to_engine=4) assert upgraded["engine_version"] == 4 - utils.validate_stored_schema(upgraded) # upgrade 1 -> 4 schema_dict = load_json_case("schemas/ev1/event.schema") assert schema_dict["engine_version"] == 1 upgraded = utils.migrate_schema(schema_dict, from_engine=1, to_engine=4) assert upgraded["engine_version"] == 4 - utils.validate_stored_schema(upgraded) # upgrade 1 -> 6 schema_dict = load_json_case("schemas/ev1/event.schema") assert schema_dict["engine_version"] == 1 upgraded = utils.migrate_schema(schema_dict, from_engine=1, to_engine=6) assert upgraded["engine_version"] == 6 - utils.validate_stored_schema(upgraded) + + # upgrade 1 -> 7 + schema_dict = load_json_case("schemas/ev1/event.schema") + assert schema_dict["engine_version"] == 1 + upgraded = utils.migrate_schema(schema_dict, from_engine=1, to_engine=7) + assert upgraded["engine_version"] == 7 + + + # upgrade 1 -> 8 + schema_dict = load_json_case("schemas/ev1/event.schema") + assert schema_dict["engine_version"] == 1 + upgraded = utils.migrate_schema(schema_dict, from_engine=1, to_engine=8) + assert upgraded["engine_version"] == 8 def test_unknown_engine_upgrade() -> None: @@ -581,7 +591,8 @@ def assert_new_schema_values(schema: Schema) -> None: assert schema.stored_version == 1 assert schema.stored_version_hash is not None assert schema.version_hash is not None - assert schema.ENGINE_VERSION == 7 + assert schema.ENGINE_VERSION == 8 + assert schema._stored_previous_hashes == [] assert len(schema.settings["default_hints"]) > 0 # check settings assert utils.standard_type_detections() == schema.settings["detections"] == schema._type_detections diff --git a/tests/common/schema/test_versioning.py b/tests/common/schema/test_versioning.py index 4e4278a539..401b463875 100644 --- a/tests/common/schema/test_versioning.py +++ b/tests/common/schema/test_versioning.py @@ -1,5 +1,6 @@ import pytest import yaml +from copy import deepcopy from dlt.common import json from dlt.common.schema import utils @@ -83,10 +84,10 @@ def test_infer_column_bumps_version() -> None: def test_preserve_version_on_load() -> None: - eth_v7: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v7") - version = eth_v7["version"] - version_hash = eth_v7["version_hash"] - schema = Schema.from_dict(eth_v7) # type: ignore[arg-type] + eth_v8: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v8") + version = eth_v8["version"] + version_hash = eth_v8["version_hash"] + schema = Schema.from_dict(eth_v8) # type: ignore[arg-type] # version should not be bumped assert version_hash == schema._stored_version_hash assert version_hash == schema.version_hash @@ -95,8 +96,8 @@ def test_preserve_version_on_load() -> None: @pytest.mark.parametrize("remove_defaults", [True, False]) def test_version_preserve_on_reload(remove_defaults: bool) -> None: - eth_v7: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v7") - schema = Schema.from_dict(eth_v7) # type: ignore[arg-type] + eth_v8: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v8") + schema = Schema.from_dict(eth_v8) # type: ignore[arg-type] to_save_dict = schema.to_dict(remove_defaults=remove_defaults) assert schema.stored_version == to_save_dict["version"] @@ -122,3 +123,31 @@ def test_version_preserve_on_reload(remove_defaults: bool) -> None: saved_rasa_schema = Schema.from_dict(yaml.safe_load(rasa_yml)) assert saved_rasa_schema.stored_version == rasa_schema.stored_version assert saved_rasa_schema.stored_version_hash == rasa_schema.stored_version_hash + + +def test_create_ancestry() -> None: + eth_v8: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v8") + schema = Schema.from_dict(eth_v8) # type: ignore[arg-type] + assert schema._stored_previous_hashes == ["yjMtV4Zv0IJlfR5DPMwuXxGg8BRhy7E79L26XAHWEGE="] + version = schema._stored_version + + # modify save and load schema 15 times and check ancestry + expected_previous_hashes = ["yjMtV4Zv0IJlfR5DPMwuXxGg8BRhy7E79L26XAHWEGE="] + for i in range(1,15): + # keep expected previous_hashes + expected_previous_hashes.insert(0, schema._stored_version_hash) + + # update schema + row = {f"float{i}": 78172.128} + _, new_table = schema.coerce_row("event_user", None, row) + schema.update_table(new_table) + schema_dict = schema.to_dict() + schema = Schema.from_stored_schema(schema_dict) + + assert schema._stored_previous_hashes == expected_previous_hashes[:10] + assert schema._stored_version == version + i + + # we never have more than 10 previous_hashes + assert len(schema._stored_previous_hashes) == i + 1 if i + 1 <= 10 else 10 + + assert len(schema._stored_previous_hashes) == 10 \ No newline at end of file diff --git a/tests/common/storages/test_schema_storage.py b/tests/common/storages/test_schema_storage.py index a4b6c5c89f..401c22f0bc 100644 --- a/tests/common/storages/test_schema_storage.py +++ b/tests/common/storages/test_schema_storage.py @@ -11,7 +11,7 @@ from dlt.common.storages import SchemaStorageConfiguration, SchemaStorage, LiveSchemaStorage, FileStorage from tests.utils import autouse_test_storage, TEST_STORAGE_ROOT -from tests.common.utils import load_yml_case, yml_case_path, COMMON_TEST_CASES_PATH, IMPORTED_VERSION_HASH_ETH_V7 +from tests.common.utils import load_yml_case, yml_case_path, COMMON_TEST_CASES_PATH, IMPORTED_VERSION_HASH_ETH_V8 @pytest.fixture @@ -87,6 +87,7 @@ def test_skip_import_if_not_modified(synced_storage: SchemaStorage, storage: Sch assert storage_schema.version == reloaded_schema.stored_version assert storage_schema.version_hash == reloaded_schema.stored_version_hash assert storage_schema._imported_version_hash == reloaded_schema._imported_version_hash + assert storage_schema.previous_hashes == reloaded_schema.previous_hashes # the import schema gets modified storage_schema.tables["_dlt_loads"]["write_disposition"] = "append" storage_schema.tables.pop("event_user") @@ -96,7 +97,11 @@ def test_skip_import_if_not_modified(synced_storage: SchemaStorage, storage: Sch # we have overwritten storage schema assert reloaded_schema.tables["_dlt_loads"]["write_disposition"] == "append" assert "event_user" not in reloaded_schema.tables + + # hash and ancestry stay the same assert reloaded_schema._imported_version_hash == storage_schema.version_hash + assert storage_schema.previous_hashes == reloaded_schema.previous_hashes + # but original version has increased assert reloaded_schema.stored_version == storage_schema.version + 1 @@ -194,12 +199,13 @@ def test_save_store_schema_over_import(ie_storage: SchemaStorage) -> None: ie_storage.save_schema(schema) assert schema.version_hash == schema_hash # we linked schema to import schema - assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V7 + assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V8 # load schema and make sure our new schema is here schema = ie_storage.load_schema("ethereum") - assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V7 + assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V8 assert schema._stored_version_hash == schema_hash assert schema.version_hash == schema_hash + assert schema.previous_hashes == [] # we have simple schema in export folder fs = FileStorage(ie_storage.config.export_schema_path) exported_name = ie_storage._file_name_in_store("ethereum", "yaml") @@ -213,12 +219,13 @@ def test_save_store_schema_over_import_sync(synced_storage: SchemaStorage) -> No schema = Schema("ethereum") schema_hash = schema.version_hash synced_storage.save_schema(schema) - assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V7 + assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V8 # import schema is overwritten fs = FileStorage(synced_storage.config.import_schema_path) exported_name = synced_storage._file_name_in_store("ethereum", "yaml") exported_schema = yaml.safe_load(fs.load(exported_name)) assert schema.version_hash == exported_schema["version_hash"] == schema_hash + assert schema.previous_hashes == [] # when it is loaded we will import schema again which is identical to the current one but the import link # will be set to itself schema = synced_storage.load_schema("ethereum") @@ -269,18 +276,18 @@ def test_schema_from_file() -> None: def prepare_import_folder(storage: SchemaStorage) -> None: - shutil.copy(yml_case_path("schemas/eth/ethereum_schema_v7"), os.path.join(storage.storage.storage_path, "../import/ethereum.schema.yaml")) + shutil.copy(yml_case_path("schemas/eth/ethereum_schema_v8"), os.path.join(storage.storage.storage_path, "../import/ethereum.schema.yaml")) def assert_schema_imported(synced_storage: SchemaStorage, storage: SchemaStorage) -> Schema: prepare_import_folder(synced_storage) - eth_v6: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v7") + eth_V8: TStoredSchema = load_yml_case("schemas/eth/ethereum_schema_v8") schema = synced_storage.load_schema("ethereum") # is linked to imported schema - schema._imported_version_hash = eth_v6["version_hash"] + schema._imported_version_hash = eth_V8["version_hash"] # also was saved in storage assert synced_storage.has_schema("ethereum") # and has link to imported schema s well (load without import) schema = storage.load_schema("ethereum") - assert schema._imported_version_hash == eth_v6["version_hash"] + assert schema._imported_version_hash == eth_V8["version_hash"] return schema diff --git a/tests/common/test_validation.py b/tests/common/test_validation.py index 0a034dc72f..f274c82014 100644 --- a/tests/common/test_validation.py +++ b/tests/common/test_validation.py @@ -89,14 +89,14 @@ def test_doc() -> TTestRecord: def test_validate_schema_cases() -> None: - with open("tests/common/cases/schemas/eth/ethereum_schema_v7.yml", mode="r", encoding="utf-8") as f: + with open("tests/common/cases/schemas/eth/ethereum_schema_v8.yml", mode="r", encoding="utf-8") as f: schema_dict: TStoredSchema = yaml.safe_load(f) validate_dict_ignoring_xkeys( spec=TStoredSchema, doc=schema_dict, path=".", - validator_f=simple_regex_validator + validator_f=simple_regex_validator, ) # with open("tests/common/cases/schemas/rasa/event.schema.json") as f: diff --git a/tests/common/utils.py b/tests/common/utils.py index d612dcbdcf..db9a8318fb 100644 --- a/tests/common/utils.py +++ b/tests/common/utils.py @@ -16,7 +16,7 @@ COMMON_TEST_CASES_PATH = "./tests/common/cases/" # for import schema tests, change when upgrading the schema version -IMPORTED_VERSION_HASH_ETH_V7 = "yjMtV4Zv0IJlfR5DPMwuXxGg8BRhy7E79L26XAHWEGE=" +IMPORTED_VERSION_HASH_ETH_V8 = "C5An8WClbavalXDdNSqXbdI7Swqh/mTWMcwWKCF//EE=" # test sentry DSN TEST_SENTRY_DSN = "https://797678dd0af64b96937435326c7d30c1@o1061158.ingest.sentry.io/4504306172821504" # preserve secrets path to be able to restore it diff --git a/tests/extract/test_decorators.py b/tests/extract/test_decorators.py index 28f3d34dcf..27cdc3d22d 100644 --- a/tests/extract/test_decorators.py +++ b/tests/extract/test_decorators.py @@ -28,7 +28,7 @@ SourceDataIsNone, SourceIsAClassTypeError, SourceNotAFunction, SourceSchemaNotAvailable) from dlt.extract.typing import TableNameMeta -from tests.common.utils import IMPORTED_VERSION_HASH_ETH_V7 +from tests.common.utils import IMPORTED_VERSION_HASH_ETH_V8 def test_none_returning_source() -> None: @@ -75,7 +75,7 @@ def test_load_schema_for_callable() -> None: schema = s.schema assert schema.name == "ethereum" == s.name # the schema in the associated file has this hash - assert schema.stored_version_hash == IMPORTED_VERSION_HASH_ETH_V7 + assert schema.stored_version_hash == IMPORTED_VERSION_HASH_ETH_V8 def test_unbound_parametrized_transformer() -> None: diff --git a/tests/extract/test_extract.py b/tests/extract/test_extract.py index 7ed74b41f2..a045dd4f3c 100644 --- a/tests/extract/test_extract.py +++ b/tests/extract/test_extract.py @@ -14,7 +14,7 @@ def test_extract_select_tables() -> None: def expect_tables(resource: DltResource) -> dlt.Schema: # delete files clean_test_storage() - source = DltSource("selectables", "module", dlt.Schema("selectables"), [resource(10)]) + source = DltSource(dlt.Schema("selectables"), "module", [resource(10)]) schema = source.discover_schema() storage = ExtractorStorage(NormalizeStorageConfiguration()) @@ -37,7 +37,7 @@ def expect_tables(resource: DltResource) -> dlt.Schema: clean_test_storage() storage = ExtractorStorage(NormalizeStorageConfiguration()) # same thing but select only odd - source = DltSource("selectables", "module", dlt.Schema("selectables"), [resource]) + source = DltSource(dlt.Schema("selectables"), "module", [resource]) source = source.with_resources(resource.name) source.selected_resources[resource.name].bind(10).select_tables("odd_table") extract_id = storage.create_extract_id() @@ -80,7 +80,7 @@ def input_gen(): yield from [1, 2, 3] input_r = DltResource.from_data(input_gen) - source = DltSource("selectables", "module", dlt.Schema("selectables"), [input_r, input_r.with_name("gen_clone")]) + source = DltSource(dlt.Schema("selectables"), "module", [input_r, input_r.with_name("gen_clone")]) storage = ExtractorStorage(NormalizeStorageConfiguration()) extract_id = storage.create_extract_id() extract(extract_id, source, storage) @@ -99,7 +99,7 @@ def tx_step(item): input_r = DltResource.from_data(input_gen) input_tx = DltResource.from_data(tx_step, data_from=DltResource.Empty) - source = DltSource("selectables", "module", dlt.Schema("selectables"), [input_r, (input_r | input_tx).with_name("tx_clone")]) + source = DltSource(dlt.Schema("selectables"), "module", [input_r, (input_r | input_tx).with_name("tx_clone")]) storage = ExtractorStorage(NormalizeStorageConfiguration()) extract_id = storage.create_extract_id() extract(extract_id, source, storage) diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index d03b125777..ec28018add 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -694,14 +694,14 @@ def child(item): # create a source where we place only child child.write_disposition = "replace" - s = DltSource("comp", "section", Schema("comp"), [child]) + s = DltSource(Schema("comp"), "section", [child]) # but extracted resources will include its parent where it derives write disposition from child extracted = s.resources.extracted assert extracted[child.name].write_disposition == "replace" assert extracted[child._pipe.parent.name].write_disposition == "replace" # create a source where we place parent explicitly - s = DltSource("comp", "section", Schema("comp"), [parent_r, child]) + s = DltSource(Schema("comp"), "section", [parent_r, child]) extracted = s.resources.extracted assert extracted[child.name].write_disposition == "replace" # now parent exists separately and has its own write disposition @@ -722,7 +722,7 @@ def child(item): # now we add child that has parent_r as parent but we add another instance of standalone_some_data explicitly # so we have a resource with the same name as child parent but the pipe instance is different - s = DltSource("comp", "section", Schema("comp"), [standalone_some_data(now), child]) + s = DltSource(Schema("comp"), "section", [standalone_some_data(now), child]) assert extracted[child.name].write_disposition == "replace" # now parent exists separately and has its own write disposition - because we search by name to identify matching resource assert extracted[child._pipe.parent.name].write_disposition == "append" diff --git a/tests/extract/test_sources.py b/tests/extract/test_sources.py index 130e0a8d93..aae95e0a3f 100644 --- a/tests/extract/test_sources.py +++ b/tests/extract/test_sources.py @@ -55,7 +55,7 @@ def parametrized(p1, /, p2, *, p3 = None): # as part of the source r = DltResource.from_data(parametrized) - s = DltSource("source", "module", Schema("source"), [r]) + s = DltSource(Schema("source"), "module", [r]) with pytest.raises(ParametrizedResourceUnbound) as py_ex: list(s) @@ -1014,7 +1014,7 @@ def some_data(): yield [1, 2, 3] yield [1, 2, 3] - s = DltSource("source", "module", Schema("source"), [dlt.resource(some_data())]) + s = DltSource(Schema("source"), "module", [dlt.resource(some_data())]) assert s.exhausted is False assert list(s) == [1, 2, 3, 1, 2, 3] assert s.exhausted is True @@ -1028,19 +1028,19 @@ def test_exhausted_property() -> None: # this example will be exhausted after iteration def open_generator_data(): yield from [1, 2, 3, 4] - s = DltSource("source", "module", Schema("source"), [dlt.resource(open_generator_data())]) + s = DltSource(Schema("source"), "module", [dlt.resource(open_generator_data())]) assert s.exhausted is False assert next(iter(s)) == 1 assert s.exhausted is True # lists will not exhaust - s = DltSource("source", "module", Schema("source"), [dlt.resource([1, 2, 3, 4], table_name="table", name="resource")]) + s = DltSource(Schema("source"), "module", [dlt.resource([1, 2, 3, 4], table_name="table", name="resource")]) assert s.exhausted is False assert next(iter(s)) == 1 assert s.exhausted is False # iterators will not exhaust - s = DltSource("source", "module", Schema("source"), [dlt.resource(iter([1, 2, 3, 4]), table_name="table", name="resource")]) + s = DltSource(Schema("source"), "module", [dlt.resource(iter([1, 2, 3, 4]), table_name="table", name="resource")]) assert s.exhausted is False assert next(iter(s)) == 1 assert s.exhausted is False @@ -1048,7 +1048,7 @@ def open_generator_data(): # having on exhausted generator resource will make the whole source exhausted def open_generator_data(): # type: ignore[no-redef] yield from [1, 2, 3, 4] - s = DltSource("source", "module", Schema("source"), [ dlt.resource([1, 2, 3, 4], table_name="table", name="resource"), dlt.resource(open_generator_data())]) + s = DltSource(Schema("source"), "module", [ dlt.resource([1, 2, 3, 4], table_name="table", name="resource"), dlt.resource(open_generator_data())]) assert s.exhausted is False # execute the whole source @@ -1239,7 +1239,7 @@ def tx_step(item): input_r_clone = input_r.with_name("input_gen_2") # separate resources have separate pipe instances - source = DltSource("dupes", "module", Schema("dupes"), [input_r, input_r_clone]) + source = DltSource(Schema("dupes"), "module", [input_r, input_r_clone]) pipes = source.resources.pipes assert len(pipes) == 2 assert pipes[0].name == "input_gen" @@ -1250,13 +1250,13 @@ def tx_step(item): assert list(source) == [1, 2, 3, 1, 2, 3] # cloned from fresh resource - source = DltSource("dupes", "module", Schema("dupes"), [DltResource.from_data(input_gen), DltResource.from_data(input_gen).with_name("gen_2")]) + source = DltSource(Schema("dupes"), "module", [DltResource.from_data(input_gen), DltResource.from_data(input_gen).with_name("gen_2")]) assert list(source) == [1, 2, 3, 1, 2, 3] # clone transformer input_r = DltResource.from_data(input_gen) input_tx = DltResource.from_data(tx_step, data_from=DltResource.Empty) - source = DltSource("dupes", "module", Schema("dupes"), [input_r, (input_r | input_tx).with_name("tx_clone")]) + source = DltSource(Schema("dupes"), "module", [input_r, (input_r | input_tx).with_name("tx_clone")]) pipes = source.resources.pipes assert len(pipes) == 2 assert source.resources[pipes[0].name] == source.input_gen diff --git a/tests/load/pipeline/test_restore_state.py b/tests/load/pipeline/test_restore_state.py index c9c6c4c437..1ebb3378a6 100644 --- a/tests/load/pipeline/test_restore_state.py +++ b/tests/load/pipeline/test_restore_state.py @@ -18,7 +18,7 @@ from tests.utils import TEST_STORAGE_ROOT from tests.cases import JSON_TYPED_DICT, JSON_TYPED_DICT_DECODED -from tests.common.utils import IMPORTED_VERSION_HASH_ETH_V7, yml_case_path as common_yml_case_path +from tests.common.utils import IMPORTED_VERSION_HASH_ETH_V8, yml_case_path as common_yml_case_path from tests.common.configuration.utils import environment from tests.load.pipeline.utils import assert_query_data, drop_active_pipeline_data from tests.load.utils import destinations_configs, DestinationTestConfiguration, get_normalized_dataset_name @@ -404,7 +404,7 @@ def test_restore_schemas_while_import_schemas_exist(destination_config: Destinat assert normalized_annotations in schema.tables # check if attached to import schema - assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V7 + assert schema._imported_version_hash == IMPORTED_VERSION_HASH_ETH_V8 # extract some data with restored pipeline p.run(["C", "D", "E"], table_name="blacklist") assert normalized_labels in schema.tables diff --git a/tests/load/weaviate/test_naming.py b/tests/load/weaviate/test_naming.py index 850f70ee19..dad7fc176f 100644 --- a/tests/load/weaviate/test_naming.py +++ b/tests/load/weaviate/test_naming.py @@ -87,13 +87,13 @@ def test_reserved_property_names() -> None: # print(schema_2.name) # print(schema_2.naming) -# eth_v6 = load_yml_case("schemas/eth/ethereum_schema_v7") -# eth_v6_schema = dlt.Schema.from_dict(eth_v6) +# eth_V8 = load_yml_case("schemas/eth/ethereum_schema_v8") +# eth_V8_schema = dlt.Schema.from_dict(eth_V8) -# pipeline.extract(s, schema=eth_v6_schema) +# pipeline.extract(s, schema=eth_V8_schema) -# print(eth_v6_schema.data_tables()) -# print(eth_v6_schema.dlt_tables()) +# print(eth_V8_schema.data_tables()) +# print(eth_V8_schema.dlt_tables()) # def test_x_schema_naming_normalize() -> None: @@ -101,14 +101,14 @@ def test_reserved_property_names() -> None: # print(pipeline.dataset_name) # s = small() -# eth_v6 = load_yml_case("schemas/eth/ethereum_schema_v7") -# eth_v6_schema = dlt.Schema.from_dict(eth_v6) +# eth_V8 = load_yml_case("schemas/eth/ethereum_schema_v8") +# eth_V8_schema = dlt.Schema.from_dict(eth_V8) -# pipeline.extract(s, schema=eth_v6_schema) -# print(eth_v6_schema.tables.keys()) +# pipeline.extract(s, schema=eth_V8_schema) +# print(eth_V8_schema.tables.keys()) # default_schema = pipeline.default_schema # print(default_schema.name) -# print(eth_v6_schema.tables.keys()) +# print(eth_V8_schema.tables.keys()) # pipeline.run(s, destination="weaviate") # print(default_schema.tables.keys()) diff --git a/tests/pipeline/test_dlt_versions.py b/tests/pipeline/test_dlt_versions.py index 7ac7dcbb34..71f6b5b813 100644 --- a/tests/pipeline/test_dlt_versions.py +++ b/tests/pipeline/test_dlt_versions.py @@ -55,7 +55,7 @@ def test_pipeline_with_dlt_update(test_storage: FileStorage) -> None: print(venv.run_script("../tests/pipeline/cases/github_pipeline/github_pipeline.py")) # hash hash in schema github_schema = json.loads(test_storage.load(f".dlt/pipelines/{GITHUB_PIPELINE_NAME}/schemas/github.schema.json")) - assert github_schema["engine_version"] == 7 + assert github_schema["engine_version"] == 8 assert "schema_version_hash" in github_schema["tables"][LOADS_TABLE_NAME]["columns"] with DuckDbSqlClient(GITHUB_DATASET, duckdb_cfg.credentials) as client: rows = client.execute_sql(f"SELECT * FROM {LOADS_TABLE_NAME} ORDER BY inserted_at") @@ -81,7 +81,7 @@ def test_pipeline_with_dlt_update(test_storage: FileStorage) -> None: pipeline.sync_destination() # print(pipeline.working_dir) # we have updated schema - assert pipeline.default_schema.ENGINE_VERSION == 7 + assert pipeline.default_schema.ENGINE_VERSION == 8 # make sure that schema hash retrieved from the destination is exactly the same as the schema hash that was in storage before the schema was wiped assert pipeline.default_schema.stored_version_hash == github_schema["version_hash"] @@ -114,6 +114,6 @@ def test_load_package_with_dlt_update(test_storage: FileStorage) -> None: github_schema = json.loads(test_storage.load(f".dlt/pipelines/{GITHUB_PIPELINE_NAME}/schemas/github.schema.json")) pipeline = pipeline.drop() pipeline.sync_destination() - assert pipeline.default_schema.ENGINE_VERSION == 7 + assert pipeline.default_schema.ENGINE_VERSION == 8 # schema version does not match `dlt.attach` does not update to the right schema by itself assert pipeline.default_schema.stored_version_hash != github_schema["version_hash"] diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 309511b95f..af21eb9f81 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -274,7 +274,7 @@ def some_data(): yield [1, 2, 3] yield [1, 2, 3] - s = DltSource("source", "module", dlt.Schema("source"), [dlt.resource(some_data())]) + s = DltSource(dlt.Schema("source"), "module", [dlt.resource(some_data())]) dlt.pipeline().extract(s) with pytest.raises(PipelineStepFailed) as py_ex: dlt.pipeline().extract(s) @@ -289,7 +289,7 @@ def test_disable_enable_state_sync(environment: Any) -> None: def some_data(): yield [1, 2, 3] - s = DltSource("default", "module", dlt.Schema("default"), [dlt.resource(some_data())]) + s = DltSource(dlt.Schema("default"), "module", [dlt.resource(some_data())]) dlt.pipeline().extract(s) storage = ExtractorStorage(p._normalize_storage_config) assert len(storage.list_files_to_normalize_sorted()) == 1 @@ -299,14 +299,14 @@ def some_data(): p.config.restore_from_destination = True # extract to different schema, state must go to default schema - s = DltSource("default_2", "module", dlt.Schema("default_2"), [dlt.resource(some_data())]) + s = DltSource(dlt.Schema("default_2"), "module", [dlt.resource(some_data())]) dlt.pipeline().extract(s) expect_extracted_file(storage, "default", s.schema.state_table_name, "***") def test_extract_multiple_sources() -> None: - s1 = DltSource("default", "module", dlt.Schema("default"), [dlt.resource([1, 2, 3], name="resource_1"), dlt.resource([3, 4, 5], name="resource_2")]) - s2 = DltSource("default_2", "module", dlt.Schema("default_2"), [dlt.resource([6, 7, 8], name="resource_3"), dlt.resource([9, 10, 0], name="resource_4")]) + s1 = DltSource(dlt.Schema("default"), "module", [dlt.resource([1, 2, 3], name="resource_1"), dlt.resource([3, 4, 5], name="resource_2")]) + s2 = DltSource(dlt.Schema("default_2"),"module", [dlt.resource([6, 7, 8], name="resource_3"), dlt.resource([9, 10, 0], name="resource_4")]) p = dlt.pipeline(destination="dummy") p.config.restore_from_destination = False @@ -325,8 +325,8 @@ def test_extract_multiple_sources() -> None: def i_fail(): raise NotImplementedError() - s3 = DltSource("default_3", "module", dlt.Schema("default_3"), [dlt.resource([1, 2, 3], name="resource_1"), dlt.resource([3, 4, 5], name="resource_2")]) - s4 = DltSource("default_4", "module", dlt.Schema("default_4"), [dlt.resource([6, 7, 8], name="resource_3"), i_fail]) + s3 = DltSource(dlt.Schema("default_3"), "module", [dlt.resource([1, 2, 3], name="resource_1"), dlt.resource([3, 4, 5], name="resource_2")]) + s4 = DltSource(dlt.Schema("default_4"), "module", [dlt.resource([6, 7, 8], name="resource_3"), i_fail]) with pytest.raises(PipelineStepFailed): # NOTE: if you swap s3 and s4 the test on list_schemas will fail: s3 will extract normally and update live schemas, s4 will break exec later diff --git a/tests/pipeline/test_pipeline_trace.py b/tests/pipeline/test_pipeline_trace.py index cd3e2444c8..5644a32d2f 100644 --- a/tests/pipeline/test_pipeline_trace.py +++ b/tests/pipeline/test_pipeline_trace.py @@ -298,14 +298,14 @@ def data(): def test_extract_data_describe() -> None: schema = Schema("test") - assert describe_extract_data(DltSource("sss_extract", "sect", schema)) == [{"name": "sss_extract", "data_type": "source"}] + assert describe_extract_data(DltSource(schema, "sect")) == [{"name": "test", "data_type": "source"}] assert describe_extract_data(DltResource(Pipe("rrr_extract"), None, False)) == [{"name": "rrr_extract", "data_type": "resource"}] - assert describe_extract_data([DltSource("sss_extract", "sect", schema)]) == [{"name": "sss_extract", "data_type": "source"}] + assert describe_extract_data([DltSource(schema, "sect")]) == [{"name": "test", "data_type": "source"}] assert describe_extract_data([DltResource(Pipe("rrr_extract"), None, False)]) == [{"name": "rrr_extract", "data_type": "resource"}] assert describe_extract_data( - [DltResource(Pipe("rrr_extract"), None, False), DltSource("sss_extract", "sect", schema)] + [DltResource(Pipe("rrr_extract"), None, False), DltSource(schema, "sect")] ) == [ - {"name": "rrr_extract", "data_type": "resource"}, {"name": "sss_extract", "data_type": "source"} + {"name": "rrr_extract", "data_type": "resource"}, {"name": "test", "data_type": "source"} ] assert describe_extract_data([{"a": "b"}]) == [{"name": "", "data_type": "dict"}] from pandas import DataFrame @@ -313,7 +313,7 @@ def test_extract_data_describe() -> None: assert describe_extract_data([DataFrame(), {"a": "b"}]) == [{"name": "", "data_type": "DataFrame"}] # first unnamed element in the list breaks checking info assert describe_extract_data( - [DltResource(Pipe("rrr_extract"), None, False), DataFrame(), DltSource("sss_extract", "sect", schema)] + [DltResource(Pipe("rrr_extract"), None, False), DataFrame(), DltSource(schema, "sect")] ) == [ {"name": "rrr_extract", "data_type": "resource"}, {"name": "", "data_type": "DataFrame"} ]