From 337f3c598168b9e459752f6e3936264980f36f4a Mon Sep 17 00:00:00 2001 From: liqingping Date: Tue, 18 Feb 2025 16:37:19 +0800 Subject: [PATCH] feat: wait until connect to master or timeout --- gloo/rendezvous/tcp_store.cc | 25 ++++++++++++++++++++----- gloo/rendezvous/tcp_store.h | 6 ++++-- 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/gloo/rendezvous/tcp_store.cc b/gloo/rendezvous/tcp_store.cc index aa3665ef0..5ed1cd5c6 100644 --- a/gloo/rendezvous/tcp_store.cc +++ b/gloo/rendezvous/tcp_store.cc @@ -16,6 +16,7 @@ #include #include #include +#include #ifndef _WIN32 #include @@ -209,11 +210,26 @@ namespace gloo GLOO_THROW(err); } - // connect to server - if (connect(new_server_fd, (struct sockaddr *)&server_address, sizeof(server_address)) < 0) + const auto start = std::chrono::steady_clock::now(); + auto timeout = std::chrono::seconds(timeout_); + while (true) { - auto err = std::string("Connection to server failed: ") + strerror(errno); - GLOO_THROW(err); + // connect to server + if (connect(new_server_fd, (struct sockaddr *)&server_address, sizeof(server_address)) == 0) + { + break; + } + + // check timeout + const auto elapsed = std::chrono::duration_cast( + std::chrono::steady_clock::now() - start); + if (timeout != kNoTimeout && elapsed > timeout) + { + GLOO_THROW_IO_EXCEPTION(GLOO_ERROR_MSG( + "Connection to master timeout for " + std::to_string(timeout_) + " seconds")); + } + /* sleep override */ + std::this_thread::sleep_for(std::chrono::milliseconds(10)); } return new_server_fd; @@ -358,7 +374,6 @@ namespace gloo } /* sleep override */ std::this_thread::sleep_for(std::chrono::milliseconds(10)); - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } } diff --git a/gloo/rendezvous/tcp_store.h b/gloo/rendezvous/tcp_store.h index cf2113705..25f83283a 100644 --- a/gloo/rendezvous/tcp_store.h +++ b/gloo/rendezvous/tcp_store.h @@ -16,6 +16,8 @@ #include #include +#define SOCKET_INIT_TIMEOUT_SECONDS 30 + namespace gloo { namespace rendezvous @@ -24,7 +26,7 @@ namespace gloo class TCPStore : public Store { public: - explicit TCPStore(const std::string &hostname, int port, int world_size, bool is_master, int timeout = 30); + explicit TCPStore(const std::string &hostname, int port, int world_size, bool is_master, int timeout = SOCKET_INIT_TIMEOUT_SECONDS); virtual ~TCPStore(); virtual void set(const std::string &key, const std::vector &data) @@ -64,7 +66,7 @@ namespace gloo std::mutex mtx; - int server_fd; + int server_fd = -1; std::map> data_; };