From dbe51d6d138c9f8ec1acd928916158ce09283718 Mon Sep 17 00:00:00 2001 From: SpyCheese Date: Fri, 16 Aug 2024 10:14:54 +0300 Subject: [PATCH] Status page in blockchain-explorer --- .../blockchain-explorer-query.cpp | 2 +- blockchain-explorer/blockchain-explorer.cpp | 66 +++++++++---- lite-client/ext-client.cpp | 94 +++++++++++++++---- lite-client/ext-client.h | 9 +- 4 files changed, 132 insertions(+), 39 deletions(-) diff --git a/blockchain-explorer/blockchain-explorer-query.cpp b/blockchain-explorer/blockchain-explorer-query.cpp index 1808a3c46..26a6787e1 100644 --- a/blockchain-explorer/blockchain-explorer-query.cpp +++ b/blockchain-explorer/blockchain-explorer-query.cpp @@ -1432,7 +1432,7 @@ void HttpQueryStatus::finish_query() { for (td::uint32 i = 0; i < results_.ips.size(); i++) { A << ""; if (results_.ips[i].is_valid()) { - A << "" << results_.ips[i] << ""; + A << "" << results_.ips[i].get_ip_str() << ":" << results_.ips[i].get_port() << ""; } else { A << "hidden"; } diff --git a/blockchain-explorer/blockchain-explorer.cpp b/blockchain-explorer/blockchain-explorer.cpp index 029c718d6..ca50d5266 100644 --- a/blockchain-explorer/blockchain-explorer.cpp +++ b/blockchain-explorer/blockchain-explorer.cpp @@ -155,8 +155,12 @@ class CoreActor : public CoreActorInterface { td::int32 attempt_ = 0; td::int32 waiting_ = 0; - //void run_queries(); - void got_result(td::uint32 idx, td::int32 attempt, td::Result data); + size_t n_servers_ = 0; + + void run_queries(); + void got_servers_ready(td::int32 attempt, std::vector ready); + void send_ping(td::uint32 idx); + void got_ping_result(td::uint32 idx, td::int32 attempt, td::Result data); void add_result() { if (new_result_) { @@ -175,7 +179,7 @@ class CoreActor : public CoreActorInterface { add_result(); } attempt_ = t; - //run_queries(); + run_queries(); alarm_timestamp() = td::Timestamp::at_unix((attempt_ + 1) * 60); } @@ -450,7 +454,8 @@ class CoreActor : public CoreActorInterface { addrs_.push_back(remote_addr_); servers.push_back(liteclient::LiteServerConfig{ton::adnl::AdnlNodeIdFull{remote_public_key_}, remote_addr_}); } - client_ = liteclient::ExtClient::create(std::move(servers), make_callback()); + n_servers_ = servers.size(); + client_ = liteclient::ExtClient::create(std::move(servers), make_callback(), true); daemon_ = MHD_start_daemon(MHD_USE_SELECT_INTERNALLY, static_cast(http_port_), nullptr, nullptr, &process_http_request, nullptr, MHD_OPTION_NOTIFY_COMPLETED, request_completed, nullptr, MHD_OPTION_THREAD_POOL_SIZE, 16, MHD_OPTION_END); @@ -458,7 +463,46 @@ class CoreActor : public CoreActorInterface { } }; -void CoreActor::got_result(td::uint32 idx, td::int32 attempt, td::Result R) { +void CoreActor::run_queries() { + waiting_ = 0; + new_result_ = std::make_shared(n_servers_, td::Timestamp::at_unix(attempt_ * 60)); + td::actor::send_closure(client_, &liteclient::ExtClient::get_servers_status, + [SelfId = actor_id(this), attempt = attempt_](td::Result> R) { + R.ensure(); + td::actor::send_closure(SelfId, &CoreActor::got_servers_ready, attempt, R.move_as_ok()); + }); +} + +void CoreActor::got_servers_ready(td::int32 attempt, std::vector ready) { + if (attempt != attempt_) { + return; + } + CHECK(ready.size() == n_servers_); + for (td::uint32 i = 0; i < n_servers_; i++) { + if (ready[i]) { + send_ping(i); + } + } + CHECK(waiting_ >= 0); + if (waiting_ == 0) { + add_result(); + } +} + +void CoreActor::send_ping(td::uint32 idx) { + waiting_++; + auto query = ton::create_tl_object(); + auto q = ton::create_tl_object(serialize_tl_object(query, true)); + + auto P = + td::PromiseCreator::lambda([SelfId = actor_id(this), idx, attempt = attempt_](td::Result R) { + td::actor::send_closure(SelfId, &CoreActor::got_ping_result, idx, attempt, std::move(R)); + }); + td::actor::send_closure(client_, &liteclient::ExtClient::send_query_to_server, "query", serialize_tl_object(q, true), + idx, td::Timestamp::in(10.0), std::move(P)); +} + +void CoreActor::got_ping_result(td::uint32 idx, td::int32 attempt, td::Result R) { if (attempt != attempt_) { return; } @@ -499,18 +543,6 @@ void CoreActor::got_result(td::uint32 idx, td::int32 attempt, td::Result(ready_.size(), td::Timestamp::at_unix(attempt_ * 60)); - for (td::uint32 i = 0; i < ready_.size(); i++) { - send_query(i); - } - CHECK(waiting_ >= 0); - if (waiting_ == 0) { - add_result(); - } -}*/ - void CoreActor::send_lite_query(td::BufferSlice query, td::Promise promise) { auto P = td::PromiseCreator::lambda([promise = std::move(promise)](td::Result R) mutable { if (R.is_error()) { diff --git a/lite-client/ext-client.cpp b/lite-client/ext-client.cpp index 0232c28f4..a0e48e64a 100644 --- a/lite-client/ext-client.cpp +++ b/lite-client/ext-client.cpp @@ -22,8 +22,8 @@ namespace liteclient { class ExtClientImpl : public ExtClient { public: - ExtClientImpl(std::vector liteservers, td::unique_ptr callback) - : callback_(std::move(callback)) { + ExtClientImpl(std::vector liteservers, td::unique_ptr callback, bool connect_to_all) + : callback_(std::move(callback)), connect_to_all_(connect_to_all) { CHECK(!liteservers.empty()); servers_.resize(liteservers.size()); for (size_t i = 0; i < servers_.size(); ++i) { @@ -36,15 +36,65 @@ class ExtClientImpl : public ExtClient { LOG(INFO) << "Started ext client, " << servers_.size() << " liteservers"; td::Random::Fast rnd; td::random_shuffle(td::as_mutable_span(servers_), rnd); + server_indices_.resize(servers_.size()); + for (size_t i = 0; i < servers_.size(); ++i) { + server_indices_[servers_[i].idx] = i; + } + + if (connect_to_all_) { + for (size_t i = 0; i < servers_.size(); ++i) { + prepare_server(i, nullptr); + } + } } void send_query(std::string name, td::BufferSlice data, td::Timestamp timeout, td::Promise promise) override { QueryInfo query_info = get_query_info(data); TRY_RESULT_PROMISE(promise, server_idx, select_server(query_info)); + send_query_internal(std::move(name), std::move(data), std::move(query_info), server_idx, timeout, + std::move(promise)); + } + + void send_query_to_server(std::string name, td::BufferSlice data, size_t server_idx, td::Timestamp timeout, + td::Promise promise) override { + if (server_idx >= servers_.size()) { + promise.set_error(td::Status::Error(PSTRING() << "server idx " << server_idx << " is too big")); + return; + } + server_idx = server_indices_[server_idx]; + QueryInfo query_info = get_query_info(data); + prepare_server(server_idx, &query_info); + send_query_internal(std::move(name), std::move(data), std::move(query_info), server_idx, timeout, + std::move(promise)); + } + + void get_servers_status(td::Promise> promise) override { + std::vector status(servers_.size()); + for (const Server& s : servers_) { + status[s.idx] = s.alive; + } + promise.set_result(std::move(status)); + } + + void reset_servers() override { + LOG(INFO) << "Force resetting all liteservers"; + for (Server& server : servers_) { + server.alive = false; + server.timeout = {}; + server.ignore_until = {}; + server.client.reset(); + } + } + + private: + void send_query_internal(std::string name, td::BufferSlice data, QueryInfo query_info, size_t server_idx, + td::Timestamp timeout, td::Promise promise) { auto& server = servers_[server_idx]; CHECK(!server.client.empty()); - alarm_timestamp().relax(server.timeout = td::Timestamp::in(MAX_NO_QUERIES_TIMEOUT)); + if (!connect_to_all_) { + alarm_timestamp().relax(server.timeout = td::Timestamp::in(MAX_NO_QUERIES_TIMEOUT)); + } td::Promise P = [SelfId = actor_id(this), server_idx, promise = std::move(promise)](td::Result R) mutable { if (R.is_error() && @@ -59,17 +109,6 @@ class ExtClientImpl : public ExtClient { std::move(P)); } - void reset_servers() override { - LOG(INFO) << "Force resetting all liteservers"; - for (Server& server : servers_) { - server.alive = false; - server.timeout = {}; - server.ignore_until = {}; - server.client.reset(); - } - } - - private: td::Result select_server(const QueryInfo& query_info) { for (size_t i = 0; i < servers_.size(); ++i) { if (servers_[i].alive && servers_[i].config.accepts_query(query_info)) { @@ -101,12 +140,22 @@ class ExtClientImpl : public ExtClient { if (server_idx == servers_.size()) { return td::Status::Error(PSTRING() << "no liteserver for query " << query_info.to_str()); } + prepare_server(server_idx, &query_info); + return server_idx; + } + + void prepare_server(size_t server_idx, const QueryInfo* query_info) { Server& server = servers_[server_idx]; + if (server.alive) { + return; + } server.alive = true; server.ignore_until = {}; - alarm_timestamp().relax(server.timeout = td::Timestamp::in(MAX_NO_QUERIES_TIMEOUT)); + if (!connect_to_all_) { + alarm_timestamp().relax(server.timeout = td::Timestamp::in(MAX_NO_QUERIES_TIMEOUT)); + } if (!server.client.empty()) { - return server_idx; + return; } class Callback : public ton::adnl::AdnlExtClient::Callback { @@ -124,10 +173,9 @@ class ExtClientImpl : public ExtClient { size_t idx_; }; LOG(INFO) << "Connecting to liteserver #" << server.idx << " (" << server.config.addr.get_ip_str() << ":" - << server.config.addr.get_port() << ") for query " << query_info.to_str(); + << server.config.addr.get_port() << ") for query " << (query_info ? query_info->to_str() : "[none]"); server.client = ton::adnl::AdnlExtClient::create(server.config.adnl_id, server.config.addr, std::make_unique(actor_id(this), server_idx)); - return server_idx; } struct Server { @@ -139,12 +187,17 @@ class ExtClientImpl : public ExtClient { td::Timestamp ignore_until = td::Timestamp::never(); }; std::vector servers_; + std::vector server_indices_; td::unique_ptr callback_; + bool connect_to_all_ = false; static constexpr double MAX_NO_QUERIES_TIMEOUT = 100.0; static constexpr double BAD_SERVER_TIMEOUT = 30.0; void alarm() override { + if (connect_to_all_) { + return; + } for (Server& server : servers_) { if (server.timeout && server.timeout.is_in_past()) { LOG(INFO) << "Closing connection to liteserver #" << server.idx << " (" << server.config.addr.get_ip_str() @@ -168,7 +221,8 @@ td::actor::ActorOwn ExtClient::create(ton::adnl::AdnlNodeIdFull dst, } td::actor::ActorOwn ExtClient::create(std::vector liteservers, - td::unique_ptr callback) { - return td::actor::create_actor("ExtClient", std::move(liteservers), std::move(callback)); + td::unique_ptr callback, bool connect_to_all) { + return td::actor::create_actor("ExtClient", std::move(liteservers), std::move(callback), + connect_to_all); } } // namespace liteclient diff --git a/lite-client/ext-client.h b/lite-client/ext-client.h index adcff9bed..ef4523fd6 100644 --- a/lite-client/ext-client.h +++ b/lite-client/ext-client.h @@ -30,12 +30,19 @@ class ExtClient : public td::actor::Actor { virtual void send_query(std::string name, td::BufferSlice data, td::Timestamp timeout, td::Promise promise) = 0; + virtual void send_query_to_server(std::string name, td::BufferSlice data, size_t server_idx, td::Timestamp timeout, + td::Promise promise) { + promise.set_error(td::Status::Error("not supported")); + } + virtual void get_servers_status(td::Promise> promise) { + promise.set_error(td::Status::Error("not supported")); + } virtual void reset_servers() { } static td::actor::ActorOwn create(ton::adnl::AdnlNodeIdFull dst, td::IPAddress dst_addr, td::unique_ptr callback); static td::actor::ActorOwn create(std::vector liteservers, - td::unique_ptr callback); + td::unique_ptr callback, bool connect_to_all = false); }; } // namespace liteclient