-
Notifications
You must be signed in to change notification settings - Fork 188
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix import schema yaml #1013
fix import schema yaml #1013
Conversation
✅ Deploy Preview for dlt-hub-docs ready!
To edit notification comments on pull requests, go to your Netlify site configuration. |
e79744f
to
f75b072
Compare
@rudolfix i think this got lost during the contracts merge. |
@@ -143,6 +143,13 @@ def _maybe_import_schema(self, name: str, storage_schema: DictStrAny = None) -> | |||
assert rv_schema is not None | |||
return rv_schema | |||
|
|||
def maybe_load_import_schema(self, name: str) -> Schema: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
type hint then will be Optional
@sh-rp I'm almost 100% I've seen this code? is this restoring the old code somehow? this part is tricky but was working correctly before. we need to review this thing together |
@rudolfix Ok, what I think is going on is this:
So the import schema got lost during the transition to the load packages. The suggestion that I made in my pr is, that the import schema gets respected during extraction of a source, but only if there is no other schema present yet. I think this makes the most sense and should have consistent behavior |
85b7c42
to
b30d048
Compare
@rudolfix one additional comment, now the import schema only get used if there is no other schema present. I can change it to always be used which somehow makes sense to me, we should be getting an error when trying to merge it in and has a conflict with an existing schema, so that should be safe. |
…ed schema when no exception. uses load_schema to import schema on extract
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sh-rp import schema is implemented on the level of storage. there's a built in mechanism that
- when loading schema from storage, checks if import schema exists
- loads import schema if it were modified (remembers original imported schema hash)
- let the schema in storage evolve otherwise
I think we lost the import schema long time ago. You are right that normalize lost the ability to import schemas, but it looks to me extract lost that really long time ago.
I restored that (hopefully) by using load_schema, I also do that in a way that changes introduced during extract stay in memory until extact ends and they are comitted in the decorator
tests are amazing. there's not much to add. what we must test above all:
- only changes introduced in extract (so coming from source and resource hints) are present in import schema. normalize changes are not present there!
- second run of the pipeline with other resource hints will not overwrite import schema
- but pipeline schema will evolve freely
- until someone changes the import schema then all gets reset
ddcd49e
to
c3c3e66
Compare
# drop all changes in live schemas | ||
for name in list(self._schema_storage.live_schemas.keys()): | ||
try: | ||
schema = self._schema_storage.load_schema(name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering if we can load schemas in parallel if there are more than N schemas?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sultaniman what is the question exactly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean the cases when there will be dozens or >100 of schemas when calling self._schema_storage.live_schemas.keys()
and was wondering if we should do it in parallel.
dlt/extract/extract.py
Outdated
elif pipeline.default_schema_name: | ||
schema_ = pipeline.schemas[pipeline.default_schema_name].clone() | ||
schema_ = Schema(pipeline.default_schema_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's see if this passes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's issue with detecting changes in committed schemas. but somehow tests are passing?
and we still clone the pipeline schema, but comment days otherwise
raise SchemaNotFoundError(name, f"live-schema://{name}") | ||
try: | ||
stored_schema_json = self._load_schema_json(name) | ||
return live_schema.stored_version_hash == cast( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we must use version_hash
here.
stored_version_hash - is a hash of last commit to storage
version_hash - incorporates all the changes
I wonder how the tests are passing. we must somehow bump schema version (which should only happen when saving)
dlt/extract/extract.py
Outdated
@@ -75,6 +75,7 @@ def choose_schema() -> Schema: | |||
"""Except of explicitly passed schema, use a clone that will get discarded if extraction fails""" | |||
if schema: | |||
schema_ = schema | |||
# if we have a default schema name, use it but do not collect any info from the existing schema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think is is a wrong comment? we still clone pipeline schema
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah yes, i had to revert this but forgot to remove this comment
raise SchemaNotFoundError(name, f"live-schema://{name}") | ||
try: | ||
stored_schema_json = self._load_schema_json(name) | ||
return live_schema.version_hash == cast(str, stored_schema_json.get("version_hash")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is using cast
relevant here, I mean we are not using it for any type hints and detections?
Description
Schemas references by the import_schema_path are completely ignore by the current dlt version which is a bug. This PR makes sure these schemas get loaded if no other schema was discovered locally or in the destination.