Skip to content

Commit

Permalink
rename nullexecutor to a less confusing name
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Jan 10, 2024
1 parent 8f36b67 commit 7501224
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 13 deletions.
4 changes: 2 additions & 2 deletions dlt/common/runners/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from .pool_runner import run_pool, NullExecutor
from .pool_runner import run_pool, CurrentThreadExecutor
from .runnable import Runnable, workermethod, TExecutor
from .typing import TRunMetrics
from .venv import Venv, VenvNotFound


__all__ = [
"run_pool",
"NullExecutor",
"CurrentThreadExecutor",
"Runnable",
"workermethod",
"TExecutor",
Expand Down
6 changes: 3 additions & 3 deletions dlt/common/runners/pool_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
P = ParamSpec("P")


class NullExecutor(Executor):
"""Dummy executor that runs jobs single-threaded.
class CurrentThreadExecutor(Executor):
"""Executor that runs jobs single-threaded on the current thread.
Provides a uniform interface for `None` pool type
"""
Expand Down Expand Up @@ -57,7 +57,7 @@ def create_pool(config: PoolRunnerConfiguration) -> Executor:
max_workers=config.workers, thread_name_prefix=Container.thread_pool_prefix()
)
# no pool - single threaded
return NullExecutor()
return CurrentThreadExecutor()


def run_pool(
Expand Down
6 changes: 3 additions & 3 deletions dlt/load/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from dlt.common.pipeline import LoadInfo, LoadMetrics, SupportsPipeline, WithStepInfo
from dlt.common.schema.utils import get_child_tables, get_top_level_table
from dlt.common.storages.load_storage import LoadPackageInfo, ParsedLoadJobFileName, TJobState
from dlt.common.runners import TRunMetrics, Runnable, workermethod, NullExecutor
from dlt.common.runners import TRunMetrics, Runnable, workermethod, CurrentThreadExecutor
from dlt.common.runtime.collector import Collector, NULL_COLLECTOR
from dlt.common.runtime.logger import pretty_format_exception
from dlt.common.exceptions import (
Expand Down Expand Up @@ -68,7 +68,7 @@ def __init__(
self.destination = destination
self.capabilities = destination.capabilities()
self.staging_destination = staging_destination
self.pool = NullExecutor()
self.pool = CurrentThreadExecutor()
self.load_storage: LoadStorage = self.create_storage(is_storage_owner)
self._loaded_packages: List[LoadPackageInfo] = []
super().__init__()
Expand Down Expand Up @@ -541,7 +541,7 @@ def load_single_package(self, load_id: str, schema: Schema) -> None:

def run(self, pool: Optional[Executor]) -> TRunMetrics:
# store pool
self.pool = pool or NullExecutor()
self.pool = pool or CurrentThreadExecutor()

logger.info("Running file loading")
# get list of loads and order by name ASC to execute schema updates
Expand Down
6 changes: 3 additions & 3 deletions dlt/normalize/normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from dlt.common.data_writers import DataWriterMetrics
from dlt.common.data_writers.writers import EMPTY_DATA_WRITER_METRICS
from dlt.common.destination import TLoaderFileFormat
from dlt.common.runners import TRunMetrics, Runnable, NullExecutor
from dlt.common.runners import TRunMetrics, Runnable, CurrentThreadExecutor
from dlt.common.runtime import signals
from dlt.common.runtime.collector import Collector, NULL_COLLECTOR
from dlt.common.schema.typing import TStoredSchema
Expand Down Expand Up @@ -69,7 +69,7 @@ def __init__(
self.config = config
self.collector = collector
self.normalize_storage: NormalizeStorage = None
self.pool = NullExecutor()
self.pool = CurrentThreadExecutor()
self.load_storage: LoadStorage = None
self.schema_storage: SchemaStorage = None

Expand Down Expand Up @@ -351,7 +351,7 @@ def spool_schema_files(self, load_id: str, schema: Schema, files: Sequence[str])

def run(self, pool: Optional[Executor]) -> TRunMetrics:
# keep the pool in class instance
self.pool = pool or NullExecutor()
self.pool = pool or CurrentThreadExecutor()
logger.info("Running file normalizing")
# list all load packages in extracted folder
load_ids = self.normalize_storage.extracted_packages.list_packages()
Expand Down
3 changes: 1 addition & 2 deletions tests/pipeline/test_schema_contracts.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,7 @@ def source() -> Iterator[DltResource]:
# check items table settings
# assert pipeline.default_schema.tables[ITEMS_TABLE].get("schema_contract", {}) == (settings.get("resource") or {})

# check effective table settings
# assert resolve_contract_settings_for_table(None, "items", pipeline.default_schema) == expand_schema_contract_settings(settings.get("resource") or settings.get("override") or "evolve")
# check effective tablepysct_settings_for_table(None, "items", pipeline.default_schema) == expand_schema_contract_settings(settings.get("resource") or settings.get("override") or "evolve")


class ContractsViolationPlugin(CallbackPlugin[Any]):
Expand Down

0 comments on commit 7501224

Please sign in to comment.