Skip to content

Commit

Permalink
Improve state serializer
Browse files Browse the repository at this point in the history
Use previous persistent state to speed up reading
  • Loading branch information
SpyCheese committed Jul 17, 2024
1 parent 8e1d628 commit 9982a77
Show file tree
Hide file tree
Showing 16 changed files with 286 additions and 80 deletions.
3 changes: 3 additions & 0 deletions crypto/vm/cells/DataCell.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ td::Result<Ref<DataCell>> DataCell::create(td::ConstBitPtr data, unsigned bits,
if (bits != 8 + hash_bytes * 8) {
return td::Status::Error("Not enouch data for a Library special cell");
}
if (!refs.empty()) {
return td::Status::Error("Library special cell has a cell reference");
}
break;
}

Expand Down
23 changes: 17 additions & 6 deletions tdutils/td/utils/filesystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,14 @@ Result<T> read_file_impl(CSlice path, int64 size, int64 offset) {
return Status::Error("Failed to read file: invalid size");
}
auto content = create_empty<T>(narrow_cast<size_t>(size));
TRY_RESULT(got_size, from_file.pread(as_mutable_slice(content), offset));
if (got_size != static_cast<size_t>(size)) {
return Status::Error("Failed to read file");
MutableSlice slice = as_mutable_slice(content);
while (!slice.empty()) {
TRY_RESULT(got_size, from_file.pread(slice, offset));
if (got_size == 0) {
return Status::Error("Failed to read file");
}
offset += got_size;
slice.remove_prefix(got_size);
}
from_file.close();
return std::move(content);
Expand Down Expand Up @@ -103,9 +108,15 @@ Status write_file(CSlice to, Slice data, WriteFileOptions options) {
TRY_STATUS(to_file.lock(FileFd::LockFlags::Write, to.str(), 10));
TRY_STATUS(to_file.truncate_to_current_position(0));
}
TRY_RESULT(written, to_file.write(data));
if (written != size) {
return Status::Error(PSLICE() << "Failed to write file: written " << written << " bytes instead of " << size);
size_t total_written = 0;
while (!data.empty()) {
TRY_RESULT(written, to_file.write(data));
if (written == 0) {
return Status::Error(PSLICE() << "Failed to write file: written " << total_written << " bytes instead of "
<< size);
}
total_written += written;
data.remove_prefix(written);
}
if (options.need_sync) {
TRY_STATUS(to_file.sync());
Expand Down
106 changes: 73 additions & 33 deletions validator/db/archive-manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,14 +310,17 @@ void ArchiveManager::get_file(ConstBlockHandle handle, FileReference ref_id, td:
get_file_short_cont(std::move(ref_id), get_max_temp_file_desc_idx(), std::move(promise));
}

void ArchiveManager::written_perm_state(FileReferenceShort id) {
perm_states_.emplace(id.hash(), id);
void ArchiveManager::register_perm_state(FileReferenceShort id) {
BlockSeqno masterchain_seqno = 0;
id.ref().visit(td::overloaded(
[&](const fileref::PersistentStateShort &x) { masterchain_seqno = x.masterchain_seqno; }, [&](const auto &) {}));
perm_states_[{masterchain_seqno, id.hash()}] = id;
}

void ArchiveManager::add_zero_state(BlockIdExt block_id, td::BufferSlice data, td::Promise<td::Unit> promise) {
auto id = FileReference{fileref::ZeroState{block_id}};
auto hash = id.hash();
if (perm_states_.find(hash) != perm_states_.end()) {
if (perm_states_.find({0, hash}) != perm_states_.end()) {
promise.set_value(td::Unit());
return;
}
Expand All @@ -328,7 +331,7 @@ void ArchiveManager::add_zero_state(BlockIdExt block_id, td::BufferSlice data, t
if (R.is_error()) {
promise.set_error(R.move_as_error());
} else {
td::actor::send_closure(SelfId, &ArchiveManager::written_perm_state, id);
td::actor::send_closure(SelfId, &ArchiveManager::register_perm_state, id);
promise.set_value(td::Unit());
}
});
Expand Down Expand Up @@ -357,12 +360,13 @@ void ArchiveManager::add_persistent_state_gen(BlockIdExt block_id, BlockIdExt ma
add_persistent_state_impl(block_id, masterchain_block_id, std::move(promise), std::move(create_writer));
}

void ArchiveManager::add_persistent_state_impl(BlockIdExt block_id, BlockIdExt masterchain_block_id,
td::Promise<td::Unit> promise,
std::function<void(std::string, td::Promise<std::string>)> create_writer) {
void ArchiveManager::add_persistent_state_impl(
BlockIdExt block_id, BlockIdExt masterchain_block_id, td::Promise<td::Unit> promise,
std::function<void(std::string, td::Promise<std::string>)> create_writer) {
auto id = FileReference{fileref::PersistentState{block_id, masterchain_block_id}};
BlockSeqno masterchain_seqno = masterchain_block_id.seqno();
auto hash = id.hash();
if (perm_states_.find(hash) != perm_states_.end()) {
if (perm_states_.find({masterchain_seqno, hash}) != perm_states_.end()) {
promise.set_value(td::Unit());
return;
}
Expand All @@ -373,7 +377,7 @@ void ArchiveManager::add_persistent_state_impl(BlockIdExt block_id, BlockIdExt m
if (R.is_error()) {
promise.set_error(R.move_as_error());
} else {
td::actor::send_closure(SelfId, &ArchiveManager::written_perm_state, id);
td::actor::send_closure(SelfId, &ArchiveManager::register_perm_state, id);
promise.set_value(td::Unit());
}
});
Expand All @@ -383,7 +387,7 @@ void ArchiveManager::add_persistent_state_impl(BlockIdExt block_id, BlockIdExt m
void ArchiveManager::get_zero_state(BlockIdExt block_id, td::Promise<td::BufferSlice> promise) {
auto id = FileReference{fileref::ZeroState{block_id}};
auto hash = id.hash();
if (perm_states_.find(hash) == perm_states_.end()) {
if (perm_states_.find({0, hash}) == perm_states_.end()) {
promise.set_error(td::Status::Error(ErrorCode::notready, "zerostate not in db"));
return;
}
Expand All @@ -395,18 +399,38 @@ void ArchiveManager::get_zero_state(BlockIdExt block_id, td::Promise<td::BufferS
void ArchiveManager::check_zero_state(BlockIdExt block_id, td::Promise<bool> promise) {
auto id = FileReference{fileref::ZeroState{block_id}};
auto hash = id.hash();
if (perm_states_.find(hash) == perm_states_.end()) {
if (perm_states_.find({0, hash}) == perm_states_.end()) {
promise.set_result(false);
return;
}
promise.set_result(true);
}

void ArchiveManager::get_previous_persistent_state_files(
BlockSeqno cur_mc_seqno, td::Promise<std::vector<std::pair<std::string, ShardIdFull>>> promise) {
auto it = perm_states_.lower_bound({cur_mc_seqno, FileHash::zero()});
if (it == perm_states_.begin()) {
promise.set_value({});
return;
}
--it;
BlockSeqno mc_seqno = it->first.first;
std::vector<std::pair<std::string, ShardIdFull>> files;
while (it->first.first == mc_seqno) {
files.emplace_back(db_root_ + "/archive/states/" + it->second.filename_short(), it->second.shard());
if (it == perm_states_.begin()) {
break;
}
--it;
}
promise.set_value(std::move(files));
}

void ArchiveManager::get_persistent_state(BlockIdExt block_id, BlockIdExt masterchain_block_id,
td::Promise<td::BufferSlice> promise) {
auto id = FileReference{fileref::PersistentState{block_id, masterchain_block_id}};
auto hash = id.hash();
if (perm_states_.find(hash) == perm_states_.end()) {
if (perm_states_.find({masterchain_block_id.seqno(), hash}) == perm_states_.end()) {
promise.set_error(td::Status::Error(ErrorCode::notready, "state file not in db"));
return;
}
Expand All @@ -419,7 +443,7 @@ void ArchiveManager::get_persistent_state_slice(BlockIdExt block_id, BlockIdExt
td::int64 max_size, td::Promise<td::BufferSlice> promise) {
auto id = FileReference{fileref::PersistentState{block_id, masterchain_block_id}};
auto hash = id.hash();
if (perm_states_.find(hash) == perm_states_.end()) {
if (perm_states_.find({masterchain_block_id.seqno(), hash}) == perm_states_.end()) {
promise.set_error(td::Status::Error(ErrorCode::notready, "state file not in db"));
return;
}
Expand All @@ -432,7 +456,7 @@ void ArchiveManager::check_persistent_state(BlockIdExt block_id, BlockIdExt mast
td::Promise<bool> promise) {
auto id = FileReference{fileref::PersistentState{block_id, masterchain_block_id}};
auto hash = id.hash();
if (perm_states_.find(hash) == perm_states_.end()) {
if (perm_states_.find({masterchain_block_id.seqno(), hash}) == perm_states_.end()) {
promise.set_result(false);
return;
}
Expand Down Expand Up @@ -884,13 +908,11 @@ void ArchiveManager::start_up() {
R = FileReferenceShort::create(newfname);
R.ensure();
}
auto f = R.move_as_ok();
auto hash = f.hash();
perm_states_[hash] = std::move(f);
register_perm_state(R.move_as_ok());
}
}).ensure();

persistent_state_gc(FileHash::zero());
persistent_state_gc({0, FileHash::zero()});

double open_since = td::Clocks::system() - opts_->get_archive_preload_period();
for (auto it = files_.rbegin(); it != files_.rend(); ++it) {
Expand Down Expand Up @@ -976,11 +998,12 @@ void ArchiveManager::run_gc(UnixTime mc_ts, UnixTime gc_ts, UnixTime archive_ttl
}
}

void ArchiveManager::persistent_state_gc(FileHash last) {
if (perm_states_.size() == 0) {
void ArchiveManager::persistent_state_gc(std::pair<BlockSeqno, FileHash> last) {
if (perm_states_.empty()) {
delay_action(
[hash = FileHash::zero(), SelfId = actor_id(this)]() {
td::actor::send_closure(SelfId, &ArchiveManager::persistent_state_gc, hash);
[SelfId = actor_id(this)]() {
td::actor::send_closure(SelfId, &ArchiveManager::persistent_state_gc,
std::pair<BlockSeqno, FileHash>{0, FileHash::zero()});
},
td::Timestamp::in(1.0));
return;
Expand All @@ -993,12 +1016,12 @@ void ArchiveManager::persistent_state_gc(FileHash last) {
it = perm_states_.begin();
}

auto key = it->first;
auto &F = it->second;
auto hash = F.hash();

int res = 0;
BlockSeqno seqno = 0;
F.ref().visit(td::overloaded([&](const fileref::ZeroStateShort &x) { res = 1; },
F.ref().visit(td::overloaded([&](const fileref::ZeroStateShort &) { res = 1; },
[&](const fileref::PersistentStateShort &x) {
res = 0;
seqno = x.masterchain_seqno;
Expand All @@ -1010,40 +1033,57 @@ void ArchiveManager::persistent_state_gc(FileHash last) {
perm_states_.erase(it);
}
if (res != 0) {
delay_action([hash, SelfId = actor_id(
this)]() { td::actor::send_closure(SelfId, &ArchiveManager::persistent_state_gc, hash); },
delay_action([key, SelfId = actor_id(
this)]() { td::actor::send_closure(SelfId, &ArchiveManager::persistent_state_gc, key); },
td::Timestamp::in(1.0));
return;
}
CHECK(seqno == key.first);

// Do not delete the most recent fully serialized state
bool allow_delete = false;
auto it2 = perm_states_.lower_bound({seqno + 1, FileHash::zero()});
if (it2 != perm_states_.end()) {
it2 = perm_states_.lower_bound({it2->first.first + 1, FileHash::zero()});
if (it2 != perm_states_.end()) {
allow_delete = true;
}
}
if (!allow_delete) {
delay_action([key, SelfId = actor_id(
this)]() { td::actor::send_closure(SelfId, &ArchiveManager::persistent_state_gc, key); },
td::Timestamp::in(1.0));
return;
}

auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), hash](td::Result<ConstBlockHandle> R) {
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), key](td::Result<ConstBlockHandle> R) {
if (R.is_error()) {
td::actor::send_closure(SelfId, &ArchiveManager::got_gc_masterchain_handle, nullptr, hash);
td::actor::send_closure(SelfId, &ArchiveManager::got_gc_masterchain_handle, nullptr, key);
} else {
td::actor::send_closure(SelfId, &ArchiveManager::got_gc_masterchain_handle, R.move_as_ok(), hash);
td::actor::send_closure(SelfId, &ArchiveManager::got_gc_masterchain_handle, R.move_as_ok(), key);
}
});

get_block_by_seqno(AccountIdPrefixFull{masterchainId, 0}, seqno, std::move(P));
}

void ArchiveManager::got_gc_masterchain_handle(ConstBlockHandle handle, FileHash hash) {
void ArchiveManager::got_gc_masterchain_handle(ConstBlockHandle handle, std::pair<BlockSeqno, FileHash> key) {
bool to_del = false;
if (!handle || !handle->inited_unix_time() || !handle->unix_time()) {
to_del = true;
} else {
auto ttl = ValidatorManager::persistent_state_ttl(handle->unix_time());
to_del = ttl < td::Clocks::system();
}
auto it = perm_states_.find(hash);
auto it = perm_states_.find(key);
CHECK(it != perm_states_.end());
auto &F = it->second;
if (to_del) {
td::unlink(db_root_ + "/archive/states/" + F.filename_short()).ignore();
perm_states_.erase(it);
}
delay_action([hash, SelfId = actor_id(
this)]() { td::actor::send_closure(SelfId, &ArchiveManager::persistent_state_gc, hash); },
delay_action([key, SelfId = actor_id(
this)]() { td::actor::send_closure(SelfId, &ArchiveManager::persistent_state_gc, key); },
td::Timestamp::in(1.0));
}

Expand Down
10 changes: 6 additions & 4 deletions validator/db/archive-manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class ArchiveManager : public td::actor::Actor {
td::int64 max_size, td::Promise<td::BufferSlice> promise);
void check_persistent_state(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::Promise<bool> promise);
void check_zero_state(BlockIdExt block_id, td::Promise<bool> promise);
void get_previous_persistent_state_files(BlockSeqno cur_mc_seqno,
td::Promise<std::vector<std::pair<std::string, ShardIdFull>>> promise);

void truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handle, td::Promise<td::Unit> promise);
//void truncate_continue(BlockSeqno masterchain_seqno, td::Promise<td::Unit> promise);
Expand Down Expand Up @@ -180,7 +182,7 @@ class ArchiveManager : public td::actor::Actor {
return p.key ? key_files_ : p.temp ? temp_files_ : files_;
}

std::map<FileHash, FileReferenceShort> perm_states_;
std::map<std::pair<BlockSeqno, FileHash>, FileReferenceShort> perm_states_; // Mc block seqno, hash -> state

void load_package(PackageId seqno);
void delete_package(PackageId seqno, td::Promise<td::Unit> promise);
Expand All @@ -207,10 +209,10 @@ class ArchiveManager : public td::actor::Actor {

void add_persistent_state_impl(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::Promise<td::Unit> promise,
std::function<void(std::string, td::Promise<std::string>)> create_writer);
void written_perm_state(FileReferenceShort id);
void register_perm_state(FileReferenceShort id);

void persistent_state_gc(FileHash last);
void got_gc_masterchain_handle(ConstBlockHandle handle, FileHash hash);
void persistent_state_gc(std::pair<BlockSeqno, FileHash> last);
void got_gc_masterchain_handle(ConstBlockHandle handle, std::pair<BlockSeqno, FileHash> key);

std::string db_root_;
td::Ref<ValidatorManagerOptions> opts_;
Expand Down
6 changes: 6 additions & 0 deletions validator/db/rootdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,12 @@ void RootDb::check_zero_state_file_exists(BlockIdExt block_id, td::Promise<bool>
td::actor::send_closure(archive_db_, &ArchiveManager::check_zero_state, block_id, std::move(promise));
}

void RootDb::get_previous_persistent_state_files(
BlockSeqno cur_mc_seqno, td::Promise<std::vector<std::pair<std::string, ShardIdFull>>> promise) {
td::actor::send_closure(archive_db_, &ArchiveManager::get_previous_persistent_state_files, cur_mc_seqno,
std::move(promise));
}

void RootDb::store_block_handle(BlockHandle handle, td::Promise<td::Unit> promise) {
td::actor::send_closure(archive_db_, &ArchiveManager::update_handle, std::move(handle), std::move(promise));
}
Expand Down
2 changes: 2 additions & 0 deletions validator/db/rootdb.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ class RootDb : public Db {
void store_zero_state_file(BlockIdExt block_id, td::BufferSlice state, td::Promise<td::Unit> promise) override;
void get_zero_state_file(BlockIdExt block_id, td::Promise<td::BufferSlice> promise) override;
void check_zero_state_file_exists(BlockIdExt block_id, td::Promise<bool> promise) override;
void get_previous_persistent_state_files(
BlockSeqno cur_mc_seqno, td::Promise<std::vector<std::pair<std::string, ShardIdFull>>> promise) override;

void try_get_static_file(FileHash file_hash, td::Promise<td::BufferSlice> promise) override;

Expand Down
10 changes: 4 additions & 6 deletions validator/full-node-shard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -832,8 +832,7 @@ void FullNodeShardImpl::download_persistent_state(BlockIdExt id, BlockIdExt mast
auto &b = choose_neighbour();
td::actor::create_actor<DownloadState>(PSTRING() << "downloadstatereq" << id.id.to_str(), id, masterchain_block_id,
adnl_id_, overlay_id_, b.adnl_id, priority, timeout, validator_manager_,
b.use_rldp2() ? (td::actor::ActorId<adnl::AdnlSenderInterface>)rldp2_ : rldp_,
overlays_, adnl_, client_, std::move(promise))
rldp2_, overlays_, adnl_, client_, std::move(promise))
.release();
}

Expand Down Expand Up @@ -867,10 +866,9 @@ void FullNodeShardImpl::get_next_key_blocks(BlockIdExt block_id, td::Timestamp t
void FullNodeShardImpl::download_archive(BlockSeqno masterchain_seqno, std::string tmp_dir, td::Timestamp timeout,
td::Promise<std::string> promise) {
auto &b = choose_neighbour();
td::actor::create_actor<DownloadArchiveSlice>(
"archive", masterchain_seqno, std::move(tmp_dir), adnl_id_, overlay_id_, b.adnl_id, timeout, validator_manager_,
b.use_rldp2() ? (td::actor::ActorId<adnl::AdnlSenderInterface>)rldp2_ : rldp_, overlays_, adnl_, client_,
create_neighbour_promise(b, std::move(promise)))
td::actor::create_actor<DownloadArchiveSlice>("archive", masterchain_seqno, std::move(tmp_dir), adnl_id_, overlay_id_,
b.adnl_id, timeout, validator_manager_, rldp2_, overlays_, adnl_,
client_, create_neighbour_promise(b, std::move(promise)))
.release();
}

Expand Down
4 changes: 0 additions & 4 deletions validator/full-node-shard.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,6 @@ struct Neighbour {
void query_failed();
void update_roundtrip(double t);

bool use_rldp2() const {
return std::make_pair(proto_version, capabilities) >= std::make_pair<td::uint32, td::uint64>(2, 2);
}

static Neighbour zero;
};

Expand Down
2 changes: 2 additions & 0 deletions validator/interfaces/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ class Db : public td::actor::Actor {
virtual void store_zero_state_file(BlockIdExt block_id, td::BufferSlice state, td::Promise<td::Unit> promise) = 0;
virtual void get_zero_state_file(BlockIdExt block_id, td::Promise<td::BufferSlice> promise) = 0;
virtual void check_zero_state_file_exists(BlockIdExt block_id, td::Promise<bool> promise) = 0;
virtual void get_previous_persistent_state_files(
BlockSeqno cur_mc_seqno, td::Promise<std::vector<std::pair<std::string, ShardIdFull>>> promise) = 0;

virtual void try_get_static_file(FileHash file_hash, td::Promise<td::BufferSlice> promise) = 0;

Expand Down
4 changes: 4 additions & 0 deletions validator/manager-disk.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ class ValidatorManagerImpl : public ValidatorManager {
td::int64 max_length, td::Promise<td::BufferSlice> promise) override {
UNREACHABLE();
}
void get_previous_persistent_state_files(
BlockSeqno cur_mc_seqno, td::Promise<std::vector<std::pair<std::string, ShardIdFull>>> promise) override {
UNREACHABLE();
}
void get_block_proof(BlockHandle handle, td::Promise<td::BufferSlice> promise) override;
void get_block_proof_link(BlockHandle block_id, td::Promise<td::BufferSlice> promise) override {
UNREACHABLE();
Expand Down
Loading

0 comments on commit 9982a77

Please sign in to comment.