Skip to content

Commit

Permalink
pr fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Nov 21, 2023
1 parent c8e6e25 commit fb9d731
Show file tree
Hide file tree
Showing 19 changed files with 560 additions and 95 deletions.
16 changes: 8 additions & 8 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,13 @@ def bump_version_if_modified(stored_schema: TStoredSchema) -> Tuple[int, str, st
pass
elif hash_ != previous_hash:
stored_schema["version"] += 1
# unshift previous hash to ancestors and limit array to 10 entries
if previous_hash not in stored_schema["ancestors"]:
stored_schema["ancestors"].insert(0, previous_hash)
stored_schema["ancestors"] = stored_schema["ancestors"][:10]
# unshift previous hash to previous_hashes and limit array to 10 entries
if previous_hash not in stored_schema["previous_hashes"]:
stored_schema["previous_hashes"].insert(0, previous_hash)
stored_schema["previous_hashes"] = stored_schema["previous_hashes"][:10]

stored_schema["version_hash"] = hash_
return stored_schema["version"], hash_, previous_hash, stored_schema["ancestors"]
return stored_schema["version"], hash_, previous_hash, stored_schema["previous_hashes"]


def generate_version_hash(stored_schema: TStoredSchema) -> str:
Expand All @@ -158,7 +158,7 @@ def generate_version_hash(stored_schema: TStoredSchema) -> str:
schema_copy.pop("version")
schema_copy.pop("version_hash", None)
schema_copy.pop("imported_version_hash", None)
schema_copy.pop("ancestors", None)
schema_copy.pop("previous_hashes", None)
# ignore order of elements when computing the hash
content = json.dumps(schema_copy, sort_keys=True)
h = hashlib.sha3_256(content.encode("utf-8"))
Expand Down Expand Up @@ -249,7 +249,7 @@ def validate_stored_schema(stored_schema: TStoredSchema) -> None:
# exclude validation of keys added later
ignored_keys = []
if stored_schema["engine_version"] < 7:
ignored_keys.append("ancestors")
ignored_keys.append("previous_hashes")

# use lambda to verify only non extra fields
validate_dict_ignoring_xkeys(
Expand Down Expand Up @@ -363,7 +363,7 @@ def migrate_filters(group: str, filters: List[str]) -> None:
table["schema_contract"] = {}
from_engine = 7
if from_engine == 7 and to_engine > 7:
schema_dict["ancestors"] = []
schema_dict["previous_hashes"] = []
from_engine = 8

schema_dict["engine_version"] = from_engine
Expand Down
2 changes: 1 addition & 1 deletion dlt/extract/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def _wrap(*args: Any, **kwargs: Any) -> TDltSourceImpl:
rv = list(rv)

# convert to source
s = _impl_cls.from_data(source_section, schema.clone(update_normalizers=True), rv)
s = _impl_cls.from_data(schema.clone(update_normalizers=True), source_section, rv)
# apply hints
if max_table_nesting is not None:
s.max_table_nesting = max_table_nesting
Expand Down
8 changes: 4 additions & 4 deletions dlt/extract/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class DltSource(Iterable[TDataItem]):
* You can use a `run` method to load the data with a default instance of dlt pipeline.
* You can get source read only state for the currently active Pipeline instance
"""
def __init__(self, section: str, schema: Schema, resources: Sequence[DltResource] = None) -> None:
def __init__(self, schema: Schema, section: str, resources: Sequence[DltResource] = None) -> None:
self.section = section
"""Tells if iterator associated with a source is exhausted"""
self._schema = schema
Expand All @@ -177,7 +177,7 @@ def __init__(self, section: str, schema: Schema, resources: Sequence[DltResource
self.resources.add(*resources)

@classmethod
def from_data(cls, section: str, schema: Schema, data: Any) -> Self:
def from_data(cls, schema: Schema, section: str, data: Any) -> Self:
"""Converts any `data` supported by `dlt` `run` method into `dlt source` with a name `section`.`name` and `schema` schema."""
# creates source from various forms of data
if isinstance(data, DltSource):
Expand All @@ -189,7 +189,7 @@ def from_data(cls, section: str, schema: Schema, data: Any) -> Self:
else:
resources = [DltResource.from_data(data)]

return cls(section, schema, resources)
return cls(schema, section, resources)

@property
def name(self) -> str:
Expand Down Expand Up @@ -327,7 +327,7 @@ def state(self) -> StrAny:
def clone(self) -> "DltSource":
"""Creates a deep copy of the source where copies of schema, resources and pipes are created"""
# mind that resources and pipes are cloned when added to the DltResourcesDict in the source constructor
return DltSource(self.section, self.schema.clone(), list(self._resources.values()))
return DltSource(self.schema.clone(), self.section, list(self._resources.values()))

def __iter__(self) -> Iterator[TDataItem]:
"""Opens iterator that yields the data items from all the resources within the source in the same order as in Pipeline class.
Expand Down
6 changes: 3 additions & 3 deletions dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,7 @@ def append_data(data_item: Any) -> None:
# do not set section to prevent source that represent a standalone resource
# to overwrite other standalone resources (ie. parents) in that source
sources.append(
DltSource(effective_schema.name, "", effective_schema, [data_item])
DltSource(effective_schema, "", [data_item])
)
else:
# iterator/iterable/generator
Expand All @@ -881,7 +881,7 @@ def append_data(data_item: Any) -> None:

# add all the appended resources in one source
if resources:
sources.append(DltSource(effective_schema.name, self.pipeline_name, effective_schema, resources))
sources.append(DltSource(effective_schema, self.pipeline_name, resources))

# apply hints and settings
for source in sources:
Expand Down Expand Up @@ -1293,7 +1293,7 @@ def _save_state(self, state: TPipelineState) -> None:
def _extract_state(self, state: TPipelineState) -> TPipelineState:
# this will extract the state into current load package and update the schema with the _dlt_pipeline_state table
# note: the schema will be persisted because the schema saving decorator is over the state manager decorator for extract
state_source = DltSource(self.default_schema.name, self.pipeline_name, self.default_schema, [state_resource(state)])
state_source = DltSource(self.default_schema, self.pipeline_name, [state_resource(state)])
storage = ExtractorStorage(self._normalize_storage_config)
extract_id = extract_with_schema(storage, state_source, _NULL_COLLECTOR, 1, 1)
storage.commit_extract_files(extract_id)
Expand Down
Loading

0 comments on commit fb9d731

Please sign in to comment.