diff --git a/kobo/worker/logger.py b/kobo/worker/logger.py index 5cad5c4..a5c3961 100644 --- a/kobo/worker/logger.py +++ b/kobo/worker/logger.py @@ -2,6 +2,7 @@ import threading import time +import os import six @@ -31,6 +32,7 @@ def __init__(self, hub, task_id, *args, **kwargs): self._running = True self._send_time = 0 self._send_data = b"" + self._timeout = int(os.environ.get("KOBO_LOGGING_THREAD_TIMEOUT", 600)) def read_queue(self): out = self._queue.get_nowait() @@ -68,15 +70,26 @@ def run(self): self._send_time = now self._send_data = b"" except Exception: - # Log all caught exceptions. + # Any exception other than an XML-RPC fault may be fatal. It is + # possible that we've encountered a retryable error, such as a + # temporary network disruption between worker and hub. Attempt + # to retry for a bit. + if now - self._send_time <= self._timeout: + continue + + # If the timemout has been exceeded, we can assume we've + # encountered a non-temporary, fatal exception. + # + # Since upload_task_log is apparently not working, we can't get + # this into the task logs, but it should at least be possible + # for this to get into the worker's local log file. if self._logger: msg = "\n".join([ - "Exception in LoggingThread:", + "Fatal error in LoggingThread", kobo.tback.Traceback().get_traceback(), ]) - self._logger.log_error(msg) - - continue + self._logger.log_critical(msg) + raise def write(self, data): """Add data to the queue and set the event for sending queue content.""" diff --git a/tests/test_logger.py b/tests/test_logger.py index d2a85e6..9ef6404 100644 --- a/tests/test_logger.py +++ b/tests/test_logger.py @@ -56,6 +56,33 @@ def test_upload_task_log_after_some_time(self): self.assertFalse(thread.is_alive()) self.assertFalse(thread._running) + # Following test intentionally kills a thread with an exception. + @pytest.mark.filterwarnings("ignore::pytest.PytestUnhandledThreadExceptionWarning") + def test_logs_on_fatal_error(self): + # Set up a logger whose output we'll be able to inspect. + logs = StringIO() + logger = logging.getLogger('TestLoggingThread') + logger.addHandler(logging.StreamHandler(logs)) + kobo_logger = LoggingBase(logger) + + mock_hub = Mock() + mock_hub.upload_task_log.side_effect = RuntimeError("Simulated error") + + thread = LoggingThread(mock_hub, 9999, logger=kobo_logger) + thread.daemon = True + thread.start() + + thread.write('This is a log message!') + # Since we set up a fatal error, we expect the thread to die soon + # despite not calling stop(). + thread.join(10.0) + self.assertFalse(thread.is_alive()) + + # Before dying, it should have written something useful to the logs. + captured = logs.getvalue() + self.assertIn('Fatal error in LoggingThread', captured) + self.assertIn('RuntimeError: Simulated error', captured) + def test_logs_during_temporary_outage(self): # Messages written to the logging thread during a temporary # outage should be uploaded (and not discarded) after the hub