Skip to content

Commit

Permalink
Fix error processing in ArchiveImporter
Browse files Browse the repository at this point in the history
  • Loading branch information
SpyCheese committed Jun 6, 2024
1 parent 24d4915 commit 8690f14
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 2 deletions.
17 changes: 16 additions & 1 deletion validator/import-db-slice.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ ArchiveImporter::ArchiveImporter(std::string db_root, td::Ref<MasterchainState>

void ArchiveImporter::start_up() {
if (use_imported_files_) {
LOG(INFO) << "Importing archive for masterchain seqno #" << start_import_seqno_ << " from disk";
for (const std::string& path : to_import_files_) {
LOG(INFO) << "Importing file from disk " << path;
td::Status S = process_package(path, true);
if (S.is_error()) {
LOG(INFO) << "Error processing package " << path << ": " << S;
Expand All @@ -60,6 +62,7 @@ void ArchiveImporter::start_up() {
processed_mc_archive();
return;
}
LOG(INFO) << "Importing archive for masterchain seqno #" << start_import_seqno_ << " from net";
td::actor::send_closure(manager_, &ValidatorManager::send_download_archive_request, start_import_seqno_,
ShardIdFull{masterchainId}, db_root_ + "/tmp/", td::Timestamp::in(3600.0),
[SelfId = actor_id(this)](td::Result<std::string> R) {
Expand Down Expand Up @@ -142,6 +145,8 @@ td::Status ArchiveImporter::process_package(std::string path, bool with_masterch
if (b.is_masterchain()) {
masterchain_blocks_[b.seqno()] = b;
last_masterchain_seqno_ = std::max(last_masterchain_seqno_, b.seqno());
} else {
have_shard_blocks_;
}
}
return true;
Expand Down Expand Up @@ -264,6 +269,7 @@ void ArchiveImporter::applied_masterchain_block(BlockHandle handle) {

void ArchiveImporter::got_new_materchain_state(td::Ref<MasterchainState> state) {
last_masterchain_state_ = std::move(state);
imported_any_ = true;
check_masterchain_block(last_masterchain_state_->get_block_id().seqno() + 1);
}

Expand All @@ -287,7 +293,8 @@ void ArchiveImporter::download_shard_archives(td::Ref<MasterchainState> start_st
start_state_ = start_state;
td::uint32 monitor_min_split = start_state->monitor_min_split_depth(basechainId);
// If monitor_min_split == 0, we use the old archive format (packages are not separated by shard)
if (monitor_min_split > 0 && !use_imported_files_) {
// If masterchain package has shard blocks then it's old archive format, don't need to download shards
if (monitor_min_split > 0 && !have_shard_blocks_ && !use_imported_files_) {
for (td::uint64 i = 0; i < (1ULL << monitor_min_split); ++i) {
ShardIdFull shard_prefix{basechainId, (i * 2 + 1) << (64 - monitor_min_split - 1)};
if (opts_->need_monitor(shard_prefix, start_state)) {
Expand Down Expand Up @@ -371,6 +378,7 @@ void ArchiveImporter::got_masterchain_state(td::Ref<MasterchainState> state) {
void ArchiveImporter::checked_shard_client_seqno(BlockSeqno seqno) {
CHECK(shard_client_seqno_ + 1 == seqno);
shard_client_seqno_++;
imported_any_ = true;
check_next_shard_client_seqno(seqno + 1);
}

Expand Down Expand Up @@ -484,6 +492,13 @@ void ArchiveImporter::check_shard_block_applied(BlockIdExt block_id, td::Promise
}

void ArchiveImporter::abort_query(td::Status error) {
if (!imported_any_) {
for (const std::string &f : files_to_cleanup_) {
td::unlink(f).ignore();
}
promise_.set_error(std::move(error));
return;
}
LOG(INFO) << "Archive import: " << error;
finish_query();
}
Expand Down
2 changes: 2 additions & 0 deletions validator/import-db-slice.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ class ArchiveImporter : public td::actor::Actor {
td::Ref<MasterchainState> start_state_;
size_t pending_shard_archives_ = 0;

bool imported_any_ = false;
bool have_shard_blocks_ = false;
std::vector<std::string> files_to_cleanup_;
};

Expand Down
1 change: 0 additions & 1 deletion validator/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1948,7 +1948,6 @@ void ValidatorManagerImpl::download_next_archive() {
}

auto seqno = std::min(last_masterchain_seqno_, shard_client_handle_->id().seqno());
// TODO: importing archives from disk
std::vector<std::string> to_import_files;
auto it = to_import_.upper_bound(seqno + 1);
if (it != to_import_.begin()) {
Expand Down

0 comments on commit 8690f14

Please sign in to comment.