Skip to content

Commit

Permalink
[Fix][Core] Wait a while before stopping the ray_print_logs thread to…
Browse files Browse the repository at this point in the history
… prevent pending logs

Closes: #48701
Signed-off-by: Chi-Sheng Liu <[email protected]>
  • Loading branch information
MortalHappiness committed Dec 19, 2024
1 parent 5efdc5a commit 588c244
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 6 deletions.
11 changes: 8 additions & 3 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -951,8 +951,14 @@ def print_logs(self):
last_polling_batch_size = 0
job_id_hex = self.current_job_id.hex()
while True:
# Exit if we received a signal that we should stop.
if self.threads_stopped.is_set():
# Exit if we received a signal that we should stop or if main thread
# is dead. But wait until the message queue size decreases to lower
# than or equal to 1 to avoid missing logs.
if (
self.threads_stopped.is_set()
or not threading.main_thread().is_alive()
and subscriber.queue_size <= 1
):
return

data = subscriber.poll()
Expand Down Expand Up @@ -2529,7 +2535,6 @@ def connect(
worker.logger_thread = threading.Thread(
target=worker.print_logs, name="ray_print_logs"
)
worker.logger_thread.daemon = True
worker.logger_thread.start()

# Setup tracing here
Expand Down
5 changes: 5 additions & 0 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2804,6 +2804,11 @@ cdef class _GcsSubscriber:
"""
return self.inner.get().last_batch_size()

@property
def queue_size(self):
"""Size of the message queue of the subscriber."""
return self.inner.get().queue_size()

def close(self):
"""Closes the subscriber and its active subscription."""
with nogil:
Expand Down
4 changes: 3 additions & 1 deletion python/ray/includes/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,9 @@ cdef extern from "ray/gcs/pubsub/gcs_pub_sub.h" nogil:

CRayStatus Subscribe()

int64_t last_batch_size()
int64_t last_batch_size() const

int64_t queue_size() const

CRayStatus PollError(
c_string* key_id, int64_t timeout_ms, CErrorTableData* data)
Expand Down
7 changes: 6 additions & 1 deletion src/ray/gcs/pubsub/gcs_pub_sub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,12 @@ Status PythonGcsSubscriber::Close() {
return Status::OK();
}

int64_t PythonGcsSubscriber::last_batch_size() {
int64_t PythonGcsSubscriber::last_batch_size() const {
absl::MutexLock lock(&mu_);
return last_batch_size_;
}

int64_t PythonGcsSubscriber::queue_size() const {
absl::MutexLock lock(&mu_);
return last_batch_size_;
}
Expand Down
4 changes: 3 additions & 1 deletion src/ray/gcs/pubsub/gcs_pub_sub.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,9 @@ class RAY_EXPORT PythonGcsSubscriber {
/// Closes the subscriber and its active subscription.
Status Close();

int64_t last_batch_size();
int64_t last_batch_size() const;

int64_t queue_size() const;

private:
Status DoPoll(int64_t timeout_ms, rpc::PubMessage *message);
Expand Down

0 comments on commit 588c244

Please sign in to comment.