Skip to content

Commit

Permalink
Show information about subprocess exception (#532)
Browse files Browse the repository at this point in the history
* Show information about subprocess exception

* Update logging messages
  • Loading branch information
gabrielmbmb authored Apr 15, 2024
1 parent 5aeda69 commit 7233b1c
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 13 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ dependencies = [
"rich >= 13.5.0",
"scipy >= 1.10.0",
"typer >= 0.9.0",
"tblib >= 3.0.0",
]
dynamic = ["version"]

Expand Down
6 changes: 4 additions & 2 deletions src/distilabel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from rich.traceback import install
from rich import traceback as rich_traceback
from tblib import pickling_support

__version__ = "1.0.0"

install(show_locals=True)
rich_traceback.install(show_locals=True)
pickling_support.install()
49 changes: 38 additions & 11 deletions src/distilabel/pipeline/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import signal
import threading
import time
import traceback
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union, cast

from distilabel.distiset import create_distiset
Expand Down Expand Up @@ -49,6 +50,8 @@
_STEPS_FINISHED = set()
_STEPS_FINISHED_LOCK = threading.Lock()

_SUBPROCESS_EXCEPTION: Union[Exception, None] = None


def _init_worker(queue: "Queue[Any]") -> None:
signal.signal(signal.SIGINT, signal.SIG_IGN)
Expand Down Expand Up @@ -127,7 +130,7 @@ def run(
stop_logging()
raise RuntimeError(
"Failed to load all the steps. Could not run pipeline."
)
) from _SUBPROCESS_EXCEPTION

# Send the "first" batches to the steps so the batches starts flowing through
# the input queues and output queue
Expand Down Expand Up @@ -471,6 +474,8 @@ def _error_callback(self, e: BaseException) -> None:
Args:
e: The exception raised by the process.
"""
global _SUBPROCESS_EXCEPTION

# First we check that the exception is a `_ProcessWrapperException`, otherwise, we
# print it out and stop the pipeline, since some errors may be unhandled
if not isinstance(e, _ProcessWrapperException):
Expand All @@ -482,6 +487,7 @@ def _error_callback(self, e: BaseException) -> None:
self._logger.error(f"❌ Failed to load step '{e.step.name}': {e.message}")
with self.shared_info[_STEPS_LOADED_LOCK_KEY]:
self.shared_info[_STEPS_LOADED_KEY] = [_STEPS_LOADED_ERROR_CODE]
_SUBPROCESS_EXCEPTION = e.subprocess_exception
return

# If the step is global, is not in the last trophic level and has no successors,
Expand All @@ -494,11 +500,14 @@ def _error_callback(self, e: BaseException) -> None:
self._logger.error(
f"✋ An error occurred when running global step '{e.step.name}' with no"
" successors and not in the last trophic level. Pipeline execution can"
f" continue. Error will be ignored: {e.message}"
f" continue. Error will be ignored."
)
self._logger.error(f"Subprocess traceback:\n\n{e.formatted_traceback}")
return

self._logger.error(f"An error occurred in step '{e.step.name}': {e.message}")
# Global step with successors failed
self._logger.error(f"An error occurred in global step '{e.step.name}'")
self._logger.error(f"Subprocess traceback:\n\n{e.formatted_traceback}")
self._cache()
self._stop()

Expand Down Expand Up @@ -575,27 +584,40 @@ class _ProcessWrapperException(Exception):
message: The error message.
step: The `Step` that raised the error.
code: The error code.
subprocess_exception: The exception raised by the subprocess. Defaults to `None`.
"""

def __init__(self, message: str, step: "Step", code: int) -> None:
def __init__(
self,
message: str,
step: "Step",
code: int,
subprocess_exception: Optional[Exception] = None,
) -> None:
self.message = message
self.step = step
self.code = code
self.subprocess_exception = subprocess_exception
self.formatted_traceback = traceback.format_exc()

@classmethod
def create_load_error(
cls, message: str, step: "Step"
cls,
message: str,
step: "Step",
subprocess_exception: Optional[Exception] = None,
) -> "_ProcessWrapperException":
"""Creates a `_ProcessWrapperException` for a load error.
Args:
message: The error message.
step: The `Step` that raised the error.
subprocess_exception: The exception raised by the subprocess. Defaults to `None`.
Returns:
The `_ProcessWrapperException` instance.
"""
return cls(message, step, 1)
return cls(message, step, 1, subprocess_exception)

@property
def is_load_error(self) -> bool:
Expand Down Expand Up @@ -666,7 +688,9 @@ def run(self) -> str:
self.step.load()
self.step._logger.debug(f"Step '{self.step.name}' loaded!")
except Exception as e:
raise _ProcessWrapperException.create_load_error(str(e), self.step) from e
raise _ProcessWrapperException.create_load_error(
str(e), self.step, e
) from e

self._notify_load()

Expand Down Expand Up @@ -735,7 +759,7 @@ def _generator_step_process_loop(self) -> None:
)
return
except Exception as e:
raise _ProcessWrapperException(str(e), self.step, 2) from e
raise _ProcessWrapperException(str(e), self.step, 2, e) from e

def _non_generator_process_loop(self) -> None:
"""Runs the process loop for a non-generator step. It will call the `process`
Expand Down Expand Up @@ -771,13 +795,16 @@ def _non_generator_process_loop(self) -> None:
result = next(self.step.process_applying_mappings(batch.data[0]))
except Exception as e:
if self.step.is_global:
raise _ProcessWrapperException(str(e), self.step, 2) from e
raise _ProcessWrapperException(str(e), self.step, 2, e) from e

# if the step is not global then we can skip the batch which means sending
# an empty batch to the output queue
self.step._logger.warning(
f"⚠️ Processing batch {batch.seq_no} with step '{self.step.name}' failed:"
f" {e}. Sending empty batch..."
f"⚠️ Processing batch {batch.seq_no} with step '{self.step.name}' failed."
" Sending empty batch..."
)
self.step._logger.warning(
f"Subprocess traceback:\n\n{traceback.format_exc()}"
)
finally:
batch.data = [result]
Expand Down

0 comments on commit 7233b1c

Please sign in to comment.