From b6fb333328fbc74f012bb905f34cb9834601e79a Mon Sep 17 00:00:00 2001 From: Steinthor Palsson Date: Mon, 30 Oct 2023 11:54:13 -0400 Subject: [PATCH] Update runnable tests with all methods --- dlt/common/runners/pool_runner.py | 2 -- tests/common/runners/test_runnable.py | 21 +++++++++++++-------- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/dlt/common/runners/pool_runner.py b/dlt/common/runners/pool_runner.py index fde5020522..a0862485cb 100644 --- a/dlt/common/runners/pool_runner.py +++ b/dlt/common/runners/pool_runner.py @@ -40,10 +40,8 @@ def create_pool(config: PoolRunnerConfiguration) -> Executor: # if not fork method, provide initializer for logs and configuration if multiprocessing.get_start_method() != "fork" and init._INITIALIZED: return ProcessPoolExecutor(max_workers=config.workers, initializer=init.initialize_runtime, initargs=(init._RUN_CONFIGURATION,)) - # return Pool(processes=config.workers, initializer=init.initialize_runtime, initargs=(init._RUN_CONFIGURATION, )) else: return ProcessPoolExecutor(max_workers=config.workers) - # return Pool(processes=config.workers) elif config.pool_type == "thread": return ThreadPoolExecutor(max_workers=config.workers) # no pool - single threaded diff --git a/tests/common/runners/test_runnable.py b/tests/common/runners/test_runnable.py index fa3d123f50..eae4a46a70 100644 --- a/tests/common/runners/test_runnable.py +++ b/tests/common/runners/test_runnable.py @@ -14,11 +14,10 @@ @pytest.mark.parametrize('method', ALL_METHODS) def test_runnable_process_pool(method: str) -> None: - multiprocessing.set_start_method(method, force=True) # 4 tasks r = _TestRunnableWorker(4) # create 4 workers - with ProcessPoolExecutor(4) as p: + with ProcessPoolExecutor(4, mp_context=multiprocessing.get_context(method)) as p: rv = r._run(p) p.shutdown() assert len(rv) == 4 @@ -46,11 +45,16 @@ def test_runnable_direct_worker_call() -> None: assert rv[0] == 199 -def test_process_worker_started_early() -> None: - # ProcessPoolExecutor starts lazily so it can be created before tasks - with ProcessPoolExecutor(4) as p: +@pytest.mark.parametrize('method', ALL_METHODS) +def test_process_worker_started_early(method: str) -> None: + with ProcessPoolExecutor(4, mp_context=multiprocessing.get_context(method)) as p: r = _TestRunnableWorkerMethod(4) - r._run(p) + if method == "spawn": + # spawn processes are started upfront, so process pool cannot be started before class instance is created: mapping not exist in worker + with pytest.raises(KeyError): + r._run(p) + else: # With fork method processes are spawned lazily so this order is fine + r._run(p) p.shutdown(wait=True) @@ -67,14 +71,15 @@ def test_weak_pool_ref() -> None: r = wref[rid] -def test_configuredworker() -> None: +@pytest.mark.parametrize('method', ALL_METHODS) +def test_configuredworker(method: str) -> None: # call worker method with CONFIG values that should be restored into CONFIG type config = SchemaStorageConfiguration() config["import_schema_path"] = "test_schema_path" _worker_1(config, "PX1", par2="PX2") # must also work across process boundary - with ProcessPoolExecutor(1) as p: + with ProcessPoolExecutor(1, mp_context=multiprocessing.get_context(method)) as p: p.map(_worker_1, *zip(*[(config, "PX1", "PX2")]))