Skip to content

Commit

Permalink
Changes in deferred messages
Browse files Browse the repository at this point in the history
* Process deferred messages via new_msgs in collator
* Rework setting deferred_lt, bring back check_message_processing_order, check order of deferred_lt in validator
  • Loading branch information
SpyCheese committed Jun 17, 2024
1 parent d7a3351 commit 49288e2
Show file tree
Hide file tree
Showing 5 changed files with 202 additions and 115 deletions.
1 change: 1 addition & 0 deletions crypto/block/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ struct NewOutMsg {
Ref<vm::Cell> trans;
unsigned msg_idx;
td::optional<MsgMetadata> metadata;
td::Ref<vm::Cell> msg_env_from_dispatch_queue; // Not null if from dispatch queue; in this case lt is deferred_lt
NewOutMsg(ton::LogicalTime _lt, Ref<vm::Cell> _msg, Ref<vm::Cell> _trans, unsigned _msg_idx)
: lt(_lt), msg(std::move(_msg)), trans(std::move(_trans)), msg_idx(_msg_idx) {
}
Expand Down
7 changes: 4 additions & 3 deletions validator/impl/collator-impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ class Collator final : public td::actor::Actor {
std::priority_queue<NewOutMsg, std::vector<NewOutMsg>, std::greater<NewOutMsg>> new_msgs;
std::pair<ton::LogicalTime, ton::Bits256> last_proc_int_msg_, first_unproc_int_msg_;
std::unique_ptr<vm::AugmentedDictionary> in_msg_dict, out_msg_dict, out_msg_queue_, sibling_out_msg_queue_;
std::map<StdSmcAddress, size_t> unprocessed_deferred_messages_; // number of messages from dispatch queue in new_msgs
td::uint64 out_msg_queue_size_ = 0;
bool have_out_msg_queue_size_in_state_ = false;
std::unique_ptr<vm::Dictionary> ihr_pending;
Expand All @@ -208,7 +209,7 @@ class Collator final : public td::actor::Actor {
std::unique_ptr<vm::AugmentedDictionary> dispatch_queue_;
std::map<StdSmcAddress, td::uint32> sender_generated_messages_count_;
unsigned dispatch_queue_ops_{0};
std::map<StdSmcAddress, LogicalTime> last_enqueued_deferred_lt_;
std::map<StdSmcAddress, LogicalTime> last_deferred_lt_;
bool have_unprocessed_account_dispatch_queue_ = true;

bool msg_metadata_enabled_ = false;
Expand Down Expand Up @@ -301,8 +302,8 @@ class Collator final : public td::actor::Actor {
int process_external_message(Ref<vm::Cell> msg);
bool process_dispatch_queue();
bool process_deferred_message(Ref<vm::CellSlice> enq_msg, StdSmcAddress src_addr, LogicalTime lt);
bool enqueue_message(block::NewOutMsg msg, td::RefInt256 fwd_fees_remaining, ton::LogicalTime enqueued_lt,
StdSmcAddress src_addr, bool defer = false);
bool enqueue_message(block::NewOutMsg msg, td::RefInt256 fwd_fees_remaining, StdSmcAddress src_addr,
bool defer = false);
bool enqueue_transit_message(Ref<vm::Cell> msg, Ref<vm::Cell> old_msg_env, ton::AccountIdPrefixFull prev_prefix,
ton::AccountIdPrefixFull cur_prefix, ton::AccountIdPrefixFull dest_prefix,
td::RefInt256 fwd_fee_remaining, td::optional<block::MsgMetadata> msg_metadata,
Expand Down
184 changes: 101 additions & 83 deletions validator/impl/collator.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
This file is part of TON Blockchain Library.
This file is part of TON Blockchain Library.
TON Blockchain Library is free software: you can redistribute it and/or modify
TON Blockchain Library is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
Expand Down Expand Up @@ -2100,6 +2100,11 @@ bool Collator::do_collate() {
if (!init_value_create()) {
return fatal_error("cannot compute the value to be created / minted / recovered");
}
// 2-. take messages from dispatch queue
LOG(INFO) << "process dispatch queue";
if (!process_dispatch_queue()) {
return fatal_error("cannot process dispatch queue");
}
// 2. tick transactions
LOG(INFO) << "create tick transactions";
if (!create_ticktock_transactions(2)) {
Expand All @@ -2114,11 +2119,6 @@ bool Collator::do_collate() {
// TODO: implement merge prepare/install transactions for "large" smart contracts
// ...
}
// 4-. take messages from dispatch queue
LOG(INFO) << "process dispatch queue";
if (!process_dispatch_queue()) {
return fatal_error("cannot process dispatch queue");
}
// 4. import inbound internal messages, process or transit
LOG(INFO) << "process inbound internal messages";
if (!process_inbound_internal_messages()) {
Expand Down Expand Up @@ -2654,8 +2654,8 @@ bool Collator::create_ticktock_transaction(const ton::StdSmcAddress& smc_addr, t
return true;
}
req_start_lt = std::max(req_start_lt, start_lt + 1);
auto it = last_enqueued_deferred_lt_.find(acc->addr);
if (it != last_enqueued_deferred_lt_.end()) {
auto it = last_deferred_lt_.find(acc->addr);
if (it != last_deferred_lt_.end()) {
req_start_lt = std::max(req_start_lt, it->second + 1);
}
if (acc->last_trans_end_lt_ >= start_lt && acc->transactions.empty()) {
Expand Down Expand Up @@ -2755,10 +2755,13 @@ Ref<vm::Cell> Collator::create_ordinary_transaction(Ref<vm::Cell> msg_root,
block::Account* acc = acc_res.move_as_ok();
assert(acc);

LogicalTime after_lt = last_proc_int_msg_.first;
auto it = last_enqueued_deferred_lt_.find(acc->addr);
if (it != last_enqueued_deferred_lt_.end()) {
after_lt = std::max(after_lt, it->second);
LogicalTime after_lt = 0;
if (external) {
after_lt = last_proc_int_msg_.first;
}
auto it = last_deferred_lt_.find(acc->addr);
if (it != last_deferred_lt_.end()) {
after_lt = it->second;
}
auto res = impl_create_ordinary_transaction(msg_root, acc, now_, start_lt, &storage_phase_cfg_, &compute_phase_cfg_,
&action_phase_cfg_, external, after_lt);
Expand Down Expand Up @@ -2828,13 +2831,12 @@ td::Result<std::unique_ptr<block::transaction::Transaction>> Collator::impl_crea
<< ":" << acc->addr.to_hex() << " is too large");
}
auto trans_min_lt = lt;
if (external) {
// transactions processing external messages must have lt larger than all processed internal messages
trans_min_lt = std::max(trans_min_lt, after_lt);
}
// transactions processing external messages must have lt larger than all processed internal messages
// if account has deferred message processed in this block, the next transaction should have lt > deferred_lt
trans_min_lt = std::max(trans_min_lt, after_lt);

std::unique_ptr<block::transaction::Transaction> trans =
std::make_unique<block::transaction::Transaction>(*acc, block::transaction::Transaction::tr_ord, trans_min_lt + 1, utime, msg_root);
std::unique_ptr<block::transaction::Transaction> trans = std::make_unique<block::transaction::Transaction>(
*acc, block::transaction::Transaction::tr_ord, trans_min_lt + 1, utime, msg_root);
bool ihr_delivered = false; // FIXME
if (!trans->unpack_input_msg(ihr_delivered, action_phase_cfg)) {
if (external) {
Expand Down Expand Up @@ -2998,6 +3000,7 @@ bool Collator::is_our_address(const ton::StdSmcAddress& addr) const {
* -1 - error occured.
*/
int Collator::process_one_new_message(block::NewOutMsg msg, bool enqueue_only, Ref<vm::Cell>* is_special) {
bool from_dispatch_queue = msg.msg_env_from_dispatch_queue.not_null();
Ref<vm::CellSlice> src, dest;
bool enqueue, external;
auto cs = load_cell_slice(msg.msg);
Expand All @@ -3009,7 +3012,7 @@ int Collator::process_one_new_message(block::NewOutMsg msg, bool enqueue_only, R
if (!tlb::unpack(cs, info)) {
return -1;
}
CHECK(info.created_lt == msg.lt && info.created_at == now_);
CHECK(info.created_lt == msg.lt && info.created_at == now_ && !from_dispatch_queue);
src = std::move(info.src);
enqueue = external = true;
break;
Expand All @@ -3019,7 +3022,7 @@ int Collator::process_one_new_message(block::NewOutMsg msg, bool enqueue_only, R
if (!tlb::unpack(cs, info)) {
return -1;
}
CHECK(info.created_lt == msg.lt && info.created_at == now_);
CHECK(from_dispatch_queue || (info.created_lt == msg.lt && info.created_at == now_));
src = std::move(info.src);
dest = std::move(info.dest);
fwd_fees = block::tlb::t_Grams.as_integer(info.fwd_fee);
Expand All @@ -3046,22 +3049,41 @@ int Collator::process_one_new_message(block::NewOutMsg msg, bool enqueue_only, R

WorkchainId src_wc;
StdSmcAddress src_addr;
CHECK(block::tlb::t_MsgAddressInt.extract_std_address(std::move(src), src_wc, src_addr));
CHECK(block::tlb::t_MsgAddressInt.extract_std_address(src, src_wc, src_addr));
CHECK(src_wc == workchain());
bool is_special_account = is_masterchain() && config_->is_special_smartcontract(src_addr);
bool defer = false;
if (deferring_messages_enabled_ && !is_special && !is_special_account && msg.msg_idx != 0) {
if (++sender_generated_messages_count_[src_addr] >= DEFER_MESSAGES_AFTER) {
if (!from_dispatch_queue) {
if (deferring_messages_enabled_ && !is_special && !is_special_account && msg.msg_idx != 0) {
if (++sender_generated_messages_count_[src_addr] >= DEFER_MESSAGES_AFTER) {
defer = true;
}
}
if (dispatch_queue_->lookup(src_addr).not_null() || unprocessed_deferred_messages_.count(src_addr)) {
defer = true;
}
}
if (dispatch_queue_->lookup(src_addr).not_null()) {
defer = true;
} else {
auto &x = unprocessed_deferred_messages_[src_addr];
CHECK(x > 0);
if (--x == 0) {
unprocessed_deferred_messages_.erase(src_addr);
}
}

if (enqueue || defer) {
auto lt = msg.lt;
bool ok = enqueue_message(std::move(msg), std::move(fwd_fees), lt, src_addr, defer);
bool ok;
if (from_dispatch_queue) {
auto msg_env = msg.msg_env_from_dispatch_queue;
block::tlb::MsgEnvelope::Record_std env;
CHECK(block::tlb::unpack_cell(msg_env, env));
auto src_prefix = block::tlb::MsgAddressInt::get_prefix(src);
auto dest_prefix = block::tlb::MsgAddressInt::get_prefix(dest);
CHECK(env.deferred_lt && env.deferred_lt.value() == msg.lt);
ok = enqueue_transit_message(std::move(msg.msg), std::move(msg_env), src_prefix, src_prefix, dest_prefix,
std::move(env.fwd_fee_remaining), std::move(env.metadata), msg.lt);
} else {
ok = enqueue_message(std::move(msg), std::move(fwd_fees), src_addr, defer);
}
return ok ? 0 : -1;
}
// process message by a transaction in this block:
Expand Down Expand Up @@ -3092,10 +3114,21 @@ int Collator::process_one_new_message(block::NewOutMsg msg, bool enqueue_only, R
}
// 3. create InMsg, referring to this MsgEnvelope and this Transaction
vm::CellBuilder cb;
CHECK(cb.store_long_bool(3, 3) // msg_import_imm$011
&& cb.store_ref_bool(msg_env) // in_msg:^MsgEnvelope
&& cb.store_ref_bool(trans_root) // transaction:^Transaction
&& block::tlb::t_Grams.store_integer_ref(cb, fwd_fees)); // fwd_fee:Grams
if (from_dispatch_queue) {
auto msg_env = msg.msg_env_from_dispatch_queue;
block::tlb::MsgEnvelope::Record_std env;
CHECK(block::tlb::unpack_cell(msg_env, env));
CHECK(env.deferred_lt && env.deferred_lt.value() == msg.lt);
CHECK(cb.store_long_bool(0b00100, 5) // msg_import_deferred_fin$00100
&& cb.store_ref_bool(msg_env) // in_msg:^MsgEnvelope
&& cb.store_ref_bool(trans_root) // transaction:^Transaction
&& block::tlb::t_Grams.store_integer_ref(cb, env.fwd_fee_remaining)); // fwd_fee:Grams
} else {
CHECK(cb.store_long_bool(3, 3) // msg_import_imm$011
&& cb.store_ref_bool(msg_env) // in_msg:^MsgEnvelope
&& cb.store_ref_bool(trans_root) // transaction:^Transaction
&& block::tlb::t_Grams.store_integer_ref(cb, fwd_fees)); // fwd_fee:Grams
}
// 4. insert InMsg into InMsgDescr
Ref<vm::Cell> in_msg = cb.finalize();
if (!insert_in_msg(in_msg)) {
Expand All @@ -3106,14 +3139,16 @@ int Collator::process_one_new_message(block::NewOutMsg msg, bool enqueue_only, R
*is_special = in_msg;
return 1;
}
// 5. create OutMsg, referring to this MsgEnvelope and InMsg
CHECK(cb.store_long_bool(2, 3) // msg_export_imm$010
&& cb.store_ref_bool(msg_env) // out_msg:^MsgEnvelope
&& cb.store_ref_bool(msg.trans) // transaction:^Transaction
&& cb.store_ref_bool(in_msg)); // reimport:^InMsg
// 6. insert OutMsg into OutMsgDescr
if (!insert_out_msg(cb.finalize())) {
return -1;
if (!from_dispatch_queue) {
// 5. create OutMsg, referring to this MsgEnvelope and InMsg
CHECK(cb.store_long_bool(2, 3) // msg_export_imm$010
&& cb.store_ref_bool(msg_env) // out_msg:^MsgEnvelope
&& cb.store_ref_bool(msg.trans) // transaction:^Transaction
&& cb.store_ref_bool(in_msg)); // reimport:^InMsg
// 6. insert OutMsg into OutMsgDescr
if (!insert_out_msg(cb.finalize())) {
return -1;
}
}
// 7. check whether the block is full now
if (!block_limit_status_->fits(block::ParamLimits::cl_normal)) {
Expand Down Expand Up @@ -3642,7 +3677,6 @@ bool Collator::process_dispatch_queue() {
<< src_addr.to_hex() << ", lt=" << lt);
}

LOG(INFO) << "delivering deferred message from account " << src_addr.to_hex() << ", lt=" << lt;
if (!process_deferred_message(std::move(enqueued_msg), src_addr, lt)) {
return fatal_error(PSTRING() << "error processing internal message from dispatch queue: account="
<< src_addr.to_hex() << ", lt=" << lt);
Expand Down Expand Up @@ -3681,8 +3715,7 @@ bool Collator::process_deferred_message(Ref<vm::CellSlice> enq_msg, StdSmcAddres
++sender_generated_messages_count_[src_addr];

LogicalTime enqueued_lt = 0;
if (enq_msg.is_null() || enq_msg->size_ext() != 0x10040 ||
(enqueued_lt = enq_msg->prefetch_ulong(64)) < /* 0 */ 1 * lt) {
if (enq_msg.is_null() || enq_msg->size_ext() != 0x10040 || (enqueued_lt = enq_msg->prefetch_ulong(64)) != lt) {
if (enq_msg.not_null()) {
block::gen::t_EnqueuedMsg.print(std::cerr, *enq_msg);
}
Expand Down Expand Up @@ -3750,44 +3783,28 @@ bool Collator::process_deferred_message(Ref<vm::CellSlice> enq_msg, StdSmcAddres
LOG(ERROR) << "internal message in DispatchQueue is expected to have zero cur_addr and next_addr";
return false;
}
// 5. decide what to do with the message
bool to_us = ton::shard_contains(shard_, dest_prefix);
if (!to_us) {
LogicalTime deferred_lt = std::max(start_lt, last_enqueued_deferred_lt_[src_addr] + 1);
auto it = accounts.find(src_addr);
if (it != accounts.end()) {
deferred_lt = std::max(deferred_lt, it->second->last_trans_end_lt_ + 1);
}
// destination is outside our shard, relay transit message
// (very similar to enqueue_message())
if (!enqueue_transit_message(std::move(env.msg), std::move(msg_env), src_prefix, src_prefix, dest_prefix,
std::move(env.fwd_fee_remaining), std::move(env.metadata), deferred_lt)) {
return fatal_error(PSTRING() << "cannot enqueue transit internal message from dispatchqueue from address "
<< src_addr.to_hex() << ", lt=" << lt);
}
last_enqueued_deferred_lt_[src_addr] = deferred_lt;
update_max_lt(deferred_lt + 1);
return true;
// 5. calculate deferred_lt
LogicalTime deferred_lt = std::max(start_lt, last_deferred_lt_[src_addr] + 1);
auto it = accounts.find(src_addr);
if (it != accounts.end()) {
deferred_lt = std::max(deferred_lt, it->second->last_trans_end_lt_ + 1);
}
// destination is in our shard
// process the message by an ordinary transaction similarly to process_one_new_message()
//
// 6. create a transaction processing this message
auto trans_root = create_ordinary_transaction(env.msg, env.metadata);
if (trans_root.is_null()) {
return fatal_error("cannot create transaction for processing inbound message");
}
// 7. create InMsg, referring to this MsgEnvelope and this Transaction
vm::CellBuilder cb;
CHECK(cb.store_long_bool(0b00100, 5) // msg_import_deferred_fin$00100
&& cb.store_ref_bool(msg_env) // in_msg:^MsgEnvelope
&& cb.store_ref_bool(trans_root) // transaction:^Transaction
&& block::tlb::t_Grams.store_integer_ref(cb, env.fwd_fee_remaining)); // fwd_fee:Grams
Ref<vm::Cell> in_msg = cb.finalize();
// 8. insert InMsg into InMsgDescr
if (!insert_in_msg(std::move(in_msg))) {
return fatal_error("cannot insert InMsg into InMsgDescr");
last_deferred_lt_[src_addr] = deferred_lt;
update_max_lt(deferred_lt + 1);

env.deferred_lt = deferred_lt;
if (!block::tlb::pack_cell(msg_env, env)) {
return fatal_error("cannot pack msg envelope");
}

// 6. create NewOutMsg
block::NewOutMsg new_msg{deferred_lt, env.msg, {}, 0};
new_msg.metadata = env.metadata;
new_msg.msg_env_from_dispatch_queue = msg_env;
++unprocessed_deferred_messages_[src_addr];
LOG(INFO) << "delivering deferred message from account " << src_addr.to_hex() << ", lt=" << lt
<< ", deferred_lt=" << deferred_lt;
register_new_msg(std::move(new_msg));
return true;
}

Expand Down Expand Up @@ -3891,14 +3908,15 @@ bool Collator::insert_out_msg(Ref<vm::Cell> out_msg, td::ConstBitPtr msg_hash) {
*
* @param msg The new outbound message to enqueue.
* @param fwd_fees_remaining The remaining forward fees for the message.
* @param enqueued_lt The logical time at which the message is enqueued.
* @param src_addr 256-bit address of the sender
* @param defer Put the message to DispatchQueue
*
* @returns True if the message was successfully enqueued, false otherwise.
*/
bool Collator::enqueue_message(block::NewOutMsg msg, td::RefInt256 fwd_fees_remaining, ton::LogicalTime enqueued_lt,
StdSmcAddress src_addr, bool defer) {
bool Collator::enqueue_message(block::NewOutMsg msg, td::RefInt256 fwd_fees_remaining, StdSmcAddress src_addr,
bool defer) {
LogicalTime enqueued_lt = msg.lt;
CHECK(msg.msg_env_from_dispatch_queue.is_null());
// 0. unpack src_addr and dest_addr
block::gen::CommonMsgInfo::Record_int_msg_info info;
if (!tlb::unpack_cell_inexact(msg.msg, info)) {
Expand Down
Loading

0 comments on commit 49288e2

Please sign in to comment.