diff --git a/utils/proxy-liteserver.cpp b/utils/proxy-liteserver.cpp index 66161fc45..a9baa7595 100644 --- a/utils/proxy-liteserver.cpp +++ b/utils/proxy-liteserver.cpp @@ -43,8 +43,12 @@ #if TD_DARWIN || TD_LINUX #include #endif +#include "td/utils/overloaded.h" + #include #include +#include +#include "td/utils/tl_storers.h" using namespace ton; @@ -252,26 +256,93 @@ class ProxyLiteserver : public td::actor::Actor { } void receive_query(td::BufferSlice data, td::Promise promise) { + // Like in ValidatorManagerImpl::run_ext_query + auto F = fetch_tl_object(data, true); + if (F.is_ok()) { + data = std::move(F.move_as_ok()->data_); + } else { + auto G = fetch_tl_prefix(data, true); + if (G.is_error()) { + promise.set_error(G.move_as_error()); + return; + } + } + + tl_object_ptr wait_mc_seqno_obj; + auto E = fetch_tl_prefix(data, true); + if (E.is_ok()) { + wait_mc_seqno_obj = E.move_as_ok(); + } liteclient::QueryInfo query_info = liteclient::get_query_info(data); ++ls_stats_[query_info.query_id]; - promise = [promise = std::move(promise), query_info, timer = td::Timer()](td::Result R) mutable { + promise = [promise = std::move(promise), query_info, timer = td::Timer(), + wait_mc_seqno = + (wait_mc_seqno_obj ? wait_mc_seqno_obj->seqno_ : 0)](td::Result R) mutable { if (R.is_ok()) { - LOG(INFO) << "Query " << query_info.to_str() << ": OK, time=" << timer.elapsed() - << ", response_size=" << R.ok().size(); + LOG(INFO) << "Query " << query_info.to_str() + << (wait_mc_seqno ? PSTRING() << " (wait seqno " << wait_mc_seqno << ")" : "") + << ": OK, time=" << timer.elapsed() << ", response_size=" << R.ok().size(); promise.set_value(R.move_as_ok()); return; } - LOG(INFO) << "Query " << query_info.to_str() << ": " << R.error(); + LOG(INFO) << "Query " << query_info.to_str() + << (wait_mc_seqno ? PSTRING() << " (wait seqno " << wait_mc_seqno << ")" : "") << ": " << R.error(); promise.set_value(create_serialize_tl_object( R.error().code(), "Gateway error: " + R.error().message().str())); }; - TRY_RESULT_PROMISE(promise, server_idx, select_server(query_info)); + TRY_RESULT_PROMISE(promise, server_idx, select_server(query_info)); Server& server = servers_[server_idx]; - LOG(INFO) << "Sending query " << query_info.to_str() << ", size=" << data.size() << ", to server #" << server_idx - << " (" << server.config.addr.get_ip_str() << ":" << server.config.addr.get_port() << ")"; + LOG(INFO) << "Sending query " << query_info.to_str() + << (wait_mc_seqno_obj ? PSTRING() << " (wait seqno " << wait_mc_seqno_obj->seqno_ << ")" : "") + << ", size=" << data.size() << ", to server #" << server_idx << " (" << server.config.addr.get_ip_str() + << ":" << server.config.addr.get_port() << ")"; + + BlockSeqno wait_mc_seqno = wait_mc_seqno_obj ? wait_mc_seqno_obj->seqno_ : 0; + wait_mc_seqno = std::max(wait_mc_seqno, last_known_masterchain_seqno_); + if (server.last_known_masterchain_seqno < wait_mc_seqno) { + int timeout_ms = wait_mc_seqno_obj ? wait_mc_seqno_obj->timeout_ms_ : 8000; + data = serialize_tl_object(create_tl_object(wait_mc_seqno, timeout_ms), + true, std::move(data)); + } + data = create_serialize_tl_object(std::move(data)); td::actor::send_closure(server.client, &adnl::AdnlExtClient::send_query, "q", std::move(data), - td::Timestamp::in(8.0), std::move(promise)); + td::Timestamp::in(8.0), + [SelfId = actor_id(this), promise = std::move(promise), server_idx, + wait_mc_seqno](td::Result R) mutable { + if (R.is_ok()) { + td::actor::send_closure(SelfId, &ProxyLiteserver::process_query_response, + R.ok().clone(), server_idx, wait_mc_seqno); + } + promise.set_result(std::move(R)); + }); + } + + void process_query_response(td::BufferSlice data, size_t server_idx, BlockSeqno wait_mc_seqno) { + auto F = fetch_tl_object(data, true); + if (F.is_error() || F.ok()->get_id() == lite_api::liteServer_error::ID) { + return; + } + BlockSeqno new_seqno = wait_mc_seqno; + lite_api::downcast_call(*F.ok_ref(), td::overloaded( + [&](lite_api::liteServer_masterchainInfo& f) { + new_seqno = std::max(new_seqno, f.last_->seqno_); + }, + [&](lite_api::liteServer_masterchainInfoExt& f) { + new_seqno = std::max(new_seqno, f.last_->seqno_); + }, + [&](lite_api::liteServer_accountState& f) { + if (f.id_->workchain_ == masterchainId) { + new_seqno = std::max(new_seqno, f.id_->seqno_); + } + }, + [&](auto& obj) {})); + servers_[server_idx].last_known_masterchain_seqno = + std::max(servers_[server_idx].last_known_masterchain_seqno, new_seqno); + if (new_seqno > last_known_masterchain_seqno_) { + last_known_masterchain_seqno_ = new_seqno; + LOG(INFO) << "Last known masterchain seqno = " << new_seqno; + } } void alarm() override { @@ -307,11 +378,15 @@ class ProxyLiteserver : public td::actor::Actor { liteclient::LiteServerConfig config; td::actor::ActorOwn client; bool alive = false; + BlockSeqno last_known_masterchain_seqno = 0; }; std::vector servers_; std::map ls_stats_; // lite_api ID -> count, 0 for unknown + BlockSeqno last_known_masterchain_seqno_ = 0; + tl_object_ptr last_masterchain_info_; + std::string config_file() const { return db_root_ + "/config.json"; }