From f493d0ad9cddeac17e98f4bb449fb9550ccf02a2 Mon Sep 17 00:00:00 2001 From: Dave Date: Wed, 10 Jan 2024 14:49:46 +0100 Subject: [PATCH] send events via queue if not in main thread --- dlt/common/plugins/plugins.py | 3 ++- tests/pipeline/test_schema_contracts.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/dlt/common/plugins/plugins.py b/dlt/common/plugins/plugins.py index af1d187694..784812c8b7 100644 --- a/dlt/common/plugins/plugins.py +++ b/dlt/common/plugins/plugins.py @@ -12,13 +12,14 @@ from dlt.common.configuration.specs.base_configuration import BaseConfiguration import multiprocessing as mp from functools import wraps +import threading def on_main_process(f: TFun) -> TFun: @wraps(f) def _wrap(self: "PluginsContext", *args: Any, **kwargs: Any) -> Any: # send message to shared queue if this is not the main instance - if not self._main: + if not self._main or not threading.main_thread(): self._queue.put((f.__name__, args, kwargs)) return None return f(self, *args, **kwargs) diff --git a/tests/pipeline/test_schema_contracts.py b/tests/pipeline/test_schema_contracts.py index bf2dc1ba31..62a75c8a9c 100644 --- a/tests/pipeline/test_schema_contracts.py +++ b/tests/pipeline/test_schema_contracts.py @@ -186,7 +186,7 @@ def first_row(self) -> Dict[str, Any]: data_item = ( call.data_item if not (isinstance(call.data_item, Iterable) and not isinstance(call.data_item, dict)) - else call.data_item[0] # type: ignore + else call.data_item[0] # type: ignore ) # arrow tables if hasattr(data_item, "num_rows"):