Skip to content

Commit

Permalink
capi: block execution commits after state batch has been flushed (#1621)
Browse files Browse the repository at this point in the history
* api: use bytes instead of gas as batch size unit
node: fix batch size computation in db buffer
node: avoid caching read values in db buffer

* api: restore usage of gas as batch size unit

* api: improve logging in block execution

* make fmt

* make format uniform in logs

* api: commit after any state batch has been flushed
node: change RWTxnUnmanaged to allow commit and abort

* fix comment

* api: flush state history after each block execution
api: change batch flush policy from gas to size
node: fix empty element in call traces

* node: log MDBX commit latency

* node: update state batch size for initial=current=empty account
node: fix state batch size computation for account code and storage
node: sort storage locations to insert ordered data into db
node: fix unmanaged r/w txn destruction
add unit tests

* add unit tests for batch size

* delete check already moved within db::Buffer::update_account

* add check also to InMemoryState::update_account

* fix some warnings and naming

---------

Co-authored-by: GitHub <[email protected]>
  • Loading branch information
canepat and web-flow committed Jan 23, 2024
1 parent 346f29d commit 9f2436f
Show file tree
Hide file tree
Showing 11 changed files with 657 additions and 95 deletions.
74 changes: 52 additions & 22 deletions silkworm/capi/silkworm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
#include <silkworm/core/execution/call_tracer.hpp>
#include <silkworm/core/execution/execution.hpp>
#include <silkworm/core/types/call_traces.hpp>
#include <silkworm/infra/common/directories.hpp>
#include <silkworm/infra/common/log.hpp>
#include <silkworm/infra/common/stopwatch.hpp>
#include <silkworm/infra/concurrency/signal_handler.hpp>
#include <silkworm/infra/concurrency/thread_pool.hpp>
#include <silkworm/node/db/access_layer.hpp>
Expand All @@ -49,6 +51,7 @@ static MemoryMappedRegion make_region(const SilkwormMemoryMappedFile& mmf) {
static log::Settings kLogSettingsLikeErigon{
.log_utc = false, // display local time
.log_timezone = false, // no timezone ID
.log_nocolor = true, // do not use colors
.log_trim = true, // compact rendering (i.e. no whitespaces)
};

Expand All @@ -62,7 +65,6 @@ struct ExecutionProgress {
size_t processed_transactions{0};
size_t processed_gas{0};
float gas_state_perc{0.0};
float gas_history_perc{0.0};
};

//! Generate log arguments for Silkworm library version
Expand All @@ -74,8 +76,27 @@ static log::Args log_args_for_version() {
"git_tag",
std::string(build_info->project_version),
"git_commit",
std::string(build_info->git_commit_hash),
};
std::string(build_info->git_commit_hash)};
}

//! Generate log arguments for execution flush at specified block
static log::Args log_args_for_exec_flush(const db::Buffer& state_buffer, uint64_t max_batch_size, uint64_t current_block) {
return {
"batch",
std::to_string(state_buffer.current_batch_state_size()),
"max_batch",
std::to_string(max_batch_size),
"block",
std::to_string(current_block)};
}

//! Generate log arguments for execution commit at specified block
static log::Args log_args_for_exec_commit(StopWatch::Duration elapsed, const std::filesystem::path& db_path) {
return {
"in",
StopWatch::format(elapsed),
"chaindata",
std::to_string(Directory{db_path}.size())};
}

static std::filesystem::path make_path(const char data_dir_path[SILKWORM_PATH_SIZE]) {
Expand Down Expand Up @@ -117,10 +138,7 @@ static log::Args log_args_for_exec_progress(ExecutionProgress& progress, uint64_
"Mgas/s",
float_to_string(speed_mgas),
"gasState",
float_to_string(progress.gas_state_perc),
"gasHistory",
float_to_string(progress.gas_history_perc),
};
float_to_string(progress.gas_state_perc)};
}

//! A signal handler guard using RAII pattern to acquire/release signal handling
Expand Down Expand Up @@ -373,6 +391,7 @@ int silkworm_execute_blocks(SilkwormHandle handle, MDBX_txn* mdbx_txn, uint64_t
try {
// Wrap MDBX txn into an internal *unmanaged* txn, i.e. MDBX txn is only used but neither aborted nor committed
db::RWTxnUnmanaged txn{mdbx_txn};
const auto db_path{txn.db().get_path()};

db::Buffer state_buffer{txn, /*prune_history_threshold=*/0};
db::DataModel access_layer{txn};
Expand All @@ -381,9 +400,10 @@ int silkworm_execute_blocks(SilkwormHandle handle, MDBX_txn* mdbx_txn, uint64_t
AnalysisCache analysis_cache{kCacheSize};
ObjectPool<evmone::ExecutionState> state_pool;

// Transform batch size limit into gas units (Ggas = Giga gas, Tgas = Tera gas)
const size_t gas_max_history_size{batch_size * 1_Kibi / 2}; // 512MB -> 256Ggas roughly
const size_t gas_max_batch_size{gas_max_history_size * 20}; // 256Ggas -> 5Tgas roughly
const size_t max_batch_size{batch_size};

// Transform batch size limit into gas units (Ggas = Giga gas)
const size_t gas_max_batch_size{batch_size * 2_Kibi}; // 256MB -> 512Ggas roughly

// Preload requested blocks in batches from storage, i.e. from MDBX database or snapshots
static constexpr size_t kMaxPrefetchedBlocks{10240};
Expand All @@ -393,7 +413,7 @@ int silkworm_execute_blocks(SilkwormHandle handle, MDBX_txn* mdbx_txn, uint64_t
auto signal_check_time{progress.start_time};
auto log_time{progress.start_time};

size_t gas_batch_size{0}, gas_history_size{0};
size_t gas_batch_size{0};
for (BlockNum block_number{start_block}; block_number <= max_block; ++block_number) {
if (prefetched_blocks.empty()) {
const auto num_blocks{std::min(size_t(max_block - block_number + 1), kMaxPrefetchedBlocks)};
Expand Down Expand Up @@ -443,19 +463,23 @@ int silkworm_execute_blocks(SilkwormHandle handle, MDBX_txn* mdbx_txn, uint64_t
progress.processed_transactions += block.transactions.size();
progress.processed_gas += block.header.gas_used;
gas_batch_size += block.header.gas_used;
gas_history_size += block.header.gas_used;

prefetched_blocks.pop_front();

// Flush whole state buffer or just history if we've reached the target batch sizes in gas units
if (gas_batch_size >= gas_max_batch_size) {
SILK_TRACE << log::Args{"buffer", "state", "size", human_size(state_buffer.current_batch_state_size())};
state_buffer.write_to_db(write_change_sets);
// Always flush history for single processed block (no batching)
state_buffer.write_history_to_db(write_change_sets);

// Flush state buffer if we've reached the target batch size
if (state_buffer.current_batch_state_size() >= max_batch_size) {
log::Info{"[4/12 Execution] Flushing state", // NOLINT(*-unused-raii)
log_args_for_exec_flush(state_buffer, max_batch_size, block.header.number)};
state_buffer.write_state_to_db();
gas_batch_size = 0;
} else if (gas_history_size >= gas_max_history_size) {
SILK_TRACE << log::Args{"buffer", "history", "size", human_size(state_buffer.current_batch_history_size())};
state_buffer.write_history_to_db(write_change_sets);
gas_history_size = 0;
StopWatch sw{/*auto_start=*/true};
txn.commit_and_renew();
const auto [elapsed, _]{sw.stop()};
log::Info("[4/12 Execution] Commit state+history", // NOLINT(*-unused-raii)
log_args_for_exec_commit(sw.since_start(elapsed), db_path));
}

const auto now{std::chrono::steady_clock::now()};
Expand All @@ -467,15 +491,21 @@ int silkworm_execute_blocks(SilkwormHandle handle, MDBX_txn* mdbx_txn, uint64_t
}
if (log_time <= now) {
progress.gas_state_perc = float(gas_batch_size) / float(gas_max_batch_size);
progress.gas_history_perc = float(gas_history_size) / float(gas_max_history_size);
progress.end_time = now;
log::Info{"[4/12 Execution] Executed blocks", // NOLINT(*-unused-raii)
log_args_for_exec_progress(progress, block.header.number)};
log_time = now + 20s;
}
}

state_buffer.write_to_db(write_change_sets);
log::Info{"[4/12 Execution] Flushing state", // NOLINT(*-unused-raii)
log_args_for_exec_flush(state_buffer, max_batch_size, max_block)};
state_buffer.write_state_to_db();
StopWatch sw{/*auto_start=*/true};
txn.commit_and_renew();
const auto [elapsed, _]{sw.stop()};
log::Info("[4/12 Execution] Commit state+history", // NOLINT(*-unused-raii)
log_args_for_exec_commit(sw.since_start(elapsed), db_path));
return SILKWORM_OK;
} catch (const mdbx::exception& e) {
if (mdbx_error_code) {
Expand Down
6 changes: 3 additions & 3 deletions silkworm/core/common/util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,8 @@ std::optional<uint64_t> parse_size(const std::string& sizestr) {
return number;
}

std::string human_size(uint64_t bytes) {
static const char* suffix[]{"B", "KB", "MB", "GB", "TB"};
std::string human_size(uint64_t bytes, const char* unit) {
static const char* suffix[]{"", "K", "M", "G", "T"};
static const uint32_t items{sizeof(suffix) / sizeof(suffix[0])};
uint32_t index{0};
double value{static_cast<double>(bytes)};
Expand All @@ -213,7 +213,7 @@ std::string human_size(uint64_t bytes) {
}
static constexpr size_t kBufferSize{64};
SILKWORM_THREAD_LOCAL char output[kBufferSize];
SILKWORM_ASSERT(std::snprintf(output, kBufferSize, "%.02lf %s", value, suffix[index]) > 0);
SILKWORM_ASSERT(std::snprintf(output, kBufferSize, "%.02lf %s%s", value, suffix[index], unit) > 0);
return output;
}

Expand Down
2 changes: 1 addition & 1 deletion silkworm/core/common/util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ std::optional<Bytes> from_hex(std::string_view hex) noexcept;
std::optional<uint64_t> parse_size(const std::string& sizestr);

// Converts a number of bytes in a human-readable format
std::string human_size(uint64_t bytes);
std::string human_size(uint64_t bytes, const char* unit = "B");

// Compares two strings for equality with case insensitivity
bool iequals(std::string_view a, std::string_view b);
Expand Down
4 changes: 4 additions & 0 deletions silkworm/core/state/in_memory_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ void InMemoryState::begin_block(BlockNum block_number) {

void InMemoryState::update_account(const evmc::address& address, std::optional<Account> initial,
std::optional<Account> current) {
// Skip update if both initial and final state are non-existent (i.e. contract creation+destruction within the same block)
if (!initial && !current) {
return;
}
account_changes_[block_number_][address] = initial;

if (current.has_value()) {
Expand Down
4 changes: 0 additions & 4 deletions silkworm/core/state/intra_block_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,10 +341,6 @@ void IntraBlockState::write_to_db(uint64_t block_number) {
}

for (const auto& [address, obj] : objects_) {
// Skip update if both initial and final state are empty (i.e. contract creation+destruction within the same block)
if (!obj.initial && !obj.current) {
continue;
}
db_.update_account(address, obj.initial, obj.current);
if (!obj.current) {
continue;
Expand Down
85 changes: 51 additions & 34 deletions silkworm/node/db/buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ void Buffer::begin_block(uint64_t block_number) {

void Buffer::update_account(const evmc::address& address, std::optional<Account> initial,
std::optional<Account> current) {
// Skip update if both initial and final state are non-existent (i.e. contract creation+destruction within the same block)
if (!initial && !current) {
// Only to perfectly match Erigon state batch size (Erigon does count any account w/ old=new=empty value).
batch_state_size_ += kAddressLength;
return;
}

const bool equal{current == initial};
const bool account_deleted{!current.has_value()};

Expand Down Expand Up @@ -66,11 +73,11 @@ void Buffer::update_account(const evmc::address& address, std::optional<Account>
}
auto it{accounts_.find(address)};
if (it != accounts_.end()) {
batch_state_size_ -= it->second.has_value() ? sizeof(Account) : 0;
batch_state_size_ += (current ? sizeof(Account) : 0);
batch_state_size_ -= it->second.has_value() ? it->second->encoding_length_for_storage() : 0;
batch_state_size_ += (current ? current->encoding_length_for_storage() : 0);
it->second = current;
} else {
batch_state_size_ += kAddressLength + (current ? sizeof(Account) : 0);
batch_state_size_ += kAddressLength + (current ? current->encoding_length_for_storage() : 0);
accounts_[address] = current;
}

Expand All @@ -85,10 +92,12 @@ void Buffer::update_account(const evmc::address& address, std::optional<Account>

void Buffer::update_account_code(const evmc::address& address, uint64_t incarnation, const evmc::bytes32& code_hash,
ByteView code) {
// Don't overwrite already existing code so that views of it
// that were previously returned by read_code() are still valid.
if (hash_to_code_.try_emplace(code_hash, code).second) {
// Don't overwrite existing code so that views of it that were previously returned by read_code are still valid
const auto [inserted_or_existing_it, inserted] = hash_to_code_.try_emplace(code_hash, code);
if (inserted) {
batch_state_size_ += kHashLength + code.length();
} else {
batch_state_size_ += code.length() - inserted_or_existing_it->second.length();
}

if (storage_prefix_to_code_hash_.insert_or_assign(storage_prefix(address, incarnation), code_hash).second) {
Expand All @@ -111,8 +120,14 @@ void Buffer::update_storage(const evmc::address& address, uint64_t incarnation,
}
}

if (storage_[address][incarnation].insert_or_assign(location, current).second) {
batch_state_size_ += kPlainStoragePrefixLength + kHashLength + kHashLength;
// Iterator in insert_or_assign return value "is pointing at the element that was inserted or updated"
// so we cannot use it to determine the old value size: we need to use initial instead
const auto [_, inserted] = storage_[address][incarnation].insert_or_assign(location, current);
ByteView current_val{zeroless_view(current.bytes)};
if (inserted) {
batch_state_size_ += kPlainStoragePrefixLength + kHashLength + current_val.length();
} else {
batch_state_size_ += current_val.length() - zeroless_view(initial.bytes).length();
}
}

Expand Down Expand Up @@ -242,8 +257,8 @@ void Buffer::write_history_to_db(bool write_change_sets) {

batch_history_size_ = 0;
auto [finish_time, _]{sw.stop()};
log::Info("Flushed history",
{"size", human_size(total_written_size), "in", StopWatch::format(sw.since_start(finish_time))});
log::Trace("Flushed history",
{"size", human_size(total_written_size), "in", StopWatch::format(sw.since_start(finish_time))});
}

void Buffer::write_state_to_db() {
Expand Down Expand Up @@ -337,9 +352,17 @@ void Buffer::write_state_to_db() {
if (auto it{storage_.find(address)}; it != storage_.end()) {
for (const auto& [incarnation, contract_storage] : it->second) {
Bytes prefix{storage_prefix(address, incarnation)};
for (const auto& [location, value] : contract_storage) {
upsert_storage_value(*state_table, prefix, location.bytes, value.bytes);
written_size += prefix.length() + kLocationLength + kHashLength;
// Extract sorted set of storage locations to insert ordered data into the DB
absl::btree_set<evmc::bytes32> storage_locations;
for (auto& storage_entry : contract_storage) {
storage_locations.insert(storage_entry.first);
}
for (const auto& location : storage_locations) {
if (auto storage_it{contract_storage.find(location)}; storage_it != contract_storage.end()) {
const auto& value{storage_it->second};
upsert_storage_value(*state_table, prefix, location.bytes, value.bytes);
written_size += prefix.length() + kLocationLength + zeroless_view(value.bytes).size();
}
}
}
storage_.erase(it);
Expand Down Expand Up @@ -397,23 +420,24 @@ void Buffer::insert_call_traces(BlockNum block_number, const CallTraces& traces)
touched_accounts.insert(recipient);
}

if (not touched_accounts.empty()) {
if (!touched_accounts.empty()) {
batch_history_size_ += sizeof(BlockNum);
}
absl::btree_set<Bytes> values;
for (const auto& account : touched_accounts) {
Bytes value(kAddressLength + 1, '\0');
std::memcpy(value.data(), account.bytes, kAddressLength);
if (traces.senders.contains(account)) {
value[kAddressLength] |= 1;
}
if (traces.recipients.contains(account)) {
value[kAddressLength] |= 2;

absl::btree_set<Bytes> values;
for (const auto& account : touched_accounts) {
Bytes value(kAddressLength + 1, '\0');
std::memcpy(value.data(), account.bytes, kAddressLength);
if (traces.senders.contains(account)) {
value[kAddressLength] |= 1;
}
if (traces.recipients.contains(account)) {
value[kAddressLength] |= 2;
}
batch_history_size_ += value.size();
values.insert(std::move(value));
}
batch_history_size_ += value.size();
values.insert(std::move(value));
call_traces_.emplace(block_number, values);
}
call_traces_.emplace(block_number, values);
}

evmc::bytes32 Buffer::state_root_hash() const {
Expand Down Expand Up @@ -482,8 +506,6 @@ std::optional<Account> Buffer::read_account(const evmc::address& address) const
return it->second;
}
auto db_account{db::read_account(txn_, address, historical_block_)};
accounts_[address] = db_account;
batch_state_size_ += kAddressLength + db_account.value_or(Account()).encoding_length_for_storage();
return db_account;
}

Expand All @@ -501,19 +523,14 @@ ByteView Buffer::read_code(const evmc::bytes32& code_hash) const noexcept {

evmc::bytes32 Buffer::read_storage(const evmc::address& address, uint64_t incarnation,
const evmc::bytes32& location) const noexcept {
size_t payload_length{kAddressLength + kIncarnationLength + kLocationLength + kHashLength};
if (auto it1{storage_.find(address)}; it1 != storage_.end()) {
payload_length -= kAddressLength;
if (auto it2{it1->second.find(incarnation)}; it2 != it1->second.end()) {
payload_length -= kIncarnationLength;
if (auto it3{it2->second.find(location)}; it3 != it2->second.end()) {
return it3->second;
}
}
}
auto db_storage{db::read_storage(txn_, address, incarnation, location, historical_block_)};
storage_[address][incarnation][location] = db_storage;
batch_state_size_ += payload_length;
return db_storage;
}

Expand Down
8 changes: 4 additions & 4 deletions silkworm/node/db/buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,10 @@ class Buffer : public State {
//! @param write_change_sets flag indicating if state changes should be written or not (default: true)
void write_history_to_db(bool write_change_sets = true);

private:
//! \brief Persists *state* accrued contents into db
void write_state_to_db();

private:
RWTxn& txn_;
db::DataModel access_layer_;
uint64_t prune_history_threshold_;
Expand All @@ -148,9 +148,9 @@ class Buffer : public State {
mutable absl::flat_hash_map<evmc::address, std::optional<Account>> accounts_;

// address -> incarnation -> location -> value
mutable absl::flat_hash_map<evmc::address,
absl::btree_map<uint64_t, absl::flat_hash_map<evmc::bytes32, evmc::bytes32>>>
storage_;
using Storage = absl::flat_hash_map<evmc::bytes32, evmc::bytes32>;
using StorageByIncarnation = absl::btree_map<uint64_t, Storage>;
mutable absl::flat_hash_map<evmc::address, StorageByIncarnation> storage_;

absl::btree_map<evmc::address, uint64_t> incarnations_;
absl::btree_map<evmc::bytes32, Bytes> hash_to_code_;
Expand Down
Loading

0 comments on commit 9f2436f

Please sign in to comment.