Skip to content

Commit

Permalink
Always fallback on NullExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
steinitzu committed Oct 27, 2023
1 parent bf8bf52 commit add0724
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 7 deletions.
7 changes: 4 additions & 3 deletions dlt/load/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@


class Load(Runnable[Executor]):
pool: Executor

@with_config(spec=LoaderConfiguration, sections=(known_sections.LOAD,))
def __init__(
Expand All @@ -48,7 +49,7 @@ def __init__(
self.destination = destination
self.capabilities = destination.capabilities()
self.staging_destination = staging_destination
self.pool: Executor = NullExecutor()
self.pool = NullExecutor()
self.load_storage: LoadStorage = self.create_storage(is_storage_owner)
self._processed_load_ids: Dict[str, str] = {}
"""Load ids to dataset name"""
Expand Down Expand Up @@ -380,9 +381,9 @@ def load_single_package(self, load_id: str, schema: Schema) -> None:
self.complete_package(load_id, schema, True)
raise

def run(self, pool: Executor) -> TRunMetrics:
def run(self, pool: Optional[Executor]) -> TRunMetrics:
# store pool
self.pool = pool
self.pool = pool or NullExecutor()

logger.info("Running file loading")
# get list of loads and order by name ASC to execute schema updates
Expand Down
9 changes: 5 additions & 4 deletions dlt/normalize/normalize.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os
from typing import Any, Callable, List, Dict, Sequence, Tuple, Set
from typing import Any, Callable, List, Dict, Sequence, Tuple, Set, Optional
from concurrent.futures import Future, ProcessPoolExecutor, Executor
# from multiprocessing.pool import AsyncResult, Pool as ProcessPool

Expand Down Expand Up @@ -35,12 +35,13 @@


class Normalize(Runnable[Executor]):
pool: Executor
@with_config(spec=NormalizeConfiguration, sections=(known_sections.NORMALIZE,))
def __init__(self, collector: Collector = NULL_COLLECTOR, schema_storage: SchemaStorage = None, config: NormalizeConfiguration = config.value) -> None:
self.config = config
self.collector = collector
self.normalize_storage: NormalizeStorage = None
self.pool: Executor = NullExecutor()
self.pool = NullExecutor()
self.load_storage: LoadStorage = None
self.schema_storage: SchemaStorage = None
self._row_counts: TRowCount = {}
Expand Down Expand Up @@ -279,9 +280,9 @@ def spool_schema_files(self, load_id: str, schema_name: str, files: Sequence[str

return load_id

def run(self, pool: Executor) -> TRunMetrics:
def run(self, pool: Optional[Executor]) -> TRunMetrics:
# keep the pool in class instance
self.pool = pool
self.pool = pool or NullExecutor()
self._row_counts = {}
logger.info("Running file normalizing")
# list files and group by schema name, list must be sorted for group by to actually work
Expand Down

0 comments on commit add0724

Please sign in to comment.