Skip to content

Commit

Permalink
Update runnable tests with all methods
Browse files Browse the repository at this point in the history
  • Loading branch information
steinitzu committed Oct 30, 2023
1 parent 7279700 commit b6fb333
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 10 deletions.
2 changes: 0 additions & 2 deletions dlt/common/runners/pool_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,8 @@ def create_pool(config: PoolRunnerConfiguration) -> Executor:
# if not fork method, provide initializer for logs and configuration
if multiprocessing.get_start_method() != "fork" and init._INITIALIZED:
return ProcessPoolExecutor(max_workers=config.workers, initializer=init.initialize_runtime, initargs=(init._RUN_CONFIGURATION,))
# return Pool(processes=config.workers, initializer=init.initialize_runtime, initargs=(init._RUN_CONFIGURATION, ))
else:
return ProcessPoolExecutor(max_workers=config.workers)
# return Pool(processes=config.workers)
elif config.pool_type == "thread":
return ThreadPoolExecutor(max_workers=config.workers)
# no pool - single threaded
Expand Down
21 changes: 13 additions & 8 deletions tests/common/runners/test_runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@

@pytest.mark.parametrize('method', ALL_METHODS)
def test_runnable_process_pool(method: str) -> None:
multiprocessing.set_start_method(method, force=True)
# 4 tasks
r = _TestRunnableWorker(4)
# create 4 workers
with ProcessPoolExecutor(4) as p:
with ProcessPoolExecutor(4, mp_context=multiprocessing.get_context(method)) as p:
rv = r._run(p)
p.shutdown()
assert len(rv) == 4
Expand Down Expand Up @@ -46,11 +45,16 @@ def test_runnable_direct_worker_call() -> None:
assert rv[0] == 199


def test_process_worker_started_early() -> None:
# ProcessPoolExecutor starts lazily so it can be created before tasks
with ProcessPoolExecutor(4) as p:
@pytest.mark.parametrize('method', ALL_METHODS)
def test_process_worker_started_early(method: str) -> None:
with ProcessPoolExecutor(4, mp_context=multiprocessing.get_context(method)) as p:
r = _TestRunnableWorkerMethod(4)
r._run(p)
if method == "spawn":
# spawn processes are started upfront, so process pool cannot be started before class instance is created: mapping not exist in worker
with pytest.raises(KeyError):
r._run(p)
else: # With fork method processes are spawned lazily so this order is fine
r._run(p)
p.shutdown(wait=True)


Expand All @@ -67,14 +71,15 @@ def test_weak_pool_ref() -> None:
r = wref[rid]


def test_configuredworker() -> None:
@pytest.mark.parametrize('method', ALL_METHODS)
def test_configuredworker(method: str) -> None:
# call worker method with CONFIG values that should be restored into CONFIG type
config = SchemaStorageConfiguration()
config["import_schema_path"] = "test_schema_path"
_worker_1(config, "PX1", par2="PX2")

# must also work across process boundary
with ProcessPoolExecutor(1) as p:
with ProcessPoolExecutor(1, mp_context=multiprocessing.get_context(method)) as p:
p.map(_worker_1, *zip(*[(config, "PX1", "PX2")]))


Expand Down

0 comments on commit b6fb333

Please sign in to comment.