Skip to content

Commit

Permalink
source and schema changes (#769)
Browse files Browse the repository at this point in the history
* add schema ancestors

* remove name attribute and init arg from dltsource

* fix 2 tests

* fix statekey related errors

* pr fixes

* revert changes on validate dict

* fix one test
  • Loading branch information
sh-rp authored Nov 21, 2023
1 parent 3fb2c30 commit 28dbba6
Show file tree
Hide file tree
Showing 22 changed files with 620 additions and 93 deletions.
13 changes: 11 additions & 2 deletions dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class Schema:
_dlt_tables_prefix: str
_stored_version: int # version at load/creation time
_stored_version_hash: str # version hash at load/creation time
_stored_previous_hashes: Optional[List[str]] # list of ancestor hashes of the schema
_imported_version_hash: str # version hash of recently imported schema
_schema_description: str # optional schema description
_schema_tables: TSchemaTables
Expand Down Expand Up @@ -101,7 +102,8 @@ def to_dict(self, remove_defaults: bool = False, bump_version: bool = True) -> T
"name": self._schema_name,
"tables": self._schema_tables,
"settings": self._settings,
"normalizers": self._normalizers_config
"normalizers": self._normalizers_config,
"previous_hashes": self._stored_previous_hashes
}
if self._imported_version_hash and not remove_defaults:
stored_schema["imported_version_hash"] = self._imported_version_hash
Expand Down Expand Up @@ -353,7 +355,7 @@ def bump_version(self) -> Tuple[int, str]:
Returns:
Tuple[int, str]: Current (``stored_version``, ``stored_version_hash``) tuple
"""
self._stored_version, self._stored_version_hash, _ = utils.bump_version_if_modified(self.to_dict(bump_version=False))
self._stored_version, self._stored_version_hash, _, _ = utils.bump_version_if_modified(self.to_dict(bump_version=False))
return self._stored_version, self._stored_version_hash

def filter_row_with_hint(self, table_name: str, hint_type: TColumnHint, row: StrAny) -> StrAny:
Expand Down Expand Up @@ -475,6 +477,11 @@ def version_hash(self) -> str:
"""Current version hash of the schema, recomputed from the actual content"""
return utils.bump_version_if_modified(self.to_dict())[1]

@property
def previous_hashes(self) -> List[str]:
"""Current version hash of the schema, recomputed from the actual content"""
return utils.bump_version_if_modified(self.to_dict())[3]

@property
def stored_version_hash(self) -> str:
"""Version hash of the schema content form the time of schema loading/creation."""
Expand Down Expand Up @@ -663,6 +670,7 @@ def _reset_schema(self, name: str, normalizers: TNormalizersConfig = None) -> No
self._stored_version_hash: str = None
self._imported_version_hash: str = None
self._schema_description: str = None
self._stored_previous_hashes: List[str] = []

self._settings: TSchemaSettings = {}
self._compiled_preferred_types: List[Tuple[REPattern, TDataType]] = []
Expand Down Expand Up @@ -701,6 +709,7 @@ def _from_stored_schema(self, stored_schema: TStoredSchema) -> None:
self._imported_version_hash = stored_schema.get("imported_version_hash")
self._schema_description = stored_schema.get("description")
self._settings = stored_schema.get("settings") or {}
self._stored_previous_hashes = stored_schema.get("previous_hashes")
self._compile_settings()

def _set_schema_name(self, name: str) -> None:
Expand Down
3 changes: 2 additions & 1 deletion dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@


# current version of schema engine
SCHEMA_ENGINE_VERSION = 7
SCHEMA_ENGINE_VERSION = 8

# dlt tables
VERSION_TABLE_NAME = "_dlt_version"
Expand Down Expand Up @@ -123,6 +123,7 @@ class TStoredSchema(TypedDict, total=False):
"""TypeDict defining the schema representation in storage"""
version: int
version_hash: str
previous_hashes: List[str]
imported_version_hash: Optional[str]
engine_version: int
name: str
Expand Down
15 changes: 13 additions & 2 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def add_column_defaults(column: TColumnSchemaBase) -> TColumnSchema:
# return copy(column) # type: ignore


def bump_version_if_modified(stored_schema: TStoredSchema) -> Tuple[int, str, str]:
def bump_version_if_modified(stored_schema: TStoredSchema) -> Tuple[int, str, str, List[str]]:
"""Bumps the `stored_schema` version and version hash if content modified, returns (new version, new hash, old hash) tuple"""
hash_ = generate_version_hash(stored_schema)
previous_hash = stored_schema.get("version_hash")
Expand All @@ -143,8 +143,13 @@ def bump_version_if_modified(stored_schema: TStoredSchema) -> Tuple[int, str, st
pass
elif hash_ != previous_hash:
stored_schema["version"] += 1
# unshift previous hash to previous_hashes and limit array to 10 entries
if previous_hash not in stored_schema["previous_hashes"]:
stored_schema["previous_hashes"].insert(0, previous_hash)
stored_schema["previous_hashes"] = stored_schema["previous_hashes"][:10]

stored_schema["version_hash"] = hash_
return stored_schema["version"], hash_, previous_hash
return stored_schema["version"], hash_, previous_hash, stored_schema["previous_hashes"]


def generate_version_hash(stored_schema: TStoredSchema) -> str:
Expand All @@ -153,6 +158,7 @@ def generate_version_hash(stored_schema: TStoredSchema) -> str:
schema_copy.pop("version")
schema_copy.pop("version_hash", None)
schema_copy.pop("imported_version_hash", None)
schema_copy.pop("previous_hashes", None)
# ignore order of elements when computing the hash
content = json.dumps(schema_copy, sort_keys=True)
h = hashlib.sha3_256(content.encode("utf-8"))
Expand Down Expand Up @@ -240,6 +246,7 @@ def compile_simple_regexes(r: Iterable[TSimpleRegex]) -> REPattern:


def validate_stored_schema(stored_schema: TStoredSchema) -> None:

# use lambda to verify only non extra fields
validate_dict_ignoring_xkeys(
spec=TStoredSchema,
Expand All @@ -256,6 +263,7 @@ def validate_stored_schema(stored_schema: TStoredSchema) -> None:


def migrate_schema(schema_dict: DictStrAny, from_engine: int, to_engine: int) -> TStoredSchema:

if from_engine == to_engine:
return cast(TStoredSchema, schema_dict)

Expand Down Expand Up @@ -349,6 +357,9 @@ def migrate_filters(group: str, filters: List[str]) -> None:
if not table.get("parent"):
table["schema_contract"] = {}
from_engine = 7
if from_engine == 7 and to_engine > 7:
schema_dict["previous_hashes"] = []
from_engine = 8

schema_dict["engine_version"] = from_engine
if from_engine != to_engine:
Expand Down
2 changes: 2 additions & 0 deletions dlt/common/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ def validate_dict(spec: Type[_TypedDict], doc: StrAny, path: str, filter_f: TFil
validator_f (TCustomValidator, optional): A function to perform additional validation
for types not covered by this function. It should return `True` if the validation passes.
Defaults to a function that rejects all such types.
filter_required (TFilterFunc, optional): A function to filter out required fields, useful
for testing historic versions of dict that might now have certain fields yet.
Raises:
DictValidationException: If there are missing required fields, unexpected fields,
Expand Down
9 changes: 3 additions & 6 deletions dlt/extract/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,6 @@ def decorator(f: Callable[TSourceFunParams, Any]) -> Callable[TSourceFunParams,
if name and name != schema.name:
raise ExplicitSourceNameInvalid(name, schema.name)

# the name of the source must be identical to the name of the schema
name = schema.name

# wrap source extraction function in configuration with section
func_module = inspect.getmodule(f)
source_section = section or _get_source_section_name(func_module)
Expand All @@ -167,16 +164,16 @@ def _wrap(*args: Any, **kwargs: Any) -> TDltSourceImpl:
# configurations will be accessed in this section in the source
proxy = Container()[PipelineContext]
pipeline_name = None if not proxy.is_active() else proxy.pipeline().pipeline_name
with inject_section(ConfigSectionContext(pipeline_name=pipeline_name, sections=source_sections, source_state_key=name)):
with inject_section(ConfigSectionContext(pipeline_name=pipeline_name, sections=source_sections, source_state_key=schema.name)):
rv = conf_f(*args, **kwargs)
if rv is None:
raise SourceDataIsNone(name)
raise SourceDataIsNone(schema.name)
# if generator, consume it immediately
if inspect.isgenerator(rv):
rv = list(rv)

# convert to source
s = _impl_cls.from_data(name, source_section, schema.clone(update_normalizers=True), rv)
s = _impl_cls.from_data(schema.clone(update_normalizers=True), source_section, rv)
# apply hints
if max_table_nesting is not None:
s.max_table_nesting = max_table_nesting
Expand Down
19 changes: 9 additions & 10 deletions dlt/extract/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,22 +167,17 @@ class DltSource(Iterable[TDataItem]):
* You can use a `run` method to load the data with a default instance of dlt pipeline.
* You can get source read only state for the currently active Pipeline instance
"""
def __init__(self, name: str, section: str, schema: Schema, resources: Sequence[DltResource] = None) -> None:
self.name = name
def __init__(self, schema: Schema, section: str, resources: Sequence[DltResource] = None) -> None:
self.section = section
"""Tells if iterator associated with a source is exhausted"""
self._schema = schema
self._resources: DltResourceDict = DltResourceDict(self.name, self.section)

if self.name != schema.name:
# raise ValueError(f"Schema name {schema.name} differs from source name {name}! The explicit source name argument is deprecated and will be soon removed.")
warnings.warn(f"Schema name {schema.name} differs from source name {name}! The explicit source name argument is deprecated and will be soon removed.")

if resources:
self.resources.add(*resources)

@classmethod
def from_data(cls, name: str, section: str, schema: Schema, data: Any) -> Self:
def from_data(cls, schema: Schema, section: str, data: Any) -> Self:
"""Converts any `data` supported by `dlt` `run` method into `dlt source` with a name `section`.`name` and `schema` schema."""
# creates source from various forms of data
if isinstance(data, DltSource):
Expand All @@ -194,10 +189,14 @@ def from_data(cls, name: str, section: str, schema: Schema, data: Any) -> Self:
else:
resources = [DltResource.from_data(data)]

return cls(name, section, schema, resources)
return cls(schema, section, resources)

# TODO: 4 properties below must go somewhere else ie. into RelationalSchema which is Schema + Relational normalizer.
@property
def name(self) -> str:
return self._schema.name


# TODO: 4 properties below must go somewhere else ie. into RelationalSchema which is Schema + Relational normalizer.
@property
def max_table_nesting(self) -> int:
"""A schema hint that sets the maximum depth of nested table above which the remaining nodes are loaded as structs or JSON."""
Expand Down Expand Up @@ -328,7 +327,7 @@ def state(self) -> StrAny:
def clone(self) -> "DltSource":
"""Creates a deep copy of the source where copies of schema, resources and pipes are created"""
# mind that resources and pipes are cloned when added to the DltResourcesDict in the source constructor
return DltSource(self.name, self.section, self.schema.clone(), list(self._resources.values()))
return DltSource(self.schema.clone(), self.section, list(self._resources.values()))

def __iter__(self) -> Iterator[TDataItem]:
"""Opens iterator that yields the data items from all the resources within the source in the same order as in Pipeline class.
Expand Down
6 changes: 3 additions & 3 deletions dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,7 @@ def append_data(data_item: Any) -> None:
# do not set section to prevent source that represent a standalone resource
# to overwrite other standalone resources (ie. parents) in that source
sources.append(
DltSource(effective_schema.name, "", effective_schema, [data_item])
DltSource(effective_schema, "", [data_item])
)
else:
# iterator/iterable/generator
Expand All @@ -881,7 +881,7 @@ def append_data(data_item: Any) -> None:

# add all the appended resources in one source
if resources:
sources.append(DltSource(effective_schema.name, self.pipeline_name, effective_schema, resources))
sources.append(DltSource(effective_schema, self.pipeline_name, resources))

# apply hints and settings
for source in sources:
Expand Down Expand Up @@ -1293,7 +1293,7 @@ def _save_state(self, state: TPipelineState) -> None:
def _extract_state(self, state: TPipelineState) -> TPipelineState:
# this will extract the state into current load package and update the schema with the _dlt_pipeline_state table
# note: the schema will be persisted because the schema saving decorator is over the state manager decorator for extract
state_source = DltSource(self.default_schema.name, self.pipeline_name, self.default_schema, [state_resource(state)])
state_source = DltSource(self.default_schema, self.pipeline_name, [state_resource(state)])
storage = ExtractorStorage(self._normalize_storage_config)
extract_id = extract_with_schema(storage, state_source, _NULL_COLLECTOR, 1, 1)
storage.commit_extract_files(extract_id)
Expand Down
Loading

0 comments on commit 28dbba6

Please sign in to comment.