Skip to content

Commit

Permalink
Add ShardClientDetector constructor
Browse files Browse the repository at this point in the history
  • Loading branch information
tvorogme committed Feb 13, 2024
1 parent fff179d commit 0464f6b
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 10 deletions.
49 changes: 40 additions & 9 deletions validator/manager-disk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,30 @@ namespace ton {

namespace validator {

void ShardClientDetector::start_up() {
// alarm_timestamp() = td::Timestamp::in(0.2);
}

void ShardClientDetector::increase_wait(BlockIdExt blkid) {
mc_shards_waits_[blkid] += 1;
}

void ShardClientDetector::receive_result(BlockIdExt mc_blkid, BlockIdExt shard_blkid, td::Result<BlockHandle> R) {
if (R.is_ok()) {
mc_shards_waits_[mc_blkid] -= 1;
if (mc_shards_waits_[mc_blkid] == 0) {
mc_shards_waits_.erase(mc_blkid);
LOG(WARNING) << "New shard client available: " << mc_blkid;
}
} else {
LOG(INFO) << "Cant get shard for: " << mc_blkid << " shard: " << shard_blkid;
}
}

void ShardClientDetector::alarm() {
// alarm_timestamp() = td::Timestamp::in(0.2);
}

void ValidatorManagerImpl::validate_block_is_next_proof(BlockIdExt prev_block_id, BlockIdExt next_block_id,
td::BufferSlice proof, td::Promise<td::Unit> promise) {
UNREACHABLE();
Expand Down Expand Up @@ -1177,30 +1201,37 @@ void ValidatorManagerImpl::receiveLastBlock(td::Result<td::Ref<BlockData>> block
std::string shards_idents;
int total_shards{0};

auto parseShards = [&shards_idents, SelfId = actor_id(this), mc_id = last_masterchain_block_id_,
&total_shards](McShardHash &ms) {
auto parseShards = [&shards_idents, SelfId = actor_id(this), DetectorId = shardclientdetector_.get(),
mc_id = last_masterchain_block_id_, &total_shards](McShardHash &ms) {
auto shard_seqno = ms.top_block_id().id.seqno;
auto shard_shard = ms.top_block_id().id.shard;
auto shard_workchain = ms.shard().workchain;

shards_idents += ", " + std::to_string(shard_workchain) + ":" + std::to_string(shard_shard) + ":" +
std::to_string(shard_seqno);
shards_idents += shards_idents.empty() ? ""
: ", " + std::to_string(shard_workchain) + ":" +
std::to_string(shard_shard) + ":" + std::to_string(shard_seqno);
total_shards += 1;
//
// td::actor::send_closure_later(SelfId, &IndexerWorker::start_parse_shards, shard_seqno, shard_shard,
// shard_workchain, mc_handle);

td::actor::send_closure_later(DetectorId, &ShardClientDetector::increase_wait, mc_id);

auto P_cb = td::PromiseCreator::lambda([DetectorId, mc_id, my_id = ms.top_block_id()](td::Result<BlockHandle> R) {
td::actor::send_closure_later(DetectorId, &ShardClientDetector::receive_result, mc_id, my_id, std::move(R));
});

td::actor::send_closure_later(SelfId, &ValidatorManagerImpl::get_block_handle, ms.top_block_id(), false,
std::move(P_cb));
return 1;
};

mc_shards_waits_[last_masterchain_block_id_] = total_shards;
// mc_shards_waits_[last_masterchain_block_id_] = total_shards;
shards.process_shard_hashes(parseShards);

last_masterchain_time_ = info.gen_utime;

// update DB if needed
td::actor::send_closure(db_, &Db::reinit);
LOG(INFO) << "New MC block: " << last_masterchain_block_id_
<< " at: " << time_to_human(last_masterchain_time_) + "shards: " << shards_idents;
<< " at: " << time_to_human(last_masterchain_time_) + ", shards: " << shards_idents;

// Handle wait_block
while (!shard_client_waiters_.empty()) {
Expand Down
19 changes: 18 additions & 1 deletion validator/manager-disk.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,21 @@ class WaitZeroState;
class WaitShardState;
class WaitBlockDataDisk;

class ShardClientDetector : public td::actor::Actor {
public:
ShardClientDetector(td::actor::ActorId<ValidatorManager> manager) {
manager_ = std::move(manager);
}
void start_up() override;
void alarm() override;
void increase_wait(BlockIdExt blkid);
void receive_result(BlockIdExt mc_blkid, BlockIdExt shard_blkid, td::Result<BlockHandle> R);

private:
std::map<BlockIdExt, int> mc_shards_waits_;
td::actor::ActorId<ValidatorManager> manager_;
};

class ValidatorManagerImpl : public ValidatorManager {
private:
std::vector<td::Ref<ExtMessage>> ext_messages_;
Expand Down Expand Up @@ -70,7 +85,6 @@ class ValidatorManagerImpl : public ValidatorManager {
BlockSeqno last_masterchain_seqno_ = 0;
bool started_ = false;
td::Ref<MasterchainState> last_masterchain_state_;
std::map<BlockIdExt, int> mc_shards_waits_;

unsigned int last_masterchain_time_;
//BlockHandle last_masterchain_block_;
Expand Down Expand Up @@ -343,6 +357,8 @@ class ValidatorManagerImpl : public ValidatorManager {
, db_root_(std::move(db_root))
, shard_to_generate_(shard_id)
, block_to_generate_(shard_to_block_id) {
// Need to detect last mc block with shards
shardclientdetector_ = td::actor::create_actor<ShardClientDetector>("ShardClientDetector", actor_id(this));
}

public:
Expand Down Expand Up @@ -504,6 +520,7 @@ class ValidatorManagerImpl : public ValidatorManager {
td::actor::ActorId<rldp::Rldp> rldp_;
td::actor::ActorId<overlay::Overlays> overlays_;
td::actor::ActorId<liteserver::LiteServerLimiter> lslimiter_;
td::actor::ActorOwn<ShardClientDetector> shardclientdetector_;

std::string db_root_;
ShardIdFull shard_to_generate_;
Expand Down

0 comments on commit 0464f6b

Please sign in to comment.