Skip to content

Commit

Permalink
capi: Fix batch size control (#2308)
Browse files Browse the repository at this point in the history
On db::Buffer::MemoryLimitError the block has already been popped from the block_buffer. Then in the next iteration we would get the next block and the assert SILKWORM_ASSERT(block->header.number == block_number) would fail. For this to work we need to re-insert the failed block in the exception handler.
  • Loading branch information
JacekGlen authored Sep 23, 2024
1 parent ffdc93b commit d65aaa4
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 6 deletions.
20 changes: 19 additions & 1 deletion silkworm/capi/silkworm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ int silkworm_execute_blocks_ephemeral(SilkwormHandle handle, MDBX_txn* mdbx_txn,
auto signal_check_time{std::chrono::steady_clock::now()};

BlockNum block_number{start_block};
BlockNum batch_start_block_number{start_block};
BlockNum last_block_number = 0;
db::DataModel da_layer{txn};

Expand Down Expand Up @@ -522,7 +523,14 @@ int silkworm_execute_blocks_ephemeral(SilkwormHandle handle, MDBX_txn* mdbx_txn,
last_exec_result = block_executor.execute_single(block, state_buffer, analysis_cache, state_pool);
update_execution_progress(execution_progress, block, state_buffer, max_batch_size);
} catch (const db::Buffer::MemoryLimitError&) {
// infinite loop detection, buffer memory limit reached but no progress
if (batch_start_block_number == block_number) {
SILK_ERROR << "Buffer memory limit too small to execute a single block (block_number=" << block_number << ")";
return SILKWORM_INTERNAL_ERROR;
}

// batch done
batch_start_block_number = block_number;
break;
}
if (last_exec_result != ValidationResult::kOk) {
Expand Down Expand Up @@ -609,6 +617,7 @@ int silkworm_execute_blocks_perpetual(SilkwormHandle handle, MDBX_env* mdbx_env,

std::optional<Block> block;
BlockNum block_number{start_block};
BlockNum batch_start_block_number{start_block};
BlockNum last_block_number = 0;
AnalysisCache analysis_cache{execution::block::BlockExecutor::kDefaultAnalysisCacheSize};
ObjectPool<evmone::ExecutionState> state_pool;
Expand All @@ -619,7 +628,7 @@ int silkworm_execute_blocks_perpetual(SilkwormHandle handle, MDBX_env* mdbx_env,

while (block_number <= max_block) {
while (block_number <= max_block) {
block_buffer.pop_back(&block);
block_buffer.peek_back(&block);
if (!block) {
return SILKWORM_BLOCK_NOT_FOUND;
}
Expand All @@ -629,9 +638,17 @@ int silkworm_execute_blocks_perpetual(SilkwormHandle handle, MDBX_env* mdbx_env,
last_exec_result = block_executor.execute_single(*block, state_buffer, analysis_cache, state_pool);
update_execution_progress(execution_progress, *block, state_buffer, max_batch_size);
} catch (const db::Buffer::MemoryLimitError&) {
// infinite loop detection, buffer memory limit reached but no progress
if (batch_start_block_number == block_number) {
SILK_ERROR << "Buffer memory limit too small to execute a single block (block_number=" << block_number << ")";
return SILKWORM_INTERNAL_ERROR;
}

// batch done
batch_start_block_number = block_number;
break;
}

if (last_exec_result != ValidationResult::kOk) {
// firstly, persist the work done so far, then return SILKWORM_INVALID_BLOCK
break;
Expand All @@ -643,6 +660,7 @@ int silkworm_execute_blocks_perpetual(SilkwormHandle handle, MDBX_env* mdbx_env,

last_block_number = block_number;
++block_number;
block_buffer.pop_back(&block);
}

StopWatch sw{/*auto_start=*/true};
Expand Down
146 changes: 141 additions & 5 deletions silkworm/capi/silkworm_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ TEST_CASE_METHOD(CApiTest, "CAPI silkworm_execute_blocks_ephemeral multiple bloc
SilkwormLibrary silkworm_lib{env_path()};

const int chain_id{1};
const uint64_t batch_size{256 * kMebi};
const uint64_t batch_size{3000}; // Small batch size to force multiple iterations
const bool write_change_sets{false}; // We CANNOT write changesets here, TestDatabaseContext db already has them
const bool write_receipts{false}; // We CANNOT write receipts here, TestDatabaseContext db already has them
const bool write_call_traces{false}; // For coherence but don't care
Expand Down Expand Up @@ -583,7 +583,7 @@ TEST_CASE_METHOD(CApiTest, "CAPI silkworm_execute_blocks_perpetual multiple bloc
SilkwormLibrary silkworm_lib{env_path()};

const int chain_id{1};
const uint64_t batch_size{256 * kMebi};
const uint64_t batch_size{3000}; // Small batch size to force multiple iterations
const bool write_change_sets{false}; // We CANNOT write changesets here, TestDatabaseContext db already has them
const bool write_receipts{false}; // We CANNOT write receipts here, TestDatabaseContext db already has them
const bool write_call_traces{false}; // For coherence but don't care
Expand All @@ -603,7 +603,7 @@ TEST_CASE_METHOD(CApiTest, "CAPI silkworm_execute_blocks_perpetual multiple bloc

// Prepare block template (just 1 tx w/ value transfer)
evmc::address from{0x658bdf435d810c91414ec09147daa6db62406379_address}; // funded in genesis
evmc::address to{0x8b299e2b7d7f43c0ce3068263545309ff4ffb521_address}; // untouched address
evmc::address to{0x8b299e2b7d7f43c0ce3068263545309ff4ffb500_address}; // untouched address(es)
intx::uint256 value{1};

Block block{};
Expand Down Expand Up @@ -635,6 +635,7 @@ TEST_CASE_METHOD(CApiTest, "CAPI silkworm_execute_blocks_perpetual multiple bloc
block.transactions.erase(block.transactions.cbegin());
block.transactions.pop_back();
block.transactions[0].nonce++;
block.transactions[0].to->bytes[19]++; // change recipient address to force batch size growth
}

// Execute N blocks using an *internal* txn
Expand All @@ -646,16 +647,18 @@ TEST_CASE_METHOD(CApiTest, "CAPI silkworm_execute_blocks_perpetual multiple bloc

db::ROTxnManaged ro_txn{env};
REQUIRE(db::read_account(ro_txn, to));
CHECK(db::read_account(ro_txn, to)->balance == kBlocks * value);
CHECK(db::read_account(ro_txn, to)->balance == value);
ro_txn.abort();

// Insert N blocks again
block.transactions[0].to = to;
for (size_t i{10 + kBlocks}; i < (10 + 2 * kBlocks); ++i) {
block.header.number = i;
insert_block(env, block);
block.transactions.erase(block.transactions.cbegin());
block.transactions.pop_back();
block.transactions[0].nonce++;
block.transactions[0].to->bytes[19]++; // change recipient address to force batch size growth
}

// Execute N blocks using an *internal* txn, then commit
Expand All @@ -667,7 +670,140 @@ TEST_CASE_METHOD(CApiTest, "CAPI silkworm_execute_blocks_perpetual multiple bloc

ro_txn = db::ROTxnManaged{env};
REQUIRE(db::read_account(ro_txn, to));
CHECK(db::read_account(ro_txn, to)->balance == 2 * kBlocks * value);
CHECK(db::read_account(ro_txn, to)->balance == 2 * value);
}

TEST_CASE_METHOD(CApiTest, "CAPI silkworm_execute_blocks_ephemeral multiple blocks: insufficient buffer memory", "[silkworm][capi]") {
// Use Silkworm as a library with silkworm_init/silkworm_fini automated by RAII
SilkwormLibrary silkworm_lib{env_path()};

const int chain_id{1};
const uint64_t batch_size{170}; // Small batch size to force multiple iterations
const bool write_change_sets{false}; // We CANNOT write changesets here, TestDatabaseContext db already has them
const bool write_receipts{false}; // We CANNOT write receipts here, TestDatabaseContext db already has them
const bool write_call_traces{false}; // For coherence but don't care

auto execute_blocks = [&](auto tx, auto start_block, auto end_block) {
return silkworm_lib.execute_blocks(tx,
chain_id,
start_block,
end_block,
batch_size,
write_change_sets,
write_receipts,
write_call_traces);
};

/* TestDatabaseContext db contains a test chain made up of 9 blocks */

// Prepare block template (just 1 tx w/ value transfer)
evmc::address from{0x658bdf435d810c91414ec09147daa6db62406379_address}; // funded in genesis
evmc::address to{0x8b299e2b7d7f43c0ce3068263545309ff4ffb521_address}; // untouched address
intx::uint256 value{1};

Block block{};
block.header.gas_limit = 5'000'000;
block.header.gas_used = 21'000;

static constexpr auto kEncoder = [](Bytes& dest, const Receipt& r) { rlp::encode(dest, r); };
std::vector<Receipt> receipts{
{TransactionType::kLegacy, true, block.header.gas_used, {}, {}},
};
block.header.receipts_root = trie::root_hash(receipts, kEncoder);
block.transactions.resize(1);
block.transactions[0].to = to;
block.transactions[0].gas_limit = block.header.gas_limit;
block.transactions[0].type = TransactionType::kLegacy;
block.transactions[0].max_priority_fee_per_gas = 0;
block.transactions[0].max_fee_per_gas = 20 * kGiga;
block.transactions[0].value = value;
block.transactions[0].r = 1; // dummy
block.transactions[0].s = 1; // dummy
block.transactions[0].set_sender(from);

constexpr size_t kBlocks{130};

// Insert N blocks
for (size_t i{10}; i < 10 + kBlocks; ++i) {
block.header.number = i;
insert_block(env, block);
block.transactions.erase(block.transactions.cbegin());
block.transactions.pop_back();
block.transactions[0].nonce++;
}

// Execute N blocks using an *external* txn, then commit
db::RWTxnManaged external_txn0{env};
BlockNum start_block{10}, end_block{10 + kBlocks - 1};
const auto result0{execute_blocks(*external_txn0, start_block, end_block)};
CHECK_NOTHROW(external_txn0.commit_and_stop());
CHECK(result0.execute_block_result == SILKWORM_INTERNAL_ERROR);
}

TEST_CASE_METHOD(CApiTest, "CAPI silkworm_execute_blocks_perpetual multiple blocks: insufficient buffer memory", "[silkworm][capi]") {
// Use Silkworm as a library with silkworm_init/silkworm_fini automated by RAII
SilkwormLibrary silkworm_lib{env_path()};

const int chain_id{1};
const uint64_t batch_size{170}; // Batch size not enough to process a single block
const bool write_change_sets{false}; // We CANNOT write changesets here, TestDatabaseContext db already has them
const bool write_receipts{false}; // We CANNOT write receipts here, TestDatabaseContext db already has them
const bool write_call_traces{false}; // For coherence but don't care

auto execute_blocks = [&](auto start_block, auto end_block) {
return silkworm_lib.execute_blocks_perpetual(env,
chain_id,
start_block,
end_block,
batch_size,
write_change_sets,
write_receipts,
write_call_traces);
};

/* TestDatabaseContext db contains a test chain made up of 9 blocks */

// Prepare block template (just 1 tx w/ value transfer)
evmc::address from{0x658bdf435d810c91414ec09147daa6db62406379_address}; // funded in genesis
evmc::address to{0x8b299e2b7d7f43c0ce3068263545309ff4ffb500_address}; // untouched address(es)
intx::uint256 value{1};

Block block{};
block.header.gas_limit = 5'000'000;
block.header.gas_used = 21'000;

static constexpr auto kEncoder = [](Bytes& dest, const Receipt& r) { rlp::encode(dest, r); };
std::vector<Receipt> receipts{
{TransactionType::kLegacy, true, block.header.gas_used, {}, {}},
};
block.header.receipts_root = trie::root_hash(receipts, kEncoder);
block.transactions.resize(1);
block.transactions[0].to = to;
block.transactions[0].gas_limit = block.header.gas_limit;
block.transactions[0].type = TransactionType::kLegacy;
block.transactions[0].max_priority_fee_per_gas = 0;
block.transactions[0].max_fee_per_gas = 20 * kGiga;
block.transactions[0].value = value;
block.transactions[0].r = 1; // dummy
block.transactions[0].s = 1; // dummy
block.transactions[0].set_sender(from);

constexpr size_t kBlocks{130};

// Insert N blocks
for (size_t i{10}; i < 10 + kBlocks; ++i) {
block.header.number = i;
insert_block(env, block);
block.transactions.erase(block.transactions.cbegin());
block.transactions.pop_back();
block.transactions[0].nonce++;
block.transactions[0].to->bytes[19]++; // change recipient address to force batch size growth
}

// Execute N blocks using an *internal* txn
BlockNum start_block{10}, end_block{10 + kBlocks - 1};
const auto result0{execute_blocks(start_block, end_block)};
CHECK(result0.execute_block_result == SILKWORM_INTERNAL_ERROR);
}

TEST_CASE_METHOD(CApiTest, "CAPI silkworm_add_snapshot", "[silkworm][capi]") {
Expand Down
14 changes: 14 additions & 0 deletions silkworm/infra/common/bounded_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,20 @@ class BoundedBuffer {
not_empty_.notify_one();
}

void peek_back(value_type* item) {
boost::unique_lock<boost::mutex> lock(mutex_);

not_empty_.wait(lock, [&] { return is_stopped() || is_not_empty(); });

if (is_stopped()) { // If the buffer is stopped, do not peek the item
item = nullptr;
return;
}

*item = container_[unread_ - 1];
lock.unlock();
}

void pop_back(value_type* item) {
boost::unique_lock<boost::mutex> lock(mutex_);

Expand Down
21 changes: 21 additions & 0 deletions silkworm/infra/common/bounded_buffer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,4 +185,25 @@ TEST_CASE("BoundedBuffer can terminate consumer") {
consume.join();
}

TEST_CASE("BoundedBuffer peek does not remove item") {
BoundedBuffer<std::string> buffer(10);
buffer.push_front("Hello1");
buffer.push_front("Hello2");
buffer.push_front("Hello3");
buffer.push_front("Hello4");

std::string item;
buffer.peek_back(&item);
CHECK(item == "Hello1");
CHECK(buffer.size() == 4);

buffer.pop_back(&item);
CHECK(item == "Hello1");
CHECK(buffer.size() == 3);

buffer.peek_back(&item);
CHECK(item == "Hello2");
CHECK(buffer.size() == 3);
}

} // namespace silkworm

0 comments on commit d65aaa4

Please sign in to comment.