From 90fb1a66ee9d2e5b127c7a00d9b585c0917a4d40 Mon Sep 17 00:00:00 2001 From: Chi-Sheng Liu Date: Thu, 19 Dec 2024 01:06:47 +0800 Subject: [PATCH] [Fix][Core] Wait a while before stopping the ray_print_logs thread to prevent pending logs Closes: ray-project/ray#48701 Signed-off-by: Chi-Sheng Liu --- python/ray/_private/worker.py | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index f7bbd1555ba..1c7e7b88f64 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -948,11 +948,25 @@ def print_logs(self): # probably will not be able to consume the log messages as rapidly # as they are coming in. # This is meaningful only for GCS subscriber. - last_polling_batch_size = 0 + last_polling_batch_size = -1 + consecutive_zero_batch_cnt = 0 job_id_hex = self.current_job_id.hex() while True: - # Exit if we received a signal that we should stop. - if self.threads_stopped.is_set(): + if last_polling_batch_size == 0: + consecutive_zero_batch_cnt += 1 + # Sleep for a while to avoid consecutive_zero_batch_cnt + # increasing too fast. + time.sleep(0.5) + else: + consecutive_zero_batch_cnt = 0 + # Exit if we received a signal that we should stop or if main thread + # is dead. But wait until the polling batch size decreases to 0 and + # remains so for a while before returning; this ensures that there + # are no pending logs. + if ( + self.threads_stopped.is_set() + or not threading.main_thread().is_alive() + ) and consecutive_zero_batch_cnt >= 3: return data = subscriber.poll() @@ -961,11 +975,7 @@ def print_logs(self): last_polling_batch_size = 0 continue - if ( - self._filter_logs_by_job - and data["job"] - and data["job"] != job_id_hex - ): + if self._filter_logs_by_job and data["job"] != job_id_hex: last_polling_batch_size = 0 continue @@ -2529,7 +2539,6 @@ def connect( worker.logger_thread = threading.Thread( target=worker.print_logs, name="ray_print_logs" ) - worker.logger_thread.daemon = True worker.logger_thread.start() # Setup tracing here