From a648e07594c00bdbdf4371fbe37e17a6f53638bd Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Mon, 30 Oct 2023 21:35:24 +0100 Subject: [PATCH] passes mt context to ProcessPoolExecutor, fixes caching in test_common workflow --- .github/workflows/test_common.yml | 2 +- .github/workflows/test_dbt_cloud.yml | 2 +- dlt/common/runners/pool_runner.py | 20 ++++++++++++++++---- 3 files changed, 18 insertions(+), 6 deletions(-) diff --git a/.github/workflows/test_common.yml b/.github/workflows/test_common.yml index f9f744ffa3..3a148ccf29 100644 --- a/.github/workflows/test_common.yml +++ b/.github/workflows/test_common.yml @@ -61,7 +61,7 @@ jobs: with: # path: ${{ steps.pip-cache.outputs.dir }} path: .venv - key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }} + key: venv-${{ matrix.os }}-${{ matrix.python-versions }}-${{ hashFiles('**/poetry.lock') }} - name: Install dependencies + sentry run: poetry install --no-interaction -E parquet -E pydantic && pip install sentry-sdk diff --git a/.github/workflows/test_dbt_cloud.yml b/.github/workflows/test_dbt_cloud.yml index e365000cd1..2d06ac96ba 100644 --- a/.github/workflows/test_dbt_cloud.yml +++ b/.github/workflows/test_dbt_cloud.yml @@ -58,7 +58,7 @@ jobs: with: # path: ${{ steps.pip-cache.outputs.dir }} path: .venv - key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-dbt-cloud + key: venv-${{ matrix.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-dbt-cloud - name: Install dependencies # install dlt with postgres support diff --git a/dlt/common/runners/pool_runner.py b/dlt/common/runners/pool_runner.py index a0862485cb..6d07498b05 100644 --- a/dlt/common/runners/pool_runner.py +++ b/dlt/common/runners/pool_runner.py @@ -1,6 +1,5 @@ import multiprocessing -from typing import Callable, Union, cast, Any, TypeVar -from multiprocessing.pool import ThreadPool, Pool +from typing import Callable, Union, cast, TypeVar from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor, Future from typing_extensions import ParamSpec @@ -16,6 +15,11 @@ T = TypeVar("T") P = ParamSpec("P") +# if TYPE_CHECKING: +# TItemFuture = Future[Union[TDataItems, DataItemWithMeta]] +# else: +# TItemFuture = Future + class NullExecutor(Executor): """Dummy executor that runs jobs single-threaded. @@ -39,9 +43,17 @@ def create_pool(config: PoolRunnerConfiguration) -> Executor: if config.pool_type == "process": # if not fork method, provide initializer for logs and configuration if multiprocessing.get_start_method() != "fork" and init._INITIALIZED: - return ProcessPoolExecutor(max_workers=config.workers, initializer=init.initialize_runtime, initargs=(init._RUN_CONFIGURATION,)) + return ProcessPoolExecutor( + max_workers=config.workers, + initializer=init.initialize_runtime, + initargs=(init._RUN_CONFIGURATION,), + mp_context=multiprocessing.get_context() + ) else: - return ProcessPoolExecutor(max_workers=config.workers) + return ProcessPoolExecutor( + max_workers=config.workers, + mp_context=multiprocessing.get_context() + ) elif config.pool_type == "thread": return ThreadPoolExecutor(max_workers=config.workers) # no pool - single threaded