Skip to content

Commit

Permalink
[Fix][Core] Wait a while before stopping the ray_print_logs thread to…
Browse files Browse the repository at this point in the history
… prevent pending logs

Closes: ray-project#48701
Signed-off-by: Chi-Sheng Liu <[email protected]>
  • Loading branch information
MortalHappiness committed Dec 18, 2024
1 parent 72a8c22 commit 2586dc1
Showing 1 changed file with 16 additions and 9 deletions.
25 changes: 16 additions & 9 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -948,11 +948,23 @@ def print_logs(self):
# probably will not be able to consume the log messages as rapidly
# as they are coming in.
# This is meaningful only for GCS subscriber.
last_polling_batch_size = 0
last_polling_batch_size = -1
consecutive_zero_batch_cnt = 0
job_id_hex = self.current_job_id.hex()
while True:
# Exit if we received a signal that we should stop.
if self.threads_stopped.is_set():
if last_polling_batch_size == 0:
consecutive_zero_batch_cnt += 1
# Sleep for a while to avoid consecutive_zero_batch_cnt increasing too fast.
time.sleep(0.5)
else:
consecutive_zero_batch_cnt = 0
# Only returns when main thread is dead.
# But wait until the polling batch size decreases to 0 and remains so
# for a while before returning; this ensures that there are no pending logs.
if (
not threading.main_thread().is_alive()
and consecutive_zero_batch_cnt >= 3
):
return

data = subscriber.poll()
Expand All @@ -961,11 +973,7 @@ def print_logs(self):
last_polling_batch_size = 0
continue

if (
self._filter_logs_by_job
and data["job"]
and data["job"] != job_id_hex
):
if self._filter_logs_by_job and data["job"] != job_id_hex:
last_polling_batch_size = 0
continue

Expand Down Expand Up @@ -2529,7 +2537,6 @@ def connect(
worker.logger_thread = threading.Thread(
target=worker.print_logs, name="ray_print_logs"
)
worker.logger_thread.daemon = True
worker.logger_thread.start()

# Setup tracing here
Expand Down

0 comments on commit 2586dc1

Please sign in to comment.