Skip to content

Commit

Permalink
pr fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Nov 21, 2023
1 parent c8e6e25 commit 0799197
Show file tree
Hide file tree
Showing 22 changed files with 570 additions and 103 deletions.
12 changes: 6 additions & 6 deletions dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class Schema:
_dlt_tables_prefix: str
_stored_version: int # version at load/creation time
_stored_version_hash: str # version hash at load/creation time
_stored_ancestors: Optional[List[str]] # list of ancestor hashes of the schema
_stored_previous_hashes: Optional[List[str]] # list of ancestor hashes of the schema
_imported_version_hash: str # version hash of recently imported schema
_schema_description: str # optional schema description
_schema_tables: TSchemaTables
Expand Down Expand Up @@ -103,7 +103,7 @@ def to_dict(self, remove_defaults: bool = False, bump_version: bool = True) -> T
"tables": self._schema_tables,
"settings": self._settings,
"normalizers": self._normalizers_config,
"ancestors": self._stored_ancestors
"previous_hashes": self._stored_previous_hashes
}
if self._imported_version_hash and not remove_defaults:
stored_schema["imported_version_hash"] = self._imported_version_hash
Expand Down Expand Up @@ -478,9 +478,9 @@ def version_hash(self) -> str:
return utils.bump_version_if_modified(self.to_dict())[1]

@property
def ancestors(self) -> List[str]:
def previous_hashes(self) -> List[str]:
"""Current version hash of the schema, recomputed from the actual content"""
return utils.bump_version_if_modified(self.to_dict())[2]
return utils.bump_version_if_modified(self.to_dict())[3]

@property
def stored_version_hash(self) -> str:
Expand Down Expand Up @@ -670,7 +670,7 @@ def _reset_schema(self, name: str, normalizers: TNormalizersConfig = None) -> No
self._stored_version_hash: str = None
self._imported_version_hash: str = None
self._schema_description: str = None
self._stored_ancestors: List[str] = []
self._stored_previous_hashes: List[str] = []

self._settings: TSchemaSettings = {}
self._compiled_preferred_types: List[Tuple[REPattern, TDataType]] = []
Expand Down Expand Up @@ -709,7 +709,7 @@ def _from_stored_schema(self, stored_schema: TStoredSchema) -> None:
self._imported_version_hash = stored_schema.get("imported_version_hash")
self._schema_description = stored_schema.get("description")
self._settings = stored_schema.get("settings") or {}
self._stored_ancestors = stored_schema.get("ancestors")
self._stored_previous_hashes = stored_schema.get("previous_hashes")
self._compile_settings()

def _set_schema_name(self, name: str) -> None:
Expand Down
4 changes: 2 additions & 2 deletions dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@


# current version of schema engine
SCHEMA_ENGINE_VERSION = 7
SCHEMA_ENGINE_VERSION = 8

# dlt tables
VERSION_TABLE_NAME = "_dlt_version"
Expand Down Expand Up @@ -123,7 +123,7 @@ class TStoredSchema(TypedDict, total=False):
"""TypeDict defining the schema representation in storage"""
version: int
version_hash: str
ancestors: List[str]
previous_hashes: List[str]
imported_version_hash: Optional[str]
engine_version: int
name: str
Expand Down
16 changes: 8 additions & 8 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,13 @@ def bump_version_if_modified(stored_schema: TStoredSchema) -> Tuple[int, str, st
pass
elif hash_ != previous_hash:
stored_schema["version"] += 1
# unshift previous hash to ancestors and limit array to 10 entries
if previous_hash not in stored_schema["ancestors"]:
stored_schema["ancestors"].insert(0, previous_hash)
stored_schema["ancestors"] = stored_schema["ancestors"][:10]
# unshift previous hash to previous_hashes and limit array to 10 entries
if previous_hash not in stored_schema["previous_hashes"]:
stored_schema["previous_hashes"].insert(0, previous_hash)
stored_schema["previous_hashes"] = stored_schema["previous_hashes"][:10]

stored_schema["version_hash"] = hash_
return stored_schema["version"], hash_, previous_hash, stored_schema["ancestors"]
return stored_schema["version"], hash_, previous_hash, stored_schema["previous_hashes"]


def generate_version_hash(stored_schema: TStoredSchema) -> str:
Expand All @@ -158,7 +158,7 @@ def generate_version_hash(stored_schema: TStoredSchema) -> str:
schema_copy.pop("version")
schema_copy.pop("version_hash", None)
schema_copy.pop("imported_version_hash", None)
schema_copy.pop("ancestors", None)
schema_copy.pop("previous_hashes", None)
# ignore order of elements when computing the hash
content = json.dumps(schema_copy, sort_keys=True)
h = hashlib.sha3_256(content.encode("utf-8"))
Expand Down Expand Up @@ -249,7 +249,7 @@ def validate_stored_schema(stored_schema: TStoredSchema) -> None:
# exclude validation of keys added later
ignored_keys = []
if stored_schema["engine_version"] < 7:
ignored_keys.append("ancestors")
ignored_keys.append("previous_hashes")

# use lambda to verify only non extra fields
validate_dict_ignoring_xkeys(
Expand Down Expand Up @@ -363,7 +363,7 @@ def migrate_filters(group: str, filters: List[str]) -> None:
table["schema_contract"] = {}
from_engine = 7
if from_engine == 7 and to_engine > 7:
schema_dict["ancestors"] = []
schema_dict["previous_hashes"] = []
from_engine = 8

schema_dict["engine_version"] = from_engine
Expand Down
2 changes: 2 additions & 0 deletions dlt/common/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ def validate_dict(spec: Type[_TypedDict], doc: StrAny, path: str, filter_f: TFil
validator_f (TCustomValidator, optional): A function to perform additional validation
for types not covered by this function. It should return `True` if the validation passes.
Defaults to a function that rejects all such types.
filter_required (TFilterFunc, optional): A function to filter out required fields, useful
for testing historic versions of dict that might now have certain fields yet.
Raises:
DictValidationException: If there are missing required fields, unexpected fields,
Expand Down
2 changes: 1 addition & 1 deletion dlt/extract/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def _wrap(*args: Any, **kwargs: Any) -> TDltSourceImpl:
rv = list(rv)

# convert to source
s = _impl_cls.from_data(source_section, schema.clone(update_normalizers=True), rv)
s = _impl_cls.from_data(schema.clone(update_normalizers=True), source_section, rv)
# apply hints
if max_table_nesting is not None:
s.max_table_nesting = max_table_nesting
Expand Down
8 changes: 4 additions & 4 deletions dlt/extract/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class DltSource(Iterable[TDataItem]):
* You can use a `run` method to load the data with a default instance of dlt pipeline.
* You can get source read only state for the currently active Pipeline instance
"""
def __init__(self, section: str, schema: Schema, resources: Sequence[DltResource] = None) -> None:
def __init__(self, schema: Schema, section: str, resources: Sequence[DltResource] = None) -> None:
self.section = section
"""Tells if iterator associated with a source is exhausted"""
self._schema = schema
Expand All @@ -177,7 +177,7 @@ def __init__(self, section: str, schema: Schema, resources: Sequence[DltResource
self.resources.add(*resources)

@classmethod
def from_data(cls, section: str, schema: Schema, data: Any) -> Self:
def from_data(cls, schema: Schema, section: str, data: Any) -> Self:
"""Converts any `data` supported by `dlt` `run` method into `dlt source` with a name `section`.`name` and `schema` schema."""
# creates source from various forms of data
if isinstance(data, DltSource):
Expand All @@ -189,7 +189,7 @@ def from_data(cls, section: str, schema: Schema, data: Any) -> Self:
else:
resources = [DltResource.from_data(data)]

return cls(section, schema, resources)
return cls(schema, section, resources)

@property
def name(self) -> str:
Expand Down Expand Up @@ -327,7 +327,7 @@ def state(self) -> StrAny:
def clone(self) -> "DltSource":
"""Creates a deep copy of the source where copies of schema, resources and pipes are created"""
# mind that resources and pipes are cloned when added to the DltResourcesDict in the source constructor
return DltSource(self.section, self.schema.clone(), list(self._resources.values()))
return DltSource(self.schema.clone(), self.section, list(self._resources.values()))

def __iter__(self) -> Iterator[TDataItem]:
"""Opens iterator that yields the data items from all the resources within the source in the same order as in Pipeline class.
Expand Down
6 changes: 3 additions & 3 deletions dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,7 @@ def append_data(data_item: Any) -> None:
# do not set section to prevent source that represent a standalone resource
# to overwrite other standalone resources (ie. parents) in that source
sources.append(
DltSource(effective_schema.name, "", effective_schema, [data_item])
DltSource(effective_schema, "", [data_item])
)
else:
# iterator/iterable/generator
Expand All @@ -881,7 +881,7 @@ def append_data(data_item: Any) -> None:

# add all the appended resources in one source
if resources:
sources.append(DltSource(effective_schema.name, self.pipeline_name, effective_schema, resources))
sources.append(DltSource(effective_schema, self.pipeline_name, resources))

# apply hints and settings
for source in sources:
Expand Down Expand Up @@ -1293,7 +1293,7 @@ def _save_state(self, state: TPipelineState) -> None:
def _extract_state(self, state: TPipelineState) -> TPipelineState:
# this will extract the state into current load package and update the schema with the _dlt_pipeline_state table
# note: the schema will be persisted because the schema saving decorator is over the state manager decorator for extract
state_source = DltSource(self.default_schema.name, self.pipeline_name, self.default_schema, [state_resource(state)])
state_source = DltSource(self.default_schema, self.pipeline_name, [state_resource(state)])
storage = ExtractorStorage(self._normalize_storage_config)
extract_id = extract_with_schema(storage, state_source, _NULL_COLLECTOR, 1, 1)
storage.commit_extract_files(extract_id)
Expand Down
Loading

0 comments on commit 0799197

Please sign in to comment.