Skip to content

Commit

Permalink
send events via queue if not in main thread
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Jan 10, 2024
1 parent 6ceeb15 commit f493d0a
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 2 deletions.
3 changes: 2 additions & 1 deletion dlt/common/plugins/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@
from dlt.common.configuration.specs.base_configuration import BaseConfiguration
import multiprocessing as mp
from functools import wraps
import threading


def on_main_process(f: TFun) -> TFun:
@wraps(f)
def _wrap(self: "PluginsContext", *args: Any, **kwargs: Any) -> Any:
# send message to shared queue if this is not the main instance
if not self._main:
if not self._main or not threading.main_thread():
self._queue.put((f.__name__, args, kwargs))
return None
return f(self, *args, **kwargs)
Expand Down
2 changes: 1 addition & 1 deletion tests/pipeline/test_schema_contracts.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def first_row(self) -> Dict[str, Any]:
data_item = (
call.data_item
if not (isinstance(call.data_item, Iterable) and not isinstance(call.data_item, dict))
else call.data_item[0] # type: ignore
else call.data_item[0] # type: ignore
)
# arrow tables
if hasattr(data_item, "num_rows"):
Expand Down

0 comments on commit f493d0a

Please sign in to comment.