diff --git a/dlt/normalize/normalize.py b/dlt/normalize/normalize.py index 6242c963f1..b7d4c6916a 100644 --- a/dlt/normalize/normalize.py +++ b/dlt/normalize/normalize.py @@ -48,7 +48,7 @@ class Normalize(Runnable[Executor], WithStepInfo[NormalizeMetrics, NormalizeInfo @with_config(spec=NormalizeConfiguration, sections=(known_sections.NORMALIZE,)) def __init__( self, - extracted_count: int, + extracted_count: Optional[int] = None, collector: Collector = NULL_COLLECTOR, schema_storage: SchemaStorage = None, config: NormalizeConfiguration = config.value, @@ -141,7 +141,7 @@ def map_parallel(self, schema: Schema, load_id: str, files: Sequence[str]) -> TW # schedule the task again schema_dict = schema.to_dict() # TODO: it's time for a named tuple - params = params[:3] + (schema_dict,) + params[4:] + (self.collector,) + params = params[:3] + (schema_dict,) + params[4:] retry_pending: Future[TWorkerRV] = self.pool.submit( w_normalize_files, *params ) diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index 6c4cb7cf9d..862a0a6f5f 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -722,8 +722,12 @@ def run( schema_contract=schema_contract, refresh=refresh or self.refresh, ) - self.normalize(loader_file_format=loader_file_format, extracted_count=extract_info.total_rows_count if extract_info else None) + self.normalize( + loader_file_format=loader_file_format, + extracted_count=extract_info.total_rows_count if extract_info else None, + ) return self.load(destination, dataset_name, credentials=credentials) + return None @with_schemas_sync def sync_destination(