diff --git a/validator/manager-disk.cpp b/validator/manager-disk.cpp index d4bf101ef..ccf169191 100644 --- a/validator/manager-disk.cpp +++ b/validator/manager-disk.cpp @@ -70,6 +70,30 @@ namespace ton { namespace validator { +void ShardClientDetector::start_up() { + // alarm_timestamp() = td::Timestamp::in(0.2); +} + +void ShardClientDetector::increase_wait(BlockIdExt blkid) { + mc_shards_waits_[blkid] += 1; +} + +void ShardClientDetector::receive_result(BlockIdExt mc_blkid, BlockIdExt shard_blkid, td::Result R) { + if (R.is_ok()) { + mc_shards_waits_[mc_blkid] -= 1; + if (mc_shards_waits_[mc_blkid] == 0) { + mc_shards_waits_.erase(mc_blkid); + LOG(WARNING) << "New shard client available: " << mc_blkid; + } + } else { + LOG(INFO) << "Cant get shard for: " << mc_blkid << " shard: " << shard_blkid; + } +} + +void ShardClientDetector::alarm() { + // alarm_timestamp() = td::Timestamp::in(0.2); +} + void ValidatorManagerImpl::validate_block_is_next_proof(BlockIdExt prev_block_id, BlockIdExt next_block_id, td::BufferSlice proof, td::Promise promise) { UNREACHABLE(); @@ -1177,22 +1201,29 @@ void ValidatorManagerImpl::receiveLastBlock(td::Result> block std::string shards_idents; int total_shards{0}; - auto parseShards = [&shards_idents, SelfId = actor_id(this), mc_id = last_masterchain_block_id_, - &total_shards](McShardHash &ms) { + auto parseShards = [&shards_idents, SelfId = actor_id(this), DetectorId = shardclientdetector_.get(), + mc_id = last_masterchain_block_id_, &total_shards](McShardHash &ms) { auto shard_seqno = ms.top_block_id().id.seqno; auto shard_shard = ms.top_block_id().id.shard; auto shard_workchain = ms.shard().workchain; - shards_idents += ", " + std::to_string(shard_workchain) + ":" + std::to_string(shard_shard) + ":" + - std::to_string(shard_seqno); + shards_idents += shards_idents.empty() ? "" + : ", " + std::to_string(shard_workchain) + ":" + + std::to_string(shard_shard) + ":" + std::to_string(shard_seqno); total_shards += 1; - // - // td::actor::send_closure_later(SelfId, &IndexerWorker::start_parse_shards, shard_seqno, shard_shard, - // shard_workchain, mc_handle); + + td::actor::send_closure_later(DetectorId, &ShardClientDetector::increase_wait, mc_id); + + auto P_cb = td::PromiseCreator::lambda([DetectorId, mc_id, my_id = ms.top_block_id()](td::Result R) { + td::actor::send_closure_later(DetectorId, &ShardClientDetector::receive_result, mc_id, my_id, std::move(R)); + }); + + td::actor::send_closure_later(SelfId, &ValidatorManagerImpl::get_block_handle, ms.top_block_id(), false, + std::move(P_cb)); return 1; }; - mc_shards_waits_[last_masterchain_block_id_] = total_shards; + // mc_shards_waits_[last_masterchain_block_id_] = total_shards; shards.process_shard_hashes(parseShards); last_masterchain_time_ = info.gen_utime; @@ -1200,7 +1231,7 @@ void ValidatorManagerImpl::receiveLastBlock(td::Result> block // update DB if needed td::actor::send_closure(db_, &Db::reinit); LOG(INFO) << "New MC block: " << last_masterchain_block_id_ - << " at: " << time_to_human(last_masterchain_time_) + "shards: " << shards_idents; + << " at: " << time_to_human(last_masterchain_time_) + ", shards: " << shards_idents; // Handle wait_block while (!shard_client_waiters_.empty()) { diff --git a/validator/manager-disk.hpp b/validator/manager-disk.hpp index abe99c869..f890b7a9c 100644 --- a/validator/manager-disk.hpp +++ b/validator/manager-disk.hpp @@ -40,6 +40,21 @@ class WaitZeroState; class WaitShardState; class WaitBlockDataDisk; +class ShardClientDetector : public td::actor::Actor { + public: + ShardClientDetector(td::actor::ActorId manager) { + manager_ = std::move(manager); + } + void start_up() override; + void alarm() override; + void increase_wait(BlockIdExt blkid); + void receive_result(BlockIdExt mc_blkid, BlockIdExt shard_blkid, td::Result R); + + private: + std::map mc_shards_waits_; + td::actor::ActorId manager_; +}; + class ValidatorManagerImpl : public ValidatorManager { private: std::vector> ext_messages_; @@ -70,7 +85,6 @@ class ValidatorManagerImpl : public ValidatorManager { BlockSeqno last_masterchain_seqno_ = 0; bool started_ = false; td::Ref last_masterchain_state_; - std::map mc_shards_waits_; unsigned int last_masterchain_time_; //BlockHandle last_masterchain_block_; @@ -343,6 +357,8 @@ class ValidatorManagerImpl : public ValidatorManager { , db_root_(std::move(db_root)) , shard_to_generate_(shard_id) , block_to_generate_(shard_to_block_id) { + // Need to detect last mc block with shards + shardclientdetector_ = td::actor::create_actor("ShardClientDetector", actor_id(this)); } public: @@ -504,6 +520,7 @@ class ValidatorManagerImpl : public ValidatorManager { td::actor::ActorId rldp_; td::actor::ActorId overlays_; td::actor::ActorId lslimiter_; + td::actor::ActorOwn shardclientdetector_; std::string db_root_; ShardIdFull shard_to_generate_;