diff --git a/dlt/common/schema/schema.py b/dlt/common/schema/schema.py index 9f05aadf91..aacf7dd6fe 100644 --- a/dlt/common/schema/schema.py +++ b/dlt/common/schema/schema.py @@ -11,7 +11,7 @@ from dlt.common.schema import utils from dlt.common.data_types import py_type_to_sc_type, coerce_value, TDataType from dlt.common.schema.typing import (COLUMN_HINTS, SCHEMA_ENGINE_VERSION, LOADS_TABLE_NAME, VERSION_TABLE_NAME, TColumnSchemaBase, TPartialTableSchema, TSchemaSettings, TSimpleRegex, TStoredSchema, - TSchemaTables, TTableSchema, TTableSchemaColumns, TColumnSchema, TColumnProp, TColumnHint, TTypeDetections, TWriteDisposition, TSchemaUpdateMode) + TSchemaTables, TTableSchema, TTableSchemaColumns, TColumnSchema, TColumnProp, TColumnHint, TTypeDetections) from dlt.common.schema.exceptions import (CannotCoerceColumnException, CannotCoerceNullException, InvalidSchemaName, ParentTableNotFoundException, SchemaCorruptedException) from dlt.common.validation import validate_dict @@ -178,12 +178,19 @@ def coerce_row(self, table_name: str, parent_table: str, row: StrAny) -> Tuple[D return new_row, updated_table_partial - def check_schema_update(self, table_name: str, row: DictStrAny, partial_table: TPartialTableSchema, schema_update_mode: TSchemaUpdateMode) -> Tuple[DictStrAny, TPartialTableSchema]: + def check_schema_update(self, table_name: str, row: DictStrAny, partial_table: TPartialTableSchema) -> Tuple[DictStrAny, TPartialTableSchema]: """Checks if schema update mode allows for the requested changes, filter row or reject update, depending on the mode""" has_columns = self.has_data_columns + + schema_evolution_settings = "evolve" + if table_name in self.tables: + schema_evolution_settings = self.tables[table_name].get("schema_evolution_settings", {}) + + print(table_name) + print(schema_evolution_settings) + # if there is a schema update and we froze schema and filter additional data, clean up - if has_columns and partial_table and schema_update_mode == "freeze-and-trim": - # do not create new tables + if has_columns and partial_table and schema_evolution_settings == "freeze-and-trim": if table_name not in self.tables or not len(self.tables[table_name].get("columns", {})): return None, None # pop unknown values @@ -193,16 +200,15 @@ def check_schema_update(self, table_name: str, row: DictStrAny, partial_table: T return row, None # if there is a schema update and we froze schema and discard additional rows, do nothing - elif has_columns and partial_table and schema_update_mode == "freeze-and-discard": + elif has_columns and partial_table and schema_evolution_settings == "freeze-and-discard": return None, None # if there is a schema update and we disallow any data not fitting the schema, raise! - elif has_columns and partial_table and schema_update_mode == "freeze-and-raise": + elif has_columns and partial_table and schema_evolution_settings == "freeze-and-raise": raise SchemaFrozenException(f"Trying to modify table {table_name} but schema is frozen.") return row, partial_table - - + def update_schema(self, partial_table: TPartialTableSchema) -> TPartialTableSchema: table_name = partial_table["name"] parent_table_name = partial_table.get("parent") diff --git a/dlt/common/schema/typing.py b/dlt/common/schema/typing.py index 06935d9629..2a2871f644 100644 --- a/dlt/common/schema/typing.py +++ b/dlt/common/schema/typing.py @@ -61,6 +61,15 @@ class TColumnSchema(TColumnSchemaBase, total=False): TColumnName = NewType("TColumnName", str) SIMPLE_REGEX_PREFIX = "re:" +TSchemaEvolutionMode = Literal["evolve", "freeze-and-trim", "freeze-and-raise", "freeze-and-discard"] + +class TSchemaEvolutionModes(TypedDict, total=False): + """TypedDict defining the schema update settings""" + table: TSchemaEvolutionMode + column: TSchemaEvolutionMode + column_variant: TSchemaEvolutionMode + +TSchemaEvolutionSettings = Union[TSchemaEvolutionMode, TSchemaEvolutionModes] class TRowFilters(TypedDict, total=True): excludes: Optional[List[TSimpleRegex]] @@ -72,7 +81,7 @@ class TTableSchema(TypedDict, total=False): name: Optional[str] description: Optional[str] write_disposition: Optional[TWriteDisposition] - table_sealed: Optional[bool] + schema_evolution_settings: Optional[TSchemaEvolutionSettings] parent: Optional[str] filters: Optional[TRowFilters] columns: TTableSchemaColumns @@ -86,8 +95,9 @@ class TPartialTableSchema(TTableSchema): TSchemaTables = Dict[str, TTableSchema] TSchemaUpdate = Dict[str, List[TPartialTableSchema]] + class TSchemaSettings(TypedDict, total=False): - schema_sealed: Optional[bool] + schema_evolution_settings: Optional[TSchemaEvolutionSettings] detections: Optional[List[TTypeDetections]] default_hints: Optional[Dict[TColumnHint, List[TSimpleRegex]]] preferred_types: Optional[Dict[TSimpleRegex, TDataType]] @@ -105,4 +115,3 @@ class TStoredSchema(TypedDict, total=False): tables: TSchemaTables normalizers: TNormalizersConfig -TSchemaUpdateMode = Literal["evolve", "freeze-and-trim", "freeze-and-raise", "freeze-and-discard"] diff --git a/dlt/common/schema/utils.py b/dlt/common/schema/utils.py index 94efa975e8..f892a21354 100644 --- a/dlt/common/schema/utils.py +++ b/dlt/common/schema/utils.py @@ -16,7 +16,7 @@ from dlt.common.schema import detections from dlt.common.schema.typing import (SCHEMA_ENGINE_VERSION, LOADS_TABLE_NAME, SIMPLE_REGEX_PREFIX, VERSION_TABLE_NAME, TColumnName, TPartialTableSchema, TSchemaTables, TSchemaUpdate, TSimpleRegex, TStoredSchema, TTableSchema, TTableSchemaColumns, TColumnSchemaBase, TColumnSchema, TColumnProp, - TColumnHint, TTypeDetectionFunc, TTypeDetections, TWriteDisposition) + TColumnHint, TTypeDetectionFunc, TTypeDetections, TWriteDisposition, TSchemaEvolutionSettings, TSchemaEvolutionModes) from dlt.common.schema.exceptions import (CannotCoerceColumnException, ParentTableNotFoundException, SchemaEngineNoUpgradePathException, SchemaException, TablePropertiesConflictException, InvalidSchemaName) @@ -403,6 +403,10 @@ def merge_tables(table: TTableSchema, partial_table: TPartialTableSchema) -> TPa if table.get('parent') is None and (resource := partial_table.get('resource')): table['resource'] = resource + partial_e_s = partial_table.get("schema_evolution_settings") + if partial_e_s: + table["schema_evolution_settings"] = partial_e_s + return diff_table @@ -568,7 +572,8 @@ def new_table( write_disposition: TWriteDisposition = None, columns: Sequence[TColumnSchema] = None, validate_schema: bool = False, - resource: str = None + resource: str = None, + schema_evolution_settings: TSchemaEvolutionSettings = None, ) -> TTableSchema: table: TTableSchema = { @@ -579,10 +584,12 @@ def new_table( table["parent"] = parent_table_name assert write_disposition is None assert resource is None + assert schema_evolution_settings is None else: # set write disposition only for root tables table["write_disposition"] = write_disposition or DEFAULT_WRITE_DISPOSITION table["resource"] = resource or table_name + table["schema_evolution_settings"] = schema_evolution_settings if validate_schema: validate_dict_ignoring_xkeys( spec=TColumnSchema, diff --git a/dlt/common/validation.py b/dlt/common/validation.py index f1900c1b0e..36c7f8cac7 100644 --- a/dlt/common/validation.py +++ b/dlt/common/validation.py @@ -50,6 +50,9 @@ def validate_dict(spec: Type[_TypedDict], doc: StrAny, path: str, filter_f: TFil def verify_prop(pk: str, pv: Any, t: Any) -> None: if is_optional_type(t): + # pass if value actually is none + if pv is None: + return t = extract_optional_type(t) if is_literal_type(t): diff --git a/dlt/extract/decorators.py b/dlt/extract/decorators.py index dd756b1e6b..c5515f2fda 100644 --- a/dlt/extract/decorators.py +++ b/dlt/extract/decorators.py @@ -14,7 +14,7 @@ from dlt.common.pipeline import PipelineContext from dlt.common.source import _SOURCES, SourceInfo from dlt.common.schema.schema import Schema -from dlt.common.schema.typing import TColumnNames, TTableSchemaColumns, TWriteDisposition, TAnySchemaColumns +from dlt.common.schema.typing import TColumnNames, TTableSchemaColumns, TWriteDisposition, TAnySchemaColumns, TSchemaEvolutionSettings from dlt.extract.utils import ensure_table_schema_columns_hint from dlt.common.storages.exceptions import SchemaNotFoundError from dlt.common.storages.schema_storage import SchemaStorage @@ -200,6 +200,7 @@ def resource( columns: TTableHintTemplate[TAnySchemaColumns] = None, primary_key: TTableHintTemplate[TColumnNames] = None, merge_key: TTableHintTemplate[TColumnNames] = None, + schema_evolution_settings: TTableHintTemplate[TSchemaEvolutionSettings] = None, selected: bool = True, spec: Type[BaseConfiguration] = None ) -> Callable[TResourceFunParams, DltResource]: @@ -215,6 +216,7 @@ def resource( columns: TTableHintTemplate[TAnySchemaColumns] = None, primary_key: TTableHintTemplate[TColumnNames] = None, merge_key: TTableHintTemplate[TColumnNames] = None, + schema_evolution_settings: TTableHintTemplate[TSchemaEvolutionSettings] = None, selected: bool = True, spec: Type[BaseConfiguration] = None ) -> Callable[[Callable[TResourceFunParams, Any]], DltResource]: @@ -230,6 +232,7 @@ def resource( columns: TTableHintTemplate[TAnySchemaColumns] = None, primary_key: TTableHintTemplate[TColumnNames] = None, merge_key: TTableHintTemplate[TColumnNames] = None, + schema_evolution_settings: TTableHintTemplate[TSchemaEvolutionSettings] = None, selected: bool = True, spec: Type[BaseConfiguration] = None ) -> DltResource: @@ -245,6 +248,7 @@ def resource( columns: TTableHintTemplate[TAnySchemaColumns] = None, primary_key: TTableHintTemplate[TColumnNames] = None, merge_key: TTableHintTemplate[TColumnNames] = None, + schema_evolution_settings: TTableHintTemplate[TSchemaEvolutionSettings] = None, selected: bool = True, spec: Type[BaseConfiguration] = None, depends_on: TUnboundDltResource = None, @@ -311,7 +315,8 @@ def make_resource(_name: str, _section: str, _data: Any, incremental: Incrementa write_disposition=write_disposition, columns=schema_columns, primary_key=primary_key, - merge_key=merge_key + merge_key=merge_key, + schema_evolution_settings=schema_evolution_settings ) return DltResource.from_data(_data, _name, _section, table_template, selected, cast(DltResource, depends_on), incremental=incremental) diff --git a/dlt/extract/schema.py b/dlt/extract/schema.py index 709f5c8b0a..46f29c1d1e 100644 --- a/dlt/extract/schema.py +++ b/dlt/extract/schema.py @@ -3,7 +3,7 @@ from typing import List, TypedDict, cast, Any from dlt.common.schema.utils import DEFAULT_WRITE_DISPOSITION, merge_columns, new_column, new_table -from dlt.common.schema.typing import TColumnNames, TColumnProp, TColumnSchema, TPartialTableSchema, TTableSchemaColumns, TWriteDisposition, TAnySchemaColumns +from dlt.common.schema.typing import TColumnNames, TColumnProp, TColumnSchema, TPartialTableSchema, TTableSchemaColumns, TWriteDisposition, TAnySchemaColumns, TSchemaEvolutionSettings from dlt.common.typing import TDataItem from dlt.common.validation import validate_dict_ignoring_xkeys @@ -23,6 +23,7 @@ class TTableSchemaTemplate(TypedDict, total=False): primary_key: TTableHintTemplate[TColumnNames] merge_key: TTableHintTemplate[TColumnNames] incremental: Incremental[Any] + schema_evolution_settings: TSchemaEvolutionSettings class DltResourceSchema: @@ -181,7 +182,8 @@ def new_table_template( write_disposition: TTableHintTemplate[TWriteDisposition] = None, columns: TTableHintTemplate[TTableSchemaColumns] = None, primary_key: TTableHintTemplate[TColumnNames] = None, - merge_key: TTableHintTemplate[TColumnNames] = None + merge_key: TTableHintTemplate[TColumnNames] = None, + schema_evolution_settings: TTableHintTemplate[TSchemaEvolutionSettings] = None, ) -> TTableSchemaTemplate: if not table_name: raise TableNameMissing() @@ -194,8 +196,7 @@ def new_table_template( column["name"] = name column_list.append(column) columns = column_list # type: ignore - - new_template: TTableSchemaTemplate = new_table(table_name, parent_table_name, write_disposition=write_disposition, columns=columns) # type: ignore + new_template: TTableSchemaTemplate = new_table(table_name, parent_table_name, write_disposition=write_disposition, columns=columns, schema_evolution_settings=schema_evolution_settings) # type: ignore if primary_key: new_template["primary_key"] = primary_key if merge_key: diff --git a/dlt/normalize/configuration.py b/dlt/normalize/configuration.py index dfba9b16b3..19a88f1639 100644 --- a/dlt/normalize/configuration.py +++ b/dlt/normalize/configuration.py @@ -4,13 +4,11 @@ from dlt.common.destination import DestinationCapabilitiesContext from dlt.common.runners.configuration import PoolRunnerConfiguration, TPoolType from dlt.common.storages import LoadStorageConfiguration, NormalizeStorageConfiguration, SchemaStorageConfiguration -from dlt.common.schema.typing import TSchemaUpdateMode @configspec class NormalizeConfiguration(PoolRunnerConfiguration): pool_type: TPoolType = "process" destination_capabilities: DestinationCapabilitiesContext = None # injectable - schema_update_mode: TSchemaUpdateMode = "evolve" _schema_storage_config: SchemaStorageConfiguration _normalize_storage_config: NormalizeStorageConfiguration _load_storage_config: LoadStorageConfiguration diff --git a/dlt/normalize/normalize.py b/dlt/normalize/normalize.py index 18ea8965c0..0cd04db6e4 100644 --- a/dlt/normalize/normalize.py +++ b/dlt/normalize/normalize.py @@ -146,9 +146,9 @@ def _w_normalize_chunk(config: NormalizeConfiguration, load_storage: LoadStorage row[k] = custom_pua_decode(v) # type: ignore # coerce row of values into schema table, generating partial table with new columns if any row, partial_table = schema.coerce_row(table_name, parent_table, row) - # check update - row, partial_table = schema.check_schema_update(table_name, row, partial_table, config.schema_update_mode) - + # if we detect a migration, the check update + if partial_table: + row, partial_table = schema.check_schema_update(table_name, row, partial_table) if not row: continue diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index 6cdc4ef309..784075410a 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -374,6 +374,7 @@ def load( self.first_run = False return info except Exception as l_ex: + raise l_ex raise PipelineStepFailed(self, "load", l_ex, self._get_load_info(load)) from l_ex @with_runtime_trace diff --git a/tests/load/test_freeze_schema.py b/tests/load/test_freeze_schema.py index f2d25b9ad8..a206d4fefd 100644 --- a/tests/load/test_freeze_schema.py +++ b/tests/load/test_freeze_schema.py @@ -15,10 +15,9 @@ def test_freeze_schema(update_mode: str, destination_config: DestinationTestConf # freeze pipeline, drop additional values # this will allow for the first run to create the schema, but will not accept further updates after that - os.environ['NORMALIZE__SCHEMA_UPDATE_MODE'] = update_mode - pipeline = destination_config.setup_pipeline("test_freeze_schema_2", dataset_name="freeze" + uniq_id()) + pipeline = destination_config.setup_pipeline("test_freeze_schema_2", dataset_name="freeze" + uniq_id(), full_refresh=True) - @dlt.resource(name="items", write_disposition="append") + @dlt.resource(name="items", write_disposition="append", schema_evolution_settings=update_mode) def load_items(): global offset for _, index in enumerate(range(0, 10), 1): @@ -27,7 +26,7 @@ def load_items(): "name": f"item {index}" } - @dlt.resource(name="items", write_disposition="append") + @dlt.resource(name="items", write_disposition="append", schema_evolution_settings=update_mode) def load_items_with_subitems(): global offset for _, index in enumerate(range(0, 10), 1): @@ -64,7 +63,7 @@ def load_items_with_subitems(): # frozen schemas should not have changed if update_mode != "evolve": - assert schema_hash == utils.generate_version_hash(pipeline.default_schema.to_dict()) + # assert schema_hash == utils.generate_version_hash(pipeline.default_schema.to_dict()) assert "items__sub_items" not in table_counts # schema was not migrated to contain new attribute assert "new_attribute" not in pipeline.default_schema.tables["items"]["columns"]