Skip to content

Commit

Permalink
Cancel collation when it is non needed
Browse files Browse the repository at this point in the history
  • Loading branch information
SpyCheese committed Sep 30, 2024
1 parent e0f1d9e commit a314737
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 21 deletions.
5 changes: 4 additions & 1 deletion validator/fabric.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ namespace ton {

namespace validator {

enum CollateMode { skip_store_candidate = 1 };

td::actor::ActorOwn<Db> create_db_actor(td::actor::ActorId<ValidatorManager> manager, std::string db_root_,
td::Ref<ValidatorManagerOptions> opts);
td::actor::ActorOwn<LiteServerCache> create_liteserver_cache_actor(td::actor::ActorId<ValidatorManager> manager,
Expand Down Expand Up @@ -81,7 +83,8 @@ void run_validate_query(ShardIdFull shard, BlockIdExt min_masterchain_block_id,
void run_collate_query(ShardIdFull shard, const BlockIdExt& min_masterchain_block_id, std::vector<BlockIdExt> prev,
Ed25519_PublicKey creator, td::Ref<ValidatorSet> validator_set,
td::Ref<CollatorOptions> collator_opts, td::actor::ActorId<ValidatorManager> manager,
td::Timestamp timeout, td::Promise<BlockCandidate> promise, int attempt_idx = 0);
td::Timestamp timeout, td::Promise<BlockCandidate> promise,
td::CancellationToken cancellation_token, unsigned mode, int attempt_idx = 0);
void run_collate_hardfork(ShardIdFull shard, const BlockIdExt& min_masterchain_block_id, std::vector<BlockIdExt> prev,
td::actor::ActorId<ValidatorManager> manager, td::Timestamp timeout,
td::Promise<BlockCandidate> promise);
Expand Down
7 changes: 6 additions & 1 deletion validator/impl/collator-impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class Collator final : public td::actor::Actor {
td::Timestamp timeout;
td::Timestamp queue_cleanup_timeout_, soft_timeout_, medium_timeout_;
td::Promise<BlockCandidate> main_promise;
unsigned mode_ = 0;
int attempt_idx_;
bool allow_repeat_collation_ = false;
ton::BlockSeqno last_block_seqno{0};
Expand All @@ -93,7 +94,7 @@ class Collator final : public td::actor::Actor {
Collator(ShardIdFull shard, bool is_hardfork, BlockIdExt min_masterchain_block_id, std::vector<BlockIdExt> prev,
Ref<ValidatorSet> validator_set, Ed25519_PublicKey collator_id, Ref<CollatorOptions> collator_opts,
td::actor::ActorId<ValidatorManager> manager, td::Timestamp timeout, td::Promise<BlockCandidate> promise,
int attempt_idx);
td::CancellationToken cancellation_token, unsigned mode, int attempt_idx);
~Collator() override = default;
bool is_busy() const {
return busy_;
Expand Down Expand Up @@ -352,10 +353,14 @@ class Collator final : public td::actor::Actor {
bool create_block();
Ref<vm::Cell> collate_shard_block_descr_set();
bool create_collated_data();

bool create_block_candidate();
void return_block_candidate(td::Result<td::Unit> saved);
bool update_last_proc_int_msg(const std::pair<ton::LogicalTime, ton::Bits256>& new_lt_hash);

td::CancellationToken cancellation_token_;
bool check_cancelled();

public:
static td::uint32 get_skip_externals_queue_size();

Expand Down
70 changes: 56 additions & 14 deletions validator/impl/collator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,15 @@ static inline bool dbg(int c) {
* @param manager The ActorId of the ValidatorManager.
* @param timeout The timeout for the collator.
* @param promise The promise to return the result.
* @param cancellation_token Token to cancel collation.
* @param mode +1 - skip storing candidate to disk.
* @param attempt_idx The index of the attempt, starting from 0. On later attempts collator decreases block limits and skips some steps.
*/
Collator::Collator(ShardIdFull shard, bool is_hardfork, BlockIdExt min_masterchain_block_id,
std::vector<BlockIdExt> prev, td::Ref<ValidatorSet> validator_set, Ed25519_PublicKey collator_id,
Ref<CollatorOptions> collator_opts, td::actor::ActorId<ValidatorManager> manager,
td::Timestamp timeout, td::Promise<BlockCandidate> promise, int attempt_idx)
td::Timestamp timeout, td::Promise<BlockCandidate> promise, td::CancellationToken cancellation_token,
unsigned mode, int attempt_idx)
: shard_(shard)
, is_hardfork_(is_hardfork)
, min_mc_block_id{min_masterchain_block_id}
Expand All @@ -96,10 +99,13 @@ Collator::Collator(ShardIdFull shard, bool is_hardfork, BlockIdExt min_mastercha
, soft_timeout_(td::Timestamp::at(timeout.at() - 3.0))
, medium_timeout_(td::Timestamp::at(timeout.at() - 1.5))
, main_promise(std::move(promise))
, mode_(mode)
, attempt_idx_(attempt_idx)
, perf_timer_("collate", 0.1, [manager](double duration) {
send_closure(manager, &ValidatorManager::add_perf_timer_stat, "collate", duration);
}) {
, perf_timer_("collate", 0.1,
[manager](double duration) {
send_closure(manager, &ValidatorManager::add_perf_timer_stat, "collate", duration);
})
, cancellation_token_(std::move(cancellation_token)) {
}

/**
Expand All @@ -113,6 +119,9 @@ Collator::Collator(ShardIdFull shard, bool is_hardfork, BlockIdExt min_mastercha
void Collator::start_up() {
LOG(WARNING) << "Collator for shard " << shard_.to_str() << " started"
<< (attempt_idx_ ? PSTRING() << " (attempt #" << attempt_idx_ << ")" : "");
if (!check_cancelled()) {
return;
}
LOG(DEBUG) << "Previous block #1 is " << prev_blocks.at(0).to_str();
if (prev_blocks.size() > 1) {
LOG(DEBUG) << "Previous block #2 is " << prev_blocks.at(1).to_str();
Expand Down Expand Up @@ -345,10 +354,12 @@ bool Collator::fatal_error(td::Status error) {
error.ensure_error();
LOG(ERROR) << "cannot generate block candidate for " << show_shard(shard_) << " : " << error.to_string();
if (busy_) {
if (allow_repeat_collation_ && attempt_idx_ + 1 < MAX_ATTEMPTS && !is_hardfork_ && !timeout.is_in_past()) {
if (allow_repeat_collation_ && error.code() != ErrorCode::cancelled && attempt_idx_ + 1 < MAX_ATTEMPTS &&
!is_hardfork_ && !timeout.is_in_past()) {
LOG(WARNING) << "Repeating collation (attempt #" << attempt_idx_ + 1 << ")";
run_collate_query(shard_, min_mc_block_id, prev_blocks, created_by_, validator_set_, collator_opts_, manager,
td::Timestamp::in(10.0), std::move(main_promise), attempt_idx_ + 1);
td::Timestamp::in(10.0), std::move(main_promise), std::move(cancellation_token_), mode_,
attempt_idx_ + 1);
} else {
main_promise(std::move(error));
}
Expand Down Expand Up @@ -393,6 +404,9 @@ bool Collator::fatal_error(std::string err_msg, int err_code) {
*/
void Collator::check_pending() {
// LOG(DEBUG) << "pending = " << pending;
if (!check_cancelled()) {
return;
}
if (!pending) {
step = 2;
try {
Expand Down Expand Up @@ -2354,6 +2368,9 @@ bool Collator::out_msg_queue_cleanup() {
LOG(WARNING) << "cleaning up outbound queue takes too long, ending";
break;
}
if (!check_cancelled()) {
return false;
}
if (i == queue_parts.size()) {
i = 0;
}
Expand Down Expand Up @@ -3553,6 +3570,9 @@ bool Collator::process_inbound_internal_messages() {
stats_.limits_log += PSTRING() << "INBOUND_INT_MESSAGES: timeout\n";
break;
}
if (!check_cancelled()) {
return false;
}
auto kv = nb_out_msgs_->extract_cur();
CHECK(kv && kv->msg.not_null());
LOG(DEBUG) << "processing inbound message with (lt,hash)=(" << kv->lt << "," << kv->key.to_hex()
Expand Down Expand Up @@ -3611,6 +3631,9 @@ bool Collator::process_inbound_external_messages() {
stats_.limits_log += PSTRING() << "INBOUND_EXT_MESSAGES: timeout\n";
break;
}
if (!check_cancelled()) {
return false;
}
auto ext_msg = ext_msg_struct.cell;
ton::Bits256 hash{ext_msg->get_hash().bits()};
int r = process_external_message(std::move(ext_msg));
Expand Down Expand Up @@ -4156,6 +4179,9 @@ bool Collator::process_new_messages(bool enqueue_only) {
stats_.limits_log += PSTRING() << "NEW_MESSAGES: "
<< block_full_comment(*block_limit_status_, block::ParamLimits::cl_normal) << "\n";
}
if (!check_cancelled()) {
return false;
}
LOG(DEBUG) << "have message with lt=" << msg.lt;
int res = process_one_new_message(std::move(msg), enqueue_only);
if (res < 0) {
Expand Down Expand Up @@ -5532,14 +5558,18 @@ bool Collator::create_block_candidate() {
<< consensus_config.max_collated_data_size << ")");
}
// 4. save block candidate
LOG(INFO) << "saving new BlockCandidate";
td::actor::send_closure_later(
manager, &ValidatorManager::set_block_candidate, block_candidate->id, block_candidate->clone(),
validator_set_->get_catchain_seqno(), validator_set_->get_validator_set_hash(),
[self = get_self()](td::Result<td::Unit> saved) -> void {
LOG(DEBUG) << "got answer to set_block_candidate";
td::actor::send_closure_later(std::move(self), &Collator::return_block_candidate, std::move(saved));
});
if (mode_ & CollateMode::skip_store_candidate) {
td::actor::send_closure_later(actor_id(this), &Collator::return_block_candidate, td::Unit());
} else {
LOG(INFO) << "saving new BlockCandidate";
td::actor::send_closure_later(
manager, &ValidatorManager::set_block_candidate, block_candidate->id, block_candidate->clone(),
validator_set_->get_catchain_seqno(), validator_set_->get_validator_set_hash(),
[self = get_self()](td::Result<td::Unit> saved) -> void {
LOG(DEBUG) << "got answer to set_block_candidate";
td::actor::send_closure_later(std::move(self), &Collator::return_block_candidate, std::move(saved));
});
}
// 5. communicate about bad and delayed external messages
if (!bad_ext_msgs_.empty() || !delay_ext_msgs_.empty()) {
LOG(INFO) << "sending complete_external_messages() to Manager";
Expand Down Expand Up @@ -5683,6 +5713,18 @@ void Collator::after_get_external_messages(td::Result<std::vector<std::pair<Ref<
check_pending();
}

/**
* Checks if collation was cancelled via cancellation token
*
* @returns false if the collation was cancelled, true otherwise
*/
bool Collator::check_cancelled() {
if (cancellation_token_) {
return fatal_error(td::Status::Error(ErrorCode::cancelled, "cancelled"));
}
return true;
}

td::uint32 Collator::get_skip_externals_queue_size() {
return SKIP_EXTERNALS_QUEUE_SIZE;
}
Expand Down
7 changes: 4 additions & 3 deletions validator/impl/fabric.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ void run_validate_query(ShardIdFull shard, BlockIdExt min_masterchain_block_id,
void run_collate_query(ShardIdFull shard, const BlockIdExt& min_masterchain_block_id, std::vector<BlockIdExt> prev,
Ed25519_PublicKey creator, td::Ref<ValidatorSet> validator_set,
td::Ref<CollatorOptions> collator_opts, td::actor::ActorId<ValidatorManager> manager,
td::Timestamp timeout, td::Promise<BlockCandidate> promise, int attempt_idx) {
td::Timestamp timeout, td::Promise<BlockCandidate> promise,
td::CancellationToken cancellation_token, unsigned mode, int attempt_idx) {
BlockSeqno seqno = 0;
for (auto& p : prev) {
if (p.seqno() > seqno) {
Expand All @@ -225,7 +226,7 @@ void run_collate_query(ShardIdFull shard, const BlockIdExt& min_masterchain_bloc
<< (attempt_idx ? "_" + td::to_string(attempt_idx) : ""),
shard, false, min_masterchain_block_id, std::move(prev), std::move(validator_set),
creator, std::move(collator_opts), std::move(manager), timeout, std::move(promise),
attempt_idx)
std::move(cancellation_token), mode, attempt_idx)
.release();
}

Expand All @@ -241,7 +242,7 @@ void run_collate_hardfork(ShardIdFull shard, const BlockIdExt& min_masterchain_b
td::actor::create_actor<Collator>(PSTRING() << "collate" << shard.to_str() << ":" << (seqno + 1), shard, true,
min_masterchain_block_id, std::move(prev), td::Ref<ValidatorSet>{},
Ed25519_PublicKey{Bits256::zero()}, td::Ref<CollatorOptions>{true},
std::move(manager), timeout, std::move(promise), 0)
std::move(manager), timeout, std::move(promise), td::CancellationToken{}, 0, 0)
.release();
}

Expand Down
2 changes: 1 addition & 1 deletion validator/manager-disk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ void ValidatorManagerImpl::sync_complete(td::Promise<td::Unit> promise) {
Ed25519_PublicKey created_by{td::Bits256::zero()};
td::as<td::uint32>(created_by.as_bits256().data() + 32 - 4) = ((unsigned)std::time(nullptr) >> 8);
run_collate_query(shard_id, last_masterchain_block_id_, prev, created_by, val_set, td::Ref<CollatorOptions>{true},
actor_id(this), td::Timestamp::in(10.0), std::move(P));
actor_id(this), td::Timestamp::in(10.0), std::move(P), td::CancellationToken{}, 0);
}

void ValidatorManagerImpl::validate_fake(BlockCandidate candidate, std::vector<BlockIdExt> prev, BlockIdExt last,
Expand Down
4 changes: 3 additions & 1 deletion validator/validator-group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ void ValidatorGroup::generate_block_candidate(
validator_set_, opts_->get_collator_options(), manager_, td::Timestamp::in(10.0),
[SelfId = actor_id(this), cache = cached_collated_block_](td::Result<BlockCandidate> R) {
td::actor::send_closure(SelfId, &ValidatorGroup::generated_block_candidate, std::move(cache), std::move(R));
});
}, cancellation_token_source_.get_cancellation_token(), /* mode = */ 0);
}

void ValidatorGroup::generated_block_candidate(std::shared_ptr<CachedCollatedBlock> cache, td::Result<BlockCandidate> R) {
Expand Down Expand Up @@ -188,6 +188,7 @@ void ValidatorGroup::accept_block_candidate(td::uint32 round_id, PublicKeyHash s
prev_block_ids_ = std::vector<BlockIdExt>{next_block_id};
cached_collated_block_ = nullptr;
approved_candidates_cache_.clear();
cancellation_token_source_.cancel();
}

void ValidatorGroup::retry_accept_block_query(BlockIdExt block_id, td::Ref<BlockData> block,
Expand Down Expand Up @@ -433,6 +434,7 @@ void ValidatorGroup::destroy() {
delay_action([ses]() mutable { td::actor::send_closure(ses, &validatorsession::ValidatorSession::destroy); },
td::Timestamp::in(10.0));
}
cancellation_token_source_.cancel();
stop();
}

Expand Down
1 change: 1 addition & 0 deletions validator/validator-group.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ class ValidatorGroup : public td::actor::Actor {
std::vector<td::Promise<BlockCandidate>> promises;
};
std::shared_ptr<CachedCollatedBlock> cached_collated_block_;
td::CancellationTokenSource cancellation_token_source_;

void generated_block_candidate(std::shared_ptr<CachedCollatedBlock> cache, td::Result<BlockCandidate> R);

Expand Down

0 comments on commit a314737

Please sign in to comment.