Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

source and schema changes #769

Merged
merged 8 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class Schema:
_dlt_tables_prefix: str
_stored_version: int # version at load/creation time
_stored_version_hash: str # version hash at load/creation time
_stored_previous_hashes: Optional[List[str]] # list of ancestor hashes of the schema
_imported_version_hash: str # version hash of recently imported schema
_schema_description: str # optional schema description
_schema_tables: TSchemaTables
Expand Down Expand Up @@ -101,7 +102,8 @@ def to_dict(self, remove_defaults: bool = False, bump_version: bool = True) -> T
"name": self._schema_name,
"tables": self._schema_tables,
"settings": self._settings,
"normalizers": self._normalizers_config
"normalizers": self._normalizers_config,
"previous_hashes": self._stored_previous_hashes
}
if self._imported_version_hash and not remove_defaults:
stored_schema["imported_version_hash"] = self._imported_version_hash
Expand Down Expand Up @@ -353,7 +355,7 @@ def bump_version(self) -> Tuple[int, str]:
Returns:
Tuple[int, str]: Current (``stored_version``, ``stored_version_hash``) tuple
"""
self._stored_version, self._stored_version_hash, _ = utils.bump_version_if_modified(self.to_dict(bump_version=False))
self._stored_version, self._stored_version_hash, _, _ = utils.bump_version_if_modified(self.to_dict(bump_version=False))
return self._stored_version, self._stored_version_hash

def filter_row_with_hint(self, table_name: str, hint_type: TColumnHint, row: StrAny) -> StrAny:
Expand Down Expand Up @@ -475,6 +477,11 @@ def version_hash(self) -> str:
"""Current version hash of the schema, recomputed from the actual content"""
return utils.bump_version_if_modified(self.to_dict())[1]

@property
def previous_hashes(self) -> List[str]:
"""Current version hash of the schema, recomputed from the actual content"""
return utils.bump_version_if_modified(self.to_dict())[3]

@property
def stored_version_hash(self) -> str:
"""Version hash of the schema content form the time of schema loading/creation."""
Expand Down Expand Up @@ -663,6 +670,7 @@ def _reset_schema(self, name: str, normalizers: TNormalizersConfig = None) -> No
self._stored_version_hash: str = None
self._imported_version_hash: str = None
self._schema_description: str = None
self._stored_previous_hashes: List[str] = []

self._settings: TSchemaSettings = {}
self._compiled_preferred_types: List[Tuple[REPattern, TDataType]] = []
Expand Down Expand Up @@ -701,6 +709,7 @@ def _from_stored_schema(self, stored_schema: TStoredSchema) -> None:
self._imported_version_hash = stored_schema.get("imported_version_hash")
self._schema_description = stored_schema.get("description")
self._settings = stored_schema.get("settings") or {}
self._stored_previous_hashes = stored_schema.get("previous_hashes")
self._compile_settings()

def _set_schema_name(self, name: str) -> None:
Expand Down
3 changes: 2 additions & 1 deletion dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@


# current version of schema engine
SCHEMA_ENGINE_VERSION = 7
SCHEMA_ENGINE_VERSION = 8

# dlt tables
VERSION_TABLE_NAME = "_dlt_version"
Expand Down Expand Up @@ -123,6 +123,7 @@ class TStoredSchema(TypedDict, total=False):
"""TypeDict defining the schema representation in storage"""
version: int
version_hash: str
previous_hashes: List[str]
imported_version_hash: Optional[str]
engine_version: int
name: str
Expand Down
15 changes: 13 additions & 2 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def add_column_defaults(column: TColumnSchemaBase) -> TColumnSchema:
# return copy(column) # type: ignore


def bump_version_if_modified(stored_schema: TStoredSchema) -> Tuple[int, str, str]:
def bump_version_if_modified(stored_schema: TStoredSchema) -> Tuple[int, str, str, List[str]]:
"""Bumps the `stored_schema` version and version hash if content modified, returns (new version, new hash, old hash) tuple"""
hash_ = generate_version_hash(stored_schema)
previous_hash = stored_schema.get("version_hash")
Expand All @@ -143,8 +143,13 @@ def bump_version_if_modified(stored_schema: TStoredSchema) -> Tuple[int, str, st
pass
elif hash_ != previous_hash:
stored_schema["version"] += 1
# unshift previous hash to previous_hashes and limit array to 10 entries
if previous_hash not in stored_schema["previous_hashes"]:
stored_schema["previous_hashes"].insert(0, previous_hash)
stored_schema["previous_hashes"] = stored_schema["previous_hashes"][:10]

stored_schema["version_hash"] = hash_
return stored_schema["version"], hash_, previous_hash
return stored_schema["version"], hash_, previous_hash, stored_schema["previous_hashes"]


def generate_version_hash(stored_schema: TStoredSchema) -> str:
Expand All @@ -153,6 +158,7 @@ def generate_version_hash(stored_schema: TStoredSchema) -> str:
schema_copy.pop("version")
schema_copy.pop("version_hash", None)
schema_copy.pop("imported_version_hash", None)
schema_copy.pop("previous_hashes", None)
# ignore order of elements when computing the hash
content = json.dumps(schema_copy, sort_keys=True)
h = hashlib.sha3_256(content.encode("utf-8"))
Expand Down Expand Up @@ -240,6 +246,7 @@ def compile_simple_regexes(r: Iterable[TSimpleRegex]) -> REPattern:


def validate_stored_schema(stored_schema: TStoredSchema) -> None:

# use lambda to verify only non extra fields
validate_dict_ignoring_xkeys(
spec=TStoredSchema,
Expand All @@ -256,6 +263,7 @@ def validate_stored_schema(stored_schema: TStoredSchema) -> None:


def migrate_schema(schema_dict: DictStrAny, from_engine: int, to_engine: int) -> TStoredSchema:

if from_engine == to_engine:
return cast(TStoredSchema, schema_dict)

Expand Down Expand Up @@ -349,6 +357,9 @@ def migrate_filters(group: str, filters: List[str]) -> None:
if not table.get("parent"):
table["schema_contract"] = {}
from_engine = 7
if from_engine == 7 and to_engine > 7:
schema_dict["previous_hashes"] = []
from_engine = 8

schema_dict["engine_version"] = from_engine
if from_engine != to_engine:
Expand Down
2 changes: 2 additions & 0 deletions dlt/common/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ def validate_dict(spec: Type[_TypedDict], doc: StrAny, path: str, filter_f: TFil
validator_f (TCustomValidator, optional): A function to perform additional validation
for types not covered by this function. It should return `True` if the validation passes.
Defaults to a function that rejects all such types.
filter_required (TFilterFunc, optional): A function to filter out required fields, useful
for testing historic versions of dict that might now have certain fields yet.

Raises:
DictValidationException: If there are missing required fields, unexpected fields,
Expand Down
9 changes: 3 additions & 6 deletions dlt/extract/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,6 @@ def decorator(f: Callable[TSourceFunParams, Any]) -> Callable[TSourceFunParams,
if name and name != schema.name:
raise ExplicitSourceNameInvalid(name, schema.name)

# the name of the source must be identical to the name of the schema
name = schema.name

# wrap source extraction function in configuration with section
func_module = inspect.getmodule(f)
source_section = section or _get_source_section_name(func_module)
Expand All @@ -167,16 +164,16 @@ def _wrap(*args: Any, **kwargs: Any) -> TDltSourceImpl:
# configurations will be accessed in this section in the source
proxy = Container()[PipelineContext]
pipeline_name = None if not proxy.is_active() else proxy.pipeline().pipeline_name
with inject_section(ConfigSectionContext(pipeline_name=pipeline_name, sections=source_sections, source_state_key=name)):
with inject_section(ConfigSectionContext(pipeline_name=pipeline_name, sections=source_sections, source_state_key=schema.name)):
rv = conf_f(*args, **kwargs)
if rv is None:
raise SourceDataIsNone(name)
raise SourceDataIsNone(schema.name)
# if generator, consume it immediately
if inspect.isgenerator(rv):
rv = list(rv)

# convert to source
s = _impl_cls.from_data(name, source_section, schema.clone(update_normalizers=True), rv)
s = _impl_cls.from_data(schema.clone(update_normalizers=True), source_section, rv)
# apply hints
if max_table_nesting is not None:
s.max_table_nesting = max_table_nesting
Expand Down
19 changes: 9 additions & 10 deletions dlt/extract/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,22 +167,17 @@ class DltSource(Iterable[TDataItem]):
* You can use a `run` method to load the data with a default instance of dlt pipeline.
* You can get source read only state for the currently active Pipeline instance
"""
def __init__(self, name: str, section: str, schema: Schema, resources: Sequence[DltResource] = None) -> None:
self.name = name
def __init__(self, schema: Schema, section: str, resources: Sequence[DltResource] = None) -> None:
self.section = section
"""Tells if iterator associated with a source is exhausted"""
self._schema = schema
self._resources: DltResourceDict = DltResourceDict(self.name, self.section)

if self.name != schema.name:
# raise ValueError(f"Schema name {schema.name} differs from source name {name}! The explicit source name argument is deprecated and will be soon removed.")
warnings.warn(f"Schema name {schema.name} differs from source name {name}! The explicit source name argument is deprecated and will be soon removed.")

if resources:
self.resources.add(*resources)

@classmethod
def from_data(cls, name: str, section: str, schema: Schema, data: Any) -> Self:
def from_data(cls, schema: Schema, section: str, data: Any) -> Self:
"""Converts any `data` supported by `dlt` `run` method into `dlt source` with a name `section`.`name` and `schema` schema."""
# creates source from various forms of data
if isinstance(data, DltSource):
Expand All @@ -194,10 +189,14 @@ def from_data(cls, name: str, section: str, schema: Schema, data: Any) -> Self:
else:
resources = [DltResource.from_data(data)]

return cls(name, section, schema, resources)
return cls(schema, section, resources)

# TODO: 4 properties below must go somewhere else ie. into RelationalSchema which is Schema + Relational normalizer.
@property
def name(self) -> str:
return self._schema.name


# TODO: 4 properties below must go somewhere else ie. into RelationalSchema which is Schema + Relational normalizer.
@property
def max_table_nesting(self) -> int:
"""A schema hint that sets the maximum depth of nested table above which the remaining nodes are loaded as structs or JSON."""
Expand Down Expand Up @@ -328,7 +327,7 @@ def state(self) -> StrAny:
def clone(self) -> "DltSource":
"""Creates a deep copy of the source where copies of schema, resources and pipes are created"""
# mind that resources and pipes are cloned when added to the DltResourcesDict in the source constructor
return DltSource(self.name, self.section, self.schema.clone(), list(self._resources.values()))
return DltSource(self.schema.clone(), self.section, list(self._resources.values()))

def __iter__(self) -> Iterator[TDataItem]:
"""Opens iterator that yields the data items from all the resources within the source in the same order as in Pipeline class.
Expand Down
6 changes: 3 additions & 3 deletions dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,7 @@ def append_data(data_item: Any) -> None:
# do not set section to prevent source that represent a standalone resource
# to overwrite other standalone resources (ie. parents) in that source
sources.append(
DltSource(effective_schema.name, "", effective_schema, [data_item])
DltSource(effective_schema, "", [data_item])
)
else:
# iterator/iterable/generator
Expand All @@ -881,7 +881,7 @@ def append_data(data_item: Any) -> None:

# add all the appended resources in one source
if resources:
sources.append(DltSource(effective_schema.name, self.pipeline_name, effective_schema, resources))
sources.append(DltSource(effective_schema, self.pipeline_name, resources))

# apply hints and settings
for source in sources:
Expand Down Expand Up @@ -1293,7 +1293,7 @@ def _save_state(self, state: TPipelineState) -> None:
def _extract_state(self, state: TPipelineState) -> TPipelineState:
# this will extract the state into current load package and update the schema with the _dlt_pipeline_state table
# note: the schema will be persisted because the schema saving decorator is over the state manager decorator for extract
state_source = DltSource(self.default_schema.name, self.pipeline_name, self.default_schema, [state_resource(state)])
state_source = DltSource(self.default_schema, self.pipeline_name, [state_resource(state)])
storage = ExtractorStorage(self._normalize_storage_config)
extract_id = extract_with_schema(storage, state_source, _NULL_COLLECTOR, 1, 1)
storage.commit_extract_files(extract_id)
Expand Down
Loading
Loading