Skip to content

Commit

Permalink
skips saving schemas when it was not modified in extract and normalize
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Mar 24, 2024
1 parent fd60a50 commit d4097ca
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 33 deletions.
18 changes: 12 additions & 6 deletions dlt/normalize/normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,11 +309,16 @@ def spool_files(
f"Table {table_name} has seen data for a first time with load id {load_id}"
)
x_normalizer["seen-data"] = True
logger.info(
f"Saving schema {schema.name} with version {schema.stored_version}:{schema.version}"
)
# schema is updated, save it to schema volume
self.schema_storage.save_schema(schema)
if schema.is_modified:
logger.info(
f"Saving schema {schema.name} with version {schema.stored_version}:{schema.version}"
)
self.schema_storage.save_schema(schema)
else:
logger.info(
f"Schema {schema.name} with version {schema.version} was not modified. Save skipped"
)
# save schema new package
self.load_storage.new_packages.save_schema(load_id, schema)
# save schema updates even if empty
Expand Down Expand Up @@ -376,8 +381,9 @@ def run(self, pool: Optional[Executor]) -> TRunMetrics:
schema = self.normalize_storage.extracted_packages.load_schema(load_id)
# prefer schema from schema storage if it exists
try:
# also import the schema
storage_schema = self.schema_storage.load_schema(schema.name)
# use live schema instance via getter if on live storage, it will also do import
# schema as live schemas are committed before calling normalize
storage_schema = self.schema_storage[schema.name]
if schema.stored_version_hash != storage_schema.stored_version_hash:
logger.warning(
f"When normalizing package {load_id} with schema {schema.name}: the storage"
Expand Down
34 changes: 16 additions & 18 deletions dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@
PIPELINE_STATE_ENGINE_VERSION,
bump_pipeline_state_version_if_modified,
load_pipeline_state_from_destination,
mark_state_extracted,
migrate_pipeline_state,
state_resource,
json_encode_state,
Expand Down Expand Up @@ -172,7 +173,7 @@ def _wrap(self: "Pipeline", *args: Any, **kwargs: Any) -> Any:
for name in list(self._schema_storage.live_schemas.keys()):
try:
schema = self._schema_storage.load_schema(name)
self._schema_storage.update_live_schema(schema, can_create_new=False)
schema.replace_schema_content(schema, link_to_replaced_schema=False)
except FileNotFoundError:
# no storage schema yet so pop live schema (created in call to f)
self._schema_storage.live_schemas.pop(name, None)
Expand All @@ -182,9 +183,10 @@ def _wrap(self: "Pipeline", *args: Any, **kwargs: Any) -> Any:
else:
# save modified live schemas
for name, schema in self._schema_storage.live_schemas.items():
self._schema_storage.commit_live_schema(name)
# also save import schemas only here
self._schema_storage.save_import_schema_if_not_exists(schema)
# only now save the schema, already linked to itself if saved as import schema
self._schema_storage.commit_live_schema(name)
# refresh list of schemas if any new schemas are added
self.schema_names = self._list_schemas_sorted()
return rv
Expand Down Expand Up @@ -488,7 +490,6 @@ def normalize(
) from n_ex

@with_runtime_trace(send_state=True)
@with_schemas_sync
@with_state_sync()
@with_config_section((known_sections.LOAD,))
def load(
Expand Down Expand Up @@ -725,8 +726,7 @@ def sync_destination(
# set the pipeline props from merged state
self._state_to_props(state)
# add that the state is already extracted
state["_local"]["_last_extracted_hash"] = state["_version_hash"]
state["_local"]["_last_extracted_at"] = pendulum.now()
mark_state_extracted(state, state["_version_hash"])
# on merge schemas are replaced so we delete all old versions
self._schema_storage.clear_storage()
for schema in restored_schemas:
Expand Down Expand Up @@ -1054,15 +1054,11 @@ def _extract_source(
# discover the existing pipeline schema
try:
# all live schemas are initially committed and during the extract will accumulate changes in memory
# if schema is committed try to take schema from storage
if self._schema_storage.is_live_schema_committed(source.schema.name):
# this will (1) save live schema if modified (2) look for import schema if present
# (3) load import schema an overwrite pipeline schema if import schema modified
# (4) load pipeline schema if no import schema is present
pipeline_schema = self.schemas.load_schema(source.schema.name)
else:
# if schema is not committed we know we are in process of extraction
pipeline_schema = self.schemas[source.schema.name]
# line below may create another live schema if source schema is not a part of storage
# this will (1) look for import schema if present
# (2) load import schema an overwrite pipeline schema if import schema modified
# (3) load pipeline schema if no import schema is present
pipeline_schema = self.schemas[source.schema.name]
pipeline_schema = pipeline_schema.clone() # use clone until extraction complete
# apply all changes in the source schema to pipeline schema
# NOTE: we do not apply contracts to changes done programmatically
Expand All @@ -1080,7 +1076,7 @@ def _extract_source(
# self._schema_storage.save_import_schema_if_not_exists(source.schema)

# update live schema but not update the store yet
self._schema_storage.update_live_schema(source.schema)
source.schema = self._schema_storage.set_live_schema(source.schema)

# set as default if this is first schema in pipeline
if not self.default_schema_name:
Expand Down Expand Up @@ -1560,9 +1556,11 @@ def _bump_version_and_extract_state(
extract_ = extract or Extract(
self._schema_storage, self._normalize_storage_config(), original_data=data
)
self._extract_source(extract_, data_to_sources(data, self)[0], 1, 1)
state["_local"]["_last_extracted_at"] = pendulum.now()
state["_local"]["_last_extracted_hash"] = hash_
self._extract_source(
extract_, data_to_sources(data, self, self.default_schema)[0], 1, 1
)
# set state to be extracted
mark_state_extracted(state, hash_)
# commit only if we created storage
if not extract:
extract_.commit_packages()
Expand Down
33 changes: 25 additions & 8 deletions tests/pipeline/test_import_export_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from dlt.common.utils import uniq_id

from tests.pipeline.utils import assert_load_info
from tests.utils import TEST_STORAGE_ROOT
from dlt.common.schema import Schema
from dlt.common.storages.schema_storage import SchemaStorage
Expand Down Expand Up @@ -83,22 +84,30 @@ def test_import_schema_is_respected() -> None:
export_schema_path=EXPORT_SCHEMA_PATH,
)
p.run(EXAMPLE_DATA, table_name="person")
# initial schema + evolved in normalize == version 2
assert p.default_schema.stored_version == 2
assert p.default_schema.tables["person"]["columns"]["id"]["data_type"] == "bigint"
# import schema got saved
import_schema = _get_import_schema(name)
assert "person" in import_schema.tables
# initial schema (after extract) got saved
assert import_schema.stored_version == 1
# import schema hash is set
assert p.default_schema._imported_version_hash == import_schema.version_hash
assert not p.default_schema.is_modified

# take default schema, modify column type and save it to import folder
modified_schema = p.default_schema.clone()
modified_schema.tables["person"]["columns"]["id"]["data_type"] = "text"
with open(os.path.join(IMPORT_SCHEMA_PATH, name + ".schema.yaml"), "w", encoding="utf-8") as f:
f.write(modified_schema.to_pretty_yaml())

# this will provoke a CannotCoerceColumnException
with pytest.raises(PipelineStepFailed) as exc:
p.run(EXAMPLE_DATA, table_name="person")
assert type(exc.value.exception) == CannotCoerceColumnException

# schema is changed
# import schema will be imported into pipeline
p.run(EXAMPLE_DATA, table_name="person")
# again: extract + normalize
assert p.default_schema.stored_version == 3
# change in pipeline schema
assert p.default_schema.tables["person"]["columns"]["id"]["data_type"] == "text"

# import schema is not overwritten
assert _get_import_schema(name).tables["person"]["columns"]["id"]["data_type"] == "text"

Expand All @@ -110,7 +119,15 @@ def test_import_schema_is_respected() -> None:
export_schema_path=EXPORT_SCHEMA_PATH,
full_refresh=True,
)
p.run(EXAMPLE_DATA, table_name="person")
p.extract(EXAMPLE_DATA, table_name="person")
# starts with import schema v 1 that is dirty -> 2
assert p.default_schema.stored_version == 3
p.normalize()
assert p.default_schema.stored_version == 3
info = p.load()
assert_load_info(info)
assert p.default_schema.stored_version == 3

assert p.default_schema.tables["person"]["columns"]["id"]["data_type"] == "text"

# import schema is not overwritten
Expand Down
76 changes: 75 additions & 1 deletion tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -1416,7 +1416,7 @@ def test_drop_with_new_name() -> None:
assert new_pipeline.pipeline_name == new_test_name


def test_remove_autodetect() -> None:
def test_schema_version_increase_and_source_update() -> None:
now = pendulum.now()

@dlt.source
Expand All @@ -1429,12 +1429,81 @@ def autodetect():
)

pipeline = dlt.pipeline(destination="duckdb")
# control version of the schema
auto_source = autodetect()
assert auto_source.schema.stored_version is None
pipeline.extract(auto_source)
# extract did a first save
assert pipeline.default_schema.stored_version == 1
# only one prev hash
assert len(pipeline.default_schema.previous_hashes) == 1
# source schema was updated in the pipeline
assert auto_source.schema.stored_version == 1
# source has pipeline schema
assert pipeline.default_schema is auto_source.schema

pipeline.normalize()
# columns added and schema was saved in between
assert pipeline.default_schema.stored_version == 2
assert len(pipeline.default_schema.previous_hashes) == 2
# source schema still updated
assert auto_source.schema.stored_version == 2
assert pipeline.default_schema is auto_source.schema
pipeline.load()
# nothing changed in load
assert pipeline.default_schema.stored_version == 2
assert pipeline.default_schema is auto_source.schema

# run same source again
pipeline.extract(auto_source)
assert pipeline.default_schema.stored_version == 2
assert pipeline.default_schema is auto_source.schema
pipeline.normalize()
assert pipeline.default_schema.stored_version == 2
pipeline.load()
assert pipeline.default_schema.stored_version == 2

# run another instance of the same source
pipeline.run(autodetect())
assert pipeline.default_schema.stored_version == 2
assert pipeline.default_schema is auto_source.schema
assert "timestamp" in pipeline.default_schema.settings["detections"]

# data has compatible schema with "numbers" but schema is taken from pipeline
pipeline.run([1, 2, 3], table_name="numbers")
assert "timestamp" in pipeline.default_schema.settings["detections"]
assert pipeline.default_schema.stored_version == 2
assert pipeline.default_schema is auto_source.schema

# new table will evolve schema
pipeline.run([1, 2, 3], table_name="seq")
assert "timestamp" in pipeline.default_schema.settings["detections"]
assert pipeline.default_schema.stored_version == 4
assert pipeline.default_schema is auto_source.schema


def test_remove_autodetect() -> None:
now = pendulum.now()

@dlt.source
def autodetect():
# add unix ts autodetection to current source schema
dlt.current.source_schema().add_type_detection("timestamp")
return dlt.resource(
[int(now.timestamp()), int(now.timestamp() + 1), int(now.timestamp() + 2)],
name="numbers",
)

pipeline = dlt.pipeline(destination="duckdb")
auto_source = autodetect()
pipeline.extract(auto_source)
pipeline.normalize()

# unix ts recognized
assert (
pipeline.default_schema.get_table("numbers")["columns"]["value"]["data_type"] == "timestamp"
)
pipeline.load()

pipeline = pipeline.drop()

Expand Down Expand Up @@ -1535,8 +1604,13 @@ def test_pipeline_list_packages() -> None:
)
load_ids = pipeline.list_extracted_load_packages()
assert len(load_ids) == 3
extracted_package = pipeline.get_load_package_info(load_ids[1])
assert extracted_package.schema_name == "airtable_emojis"
extracted_package = pipeline.get_load_package_info(load_ids[2])
assert extracted_package.schema_name == "emojis_2"
extracted_package = pipeline.get_load_package_info(load_ids[0])
assert extracted_package.state == "extracted"
assert extracted_package.schema_name == "airtable_emojis"
# same load id continues till the end
pipeline.normalize()
load_ids_n = pipeline.list_normalized_load_packages()
Expand Down

0 comments on commit d4097ca

Please sign in to comment.