diff --git a/pyproject.toml b/pyproject.toml index 27e63d53f6..332b2e4102 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,6 +32,7 @@ dependencies = [ "rich >= 13.5.0", "scipy >= 1.10.0", "typer >= 0.9.0", + "tblib >= 3.0.0", ] dynamic = ["version"] diff --git a/src/distilabel/__init__.py b/src/distilabel/__init__.py index 4284b47ca0..d79c100531 100644 --- a/src/distilabel/__init__.py +++ b/src/distilabel/__init__.py @@ -12,8 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from rich.traceback import install +from rich import traceback as rich_traceback +from tblib import pickling_support __version__ = "1.0.0" -install(show_locals=True) +rich_traceback.install(show_locals=True) +pickling_support.install() diff --git a/src/distilabel/pipeline/local.py b/src/distilabel/pipeline/local.py index 4770125fb4..f7909f481b 100644 --- a/src/distilabel/pipeline/local.py +++ b/src/distilabel/pipeline/local.py @@ -17,6 +17,7 @@ import signal import threading import time +import traceback from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union, cast from distilabel.distiset import create_distiset @@ -49,6 +50,8 @@ _STEPS_FINISHED = set() _STEPS_FINISHED_LOCK = threading.Lock() +_SUBPROCESS_EXCEPTION: Union[Exception, None] = None + def _init_worker(queue: "Queue[Any]") -> None: signal.signal(signal.SIGINT, signal.SIG_IGN) @@ -127,7 +130,7 @@ def run( stop_logging() raise RuntimeError( "Failed to load all the steps. Could not run pipeline." - ) + ) from _SUBPROCESS_EXCEPTION # Send the "first" batches to the steps so the batches starts flowing through # the input queues and output queue @@ -471,6 +474,8 @@ def _error_callback(self, e: BaseException) -> None: Args: e: The exception raised by the process. """ + global _SUBPROCESS_EXCEPTION + # First we check that the exception is a `_ProcessWrapperException`, otherwise, we # print it out and stop the pipeline, since some errors may be unhandled if not isinstance(e, _ProcessWrapperException): @@ -482,6 +487,7 @@ def _error_callback(self, e: BaseException) -> None: self._logger.error(f"❌ Failed to load step '{e.step.name}': {e.message}") with self.shared_info[_STEPS_LOADED_LOCK_KEY]: self.shared_info[_STEPS_LOADED_KEY] = [_STEPS_LOADED_ERROR_CODE] + _SUBPROCESS_EXCEPTION = e.subprocess_exception return # If the step is global, is not in the last trophic level and has no successors, @@ -494,11 +500,14 @@ def _error_callback(self, e: BaseException) -> None: self._logger.error( f"✋ An error occurred when running global step '{e.step.name}' with no" " successors and not in the last trophic level. Pipeline execution can" - f" continue. Error will be ignored: {e.message}" + f" continue. Error will be ignored." ) + self._logger.error(f"Subprocess traceback:\n\n{e.formatted_traceback}") return - self._logger.error(f"An error occurred in step '{e.step.name}': {e.message}") + # Global step with successors failed + self._logger.error(f"An error occurred in global step '{e.step.name}'") + self._logger.error(f"Subprocess traceback:\n\n{e.formatted_traceback}") self._cache() self._stop() @@ -575,27 +584,40 @@ class _ProcessWrapperException(Exception): message: The error message. step: The `Step` that raised the error. code: The error code. + subprocess_exception: The exception raised by the subprocess. Defaults to `None`. """ - def __init__(self, message: str, step: "Step", code: int) -> None: + def __init__( + self, + message: str, + step: "Step", + code: int, + subprocess_exception: Optional[Exception] = None, + ) -> None: self.message = message self.step = step self.code = code + self.subprocess_exception = subprocess_exception + self.formatted_traceback = traceback.format_exc() @classmethod def create_load_error( - cls, message: str, step: "Step" + cls, + message: str, + step: "Step", + subprocess_exception: Optional[Exception] = None, ) -> "_ProcessWrapperException": """Creates a `_ProcessWrapperException` for a load error. Args: message: The error message. step: The `Step` that raised the error. + subprocess_exception: The exception raised by the subprocess. Defaults to `None`. Returns: The `_ProcessWrapperException` instance. """ - return cls(message, step, 1) + return cls(message, step, 1, subprocess_exception) @property def is_load_error(self) -> bool: @@ -666,7 +688,9 @@ def run(self) -> str: self.step.load() self.step._logger.debug(f"Step '{self.step.name}' loaded!") except Exception as e: - raise _ProcessWrapperException.create_load_error(str(e), self.step) from e + raise _ProcessWrapperException.create_load_error( + str(e), self.step, e + ) from e self._notify_load() @@ -735,7 +759,7 @@ def _generator_step_process_loop(self) -> None: ) return except Exception as e: - raise _ProcessWrapperException(str(e), self.step, 2) from e + raise _ProcessWrapperException(str(e), self.step, 2, e) from e def _non_generator_process_loop(self) -> None: """Runs the process loop for a non-generator step. It will call the `process` @@ -771,13 +795,16 @@ def _non_generator_process_loop(self) -> None: result = next(self.step.process_applying_mappings(batch.data[0])) except Exception as e: if self.step.is_global: - raise _ProcessWrapperException(str(e), self.step, 2) from e + raise _ProcessWrapperException(str(e), self.step, 2, e) from e # if the step is not global then we can skip the batch which means sending # an empty batch to the output queue self.step._logger.warning( - f"⚠️ Processing batch {batch.seq_no} with step '{self.step.name}' failed:" - f" {e}. Sending empty batch..." + f"⚠️ Processing batch {batch.seq_no} with step '{self.step.name}' failed." + " Sending empty batch..." + ) + self.step._logger.warning( + f"Subprocess traceback:\n\n{traceback.format_exc()}" ) finally: batch.data = [result]