Skip to content

Commit

Permalink
Deferred messages and msg metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
SpyCheese committed May 17, 2024
1 parent 7a74888 commit 77f8db6
Show file tree
Hide file tree
Showing 11 changed files with 1,348 additions and 220 deletions.
243 changes: 203 additions & 40 deletions crypto/block/block-parse.cpp

Large diffs are not rendered by default.

41 changes: 36 additions & 5 deletions crypto/block/block-parse.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "td/utils/bits.h"
#include "td/utils/StringBuilder.h"
#include "ton/ton-types.h"
#include "block-auto.h"

namespace block {

Expand Down Expand Up @@ -469,11 +470,17 @@ struct MsgEnvelope final : TLB_Complex {
int cur_addr, next_addr;
td::RefInt256 fwd_fee_remaining;
Ref<vm::Cell> msg;
td::optional<ton::LogicalTime> deferred_lt;
td::optional<MsgMetadata> metadata;
};
bool unpack(vm::CellSlice& cs, Record& data) const;
bool unpack(vm::CellSlice& cs, Record_std& data) const;
bool unpack_std(vm::CellSlice& cs, int& cur_a, int& nhop_a, Ref<vm::Cell>& msg) const;
bool pack(vm::CellBuilder& cb, const Record_std& data) const;
bool pack_cell(td::Ref<vm::Cell>& cell, const Record_std& data) const;
bool get_created_lt(const vm::CellSlice& cs, unsigned long long& created_lt) const;
int get_tag(const vm::CellSlice& cs) const override {
return (int)cs.prefetch_ulong(4);
}
};

extern const MsgEnvelope t_MsgEnvelope;
Expand Down Expand Up @@ -801,12 +808,18 @@ struct InMsg final : TLB_Complex {
msg_import_fin = 4,
msg_import_tr = 5,
msg_discard_fin = 6,
msg_discard_tr = 7
msg_discard_tr = 7,
msg_import_deferred_fin = 8,
msg_import_deferred_tr = 9
};
bool skip(vm::CellSlice& cs) const override;
bool validate_skip(int* ops, vm::CellSlice& cs, bool weak = false) const override;
int get_tag(const vm::CellSlice& cs) const override {
return (int)cs.prefetch_ulong(3);
int tag = (int)cs.prefetch_ulong(3);
if (tag != 1) {
return tag;
}
return (int)cs.prefetch_ulong(5) - 0b00100 + 8;
}
bool get_import_fees(vm::CellBuilder& cb, vm::CellSlice& cs) const;
};
Expand All @@ -822,13 +835,21 @@ struct OutMsg final : TLB_Complex {
msg_export_deq_imm = 4,
msg_export_deq = 12,
msg_export_deq_short = 13,
msg_export_tr_req = 7
msg_export_tr_req = 7,
msg_export_new_defer = 20, // 0b10100
msg_export_deferred_tr = 21 // 0b10101
};
bool skip(vm::CellSlice& cs) const override;
bool validate_skip(int* ops, vm::CellSlice& cs, bool weak = false) const override;
int get_tag(const vm::CellSlice& cs) const override {
int t = (int)cs.prefetch_ulong(3);
return t != 6 ? t : (int)cs.prefetch_ulong(4);
if (t == 6) {
return (int)cs.prefetch_ulong(4);
}
if (t == 5) {
return (int)cs.prefetch_ulong(5);
}
return t;
}
bool get_export_value(vm::CellBuilder& cb, vm::CellSlice& cs) const;
bool get_created_lt(vm::CellSlice& cs, unsigned long long& created_lt) const;
Expand Down Expand Up @@ -909,6 +930,16 @@ struct Aug_OutMsgQueue final : AugmentationCheckData {

extern const Aug_OutMsgQueue aug_OutMsgQueue;

struct Aug_DispatchQueue final : AugmentationCheckData {
Aug_DispatchQueue() : AugmentationCheckData(gen::t_AccountDispatchQueue, t_uint64) {
}
bool eval_fork(vm::CellBuilder& cb, vm::CellSlice& left_cs, vm::CellSlice& right_cs) const override;
bool eval_empty(vm::CellBuilder& cb) const override;
bool eval_leaf(vm::CellBuilder& cb, vm::CellSlice& cs) const override;
};

extern const Aug_DispatchQueue aug_DispatchQueue;

struct OutMsgQueue final : TLB_Complex {
HashmapAugE dict_type;
OutMsgQueue() : dict_type(32 + 64 + 256, aug_OutMsgQueue){};
Expand Down
164 changes: 155 additions & 9 deletions crypto/block/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "td/utils/tl_storers.h"
#include "td/utils/misc.h"
#include "td/utils/Random.h"
#include "vm/fmt.hpp"

namespace block {
using namespace std::literals::string_literals;
Expand Down Expand Up @@ -642,7 +643,11 @@ bool EnqueuedMsgDescr::unpack(vm::CellSlice& cs) {
}
cur_prefix_ = interpolate_addr(src_prefix_, dest_prefix_, env.cur_addr);
next_prefix_ = interpolate_addr(src_prefix_, dest_prefix_, env.next_addr);
lt_ = info.created_lt;
unsigned long long lt;
if (!tlb::t_MsgEnvelope.get_created_lt(vm::load_cell_slice(enq.out_msg), lt)) {
return invalidate();
}
lt_ = lt;
enqueued_lt_ = enq.enqueued_lt;
hash_ = env.msg->get_hash().bits();
msg_ = std::move(env.msg);
Expand Down Expand Up @@ -858,12 +863,17 @@ td::Status ShardState::unpack_out_msg_queue_info(Ref<vm::Cell> out_msg_queue_inf
return td::Status::Error(
-666, "ProcessedInfo in the state of "s + id_.to_str() + " is invalid according to automated validity checks");
}
if (!block::gen::t_IhrPendingInfo.validate_csr(1024, qinfo.ihr_pending)) {
return td::Status::Error(
-666, "IhrPendingInfo in the state of "s + id_.to_str() + " is invalid according to automated validity checks");
}
processed_upto_ = block::MsgProcessedUptoCollection::unpack(ton::ShardIdFull(id_), std::move(qinfo.proc_info));
ihr_pending_ = std::make_unique<vm::Dictionary>(std::move(qinfo.ihr_pending), 320);
ihr_pending_ = std::make_unique<vm::Dictionary>(320);
if (qinfo.extra.write().fetch_long(1)) {
block::gen::OutMsgQueueExtra::Record extra;
if (!block::tlb::csr_unpack(qinfo.extra, extra)) {
return td::Status::Error(-666, "cannot unpack OutMsgQueueExtre in the state of "s + id_.to_str());
}
dispatch_queue_ = std::make_unique<vm::AugmentedDictionary>(extra.dispatch_queue, 256, tlb::aug_DispatchQueue);
} else {
dispatch_queue_ = std::make_unique<vm::AugmentedDictionary>(256, tlb::aug_DispatchQueue);
}
auto shard1 = id_.shard_full();
td::BitArray<64> pfx{(long long)shard1.shard};
int pfx_len = shard_prefix_length(shard1);
Expand Down Expand Up @@ -994,6 +1004,11 @@ td::Status ShardState::merge_with(ShardState& sib) {
underload_history_ = overload_history_ = 0;
// 10. compute vert_seqno
vert_seqno_ = std::max(vert_seqno_, sib.vert_seqno_);
// 11. merge dispatch_queue (same as account dict)
if (!dispatch_queue_->combine_with(*sib.dispatch_queue_)) {
return td::Status::Error(-666, "cannot merge dispatch queues of the two ancestors");
}
sib.dispatch_queue_.reset();
// Anything else? add here
// ...

Expand Down Expand Up @@ -1091,6 +1106,11 @@ td::Status ShardState::split(ton::ShardIdFull subshard, td::uint32* queue_size)
// NB: if total_fees_extra will be allowed to be non-empty, split it here too
// 7. reset overload/underload history
overload_history_ = underload_history_ = 0;
// 8. split dispatch_queue (same as account dict)
LOG(DEBUG) << "splitting dispatch_queue";
CHECK(dispatch_queue_);
CHECK(dispatch_queue_->cut_prefix_subdict(pfx.bits(), pfx_len));
CHECK(dispatch_queue_->has_common_prefix(pfx.bits(), pfx_len));
// 999. anything else?
id_.id.shard = subshard.shard;
id_.file_hash.set_zero();
Expand Down Expand Up @@ -1390,7 +1410,7 @@ bool ValueFlow::store(vm::CellBuilder& cb) const {
&& exported.store(cb2) // exported:CurrencyCollection
&& cb.store_ref_bool(cb2.finalize()) // ]
&& fees_collected.store(cb) // fees_collected:CurrencyCollection
&& (burned.is_zero() || burned.store(cb)) // fees_burned:CurrencyCollection
&& (burned.is_zero() || burned.store(cb)) // fees_burned:CurrencyCollection
&& fees_imported.store(cb2) // ^[ fees_imported:CurrencyCollection
&& recovered.store(cb2) // recovered:CurrencyCollection
&& created.store(cb2) // created:CurrencyCollection
Expand Down Expand Up @@ -1419,8 +1439,7 @@ bool ValueFlow::fetch(vm::CellSlice& cs) {
from_prev_blk.validate_unpack(std::move(f2.r1.from_prev_blk)) &&
to_next_blk.validate_unpack(std::move(f2.r1.to_next_blk)) &&
imported.validate_unpack(std::move(f2.r1.imported)) && exported.validate_unpack(std::move(f2.r1.exported)) &&
fees_collected.validate_unpack(std::move(f2.fees_collected)) &&
burned.validate_unpack(std::move(f2.burned)) &&
fees_collected.validate_unpack(std::move(f2.fees_collected)) && burned.validate_unpack(std::move(f2.burned)) &&
fees_imported.validate_unpack(std::move(f2.r2.fees_imported)) &&
recovered.validate_unpack(std::move(f2.r2.recovered)) && created.validate_unpack(std::move(f2.r2.created)) &&
minted.validate_unpack(std::move(f2.r2.minted))) {
Expand Down Expand Up @@ -2305,4 +2324,131 @@ bool parse_block_id_ext(td::Slice str, ton::BlockIdExt& blkid) {
return parse_block_id_ext(str.begin(), str.end(), blkid);
}

bool unpack_account_dispatch_queue(Ref<vm::CellSlice> csr, vm::Dictionary& dict, td::uint64& dict_size) {
if (csr.not_null()) {
block::gen::AccountDispatchQueue::Record rec;
if (!block::tlb::csr_unpack(std::move(csr), rec)) {
return false;
}
dict = vm::Dictionary{rec.messages, 64};
dict_size = rec.count;
if (dict_size == 0 || dict.is_empty()) {
return false;
}
} else {
dict = vm::Dictionary{64};
dict_size = 0;
}
return true;
}

Ref<vm::CellSlice> pack_account_dispatch_queue(const vm::Dictionary& dict, td::uint64 dict_size) {
if (dict_size == 0) {
return {};
}
// _ messages:(HashmapE 64 EnqueuedMsg) count:uint48 = AccountDispatchQueue;
vm::CellBuilder cb;
CHECK(dict.append_dict_to_bool(cb));
cb.store_long(dict_size, 48);
return cb.as_cellslice_ref();
}

Ref<vm::CellSlice> get_dispatch_queue_min_lt_account(const vm::AugmentedDictionary& dispatch_queue,
ton::StdSmcAddress& addr) {
// TODO: This can be done more effectively
vm::AugmentedDictionary queue{dispatch_queue.get_root(), 256, tlb::aug_DispatchQueue};
if (queue.is_empty()) {
return {};
}
auto root_extra = queue.get_root_extra();
if (root_extra.is_null()) {
return {};
}
ton::LogicalTime min_lt = root_extra->prefetch_long(64);
while (true) {
td::Bits256 key;
int pfx_len = queue.get_common_prefix(key.bits(), 256);
if (pfx_len < 0) {
return {};
}
if (pfx_len == 256) {
addr = key;
return queue.lookup(key);
}
key[pfx_len] = false;
vm::AugmentedDictionary queue_cut{queue.get_root(), 256, tlb::aug_DispatchQueue};
if (!queue_cut.cut_prefix_subdict(key.bits(), pfx_len + 1)) {
return {};
}
root_extra = queue_cut.get_root_extra();
if (root_extra.is_null()) {
return {};
}
ton::LogicalTime cut_min_lt = root_extra->prefetch_long(64);
if (cut_min_lt != min_lt) {
key[pfx_len] = true;
}
if (!queue.cut_prefix_subdict(key.bits(), pfx_len + 1)) {
return {};
}
}
}

bool remove_dispatch_queue_entry(vm::AugmentedDictionary& dispatch_queue, const ton::StdSmcAddress& addr,
ton::LogicalTime lt) {
auto account_dispatch_queue = dispatch_queue.lookup(addr);
if (account_dispatch_queue.is_null()) {
return false;
}
vm::Dictionary dict{64};
size_t dict_size;
if (!unpack_account_dispatch_queue(std::move(account_dispatch_queue), dict, dict_size)) {
return false;
}
td::BitArray<64> key;
key.store_ulong(lt);
auto entry = dict.lookup_delete(key);
if (entry.is_null()) {
return false;
}
--dict_size;
account_dispatch_queue = pack_account_dispatch_queue(dict, dict_size);
if (account_dispatch_queue.not_null()) {
dispatch_queue.set(addr, account_dispatch_queue);
} else {
dispatch_queue.lookup_delete(addr);
}
return true;
}

bool MsgMetadata::unpack(vm::CellSlice& cs) {
// msg_metadata#01 depth:uint32 initiator_addr:MsgAddressInt initiator_lt:uint64 = MsgMetadata;
int tag;
return cs.fetch_int_to(4, tag) && tag == 0 && cs.fetch_uint_to(32, depth) &&
tlb::t_MsgAddressInt.extract_std_address(cs, initiator_wc, initiator_addr) &&
cs.fetch_uint_to(64, initiator_lt);
}

bool MsgMetadata::pack(vm::CellBuilder& cb) const {
// msg_metadata#0 depth:uint32 initiator_addr:MsgAddressInt initiator_lt:uint64 = MsgMetadata;
return cb.store_long_bool(0, 4) && cb.store_long_bool(depth, 32) &&
tlb::t_MsgAddressInt.store_std_address(cb, initiator_wc, initiator_addr) &&
cb.store_long_bool(initiator_lt, 64);
}

std::string MsgMetadata::to_str() const {
return PSTRING() << "[ depth=" << depth << " init=" << initiator_wc << ":" << initiator_addr.to_hex() << ":"
<< initiator_lt << " ]";
}

bool MsgMetadata::operator==(const MsgMetadata& other) const {
return depth == other.depth && initiator_wc == other.initiator_wc && initiator_addr == other.initiator_addr &&
initiator_lt == other.initiator_lt;
}

bool MsgMetadata::operator!=(const MsgMetadata& other) const {
return !(*this == other);
}


} // namespace block
22 changes: 22 additions & 0 deletions crypto/block/block.h
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ struct ShardState {
std::unique_ptr<vm::Dictionary> ihr_pending_;
std::unique_ptr<vm::Dictionary> block_create_stats_;
std::shared_ptr<block::MsgProcessedUptoCollection> processed_upto_;
std::unique_ptr<vm::AugmentedDictionary> dispatch_queue_;

bool is_valid() const {
return id_.is_valid();
Expand Down Expand Up @@ -749,4 +750,25 @@ bool parse_hex_hash(td::Slice str, td::Bits256& hash);
bool parse_block_id_ext(const char* str, const char* end, ton::BlockIdExt& blkid);
bool parse_block_id_ext(td::Slice str, ton::BlockIdExt& blkid);

bool unpack_account_dispatch_queue(Ref<vm::CellSlice> csr, vm::Dictionary& dict, td::uint64& dict_size);
Ref<vm::CellSlice> pack_account_dispatch_queue(const vm::Dictionary& dict, td::uint64 dict_size);
Ref<vm::CellSlice> get_dispatch_queue_min_lt_account(const vm::AugmentedDictionary& dispatch_queue,
ton::StdSmcAddress& addr);
bool remove_dispatch_queue_entry(vm::AugmentedDictionary& dispatch_queue, const ton::StdSmcAddress& addr,
ton::LogicalTime lt);

struct MsgMetadata {
td::uint32 depth;
ton::WorkchainId initiator_wc;
ton::StdSmcAddress initiator_addr;
ton::LogicalTime initiator_lt;

bool unpack(vm::CellSlice& cs);
bool pack(vm::CellBuilder& cb) const;
std::string to_str() const;

bool operator==(const MsgMetadata& other) const;
bool operator!=(const MsgMetadata& other) const;
};

} // namespace block
22 changes: 21 additions & 1 deletion crypto/block/block.tlb
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,12 @@ interm_addr_ext$11 workchain_id:int32 addr_pfx:uint64
msg_envelope#4 cur_addr:IntermediateAddress
next_addr:IntermediateAddress fwd_fee_remaining:Grams
msg:^(Message Any) = MsgEnvelope;
msg_metadata#0 depth:uint32 initiator_addr:MsgAddressInt initiator_lt:uint64 = MsgMetadata;
msg_envelope_v2#5 cur_addr:IntermediateAddress
next_addr:IntermediateAddress fwd_fee_remaining:Grams
msg:^(Message Any)
deferred_lt:(Maybe uint64)
metadata:(Maybe MsgMetadata) = MsgEnvelope;
//
msg_import_ext$000 msg:^(Message Any) transaction:^Transaction
= InMsg;
Expand All @@ -187,6 +193,9 @@ msg_discard_fin$110 in_msg:^MsgEnvelope transaction_id:uint64
fwd_fee:Grams = InMsg;
msg_discard_tr$111 in_msg:^MsgEnvelope transaction_id:uint64
fwd_fee:Grams proof_delivered:^Cell = InMsg;
msg_import_deferred_fin$00100 in_msg:^MsgEnvelope
transaction:^Transaction fwd_fee:Grams = InMsg;
msg_import_deferred_tr$00101 in_msg:^MsgEnvelope out_msg:^MsgEnvelope = InMsg;
//
import_fees$_ fees_collected:Grams
value_imported:CurrencyCollection = ImportFees;
Expand All @@ -210,6 +219,10 @@ msg_export_tr_req$111 out_msg:^MsgEnvelope
imported:^InMsg = OutMsg;
msg_export_deq_imm$100 out_msg:^MsgEnvelope
reimport:^InMsg = OutMsg;
msg_export_new_defer$10100 out_msg:^MsgEnvelope
transaction:^Transaction = OutMsg;
msg_export_deferred_tr$10101 out_msg:^MsgEnvelope
imported:^InMsg = OutMsg;

_ enqueued_lt:uint64 out_msg:^MsgEnvelope = EnqueuedMsg;

Expand All @@ -224,8 +237,15 @@ _ (HashmapE 96 ProcessedUpto) = ProcessedInfo;
ihr_pending$_ import_lt:uint64 = IhrPendingSince;
_ (HashmapE 320 IhrPendingSince) = IhrPendingInfo;

// key - created_lt
_ messages:(HashmapE 64 EnqueuedMsg) count:uint48 = AccountDispatchQueue;
// key - sender address, aug - min created_lt
_ (HashmapAugE 256 AccountDispatchQueue uint64) = DispatchQueue;

out_msg_queue_extra#0 dispatch_queue:DispatchQueue out_queue_size:(Maybe uint48) = OutMsgQueueExtra;

_ out_queue:OutMsgQueue proc_info:ProcessedInfo
ihr_pending:IhrPendingInfo = OutMsgQueueInfo;
extra:(Maybe OutMsgQueueExtra) = OutMsgQueueInfo;
//
storage_used$_ cells:(VarUInteger 7) bits:(VarUInteger 7)
public_cells:(VarUInteger 7) = StorageUsed;
Expand Down
Loading

0 comments on commit 77f8db6

Please sign in to comment.