diff --git a/tl/generate/scheme/ton_api.tl b/tl/generate/scheme/ton_api.tl index f7dfb93fa..a4956780c 100644 --- a/tl/generate/scheme/ton_api.tl +++ b/tl/generate/scheme/ton_api.tl @@ -456,6 +456,7 @@ tonNode.downloadKeyBlockProof block:tonNode.blockIdExt = tonNode.Data; tonNode.downloadBlockProofLink block:tonNode.blockIdExt = tonNode.Data; tonNode.downloadKeyBlockProofLink block:tonNode.blockIdExt = tonNode.Data; tonNode.getArchiveInfo masterchain_seqno:int = tonNode.ArchiveInfo; +tonNode.getShardArchiveInfo masterchain_seqno:int shard_prefix:tonNode.shardId = tonNode.ArchiveInfo; tonNode.getArchiveSlice archive_id:long offset:long max_size:int = tonNode.Data; tonNode.getOutMsgQueueProof dst_shard:tonNode.shardId blocks:(vector tonNode.blockIdExt) limits:tonNode.importedMsgQueueLimits = tonNode.OutMsgQueueProof; diff --git a/tl/generate/scheme/ton_api.tlo b/tl/generate/scheme/ton_api.tlo index 83f7d32f9..9ffd9fd82 100644 Binary files a/tl/generate/scheme/ton_api.tlo and b/tl/generate/scheme/ton_api.tlo differ diff --git a/validator/db/archive-manager.cpp b/validator/db/archive-manager.cpp index b87d04f78..88da8dc8c 100644 --- a/validator/db/archive-manager.cpp +++ b/validator/db/archive-manager.cpp @@ -600,8 +600,8 @@ void ArchiveManager::load_package(PackageId id) { } } - desc.file = - td::actor::create_actor("slice", id.id, id.key, id.temp, false, db_root_, archive_lru_.get(), statistics_); + desc.file = td::actor::create_actor("slice", id.id, id.key, id.temp, false, 0, db_root_, + archive_lru_.get(), statistics_); m.emplace(id, std::move(desc)); update_permanent_slices(); @@ -635,8 +635,9 @@ const ArchiveManager::FileDescription *ArchiveManager::add_file_desc(ShardIdFull FileDescription new_desc{id, false}; td::mkdir(db_root_ + id.path()).ensure(); std::string prefix = PSTRING() << db_root_ << id.path() << id.name(); - new_desc.file = - td::actor::create_actor("slice", id.id, id.key, id.temp, false, db_root_, archive_lru_.get(), statistics_); + new_desc.file = td::actor::create_actor("slice", id.id, id.key, id.temp, false, + id.key || id.temp ? 0 : cur_shard_split_depth_, db_root_, + archive_lru_.get(), statistics_); const FileDescription &desc = f.emplace(id, std::move(new_desc)); if (!id.temp) { update_desc(f, desc, shard, seqno, ts, lt); @@ -1091,14 +1092,16 @@ PackageId ArchiveManager::get_package_id_force(BlockSeqno masterchain_seqno, Sha return it->first; } -void ArchiveManager::get_archive_id(BlockSeqno masterchain_seqno, td::Promise promise) { +void ArchiveManager::get_archive_id(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, + td::Promise promise) { auto F = get_file_desc_by_seqno(ShardIdFull{masterchainId}, masterchain_seqno, false); if (!F) { promise.set_error(td::Status::Error(ErrorCode::notready, "archive not found")); return; } - td::actor::send_closure(F->file_actor_id(), &ArchiveSlice::get_archive_id, masterchain_seqno, std::move(promise)); + td::actor::send_closure(F->file_actor_id(), &ArchiveSlice::get_archive_id, masterchain_seqno, shard_prefix, + std::move(promise)); } void ArchiveManager::get_archive_slice(td::uint64 archive_id, td::uint64 offset, td::uint32 limit, diff --git a/validator/db/archive-manager.hpp b/validator/db/archive-manager.hpp index a1ed97022..e260b8dc7 100644 --- a/validator/db/archive-manager.hpp +++ b/validator/db/archive-manager.hpp @@ -65,7 +65,7 @@ class ArchiveManager : public td::actor::Actor { void get_block_by_lt(AccountIdPrefixFull account_id, LogicalTime lt, td::Promise promise); void get_block_by_seqno(AccountIdPrefixFull account_id, BlockSeqno seqno, td::Promise promise); - void get_archive_id(BlockSeqno masterchain_seqno, td::Promise promise); + void get_archive_id(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, td::Promise promise); void get_archive_slice(td::uint64 archive_id, td::uint64 offset, td::uint32 limit, td::Promise promise); @@ -75,6 +75,10 @@ class ArchiveManager : public td::actor::Actor { void commit_transaction(); void set_async_mode(bool mode, td::Promise promise); + void set_current_shard_split_depth(td::uint32 value) { + cur_shard_split_depth_ = value; + } + static constexpr td::uint32 archive_size() { return 20000; } @@ -173,6 +177,7 @@ class ArchiveManager : public td::actor::Actor { bool async_mode_ = false; bool huge_transaction_started_ = false; td::uint32 huge_transaction_size_ = 0; + td::uint32 cur_shard_split_depth_ = 0; DbStatistics statistics_; diff --git a/validator/db/archive-slice.cpp b/validator/db/archive-slice.cpp index d392431a9..0fcdf7924 100644 --- a/validator/db/archive-slice.cpp +++ b/validator/db/archive-slice.cpp @@ -39,7 +39,7 @@ class PackageStatistics { void record_close(uint64_t count = 1) { close_count.fetch_add(count, std::memory_order_relaxed); } - + void record_read(double time, uint64_t bytes) { read_bytes.fetch_add(bytes, std::memory_order_relaxed); std::lock_guard guard(read_mutex); @@ -56,10 +56,10 @@ class PackageStatistics { std::stringstream ss; ss.setf(std::ios::fixed); ss.precision(6); - + ss << "ton.pack.open COUNT : " << open_count.exchange(0, std::memory_order_relaxed) << "\n"; ss << "ton.pack.close COUNT : " << close_count.exchange(0, std::memory_order_relaxed) << "\n"; - + ss << "ton.pack.read.bytes COUNT : " << read_bytes.exchange(0, std::memory_order_relaxed) << "\n"; ss << "ton.pack.write.bytes COUNT : " << write_bytes.exchange(0, std::memory_order_relaxed) << "\n"; @@ -118,7 +118,7 @@ void PackageWriter::append(std::string filename, td::BufferSlice data, return; } start = td::Timestamp::now(); - offset = p->append(std::move(filename), std::move(data), !async_mode_); + offset = p->append(std::move(filename), std::move(data), !async_mode_); end = td::Timestamp::now(); size = p->size(); } @@ -152,6 +152,21 @@ class PackageReader : public td::actor::Actor { std::shared_ptr statistics_; }; +static std::string get_package_file_name(PackageId p_id, ShardIdFull shard_prefix) { + td::StringBuilder sb; + sb << p_id.name(); + if (!shard_prefix.is_masterchain()) { + sb << "."; + sb << shard_prefix.workchain << ":" << shard_to_str(shard_prefix.shard); + } + sb << ".pack"; + return sb.as_cslice().str(); +} + +static std::string package_info_to_str(BlockSeqno seqno, ShardIdFull shard_prefix) { + return PSTRING() << seqno << "." << shard_prefix.workchain << ":" << shard_to_str(shard_prefix.shard); +} + void ArchiveSlice::add_handle(BlockHandle handle, td::Promise promise) { if (destroyed_) { promise.set_error(td::Status::Error(ErrorCode::notready, "package already gc'd")); @@ -271,7 +286,8 @@ void ArchiveSlice::add_file(BlockHandle handle, FileReference ref_id, td::Buffer TRY_RESULT_PROMISE( promise, p, choose_package( - handle ? handle->id().is_masterchain() ? handle->id().seqno() : handle->masterchain_ref_block() : 0, true)); + handle ? handle->id().is_masterchain() ? handle->id().seqno() : handle->masterchain_ref_block() : 0, + handle ? handle->id().shard_full() : ShardIdFull{masterchainId}, true)); std::string value; auto R = kv_->get(ref_id.hash().to_hex(), value); R.ensure(); @@ -376,7 +392,8 @@ void ArchiveSlice::get_file(ConstBlockHandle handle, FileReference ref_id, td::P TRY_RESULT_PROMISE( promise, p, choose_package( - handle ? handle->id().is_masterchain() ? handle->id().seqno() : handle->masterchain_ref_block() : 0, false)); + handle ? handle->id().is_masterchain() ? handle->id().seqno() : handle->masterchain_ref_block() : 0, + handle ? handle->id().shard_full() : ShardIdFull{masterchainId}, false)); promise = begin_async_query(std::move(promise)); auto P = td::PromiseCreator::lambda( [promise = std::move(promise)](td::Result> R) mutable { @@ -536,18 +553,32 @@ void ArchiveSlice::get_slice(td::uint64 archive_id, td::uint64 offset, td::uint3 } before_query(); auto value = static_cast(archive_id >> 32); - TRY_RESULT_PROMISE(promise, p, choose_package(value, false)); + PackageInfo *p; + if (shard_split_depth_ == 0) { + TRY_RESULT_PROMISE_ASSIGN(promise, p, choose_package(value, ShardIdFull{masterchainId}, false)); + } else { + if (value >= packages_.size()) { + promise.set_error(td::Status::Error(ErrorCode::notready, "no such package")); + return; + } + p = &packages_[value]; + } promise = begin_async_query(std::move(promise)); td::actor::create_actor("readfile", p->path, offset, limit, 0, std::move(promise)).release(); } -void ArchiveSlice::get_archive_id(BlockSeqno masterchain_seqno, td::Promise promise) { +void ArchiveSlice::get_archive_id(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, + td::Promise promise) { before_query(); if (!sliced_mode_) { promise.set_result(archive_id_); } else { - TRY_RESULT_PROMISE(promise, p, choose_package(masterchain_seqno, false)); - promise.set_result(p->id * (1ull << 32) + archive_id_); + TRY_RESULT_PROMISE(promise, p, choose_package(masterchain_seqno, shard_prefix, false)); + if (shard_split_depth_ == 0) { + promise.set_result(p->seqno * (1ull << 32) + archive_id_); + } else { + promise.set_result(p->idx * (1ull << 32) + archive_id_); + } } } @@ -573,9 +604,18 @@ void ArchiveSlice::before_query() { R2.ensure(); slice_size_ = td::to_integer(value); CHECK(slice_size_ > 0); + R2 = kv_->get("shard_split_depth", value); + R2.ensure(); + if (R2.move_as_ok() == td::KeyValue::GetStatus::Ok) { + shard_split_depth_ = td::to_integer(value); + CHECK(shard_split_depth_ <= 60); + } else { + shard_split_depth_ = 0; + } for (td::uint32 i = 0; i < tot; i++) { R2 = kv_->get(PSTRING() << "status." << i, value); R2.ensure(); + CHECK(R2.move_as_ok() == td::KeyValue::GetStatus::Ok); auto len = td::to_integer(value); R2 = kv_->get(PSTRING() << "version." << i, value); R2.ensure(); @@ -583,12 +623,24 @@ void ArchiveSlice::before_query() { if (R2.move_as_ok() == td::KeyValue::GetStatus::Ok) { ver = td::to_integer(value); } - auto v = archive_id_ + slice_size_ * i; - add_package(v, len, ver); + td::uint32 seqno; + ShardIdFull shard_prefix; + if (shard_split_depth_ == 0) { + seqno = archive_id_ + slice_size_ * i; + shard_prefix = ShardIdFull{masterchainId}; + } else { + R2 = kv_->get(PSTRING() << "info." << i, value); + R2.ensure(); + CHECK(R2.move_as_ok() == td::KeyValue::GetStatus::Ok); + unsigned long long shard; + CHECK(sscanf(value.c_str(), "%u.%d:%016llx", &seqno, &shard_prefix.workchain, &shard) == 3); + shard_prefix.shard = shard; + } + add_package(seqno, shard_prefix, len, ver); } } else { auto len = td::to_integer(value); - add_package(archive_id_, len, 0); + add_package(archive_id_, ShardIdFull{masterchainId}, len, 0); } } else { if (!temp_ && !key_blocks_only_) { @@ -599,13 +651,17 @@ void ArchiveSlice::before_query() { kv_->set("slice_size", td::to_string(slice_size_)).ensure(); kv_->set("status.0", "0").ensure(); kv_->set("version.0", td::to_string(default_package_version())).ensure(); + if (shard_split_depth_ > 0) { + kv_->set("info.0", package_info_to_str(archive_id_, ShardIdFull{masterchainId})).ensure(); + kv_->set("shard_split_depth", td::to_string(shard_split_depth_)).ensure(); + } kv_->commit_transaction().ensure(); - add_package(archive_id_, 0, default_package_version()); + add_package(archive_id_, ShardIdFull{masterchainId}, 0, default_package_version()); } else { kv_->begin_transaction().ensure(); kv_->set("status", "0").ensure(); kv_->commit_transaction().ensure(); - add_package(archive_id_, 0, 0); + add_package(archive_id_, ShardIdFull{masterchainId}, 0, 0); } } } @@ -642,6 +698,7 @@ void ArchiveSlice::do_close() { statistics_.pack_statistics->record_close(packages_.size()); } packages_.clear(); + id_to_package_.clear(); } template @@ -697,48 +754,61 @@ void ArchiveSlice::set_async_mode(bool mode, td::Promise promise) { } } -ArchiveSlice::ArchiveSlice(td::uint32 archive_id, bool key_blocks_only, bool temp, bool finalized, std::string db_root, +ArchiveSlice::ArchiveSlice(td::uint32 archive_id, bool key_blocks_only, bool temp, bool finalized, + td::uint32 shard_split_depth, std::string db_root, td::actor::ActorId archive_lru, DbStatistics statistics) : archive_id_(archive_id) , key_blocks_only_(key_blocks_only) , temp_(temp) , finalized_(finalized) , p_id_(archive_id_, key_blocks_only_, temp_) + , shard_split_depth_(temp || key_blocks_only ? 0 : shard_split_depth) , db_root_(std::move(db_root)) , archive_lru_(std::move(archive_lru)) , statistics_(statistics) { db_path_ = PSTRING() << db_root_ << p_id_.path() << p_id_.name() << ".index"; } -td::Result ArchiveSlice::choose_package(BlockSeqno masterchain_seqno, bool force) { +td::Result ArchiveSlice::choose_package(BlockSeqno masterchain_seqno, + ShardIdFull shard_prefix, bool force) { if (temp_ || key_blocks_only_ || !sliced_mode_) { return &packages_[0]; } if (masterchain_seqno < archive_id_) { return td::Status::Error(ErrorCode::notready, "too small masterchain seqno"); } - auto v = (masterchain_seqno - archive_id_) / slice_size_; - if (v >= packages_.size()) { + masterchain_seqno -= (masterchain_seqno - archive_id_) % slice_size_; + CHECK((masterchain_seqno - archive_id_) % slice_size_ == 0); + if (shard_split_depth_ == 0) { + shard_prefix = ShardIdFull{masterchainId}; + } else if (!shard_prefix.is_masterchain()) { + shard_prefix.shard |= 1; // In case length is < split depth + shard_prefix = ton::shard_prefix(shard_prefix, shard_split_depth_); + } + auto it = id_to_package_.find({masterchain_seqno, shard_prefix}); + if (it == id_to_package_.end()) { if (!force) { - return td::Status::Error(ErrorCode::notready, "too big masterchain seqno"); + return td::Status::Error(ErrorCode::notready, "no such package"); } - CHECK(v == packages_.size()); begin_transaction(); + size_t v = packages_.size(); kv_->set("slices", td::to_string(v + 1)).ensure(); kv_->set(PSTRING() << "status." << v, "0").ensure(); kv_->set(PSTRING() << "version." << v, td::to_string(default_package_version())).ensure(); + if (shard_split_depth_ > 0) { + kv_->set(PSTRING() << "info." << v, package_info_to_str(masterchain_seqno, shard_prefix)).ensure(); + } commit_transaction(); - CHECK((masterchain_seqno - archive_id_) % slice_size_ == 0); - add_package(masterchain_seqno, 0, default_package_version()); + add_package(masterchain_seqno, shard_prefix, 0, default_package_version()); return &packages_[v]; } else { - return &packages_[v]; + return &packages_[it->second]; } } -void ArchiveSlice::add_package(td::uint32 seqno, td::uint64 size, td::uint32 version) { +void ArchiveSlice::add_package(td::uint32 seqno, ShardIdFull shard_prefix, td::uint64 size, td::uint32 version) { PackageId p_id{seqno, key_blocks_only_, temp_}; - std::string path = PSTRING() << db_root_ << p_id.path() << p_id.name() << ".pack"; + std::string path = PSTRING() << db_root_ << p_id.path() << get_package_file_name(p_id, shard_prefix); auto R = Package::open(path, false, true); if (R.is_error()) { LOG(FATAL) << "failed to open/create archive '" << path << "': " << R.move_as_error(); @@ -748,8 +818,9 @@ void ArchiveSlice::add_package(td::uint32 seqno, td::uint64 size, td::uint32 ver statistics_.pack_statistics->record_open(); } auto idx = td::narrow_cast(packages_.size()); + id_to_package_[{seqno, shard_prefix}] = idx; if (finalized_) { - packages_.emplace_back(nullptr, td::actor::ActorOwn(), seqno, path, idx, version); + packages_.emplace_back(nullptr, td::actor::ActorOwn(), seqno, shard_prefix, path, idx, version); return; } auto pack = std::make_shared(R.move_as_ok()); @@ -757,7 +828,7 @@ void ArchiveSlice::add_package(td::uint32 seqno, td::uint64 size, td::uint32 ver pack->truncate(size).ensure(); } auto writer = td::actor::create_actor("writer", pack, async_mode_, statistics_.pack_statistics); - packages_.emplace_back(std::move(pack), std::move(writer), seqno, path, idx, version); + packages_.emplace_back(std::move(pack), std::move(writer), seqno, shard_prefix, path, idx, version); } namespace { @@ -793,6 +864,7 @@ void ArchiveSlice::destroy(td::Promise promise) { statistics_.pack_statistics->record_close(packages_.size()); } packages_.clear(); + id_to_package_.clear(); kv_ = nullptr; PackageId p_id{archive_id_, key_blocks_only_, temp_}; @@ -866,7 +938,7 @@ void ArchiveSlice::move_handle(ConstBlockHandle handle, Package *old_pack, Packa move_file(fileref::Block{handle->id()}, old_pack, pack); } -bool ArchiveSlice::truncate_block(BlockSeqno masterchain_seqno, BlockIdExt block_id, td::uint32 cutoff_idx, +bool ArchiveSlice::truncate_block(BlockSeqno masterchain_seqno, BlockIdExt block_id, td::uint32 cutoff_seqno, Package *pack) { std::string value; auto R = kv_->get(get_db_key_block_info(block_id), value); @@ -881,18 +953,18 @@ bool ArchiveSlice::truncate_block(BlockSeqno masterchain_seqno, BlockIdExt block return false; } - auto S = choose_package(seqno, false); + auto S = choose_package(seqno, block_id.shard_full(), false); S.ensure(); auto p = S.move_as_ok(); - CHECK(p->idx <= cutoff_idx); - if (p->idx == cutoff_idx) { + CHECK(p->seqno <= cutoff_seqno); + if (p->seqno == cutoff_seqno) { move_handle(std::move(handle), p->package.get(), pack); } return true; } -void ArchiveSlice::truncate_shard(BlockSeqno masterchain_seqno, ShardIdFull shard, td::uint32 cutoff_idx, +void ArchiveSlice::truncate_shard(BlockSeqno masterchain_seqno, ShardIdFull shard, td::uint32 cutoff_seqno, Package *pack) { auto key = get_db_key_lt_desc(shard); std::string value; @@ -918,7 +990,7 @@ void ArchiveSlice::truncate_shard(BlockSeqno masterchain_seqno, ShardIdFull shar E.ensure(); auto e = E.move_as_ok(); - if (truncate_block(masterchain_seqno, create_block_id(e->id_), cutoff_idx, pack)) { + if (truncate_block(masterchain_seqno, create_block_id(e->id_), cutoff_seqno, pack)) { CHECK(new_last_idx == i); new_last_idx = i + 1; } @@ -930,7 +1002,7 @@ void ArchiveSlice::truncate_shard(BlockSeqno masterchain_seqno, ShardIdFull shar } } -void ArchiveSlice::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handle, td::Promise promise) { +void ArchiveSlice::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle, td::Promise promise) { if (temp_ || archive_id_ > masterchain_seqno) { destroy(std::move(promise)); return; @@ -943,15 +1015,8 @@ void ArchiveSlice::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handl return; } - auto cutoff = choose_package(masterchain_seqno, false); - cutoff.ensure(); - auto pack = cutoff.move_as_ok(); - CHECK(pack); - - auto pack_r = Package::open(pack->path + ".new", false, true); - pack_r.ensure(); - auto new_package = std::make_shared(pack_r.move_as_ok()); - new_package->truncate(0).ensure(); + std::map old_packages; + std::map> new_packages; std::string value; auto status_key = create_serialize_tl_object(); @@ -972,38 +1037,71 @@ void ArchiveSlice::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handl auto G = fetch_tl_object(value, true); G.ensure(); auto g = G.move_as_ok(); + ShardIdFull shard{g->workchain_, static_cast(g->shard_)}; + + auto package_r = choose_package(masterchain_seqno, shard, false); + if (package_r.is_error()) { + continue; + } + auto package = package_r.move_as_ok(); + CHECK(package); + if (!old_packages.count(package->shard_prefix)) { + old_packages[package->shard_prefix] = package; + auto new_package_r = Package::open(package->path + ".new", false, true); + new_package_r.ensure(); + auto new_package = std::make_shared(new_package_r.move_as_ok()); + new_package->truncate(0).ensure(); + new_packages[package->shard_prefix] = std::move(new_package); + } + truncate_shard(masterchain_seqno, shard, package->seqno, new_packages[package->shard_prefix].get()); + } - truncate_shard(masterchain_seqno, ShardIdFull{g->workchain_, static_cast(g->shard_)}, pack->idx, - new_package.get()); + for (auto& [shard_prefix, package] : old_packages) { + auto new_package = new_packages[shard_prefix]; + CHECK(new_package); + package->package = new_package; + package->writer.reset(); + td::unlink(package->path).ensure(); + td::rename(package->path + ".new", package->path).ensure(); + package->writer = td::actor::create_actor("writer", new_package, async_mode_); } + std::vector new_packages_info; + if (!sliced_mode_) { - kv_->set("status", td::to_string(new_package->size())).ensure(); + kv_->set("status", td::to_string(packages_.at(0).package->size())).ensure(); } else { - kv_->set(PSTRING() << "status." << pack->idx, td::to_string(new_package->size())).ensure(); - for (size_t i = pack->idx + 1; i < packages_.size(); i++) { + for (PackageInfo &package : packages_) { + if (package.seqno <= masterchain_seqno) { + new_packages_info.push_back(std::move(package)); + } else { + td::unlink(package.path).ensure(); + } + } + id_to_package_.clear(); + for (td::uint32 i = 0; i < new_packages_info.size(); ++i) { + PackageInfo &package = new_packages_info[i]; + package.idx = i; + kv_->set(PSTRING() << "status." << i, td::to_string(package.package->size())).ensure(); + kv_->set(PSTRING() << "version." << i, td::to_string(package.version)).ensure(); + if (shard_split_depth_ > 0) { + kv_->set(PSTRING() << "info." << i, package_info_to_str(package.seqno, package.shard_prefix)).ensure(); + } + id_to_package_[{package.seqno, package.shard_prefix}] = i; + } + for (size_t i = new_packages_info.size(); i < packages_.size(); i++) { kv_->erase(PSTRING() << "status." << i); kv_->erase(PSTRING() << "version." << i); + kv_->erase(PSTRING() << "info." << i); } - kv_->set("slices", td::to_string(pack->idx + 1)); - } - - pack->package = new_package; - pack->writer.reset(); - td::unlink(pack->path).ensure(); - td::rename(pack->path + ".new", pack->path).ensure(); - pack->writer = td::actor::create_actor("writer", new_package, async_mode_); - - for (auto idx = pack->idx + 1; idx < packages_.size(); idx++) { - td::unlink(packages_[idx].path).ensure(); - } - if (statistics_.pack_statistics) { - statistics_.pack_statistics->record_close(packages_.size() - pack->idx - 1); + kv_->set("slices", td::to_string(new_packages_info.size())); + if (statistics_.pack_statistics) { + statistics_.pack_statistics->record_close(packages_.size() - new_packages_info.size()); + } + packages_ = std::move(new_packages_info); } - packages_.erase(packages_.begin() + pack->idx + 1, packages_.end()); kv_->commit_transaction().ensure(); - promise.set_value(td::Unit()); } diff --git a/validator/db/archive-slice.hpp b/validator/db/archive-slice.hpp index faec2fb83..a027ec0ff 100644 --- a/validator/db/archive-slice.hpp +++ b/validator/db/archive-slice.hpp @@ -96,10 +96,10 @@ class ArchiveLru; class ArchiveSlice : public td::actor::Actor { public: - ArchiveSlice(td::uint32 archive_id, bool key_blocks_only, bool temp, bool finalized, std::string db_root, - td::actor::ActorId archive_lru, DbStatistics statistics = {}); + ArchiveSlice(td::uint32 archive_id, bool key_blocks_only, bool temp, bool finalized, td::uint32 shard_split_depth, + std::string db_root, td::actor::ActorId archive_lru, DbStatistics statistics = {}); - void get_archive_id(BlockSeqno masterchain_seqno, td::Promise promise); + void get_archive_id(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, td::Promise promise); void add_handle(BlockHandle handle, td::Promise promise); void update_handle(BlockHandle handle, td::Promise promise); @@ -159,6 +159,7 @@ class ArchiveSlice : public td::actor::Actor { bool sliced_mode_{false}; td::uint32 huge_transaction_size_ = 0; td::uint32 slice_size_{100}; + td::uint32 shard_split_depth_ = 0; enum Status { st_closed, st_open, st_want_close @@ -171,28 +172,31 @@ class ArchiveSlice : public td::actor::Actor { std::unique_ptr kv_; struct PackageInfo { - PackageInfo(std::shared_ptr package, td::actor::ActorOwn writer, BlockSeqno id, + PackageInfo(std::shared_ptr package, td::actor::ActorOwn writer, BlockSeqno seqno, ShardIdFull shard_prefix, std::string path, td::uint32 idx, td::uint32 version) : package(std::move(package)) , writer(std ::move(writer)) - , id(id) + , seqno(seqno) + , shard_prefix(shard_prefix) , path(std::move(path)) , idx(idx) , version(version) { } std::shared_ptr package; td::actor::ActorOwn writer; - BlockSeqno id; + BlockSeqno seqno; + ShardIdFull shard_prefix; std::string path; td::uint32 idx; td::uint32 version; }; std::vector packages_; + std::map, td::uint32> id_to_package_; - td::Result choose_package(BlockSeqno masterchain_seqno, bool force); - void add_package(BlockSeqno masterchain_seqno, td::uint64 size, td::uint32 version); - void truncate_shard(BlockSeqno masterchain_seqno, ShardIdFull shard, td::uint32 cutoff_idx, Package *pack); - bool truncate_block(BlockSeqno masterchain_seqno, BlockIdExt block_id, td::uint32 cutoff_idx, Package *pack); + td::Result choose_package(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, bool force); + void add_package(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, td::uint64 size, td::uint32 version); + void truncate_shard(BlockSeqno masterchain_seqno, ShardIdFull shard, td::uint32 cutoff_seqno, Package *pack); + bool truncate_block(BlockSeqno masterchain_seqno, BlockIdExt block_id, td::uint32 cutoff_seqno, Package *pack); void delete_handle(ConstBlockHandle handle); void delete_file(FileReference ref_id); diff --git a/validator/db/archiver.cpp b/validator/db/archiver.cpp index c8cbd7b97..93ba18ca2 100644 --- a/validator/db/archiver.cpp +++ b/validator/db/archiver.cpp @@ -25,11 +25,27 @@ namespace ton { namespace validator { BlockArchiver::BlockArchiver(BlockHandle handle, td::actor::ActorId archive_db, - td::Promise promise) - : handle_(std::move(handle)), archive_(archive_db), promise_(std::move(promise)) { + td::actor::ActorId db, td::Promise promise) + : handle_(std::move(handle)), archive_(archive_db), db_(std::move(db)), promise_(std::move(promise)) { } void BlockArchiver::start_up() { + if (handle_->id().is_masterchain()) { + td::actor::send_closure(db_, &Db::get_block_state, handle_, + [SelfId = actor_id(this), archive = archive_](td::Result> R) { + R.ensure(); + td::Ref state{R.move_as_ok()}; + td::uint32 monitor_min_split = state->monitor_min_split_depth(basechainId); + td::actor::send_closure(archive, &ArchiveManager::set_current_shard_split_depth, + monitor_min_split); + td::actor::send_closure(SelfId, &BlockArchiver::move_handle); + }); + } else { + move_handle(); + } +} + +void BlockArchiver::move_handle() { if (handle_->handle_moved_to_archive()) { moved_handle(); } else { diff --git a/validator/db/archiver.hpp b/validator/db/archiver.hpp index 859f269cd..9498977fd 100644 --- a/validator/db/archiver.hpp +++ b/validator/db/archiver.hpp @@ -33,11 +33,13 @@ class FileDb; class BlockArchiver : public td::actor::Actor { public: - BlockArchiver(BlockHandle handle, td::actor::ActorId archive_db, td::Promise promise); + BlockArchiver(BlockHandle handle, td::actor::ActorId archive_db, td::actor::ActorId db, + td::Promise promise); void abort_query(td::Status error); void start_up() override; + void move_handle(); void moved_handle(); void got_proof(td::BufferSlice data); void written_proof(); @@ -50,6 +52,7 @@ class BlockArchiver : public td::actor::Actor { private: BlockHandle handle_; td::actor::ActorId archive_; + td::actor::ActorId db_; td::Promise promise_; }; diff --git a/validator/db/rootdb.cpp b/validator/db/rootdb.cpp index 89868f723..d1c1c2433 100644 --- a/validator/db/rootdb.cpp +++ b/validator/db/rootdb.cpp @@ -330,7 +330,8 @@ void RootDb::try_get_static_file(FileHash file_hash, td::Promise promise) { - td::actor::create_actor("archiver", std::move(handle), archive_db_.get(), std::move(promise)) + td::actor::create_actor("archiver", std::move(handle), archive_db_.get(), actor_id(this), + std::move(promise)) .release(); } @@ -404,7 +405,8 @@ void RootDb::start_up() { } void RootDb::archive(BlockHandle handle, td::Promise promise) { - td::actor::create_actor("archiveblock", std::move(handle), archive_db_.get(), std::move(promise)) + td::actor::create_actor("archiveblock", std::move(handle), archive_db_.get(), actor_id(this), + std::move(promise)) .release(); } @@ -483,8 +485,9 @@ void RootDb::check_key_block_proof_link_exists(BlockIdExt block_id, td::Promise< std::move(P)); } -void RootDb::get_archive_id(BlockSeqno masterchain_seqno, td::Promise promise) { - td::actor::send_closure(archive_db_, &ArchiveManager::get_archive_id, masterchain_seqno, std::move(promise)); +void RootDb::get_archive_id(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, td::Promise promise) { + td::actor::send_closure(archive_db_, &ArchiveManager::get_archive_id, masterchain_seqno, shard_prefix, + std::move(promise)); } void RootDb::get_archive_slice(td::uint64 archive_id, td::uint64 offset, td::uint32 limit, diff --git a/validator/db/rootdb.hpp b/validator/db/rootdb.hpp index 63509371a..dc94c600d 100644 --- a/validator/db/rootdb.hpp +++ b/validator/db/rootdb.hpp @@ -129,7 +129,7 @@ class RootDb : public Db { void check_key_block_proof_exists(BlockIdExt block_id, td::Promise promise) override; void check_key_block_proof_link_exists(BlockIdExt block_id, td::Promise promise) override; - void get_archive_id(BlockSeqno masterchain_seqno, td::Promise promise) override; + void get_archive_id(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, td::Promise promise) override; void get_archive_slice(td::uint64 archive_id, td::uint64 offset, td::uint32 limit, td::Promise promise) override; void set_async_mode(bool mode, td::Promise promise) override; diff --git a/validator/full-node-master.cpp b/validator/full-node-master.cpp index 8dca60d45..da49f0e2e 100644 --- a/validator/full-node-master.cpp +++ b/validator/full-node-master.cpp @@ -386,7 +386,7 @@ void FullNodeMasterImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNo } }); td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::get_archive_id, query.masterchain_seqno_, - std::move(P)); + ShardIdFull{masterchainId}, std::move(P)); } void FullNodeMasterImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_getArchiveSlice &query, diff --git a/validator/full-node-shard.cpp b/validator/full-node-shard.cpp index 1284cf979..d0dbf1a64 100644 --- a/validator/full-node-shard.cpp +++ b/validator/full-node-shard.cpp @@ -643,7 +643,24 @@ void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNod }); VLOG(FULL_NODE_DEBUG) << "Got query getArchiveInfo " << query.masterchain_seqno_ << " from " << src; td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::get_archive_id, query.masterchain_seqno_, - std::move(P)); + ShardIdFull{masterchainId}, std::move(P)); +} + +void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_getShardArchiveInfo &query, + td::Promise promise) { + auto P = td::PromiseCreator::lambda( + [SelfId = actor_id(this), promise = std::move(promise)](td::Result R) mutable { + if (R.is_error()) { + promise.set_value(create_serialize_tl_object()); + } else { + promise.set_value(create_serialize_tl_object(R.move_as_ok())); + } + }); + ShardIdFull shard_prefix = create_shard_id(query.shard_prefix_); + VLOG(FULL_NODE_DEBUG) << "Got query getShardArchiveInfo " << query.masterchain_seqno_ << " " << shard_prefix.to_str() + << " from " << src; + td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::get_archive_id, query.masterchain_seqno_, + shard_prefix, std::move(P)); } void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_getArchiveSlice &query, @@ -965,13 +982,13 @@ void FullNodeShardImpl::get_next_key_blocks(BlockIdExt block_id, td::Timestamp t .release(); } -void FullNodeShardImpl::download_archive(BlockSeqno masterchain_seqno, std::string tmp_dir, td::Timestamp timeout, - td::Promise promise) { +void FullNodeShardImpl::download_archive(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, std::string tmp_dir, + td::Timestamp timeout, td::Promise promise) { auto &b = choose_neighbour(true); td::actor::create_actor( - "archive", masterchain_seqno, std::move(tmp_dir), adnl_id_, overlay_id_, b.adnl_id, timeout, validator_manager_, - b.use_rldp2() ? (td::actor::ActorId)rldp2_ : rldp_, overlays_, adnl_, client_, - create_neighbour_promise(b, std::move(promise))) + "archive", masterchain_seqno, shard_prefix, std::move(tmp_dir), adnl_id_, overlay_id_, b.adnl_id, timeout, + validator_manager_, b.use_rldp2() ? (td::actor::ActorId)rldp2_ : rldp_, overlays_, + adnl_, client_, create_neighbour_promise(b, std::move(promise))) .release(); } diff --git a/validator/full-node-shard.h b/validator/full-node-shard.h index 4856989a0..235e30514 100644 --- a/validator/full-node-shard.h +++ b/validator/full-node-shard.h @@ -70,8 +70,8 @@ class FullNodeShard : public td::actor::Actor { td::Promise promise) = 0; virtual void get_next_key_blocks(BlockIdExt block_id, td::Timestamp timeout, td::Promise> promise) = 0; - virtual void download_archive(BlockSeqno masterchain_seqno, std::string tmp_dir, td::Timestamp timeout, - td::Promise promise) = 0; + virtual void download_archive(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, std::string tmp_dir, + td::Timestamp timeout, td::Promise promise) = 0; virtual void download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector blocks, block::ImportedMsgQueueLimits limits, td::Timestamp timeout, td::Promise>> promise) = 0; diff --git a/validator/full-node-shard.hpp b/validator/full-node-shard.hpp index 393ee269e..472abb194 100644 --- a/validator/full-node-shard.hpp +++ b/validator/full-node-shard.hpp @@ -152,6 +152,8 @@ class FullNodeShardImpl : public FullNodeShard { td::Promise promise); void process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_getArchiveInfo &query, td::Promise promise); + void process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_getShardArchiveInfo &query, + td::Promise promise); void process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_getArchiveSlice &query, td::Promise promise); void process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_getOutMsgQueueProof &query, @@ -198,8 +200,8 @@ class FullNodeShardImpl : public FullNodeShard { td::Promise promise) override; void get_next_key_blocks(BlockIdExt block_id, td::Timestamp timeout, td::Promise> promise) override; - void download_archive(BlockSeqno masterchain_seqno, std::string tmp_dir, td::Timestamp timeout, - td::Promise promise) override; + void download_archive(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, std::string tmp_dir, + td::Timestamp timeout, td::Promise promise) override; void download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector blocks, block::ImportedMsgQueueLimits limits, td::Timestamp timeout, td::Promise>> promise) override; diff --git a/validator/full-node.cpp b/validator/full-node.cpp index 9f5fbf698..335578cfb 100644 --- a/validator/full-node.cpp +++ b/validator/full-node.cpp @@ -468,12 +468,17 @@ void FullNodeImpl::get_next_key_blocks(BlockIdExt block_id, td::Timestamp timeou td::actor::send_closure(shard, &FullNodeShard::get_next_key_blocks, block_id, timeout, std::move(promise)); } -void FullNodeImpl::download_archive(BlockSeqno masterchain_seqno, std::string tmp_dir, td::Timestamp timeout, - td::Promise promise) { - auto shard = get_shard(ShardIdFull{masterchainId}); +void FullNodeImpl::download_archive(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, std::string tmp_dir, + td::Timestamp timeout, td::Promise promise) { + auto shard = get_shard(shard_prefix); + if (shard.empty()) { + VLOG(FULL_NODE_WARNING) << "dropping download archive query to unknown shard"; + promise.set_error(td::Status::Error(ErrorCode::notready, "shard not ready")); + return; + } CHECK(!shard.empty()); - td::actor::send_closure(shard, &FullNodeShard::download_archive, masterchain_seqno, std::move(tmp_dir), timeout, - std::move(promise)); + td::actor::send_closure(shard, &FullNodeShard::download_archive, masterchain_seqno, shard_prefix, std::move(tmp_dir), + timeout, std::move(promise)); } void FullNodeImpl::download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector blocks, @@ -689,10 +694,10 @@ void FullNodeImpl::start_up() { td::Promise> promise) override { td::actor::send_closure(id_, &FullNodeImpl::get_next_key_blocks, block_id, timeout, std::move(promise)); } - void download_archive(BlockSeqno masterchain_seqno, std::string tmp_dir, td::Timestamp timeout, - td::Promise promise) override { - td::actor::send_closure(id_, &FullNodeImpl::download_archive, masterchain_seqno, std::move(tmp_dir), timeout, - std::move(promise)); + void download_archive(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, std::string tmp_dir, + td::Timestamp timeout, td::Promise promise) override { + td::actor::send_closure(id_, &FullNodeImpl::download_archive, masterchain_seqno, shard_prefix, std::move(tmp_dir), + timeout, std::move(promise)); } void download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector blocks, block::ImportedMsgQueueLimits limits, td::Timestamp timeout, diff --git a/validator/full-node.hpp b/validator/full-node.hpp index 76daf1159..33ccf2cd1 100644 --- a/validator/full-node.hpp +++ b/validator/full-node.hpp @@ -82,8 +82,8 @@ class FullNodeImpl : public FullNode { void download_block_proof_link(BlockIdExt block_id, td::uint32 priority, td::Timestamp timeout, td::Promise promise); void get_next_key_blocks(BlockIdExt block_id, td::Timestamp timeout, td::Promise> promise); - void download_archive(BlockSeqno masterchain_seqno, std::string tmp_dir, td::Timestamp timeout, - td::Promise promise); + void download_archive(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, std::string tmp_dir, + td::Timestamp timeout, td::Promise promise); void download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector blocks, block::ImportedMsgQueueLimits limits, td::Timestamp timeout, td::Promise>> promise); diff --git a/validator/import-db-slice.cpp b/validator/import-db-slice.cpp index a93fb05be..b701e6bb6 100644 --- a/validator/import-db-slice.cpp +++ b/validator/import-db-slice.cpp @@ -17,6 +17,7 @@ Copyright 2019-2020 Telegram Systems LLP */ #include "import-db-slice.hpp" + #include "validator/db/fileref.hpp" #include "td/utils/overloaded.h" #include "validator/fabric.h" @@ -26,35 +27,84 @@ #include "ton/ton-io.hpp" #include "downloaders/download-state.hpp" +#include + namespace ton { namespace validator { -ArchiveImporter::ArchiveImporter(std::string path, td::Ref state, BlockSeqno shard_client_seqno, +ArchiveImporter::ArchiveImporter(std::string db_root, td::Ref state, BlockSeqno shard_client_seqno, td::Ref opts, td::actor::ActorId manager, - td::Promise> promise) - : path_(std::move(path)) - , state_(std::move(state)) + std::vector to_import_files, + td::Promise> promise) + : db_root_(std::move(db_root)) + , last_masterchain_state_(std::move(state)) , shard_client_seqno_(shard_client_seqno) + , start_import_seqno_(shard_client_seqno + 1) , opts_(std::move(opts)) , manager_(manager) + , to_import_files_(std::move(to_import_files)) + , use_imported_files_(!to_import_files_.empty()) , promise_(std::move(promise)) { } void ArchiveImporter::start_up() { - auto R = Package::open(path_, false, false); - if (R.is_error()) { - abort_query(R.move_as_error()); + if (use_imported_files_) { + for (const std::string& path : to_import_files_) { + td::Status S = process_package(path, true); + if (S.is_error()) { + LOG(INFO) << "Error processing package " << path << ": " << S; + } + } + files_to_cleanup_.clear(); + processed_mc_archive(); + return; + } + td::actor::send_closure(manager_, &ValidatorManager::send_download_archive_request, start_import_seqno_, + ShardIdFull{masterchainId}, db_root_ + "/tmp/", td::Timestamp::in(3600.0), + [SelfId = actor_id(this)](td::Result R) { + if (R.is_error()) { + td::actor::send_closure(SelfId, &ArchiveImporter::abort_query, R.move_as_error()); + } else { + td::actor::send_closure(SelfId, &ArchiveImporter::downloaded_mc_archive, R.move_as_ok()); + } + }); +} + +void ArchiveImporter::downloaded_mc_archive(std::string path) { + td::Status S = process_package(path, true); + if (S.is_error()) { + abort_query(std::move(S)); return; } - package_ = std::make_shared(R.move_as_ok()); +} + +void ArchiveImporter::processed_mc_archive() { + if (masterchain_blocks_.empty()) { + last_masterchain_seqno_ = last_masterchain_state_->get_seqno(); + checked_all_masterchain_blocks(); + return; + } + + auto seqno = masterchain_blocks_.begin()->first; + if (seqno > last_masterchain_state_->get_seqno() + 1) { + abort_query(td::Status::Error(ErrorCode::notready, "too big first masterchain seqno")); + return; + } + + check_masterchain_block(seqno); +} + +td::Status ArchiveImporter::process_package(std::string path, bool with_masterchain) { + files_to_cleanup_.push_back(path); + TRY_RESULT(p, Package::open(path, false, false)); + auto package = std::make_shared(std::move(p)); - bool fail = false; - package_->iterate([&](std::string filename, td::BufferSlice data, td::uint64 offset) -> bool { + td::Status S = td::Status::OK(); + package->iterate([&](std::string filename, td::BufferSlice, td::uint64 offset) -> bool { auto F = FileReference::create(filename); if (F.is_error()) { - abort_query(F.move_as_error()); - fail = true; + S = F.move_as_error(); return false; } auto f = F.move_as_ok(); @@ -79,33 +129,24 @@ void ArchiveImporter::start_up() { ignore = false; is_proof = false; }, - [&](const auto &p) { ignore = true; })); - - if (!ignore) { - blocks_[b][is_proof ? 0 : 1] = offset; + [&](const auto &) { ignore = true; })); + + if (!ignore && (with_masterchain || !b.is_masterchain())) { + if (is_proof) { + blocks_[b].proof_pkg = package; + blocks_[b].proof_offset = offset; + } else { + blocks_[b].data_pkg = package; + blocks_[b].data_offset = offset; + } if (b.is_masterchain()) { masterchain_blocks_[b.seqno()] = b; + last_masterchain_seqno_ = std::max(last_masterchain_seqno_, b.seqno()); } } return true; }); - - if (fail) { - return; - } - - if (masterchain_blocks_.size() == 0) { - abort_query(td::Status::Error(ErrorCode::notready, "archive does not contain any masterchain blocks")); - return; - } - - auto seqno = masterchain_blocks_.begin()->first; - if (seqno > state_->get_seqno() + 1) { - abort_query(td::Status::Error(ErrorCode::notready, "too big first masterchain seqno")); - return; - } - - check_masterchain_block(seqno); + return S; } void ArchiveImporter::check_masterchain_block(BlockSeqno seqno) { @@ -115,17 +156,17 @@ void ArchiveImporter::check_masterchain_block(BlockSeqno seqno) { abort_query(td::Status::Error(ErrorCode::notready, "no new blocks")); return; } - checked_all_masterchain_blocks(seqno - 1); + checked_all_masterchain_blocks(); return; } - while (seqno <= state_->get_block_id().seqno()) { - if (seqno < state_->get_block_id().seqno()) { - if (!state_->check_old_mc_block_id(it->second)) { + while (seqno <= last_masterchain_state_->get_block_id().seqno()) { + if (seqno < last_masterchain_state_->get_block_id().seqno()) { + if (!last_masterchain_state_->check_old_mc_block_id(it->second)) { abort_query(td::Status::Error(ErrorCode::protoviolation, "bad old masterchain block id")); return; } } else { - if (state_->get_block_id() != it->second) { + if (last_masterchain_state_->get_block_id() != it->second) { abort_query(td::Status::Error(ErrorCode::protoviolation, "bad old masterchain block id")); return; } @@ -133,18 +174,26 @@ void ArchiveImporter::check_masterchain_block(BlockSeqno seqno) { seqno++; it = masterchain_blocks_.find(seqno); if (it == masterchain_blocks_.end()) { - checked_all_masterchain_blocks(seqno - 1); + checked_all_masterchain_blocks(); return; } } - if (seqno != state_->get_block_id().seqno() + 1) { + if (seqno != last_masterchain_state_->get_block_id().seqno() + 1) { abort_query(td::Status::Error(ErrorCode::protoviolation, "hole in masterchain seqno")); return; } auto it2 = blocks_.find(it->second); CHECK(it2 != blocks_.end()); + if (!it2->second.proof_pkg) { + abort_query(td::Status::Error(ErrorCode::protoviolation, "no masterchain block proof")); + return; + } + if (!it2->second.data_pkg) { + abort_query(td::Status::Error(ErrorCode::protoviolation, "no masterchain block data")); + return; + } - auto R1 = package_->read(it2->second[0]); + auto R1 = it2->second.proof_pkg->read(it2->second.proof_offset); if (R1.is_error()) { abort_query(R1.move_as_error()); return; @@ -156,7 +205,7 @@ void ArchiveImporter::check_masterchain_block(BlockSeqno seqno) { return; } - auto R2 = package_->read(it2->second[1]); + auto R2 = it2->second.data_pkg->read(it2->second.data_offset); if (R2.is_error()) { abort_query(R2.move_as_error()); return; @@ -175,7 +224,7 @@ void ArchiveImporter::check_masterchain_block(BlockSeqno seqno) { auto proof = proofR.move_as_ok(); auto data = dataR.move_as_ok(); - auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), id = state_->get_block_id(), + auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), id = last_masterchain_state_->get_block_id(), data](td::Result R) mutable { if (R.is_error()) { td::actor::send_closure(SelfId, &ArchiveImporter::abort_query, R.move_as_error()); @@ -191,8 +240,8 @@ void ArchiveImporter::check_masterchain_block(BlockSeqno seqno) { td::actor::send_closure(SelfId, &ArchiveImporter::checked_masterchain_proof, std::move(handle), std::move(data)); }); - run_check_proof_query(it->second, std::move(proof), manager_, td::Timestamp::in(2.0), std::move(P), state_, - opts_->is_hardfork(it->second)); + run_check_proof_query(it->second, std::move(proof), manager_, td::Timestamp::in(2.0), std::move(P), + last_masterchain_state_, opts_->is_hardfork(it->second)); } void ArchiveImporter::checked_masterchain_proof(BlockHandle handle, td::Ref data) { @@ -214,22 +263,79 @@ void ArchiveImporter::applied_masterchain_block(BlockHandle handle) { } void ArchiveImporter::got_new_materchain_state(td::Ref state) { - state_ = std::move(state); - check_masterchain_block(state_->get_block_id().seqno() + 1); + last_masterchain_state_ = std::move(state); + check_masterchain_block(last_masterchain_state_->get_block_id().seqno() + 1); +} + +void ArchiveImporter::checked_all_masterchain_blocks() { + if (start_import_seqno_ > last_masterchain_state_->get_seqno()) { + abort_query(td::Status::Error("no new masterchain blocks were imported")); + return; + } + BlockIdExt block_id; + CHECK(last_masterchain_state_->get_old_mc_block_id(start_import_seqno_, block_id)); + td::actor::send_closure(manager_, &ValidatorManager::get_shard_state_from_db_short, block_id, + [SelfId = actor_id(this)](td::Result> R) { + R.ensure(); + td::Ref state{R.move_as_ok()}; + td::actor::send_closure(SelfId, &ArchiveImporter::download_shard_archives, + std::move(state)); + }); } -void ArchiveImporter::checked_all_masterchain_blocks(BlockSeqno seqno) { - check_next_shard_client_seqno(shard_client_seqno_ + 1); +void ArchiveImporter::download_shard_archives(td::Ref start_state) { + start_state_ = start_state; + td::uint32 monitor_min_split = start_state->monitor_min_split_depth(basechainId); + // If monitor_min_split == 0, we use the old archive format (packages are not separated by shard) + if (monitor_min_split > 0 && !use_imported_files_) { + for (td::uint64 i = 0; i < (1ULL << monitor_min_split); ++i) { + ShardIdFull shard_prefix{basechainId, (i * 2 + 1) << (64 - monitor_min_split - 1)}; + if (opts_->need_monitor(shard_prefix, start_state)) { + ++pending_shard_archives_; + download_shard_archive(shard_prefix); + } + } + } + if (pending_shard_archives_ == 0) { + check_next_shard_client_seqno(shard_client_seqno_ + 1); + } +} + +void ArchiveImporter::download_shard_archive(ShardIdFull shard_prefix) { + td::actor::send_closure( + manager_, &ValidatorManager::send_download_archive_request, start_import_seqno_, shard_prefix, db_root_ + "/tmp/", + td::Timestamp::in(3600.0), + [SelfId = actor_id(this), seqno = start_import_seqno_, shard_prefix](td::Result R) { + if (R.is_error()) { + LOG(WARNING) << "Failed to download archive slice #" << seqno << " for shard " << shard_prefix.to_str(); + delay_action( + [=]() { td::actor::send_closure(SelfId, &ArchiveImporter::download_shard_archive, shard_prefix); }, + td::Timestamp::in(2.0)); + } else { + td::actor::send_closure(SelfId, &ArchiveImporter::downloaded_shard_archive, R.move_as_ok()); + } + }); +} + +void ArchiveImporter::downloaded_shard_archive(std::string path) { + td::Status S = process_package(path, false); + if (S.is_error()) { + LOG(INFO) << "Error processing package: " << S; + } + --pending_shard_archives_; + if (pending_shard_archives_ == 0) { + check_next_shard_client_seqno(shard_client_seqno_ + 1); + } } void ArchiveImporter::check_next_shard_client_seqno(BlockSeqno seqno) { - if (seqno > state_->get_seqno()) { + if (seqno > last_masterchain_state_->get_seqno() || seqno > last_masterchain_seqno_) { finish_query(); - } else if (seqno == state_->get_seqno()) { - got_masterchain_state(state_); + } else if (seqno == last_masterchain_state_->get_seqno()) { + got_masterchain_state(last_masterchain_state_); } else { BlockIdExt b; - bool f = state_->get_old_mc_block_id(seqno, b); + bool f = last_masterchain_state_->get_old_mc_block_id(seqno, b); CHECK(f); auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result> R) { R.ensure(); @@ -241,23 +347,25 @@ void ArchiveImporter::check_next_shard_client_seqno(BlockSeqno seqno) { } void ArchiveImporter::got_masterchain_state(td::Ref state) { + if (state->get_seqno() != start_import_seqno_ && state->is_key_state()) { + finish_query(); + return; + } auto s = state->get_shards(); - - auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), seqno = state->get_seqno()](td::Result R) { + td::MultiPromise mp; + auto ig = mp.init_guard(); + for (auto &shard : s) { + if (opts_->need_monitor(shard->shard(), state)) { + apply_shard_block(shard->top_block_id(), state->get_block_id(), ig.get_promise()); + } + } + ig.add_promise([SelfId = actor_id(this), seqno = state->get_seqno()](td::Result R) { if (R.is_error()) { td::actor::send_closure(SelfId, &ArchiveImporter::abort_query, R.move_as_error()); } else { td::actor::send_closure(SelfId, &ArchiveImporter::checked_shard_client_seqno, seqno); } }); - - td::MultiPromise mp; - auto ig = mp.init_guard(); - ig.add_promise(std::move(P)); - - for (auto &shard : s) { - apply_shard_block(shard->top_block_id(), state->get_block_id(), ig.get_promise()); - } } void ArchiveImporter::checked_shard_client_seqno(BlockSeqno seqno) { @@ -286,7 +394,7 @@ void ArchiveImporter::apply_shard_block_cont1(BlockHandle handle, BlockIdExt mas if (handle->id().seqno() == 0) { auto P = td::PromiseCreator::lambda( - [promise = std::move(promise)](td::Result> R) mutable { promise.set_value(td::Unit()); }); + [promise = std::move(promise)](td::Result>) mutable { promise.set_value(td::Unit()); }); td::actor::create_actor("downloadstate", handle->id(), masterchain_block_id, 2, manager_, td::Timestamp::in(3600), std::move(P)) .release(); @@ -294,12 +402,13 @@ void ArchiveImporter::apply_shard_block_cont1(BlockHandle handle, BlockIdExt mas } auto it = blocks_.find(handle->id()); - if (it == blocks_.end()) { - promise.set_error(td::Status::Error(ErrorCode::notready, PSTRING() << "no proof for shard block " << handle->id())); + if (it == blocks_.end() || !it->second.proof_pkg || !it->second.data_pkg) { + promise.set_error( + td::Status::Error(ErrorCode::notready, PSTRING() << "no data/proof for shard block " << handle->id())); return; } - TRY_RESULT_PROMISE(promise, data, package_->read(it->second[0])); - TRY_RESULT_PROMISE(promise, proof, create_proof_link(handle->id(), std::move(data.second))); + TRY_RESULT_PROMISE(promise, proof_data, it->second.proof_pkg->read(it->second.proof_offset)); + TRY_RESULT_PROMISE(promise, proof, create_proof_link(handle->id(), std::move(proof_data.second))); auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), handle, masterchain_block_id, promise = std::move(promise)](td::Result R) mutable { if (R.is_error()) { @@ -345,8 +454,8 @@ void ArchiveImporter::apply_shard_block_cont2(BlockHandle handle, BlockIdExt mas void ArchiveImporter::apply_shard_block_cont3(BlockHandle handle, BlockIdExt masterchain_block_id, td::Promise promise) { auto it = blocks_.find(handle->id()); - CHECK(it != blocks_.end()); - TRY_RESULT_PROMISE(promise, data, package_->read(it->second[1])); + CHECK(it != blocks_.end() && it->second.data_pkg); + TRY_RESULT_PROMISE(promise, data, it->second.data_pkg->read(it->second.data_offset)); if (sha256_bits256(data.second.as_slice()) != handle->id().file_hash) { promise.set_error(td::Status::Error(ErrorCode::protoviolation, "bad block file hash")); return; @@ -375,13 +484,17 @@ void ArchiveImporter::check_shard_block_applied(BlockIdExt block_id, td::Promise } void ArchiveImporter::abort_query(td::Status error) { - LOG(INFO) << error; + LOG(INFO) << "Archive import: " << error; finish_query(); } + void ArchiveImporter::finish_query() { + for (const std::string &f : files_to_cleanup_) { + td::unlink(f).ignore(); + } if (promise_) { - promise_.set_value( - std::vector{state_->get_seqno(), std::min(state_->get_seqno(), shard_client_seqno_)}); + promise_.set_value({last_masterchain_state_->get_seqno(), + std::min(last_masterchain_state_->get_seqno(), shard_client_seqno_)}); } stop(); } diff --git a/validator/import-db-slice.hpp b/validator/import-db-slice.hpp index 0993d4bba..2d990d1a0 100644 --- a/validator/import-db-slice.hpp +++ b/validator/import-db-slice.hpp @@ -19,6 +19,7 @@ #pragma once #include "td/actor/actor.h" +#include "td/utils/port/path.h" #include "validator/interfaces/validator-manager.h" #include "validator/db/package.hpp" @@ -28,19 +29,27 @@ namespace validator { class ArchiveImporter : public td::actor::Actor { public: - ArchiveImporter(std::string path, td::Ref state, BlockSeqno shard_client_seqno, + ArchiveImporter(std::string db_root, td::Ref state, BlockSeqno shard_client_seqno, td::Ref opts, td::actor::ActorId manager, - td::Promise> promise); + std::vector to_import_files, td::Promise> promise); void start_up() override; void abort_query(td::Status error); void finish_query(); + void downloaded_mc_archive(std::string path); + td::Status process_package(std::string path, bool with_masterchain); + + void processed_mc_archive(); void check_masterchain_block(BlockSeqno seqno); void checked_masterchain_proof(BlockHandle handle, td::Ref data); void applied_masterchain_block(BlockHandle handle); void got_new_materchain_state(td::Ref state); - void checked_all_masterchain_blocks(BlockSeqno seqno); + + void checked_all_masterchain_blocks(); + void download_shard_archives(td::Ref start_state); + void download_shard_archive(ShardIdFull shard_prefix); + void downloaded_shard_archive(std::string path); void check_next_shard_client_seqno(BlockSeqno seqno); void checked_shard_client_seqno(BlockSeqno seqno); @@ -52,19 +61,34 @@ class ArchiveImporter : public td::actor::Actor { void check_shard_block_applied(BlockIdExt block_id, td::Promise promise); private: - std::string path_; - td::Ref state_; + std::string db_root_; + td::Ref last_masterchain_state_; BlockSeqno shard_client_seqno_; + BlockSeqno start_import_seqno_; td::Ref opts_; - std::shared_ptr package_; - td::actor::ActorId manager_; - td::Promise> promise_; + + std::vector to_import_files_; + bool use_imported_files_; + td::Promise> promise_; std::map masterchain_blocks_; - std::map> blocks_; + BlockSeqno last_masterchain_seqno_ = 0; + + struct BlockInfo { + std::shared_ptr data_pkg; + td::uint64 data_offset = 0; + std::shared_ptr proof_pkg; + td::uint64 proof_offset = 0; + }; + std::map blocks_; + + td::Ref start_state_; + size_t pending_shard_archives_ = 0; + + std::vector files_to_cleanup_; }; } // namespace validator diff --git a/validator/interfaces/db.h b/validator/interfaces/db.h index 66c1f0761..2d8566462 100644 --- a/validator/interfaces/db.h +++ b/validator/interfaces/db.h @@ -114,7 +114,8 @@ class Db : public td::actor::Actor { virtual void check_key_block_proof_exists(BlockIdExt block_id, td::Promise promise) = 0; virtual void check_key_block_proof_link_exists(BlockIdExt block_id, td::Promise promise) = 0; - virtual void get_archive_id(BlockSeqno masterchain_seqno, td::Promise promise) = 0; + virtual void get_archive_id(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, + td::Promise promise) = 0; virtual void get_archive_slice(td::uint64 archive_id, td::uint64 offset, td::uint32 limit, td::Promise promise) = 0; virtual void set_async_mode(bool mode, td::Promise promise) = 0; diff --git a/validator/interfaces/validator-manager.h b/validator/interfaces/validator-manager.h index 5e567eeeb..3b44e894f 100644 --- a/validator/interfaces/validator-manager.h +++ b/validator/interfaces/validator-manager.h @@ -135,6 +135,8 @@ class ValidatorManager : public ValidatorManagerInterface { virtual void send_get_out_msg_queue_proof_request(ShardIdFull dst_shard, std::vector blocks, block::ImportedMsgQueueLimits limits, td::Promise>> promise) = 0; + virtual void send_download_archive_request(BlockSeqno mc_seqno, ShardIdFull shard_prefix, std::string tmp_dir, + td::Timestamp timeout, td::Promise promise) = 0; virtual void update_shard_client_state(BlockIdExt masterchain_block_id, td::Promise promise) = 0; virtual void get_shard_client_state(bool from_db, td::Promise promise) = 0; diff --git a/validator/manager-disk.hpp b/validator/manager-disk.hpp index 695ac566f..509e743d1 100644 --- a/validator/manager-disk.hpp +++ b/validator/manager-disk.hpp @@ -265,6 +265,10 @@ class ValidatorManagerImpl : public ValidatorManager { td::Promise>> promise) override { UNREACHABLE(); } + void send_download_archive_request(BlockSeqno mc_seqno, ShardIdFull shard_prefix, std::string tmp_dir, + td::Timestamp timeout, td::Promise promise) override { + UNREACHABLE(); + } void update_shard_client_state(BlockIdExt masterchain_block_id, td::Promise promise) override; void get_shard_client_state(bool from_db, td::Promise promise) override; @@ -285,7 +289,8 @@ class ValidatorManagerImpl : public ValidatorManager { promise.set_error(td::Status::Error(ErrorCode::error, "download disabled")); } - void get_archive_id(BlockSeqno masterchain_seqno, td::Promise promise) override { + void get_archive_id(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, + td::Promise promise) override { UNREACHABLE(); } void get_archive_slice(td::uint64 archive_id, td::uint64 offset, td::uint32 limit, diff --git a/validator/manager-hardfork.hpp b/validator/manager-hardfork.hpp index f85b2f641..516a74017 100644 --- a/validator/manager-hardfork.hpp +++ b/validator/manager-hardfork.hpp @@ -332,6 +332,10 @@ class ValidatorManagerImpl : public ValidatorManager { td::Promise>> promise) override { UNREACHABLE(); } + void send_download_archive_request(BlockSeqno mc_seqno, ShardIdFull shard_prefix, std::string tmp_dir, + td::Timestamp timeout, td::Promise promise) override { + UNREACHABLE(); + } void update_shard_client_state(BlockIdExt masterchain_block_id, td::Promise promise) override { UNREACHABLE(); diff --git a/validator/manager.cpp b/validator/manager.cpp index d79267eaf..64c85dcd2 100644 --- a/validator/manager.cpp +++ b/validator/manager.cpp @@ -1729,6 +1729,12 @@ void ValidatorManagerImpl::send_get_out_msg_queue_proof_request( std::move(promise)); } +void ValidatorManagerImpl::send_download_archive_request(BlockSeqno mc_seqno, ShardIdFull shard_prefix, + std::string tmp_dir, td::Timestamp timeout, + td::Promise promise) { + callback_->download_archive(mc_seqno, shard_prefix, std::move(tmp_dir), timeout, std::move(promise)); +} + void ValidatorManagerImpl::start_up() { db_ = create_db_actor(actor_id(this), db_root_, opts_); lite_server_cache_ = create_liteserver_cache_actor(actor_id(this), db_root_); @@ -1763,7 +1769,6 @@ void ValidatorManagerImpl::start_up() { if (fname.substr(fname.size() - 5) != ".pack") { return; } - fname = fname.substr(0, fname.size() - 5); if (fname.substr(0, 8) != "archive.") { return; } @@ -1772,13 +1777,18 @@ void ValidatorManagerImpl::start_up() { while (fname.size() > 1 && fname[0] == '0') { fname.remove_prefix(1); } + auto i = fname.find('.'); + if (i == td::Slice::npos) { + return; + } + fname = fname.substr(0, i); auto v = td::to_integer_safe(fname); if (v.is_error()) { return; } - auto pos = v.move_as_ok(); - LOG(INFO) << "found archive slice '" << cfname << "' for position " << pos; - to_import_[pos] = std::make_pair(cfname.str(), true); + auto seqno = v.move_as_ok(); + LOG(INFO) << "found archive slice '" << cfname << "' for seqno " << seqno; + to_import_[seqno].push_back(cfname.str()); } }); if (S.is_error()) { @@ -1926,8 +1936,7 @@ bool ValidatorManagerImpl::out_of_sync() { void ValidatorManagerImpl::prestart_sync() { auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result R) { R.ensure(); - // Don't download archives - td::actor::send_closure(SelfId, &ValidatorManagerImpl::finish_prestart_sync); + td::actor::send_closure(SelfId, &ValidatorManagerImpl::download_next_archive); }); td::actor::send_closure(db_, &Db::set_async_mode, false, std::move(P)); } @@ -1939,61 +1948,36 @@ void ValidatorManagerImpl::download_next_archive() { } auto seqno = std::min(last_masterchain_seqno_, shard_client_handle_->id().seqno()); + // TODO: importing archives from disk + std::vector to_import_files; auto it = to_import_.upper_bound(seqno + 1); if (it != to_import_.begin()) { - it--; - if (it->second.second) { - it->second.second = false; - downloaded_archive_slice(it->second.first, false); - return; - } + --it; + to_import_files = std::move(it->second); + it->second.clear(); } - auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result R) { - if (R.is_error()) { - LOG(INFO) << "failed to download archive slice: " << R.error(); - delay_action([SelfId]() { td::actor::send_closure(SelfId, &ValidatorManagerImpl::download_next_archive); }, - td::Timestamp::in(2.0)); - } else { - td::actor::send_closure(SelfId, &ValidatorManagerImpl::downloaded_archive_slice, R.move_as_ok(), true); - } - }); - callback_->download_archive(seqno + 1, db_root_ + "/tmp/", td::Timestamp::in(36000.0), std::move(P)); -} - -void ValidatorManagerImpl::downloaded_archive_slice(std::string name, bool is_tmp) { - LOG(INFO) << "downloaded archive slice: " << name; - auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), name, is_tmp](td::Result> R) { - if (is_tmp) { - td::unlink(name).ensure(); - } + auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result> R) { if (R.is_error()) { - LOG(INFO) << "failed to check downloaded archive slice: " << R.error(); + LOG(INFO) << "failed to download and import archive slice: " << R.error(); delay_action([SelfId]() { td::actor::send_closure(SelfId, &ValidatorManagerImpl::download_next_archive); }, td::Timestamp::in(2.0)); } else { - td::actor::send_closure(SelfId, &ValidatorManagerImpl::checked_archive_slice, R.move_as_ok()); + td::actor::send_closure(SelfId, &ValidatorManagerImpl::checked_archive_slice, R.ok().first, R.ok().second); } }); - - auto seqno = std::min(last_masterchain_seqno_, shard_client_handle_->id().seqno()); - - td::actor::create_actor("archiveimport", name, last_masterchain_state_, seqno, opts_, actor_id(this), - std::move(P)) + td::actor::create_actor("archiveimport", db_root_, last_masterchain_state_, seqno, opts_, + actor_id(this), std::move(to_import_files), std::move(P)) .release(); } -void ValidatorManagerImpl::checked_archive_slice(std::vector seqno) { - CHECK(seqno.size() == 2); - LOG(INFO) << "checked downloaded archive slice: mc_top_seqno=" << seqno[0] << " shard_top_seqno_=" << seqno[1]; - CHECK(seqno[0] <= last_masterchain_seqno_); - CHECK(seqno[1] <= last_masterchain_seqno_); +void ValidatorManagerImpl::checked_archive_slice(BlockSeqno new_last_mc_seqno, BlockSeqno new_shard_client_seqno) { + LOG(INFO) << "checked downloaded archive slice: mc_top_seqno=" << new_last_mc_seqno + << " shard_top_seqno_=" << new_shard_client_seqno; + CHECK(new_last_mc_seqno <= last_masterchain_seqno_); + CHECK(new_shard_client_seqno <= last_masterchain_seqno_); - BlockIdExt b; - if (seqno[1] < last_masterchain_seqno_) { - CHECK(last_masterchain_state_->get_old_mc_block_id(seqno[1], b)); - } else { - b = last_masterchain_block_id_; - } + BlockIdExt shard_client_block_id; + CHECK(last_masterchain_state_->get_old_mc_block_id(new_shard_client_seqno, shard_client_block_id)); auto P = td::PromiseCreator::lambda( [SelfId = actor_id(this), db = db_.get(), client = shard_client_.get()](td::Result R) { @@ -2009,7 +1993,7 @@ void ValidatorManagerImpl::checked_archive_slice(std::vector seqno) }); td::actor::send_closure(db, &Db::get_block_state, std::move(handle), std::move(P)); }); - get_block_handle(b, true, std::move(P)); + get_block_handle(shard_client_block_id, true, std::move(P)); } void ValidatorManagerImpl::finish_prestart_sync() { @@ -2844,12 +2828,13 @@ void ValidatorManagerImpl::try_get_static_file(FileHash file_hash, td::Promise promise) { +void ValidatorManagerImpl::get_archive_id(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, + td::Promise promise) { if (masterchain_seqno > last_masterchain_seqno_) { promise.set_error(td::Status::Error(ErrorCode::notready, "masterchain seqno too big")); return; } - td::actor::send_closure(db_, &Db::get_archive_id, masterchain_seqno, std::move(promise)); + td::actor::send_closure(db_, &Db::get_archive_id, masterchain_seqno, shard_prefix, std::move(promise)); } void ValidatorManagerImpl::get_archive_slice(td::uint64 archive_id, td::uint64 offset, td::uint32 limit, diff --git a/validator/manager.hpp b/validator/manager.hpp index 37b447cc7..dc18e6278 100644 --- a/validator/manager.hpp +++ b/validator/manager.hpp @@ -337,8 +337,7 @@ class ValidatorManagerImpl : public ValidatorManager { void applied_hardfork(); void prestart_sync(); void download_next_archive(); - void downloaded_archive_slice(std::string name, bool is_tmp); - void checked_archive_slice(std::vector seqno); + void checked_archive_slice(BlockSeqno new_last_mc_seqno, BlockSeqno new_shard_client_seqno); void finish_prestart_sync(); void completed_prestart_sync(); @@ -516,6 +515,8 @@ class ValidatorManagerImpl : public ValidatorManager { void send_get_out_msg_queue_proof_request(ShardIdFull dst_shard, std::vector blocks, block::ImportedMsgQueueLimits limits, td::Promise>> promise) override; + void send_download_archive_request(BlockSeqno mc_seqno, ShardIdFull shard_prefix, std::string tmp_dir, + td::Timestamp timeout, td::Promise promise) override; void update_shard_client_state(BlockIdExt masterchain_block_id, td::Promise promise) override; void get_shard_client_state(bool from_db, td::Promise promise) override; @@ -532,7 +533,7 @@ class ValidatorManagerImpl : public ValidatorManager { std::move(promise)); } - void get_archive_id(BlockSeqno masterchain_seqno, td::Promise promise) override; + void get_archive_id(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, td::Promise promise) override; void get_archive_slice(td::uint64 archive_id, td::uint64 offset, td::uint32 limit, td::Promise promise) override; @@ -712,7 +713,7 @@ class ValidatorManagerImpl : public ValidatorManager { td::actor::ActorOwn serializer_; - std::map> to_import_; + std::map> to_import_; private: std::unique_ptr callback_; diff --git a/validator/net/download-archive-slice.cpp b/validator/net/download-archive-slice.cpp index 6235b8b08..c2f8eceac 100644 --- a/validator/net/download-archive-slice.cpp +++ b/validator/net/download-archive-slice.cpp @@ -20,6 +20,8 @@ #include "td/utils/port/path.h" #include "td/utils/overloaded.h" +#include + namespace ton { namespace validator { @@ -27,12 +29,13 @@ namespace validator { namespace fullnode { DownloadArchiveSlice::DownloadArchiveSlice( - BlockSeqno masterchain_seqno, std::string tmp_dir, adnl::AdnlNodeIdShort local_id, + BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, std::string tmp_dir, adnl::AdnlNodeIdShort local_id, overlay::OverlayIdShort overlay_id, adnl::AdnlNodeIdShort download_from, td::Timestamp timeout, td::actor::ActorId validator_manager, td::actor::ActorId rldp, td::actor::ActorId overlays, td::actor::ActorId adnl, td::actor::ActorId client, td::Promise promise) : masterchain_seqno_(masterchain_seqno) + , shard_prefix_(shard_prefix) , tmp_dir_(std::move(tmp_dir)) , local_id_(local_id) , overlay_id_(overlay_id) @@ -114,7 +117,13 @@ void DownloadArchiveSlice::got_node_to_download(adnl::AdnlNodeIdShort download_f } }); - auto q = create_serialize_tl_object(masterchain_seqno_); + td::BufferSlice q; + if (shard_prefix_.is_masterchain()) { + q = create_serialize_tl_object(masterchain_seqno_); + } else { + q = create_serialize_tl_object(masterchain_seqno_, + create_tl_shard_id(shard_prefix_)); + } if (client_.empty()) { td::actor::send_closure(overlays_, &overlay::Overlays::send_query, download_from_, local_id_, overlay_id_, "get_archive_info", std::move(P), td::Timestamp::in(3.0), std::move(q)); @@ -145,7 +154,8 @@ void DownloadArchiveSlice::got_archive_info(td::BufferSlice data) { } prev_logged_timer_ = td::Timer(); - LOG(INFO) << "downloading archive slice #" << masterchain_seqno_ << " from " << download_from_; + LOG(INFO) << "downloading archive slice #" << masterchain_seqno_ << " " << shard_prefix_.to_str() << " from " + << download_from_; get_archive_slice(); } @@ -186,13 +196,15 @@ void DownloadArchiveSlice::got_archive_slice(td::BufferSlice data) { double elapsed = prev_logged_timer_.elapsed(); if (elapsed > 10.0) { prev_logged_timer_ = td::Timer(); - LOG(INFO) << "downloading archive slice #" << masterchain_seqno_ << ": total=" << offset_ << " (" + LOG(INFO) << "downloading archive slice #" << masterchain_seqno_ << " " << shard_prefix_.to_str() + << ": total=" << offset_ << " (" << td::format::as_size((td::uint64)(double(offset_ - prev_logged_sum_) / elapsed)) << "/s)"; prev_logged_sum_ = offset_; } if (data.size() < slice_size()) { - LOG(INFO) << "finished downloading arcrive slice #" << masterchain_seqno_ << ": total=" << offset_; + LOG(INFO) << "finished downloading arcrive slice #" << masterchain_seqno_ << " " << shard_prefix_.to_str() + << ": total=" << offset_; finish_query(); } else { get_archive_slice(); diff --git a/validator/net/download-archive-slice.hpp b/validator/net/download-archive-slice.hpp index 0384ac8c9..42fd715f7 100644 --- a/validator/net/download-archive-slice.hpp +++ b/validator/net/download-archive-slice.hpp @@ -32,8 +32,9 @@ namespace fullnode { class DownloadArchiveSlice : public td::actor::Actor { public: - DownloadArchiveSlice(BlockSeqno masterchain_seqno, std::string tmp_dir, adnl::AdnlNodeIdShort local_id, - overlay::OverlayIdShort overlay_id, adnl::AdnlNodeIdShort download_from, td::Timestamp timeout, + DownloadArchiveSlice(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, std::string tmp_dir, + adnl::AdnlNodeIdShort local_id, overlay::OverlayIdShort overlay_id, + adnl::AdnlNodeIdShort download_from, td::Timestamp timeout, td::actor::ActorId validator_manager, td::actor::ActorId rldp, td::actor::ActorId overlays, td::actor::ActorId adnl, @@ -55,6 +56,7 @@ class DownloadArchiveSlice : public td::actor::Actor { private: BlockSeqno masterchain_seqno_; + ShardIdFull shard_prefix_; std::string tmp_dir_; std::string tmp_name_; td::FileFd fd_; diff --git a/validator/validator.h b/validator/validator.h index afd884acc..c7ef2ce87 100644 --- a/validator/validator.h +++ b/validator/validator.h @@ -160,8 +160,8 @@ class ValidatorManagerInterface : public td::actor::Actor { td::Promise promise) = 0; virtual void get_next_key_blocks(BlockIdExt block_id, td::Timestamp timeout, td::Promise> promise) = 0; - virtual void download_archive(BlockSeqno masterchain_seqno, std::string tmp_dir, td::Timestamp timeout, - td::Promise promise) = 0; + virtual void download_archive(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, std::string tmp_dir, + td::Timestamp timeout, td::Promise promise) = 0; virtual void download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector blocks, block::ImportedMsgQueueLimits limits, td::Timestamp timeout, td::Promise>> promise) = 0; @@ -253,7 +253,8 @@ class ValidatorManagerInterface : public td::actor::Actor { td::Timestamp timeout, td::Promise>> promise) = 0; - virtual void get_archive_id(BlockSeqno masterchain_seqno, td::Promise promise) = 0; + virtual void get_archive_id(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, + td::Promise promise) = 0; virtual void get_archive_slice(td::uint64 archive_id, td::uint64 offset, td::uint32 limit, td::Promise promise) = 0;