Skip to content

Commit

Permalink
minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
IlyaFaer committed Jul 30, 2024
1 parent 057183f commit 37647ef
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 3 deletions.
4 changes: 2 additions & 2 deletions dlt/normalize/normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class Normalize(Runnable[Executor], WithStepInfo[NormalizeMetrics, NormalizeInfo
@with_config(spec=NormalizeConfiguration, sections=(known_sections.NORMALIZE,))
def __init__(
self,
extracted_count: int,
extracted_count: Optional[int] = None,
collector: Collector = NULL_COLLECTOR,
schema_storage: SchemaStorage = None,
config: NormalizeConfiguration = config.value,
Expand Down Expand Up @@ -141,7 +141,7 @@ def map_parallel(self, schema: Schema, load_id: str, files: Sequence[str]) -> TW
# schedule the task again
schema_dict = schema.to_dict()
# TODO: it's time for a named tuple
params = params[:3] + (schema_dict,) + params[4:] + (self.collector,)
params = params[:3] + (schema_dict,) + params[4:]
retry_pending: Future[TWorkerRV] = self.pool.submit(
w_normalize_files, *params
)
Expand Down
6 changes: 5 additions & 1 deletion dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -722,8 +722,12 @@ def run(
schema_contract=schema_contract,
refresh=refresh or self.refresh,
)
self.normalize(loader_file_format=loader_file_format, extracted_count=extract_info.total_rows_count if extract_info else None)
self.normalize(
loader_file_format=loader_file_format,
extracted_count=extract_info.total_rows_count if extract_info else None,
)
return self.load(destination, dataset_name, credentials=credentials)
return None

@with_schemas_sync
def sync_destination(
Expand Down

0 comments on commit 37647ef

Please sign in to comment.