Skip to content

Commit

Permalink
Merge branch 'accelerator' into accelerator-test
Browse files Browse the repository at this point in the history
  • Loading branch information
SpyCheese committed Sep 10, 2024
2 parents 36a09f3 + 9b4a3a8 commit 6b113e3
Showing 1 changed file with 83 additions and 8 deletions.
91 changes: 83 additions & 8 deletions utils/proxy-liteserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,12 @@
#if TD_DARWIN || TD_LINUX
#include <unistd.h>
#endif
#include "td/utils/overloaded.h"

#include <iostream>
#include <map>
#include <auto/tl/lite_api.hpp>
#include "td/utils/tl_storers.h"

using namespace ton;

Expand Down Expand Up @@ -252,26 +256,93 @@ class ProxyLiteserver : public td::actor::Actor {
}

void receive_query(td::BufferSlice data, td::Promise<td::BufferSlice> promise) {
// Like in ValidatorManagerImpl::run_ext_query
auto F = fetch_tl_object<lite_api::liteServer_query>(data, true);
if (F.is_ok()) {
data = std::move(F.move_as_ok()->data_);
} else {
auto G = fetch_tl_prefix<lite_api::liteServer_queryPrefix>(data, true);
if (G.is_error()) {
promise.set_error(G.move_as_error());
return;
}
}

tl_object_ptr<lite_api::liteServer_waitMasterchainSeqno> wait_mc_seqno_obj;
auto E = fetch_tl_prefix<lite_api::liteServer_waitMasterchainSeqno>(data, true);
if (E.is_ok()) {
wait_mc_seqno_obj = E.move_as_ok();
}
liteclient::QueryInfo query_info = liteclient::get_query_info(data);
++ls_stats_[query_info.query_id];
promise = [promise = std::move(promise), query_info, timer = td::Timer()](td::Result<td::BufferSlice> R) mutable {
promise = [promise = std::move(promise), query_info, timer = td::Timer(),
wait_mc_seqno =
(wait_mc_seqno_obj ? wait_mc_seqno_obj->seqno_ : 0)](td::Result<td::BufferSlice> R) mutable {
if (R.is_ok()) {
LOG(INFO) << "Query " << query_info.to_str() << ": OK, time=" << timer.elapsed()
<< ", response_size=" << R.ok().size();
LOG(INFO) << "Query " << query_info.to_str()
<< (wait_mc_seqno ? PSTRING() << " (wait seqno " << wait_mc_seqno << ")" : "")
<< ": OK, time=" << timer.elapsed() << ", response_size=" << R.ok().size();
promise.set_value(R.move_as_ok());
return;
}
LOG(INFO) << "Query " << query_info.to_str() << ": " << R.error();
LOG(INFO) << "Query " << query_info.to_str()
<< (wait_mc_seqno ? PSTRING() << " (wait seqno " << wait_mc_seqno << ")" : "") << ": " << R.error();
promise.set_value(create_serialize_tl_object<lite_api::liteServer_error>(
R.error().code(), "Gateway error: " + R.error().message().str()));
};
TRY_RESULT_PROMISE(promise, server_idx, select_server(query_info));

TRY_RESULT_PROMISE(promise, server_idx, select_server(query_info));
Server& server = servers_[server_idx];
LOG(INFO) << "Sending query " << query_info.to_str() << ", size=" << data.size() << ", to server #" << server_idx
<< " (" << server.config.addr.get_ip_str() << ":" << server.config.addr.get_port() << ")";
LOG(INFO) << "Sending query " << query_info.to_str()
<< (wait_mc_seqno_obj ? PSTRING() << " (wait seqno " << wait_mc_seqno_obj->seqno_ << ")" : "")
<< ", size=" << data.size() << ", to server #" << server_idx << " (" << server.config.addr.get_ip_str()
<< ":" << server.config.addr.get_port() << ")";

BlockSeqno wait_mc_seqno = wait_mc_seqno_obj ? wait_mc_seqno_obj->seqno_ : 0;
wait_mc_seqno = std::max(wait_mc_seqno, last_known_masterchain_seqno_);
if (server.last_known_masterchain_seqno < wait_mc_seqno) {
int timeout_ms = wait_mc_seqno_obj ? wait_mc_seqno_obj->timeout_ms_ : 8000;
data = serialize_tl_object(create_tl_object<lite_api::liteServer_waitMasterchainSeqno>(wait_mc_seqno, timeout_ms),
true, std::move(data));
}
data = create_serialize_tl_object<lite_api::liteServer_query>(std::move(data));
td::actor::send_closure(server.client, &adnl::AdnlExtClient::send_query, "q", std::move(data),
td::Timestamp::in(8.0), std::move(promise));
td::Timestamp::in(8.0),
[SelfId = actor_id(this), promise = std::move(promise), server_idx,
wait_mc_seqno](td::Result<td::BufferSlice> R) mutable {
if (R.is_ok()) {
td::actor::send_closure(SelfId, &ProxyLiteserver::process_query_response,
R.ok().clone(), server_idx, wait_mc_seqno);
}
promise.set_result(std::move(R));
});
}

void process_query_response(td::BufferSlice data, size_t server_idx, BlockSeqno wait_mc_seqno) {
auto F = fetch_tl_object<lite_api::Object>(data, true);
if (F.is_error() || F.ok()->get_id() == lite_api::liteServer_error::ID) {
return;
}
BlockSeqno new_seqno = wait_mc_seqno;
lite_api::downcast_call(*F.ok_ref(), td::overloaded(
[&](lite_api::liteServer_masterchainInfo& f) {
new_seqno = std::max<BlockSeqno>(new_seqno, f.last_->seqno_);
},
[&](lite_api::liteServer_masterchainInfoExt& f) {
new_seqno = std::max<BlockSeqno>(new_seqno, f.last_->seqno_);
},
[&](lite_api::liteServer_accountState& f) {
if (f.id_->workchain_ == masterchainId) {
new_seqno = std::max<BlockSeqno>(new_seqno, f.id_->seqno_);
}
},
[&](auto& obj) {}));
servers_[server_idx].last_known_masterchain_seqno =
std::max(servers_[server_idx].last_known_masterchain_seqno, new_seqno);
if (new_seqno > last_known_masterchain_seqno_) {
last_known_masterchain_seqno_ = new_seqno;
LOG(INFO) << "Last known masterchain seqno = " << new_seqno;
}
}

void alarm() override {
Expand Down Expand Up @@ -307,11 +378,15 @@ class ProxyLiteserver : public td::actor::Actor {
liteclient::LiteServerConfig config;
td::actor::ActorOwn<adnl::AdnlExtClient> client;
bool alive = false;
BlockSeqno last_known_masterchain_seqno = 0;
};
std::vector<Server> servers_;

std::map<int, td::uint32> ls_stats_; // lite_api ID -> count, 0 for unknown

BlockSeqno last_known_masterchain_seqno_ = 0;
tl_object_ptr<lite_api::liteServer_masterchainInfoExt> last_masterchain_info_;

std::string config_file() const {
return db_root_ + "/config.json";
}
Expand Down

0 comments on commit 6b113e3

Please sign in to comment.