Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpcdaemon: move bloom from logs to worker pool #1958

Closed
wants to merge 12 commits into from
25 changes: 16 additions & 9 deletions .github/workflows/rpc-performance-tests.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
name: QA - RPC Performance Tests

on:
pull_request:
branches:
- master
types:
- opened
- ready_for_review
- synchronize
workflow_dispatch:
schedule:
- cron: '0 0 * * *' # Run every day at 00:00 AM UTC
Expand Down Expand Up @@ -50,7 +57,7 @@ jobs:
- name: Run Silkworm RpcDaemon
working-directory: ${{runner.workspace}}/silkworm/build/cmd
run: |
./rpcdaemon --datadir $ERIGON_DATA_DIR --api admin,debug,eth,parity,erigon,trace,web3,txpool,ots,net --log.verbosity 1 --erigon_compatibility --jwt ./jwt.hex --skip_protocol_check --http-compression --eth.addr 127.0.0.1:51515 &
./rpcdaemon --datadir $ERIGON_DATA_DIR --api admin,debug,eth,parity,erigon,trace,web3,txpool,ots,net --log.verbosity 4 --erigon_compatibility --jwt ./jwt.hex --skip_protocol_check --http-compression --eth.addr 127.0.0.1:51515 --contexts 16 &
SILKWORM_RPC_DAEMON_PID=$!
echo "SILKWORM_RPC_DAEMON_PID=$SILKWORM_RPC_DAEMON_PID" >> $GITHUB_ENV

Expand Down Expand Up @@ -122,14 +129,14 @@ jobs:

# Launch the RPC performance test runner
failed_test= 0
run_perf mainnet eth_call stress_test_eth_call_001_14M 1:1,100:30,1000:20,10000:20,20000:20
run_perf mainnet eth_getLogs stress_test_eth_getLogs_15M 1:1,100:30,1000:20,10000:20,20000:20
run_perf mainnet eth_getBalance stress_test_eth_getBalance_15M 1:1,100:30,1000:20,10000:20,20000:20
run_perf mainnet eth_getBlockByHash stress_test_eth_getBlockByHash_14M 1:1,100:30,1000:20,10000:20
run_perf mainnet eth_getBlockByNumber stress_test_eth_getBlockByNumber_13M 1:1,100:30,1000:20,5000:20
run_perf mainnet eth_getTransactionByHash stress_test_eth_getTransactionByHash_13M 1:1,100:30,1000:20,10000:20
run_perf mainnet eth_getTransactionReceipt stress_test_eth_getTransactionReceipt_14M 1:1,100:30,1000:20,5000:20
run_perf mainnet eth_createAccessList stress_test_eth_createAccessList_16M 1:1,100:30,1000:20,10000:20,20000:20
#run_perf mainnet eth_call stress_test_eth_call_001_14M 1:1,100:30,1000:20,10000:20,20000:20
#run_perf mainnet eth_getLogs stress_test_eth_getLogs_15M 1:1,100:30,1000:20,10000:20,20000:20
#run_perf mainnet eth_getBalance stress_test_eth_getBalance_15M 1:1,100:30,1000:20,10000:20,20000:20
#run_perf mainnet eth_getBlockByHash stress_test_eth_getBlockByHash_14M 1:1,100:30,1000:20,10000:20
#run_perf mainnet eth_getBlockByNumber stress_test_eth_getBlockByNumber_13M 1:1,100:30,1000:20,5000:20
#run_perf mainnet eth_getTransactionByHash stress_test_eth_getTransactionByHash_13M 1:1,100:30,1000:20,10000:20
run_perf mainnet eth_getTransactionReceipt stress_test_eth_getTransactionReceipt_14M 1:1,100:30,1000:20,5000:20,10000:20,20000:20
#run_perf mainnet eth_createAccessList stress_test_eth_createAccessList_16M 1:1,100:30,1000:20,10000:20,20000:20

if [ $failed_test -eq 0 ]; then
echo "TEST_RESULT=success" >> "$GITHUB_OUTPUT"
Expand Down
3 changes: 2 additions & 1 deletion silkworm/rpc/commands/eth_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,8 @@ Task<void> EthereumRpcApi::handle_eth_get_transaction_receipt(const nlohmann::js
co_await tx->close(); // RAII not (yet) available with coroutines
co_return;
}
auto receipts = co_await core::get_receipts(tx_database, *block_with_hash);
// auto receipts = co_await core::get_receipts(tx_database, *block_with_hash);
auto receipts = co_await core::get_receipts2(tx_database, *block_with_hash, workers_);
const auto& transactions = block_with_hash->block.transactions;
if (receipts.size() != transactions.size()) {
throw std::invalid_argument{"Unexpected size for receipts in handle_eth_get_transaction_receipt"};
Expand Down
4 changes: 4 additions & 0 deletions silkworm/rpc/core/evm_executor_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ TEST_CASE("EVMExecutor") {
[[nodiscard]] Task<void> walk(const std::string& /*table*/, silkworm::ByteView /*start_key*/, uint32_t /*fixed_bits*/, core::rawdb::Walker /*w*/) const override {
co_return;
}
[[nodiscard]] Task<void> walk_worker(const std::string& /*table*/, silkworm::ByteView /*start_key*/, uint32_t /*fixed_bits*/, core::rawdb::Worker /*w*/,
uint32_t /* max_size */) const override {
co_return;
}
[[nodiscard]] Task<void> for_prefix(const std::string& /*table*/, silkworm::ByteView /*prefix*/, core::rawdb::Walker /*w*/) const override {
co_return;
}
Expand Down
7 changes: 7 additions & 0 deletions silkworm/rpc/core/rawdb/accessors.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,18 @@

#include <silkworm/infra/concurrency/task.hpp>

#include <boost/asio/thread_pool.hpp>

#include <silkworm/core/common/util.hpp>
#include <silkworm/infra/concurrency/channel.hpp>
#include <silkworm/rpc/common/util.hpp>

namespace silkworm::rpc::core::rawdb {

using Walker = std::function<bool(silkworm::Bytes&, silkworm::Bytes&)>;
using WorkerChannel = silkworm::concurrency::Channel<bool>;
using WorkUnit = std::function<void(silkworm::Bytes, silkworm::Bytes, WorkerChannel&)>;
using Worker = std::pair<WorkUnit, boost::asio::thread_pool&>;

class DatabaseReader {
public:
Expand All @@ -40,6 +46,7 @@ class DatabaseReader {
[[nodiscard]] virtual Task<std::optional<silkworm::Bytes>> get_both_range(const std::string& table, silkworm::ByteView key, silkworm::ByteView subkey) const = 0;

[[nodiscard]] virtual Task<void> walk(const std::string& table, silkworm::ByteView start_key, uint32_t fixed_bits, Walker w) const = 0;
[[nodiscard]] virtual Task<void> walk_worker(const std::string& table, silkworm::ByteView start_key, uint32_t fixed_bits, Worker w, uint32_t max_size) const = 0;

[[nodiscard]] virtual Task<void> for_prefix(const std::string& table, silkworm::ByteView prefix, Walker w) const = 0;
};
Expand Down
96 changes: 96 additions & 0 deletions silkworm/rpc/core/rawdb/chain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,47 @@ Task<std::optional<Receipts>> read_raw_receipts(const DatabaseReader& reader, Bl
co_return receipts;
}

Task<std::optional<Receipts>> read_raw_receipts2(const DatabaseReader& reader, BlockNum block_number, boost::asio::thread_pool& worker_pool) {
const auto block_key = silkworm::db::block_key(block_number);
const auto data = co_await reader.get_one(db::table::kBlockReceiptsName, block_key);
SILK_TRACE << "read_raw_receipts2 data: " << silkworm::to_hex(data);
if (data.empty()) {
co_return std::nullopt;
}

Receipts receipts{};
const bool decoding_ok{cbor_decode(data, receipts)};
if (!decoding_ok) {
throw std::runtime_error("cannot decode raw receipts in block: " + std::to_string(block_number));
}
SILK_TRACE << "#receipts: " << receipts.size();
if (receipts.empty()) {
co_return receipts;
}

auto log_key = silkworm::db::log_key(block_number, 0);
SILK_DEBUG << "log_key: " << silkworm::to_hex(log_key);
WorkUnit work = [&](silkworm::Bytes k, silkworm::Bytes v, WorkerChannel& result_channel) {
if (k.size() != sizeof(uint64_t) + sizeof(uint32_t)) {
result_channel.try_send(false);
return;
}
auto tx_id = endian::load_big_u32(&k[sizeof(uint64_t)]);
const bool decode_ok{cbor_decode(v, receipts[tx_id].logs)};
if (!decode_ok) {
SILK_WARN << "cannot decode logs for receipt: " << tx_id << " in block: " << block_number;
result_channel.try_send(false);
return;
}
receipts[tx_id].bloom = bloom_from_logs(receipts[tx_id].logs);
SILK_DEBUG << "#receipts[" << tx_id << "].logs: " << receipts[tx_id].logs.size();
result_channel.try_send(true);
};
co_await reader.walk_worker(db::table::kLogsName, log_key, 8 * CHAR_BIT, Worker{work, worker_pool}, receipts.size());

co_return receipts;
}

Task<std::optional<Receipts>> read_receipts(const DatabaseReader& reader, const silkworm::BlockWithHash& block_with_hash) {
const evmc::bytes32 block_hash = block_with_hash.hash;
uint64_t block_number = block_with_hash.block.header.number;
Expand Down Expand Up @@ -263,6 +304,61 @@ Task<std::optional<Receipts>> read_receipts(const DatabaseReader& reader, const
co_return receipts;
}

Task<std::optional<Receipts>> read_receipts2(const DatabaseReader& reader, const silkworm::BlockWithHash& block_with_hash, boost::asio::thread_pool& worker_pool) {
const evmc::bytes32 block_hash = block_with_hash.hash;
uint64_t block_number = block_with_hash.block.header.number;
const auto raw_receipts = co_await read_raw_receipts2(reader, block_number, worker_pool);
if (!raw_receipts || raw_receipts->empty()) {
co_return raw_receipts;
}
auto receipts = *raw_receipts;

// Add derived fields to the receipts
auto transactions = block_with_hash.block.transactions;
SILK_DEBUG << "#transactions=" << block_with_hash.block.transactions.size() << " #receipts=" << receipts.size();
if (transactions.size() != receipts.size()) {
throw std::runtime_error{"#transactions and #receipts do not match in read_receipts"};
}
uint32_t log_index{0};
for (size_t i{0}; i < receipts.size(); i++) {
// The tx hash can be calculated by the tx content itself
auto tx_hash{transactions[i].hash()};
receipts[i].tx_hash = silkworm::to_bytes32(full_view(tx_hash.bytes));
receipts[i].tx_index = uint32_t(i);

receipts[i].block_hash = block_hash;
receipts[i].block_number = block_number;

// When tx receiver is not set, create a contract with address depending on tx sender and its nonce
if (!transactions[i].to.has_value()) {
receipts[i].contract_address = create_address(*transactions[i].sender(), transactions[i].nonce);
}

// The gas used can be calculated by the previous receipt
if (i == 0) {
receipts[i].gas_used = receipts[i].cumulative_gas_used;
} else {
receipts[i].gas_used = receipts[i].cumulative_gas_used - receipts[i - 1].cumulative_gas_used;
}

receipts[i].from = transactions[i].sender();
receipts[i].to = transactions[i].to;
receipts[i].type = static_cast<uint8_t>(transactions[i].type);

// The derived fields of receipt are taken from block and transaction
for (size_t j{0}; j < receipts[i].logs.size(); j++) {
receipts[i].logs[j].block_number = block_number;
receipts[i].logs[j].block_hash = block_hash;
receipts[i].logs[j].tx_hash = receipts[i].tx_hash;
receipts[i].logs[j].tx_index = uint32_t(i);
receipts[i].logs[j].index = log_index++;
receipts[i].logs[j].removed = false;
}
}

co_return receipts;
}

Task<intx::uint256> read_total_issued(const core::rawdb::DatabaseReader& reader, BlockNum block_number) {
const auto block_key = silkworm::db::block_key(block_number);
const auto value = co_await reader.get_one(db::table::kIssuanceName, block_key);
Expand Down
2 changes: 2 additions & 0 deletions silkworm/rpc/core/rawdb/chain.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ Task<evmc::bytes32> read_head_header_hash(const DatabaseReader& reader);
Task<uint64_t> read_cumulative_transaction_count(const DatabaseReader& reader, BlockNum block_number);

Task<std::optional<Receipts>> read_raw_receipts(const DatabaseReader& reader, BlockNum block_number);
Task<std::optional<Receipts>> read_raw_receipts2(const DatabaseReader& reader, BlockNum block_number, boost::asio::thread_pool& worker_pool);

Task<std::optional<Receipts>> read_receipts(const DatabaseReader& reader, const silkworm::BlockWithHash& block_with_hash);
Task<std::optional<Receipts>> read_receipts2(const DatabaseReader& reader, const silkworm::BlockWithHash& block_with_hash, boost::asio::thread_pool& worker_pool);

Task<intx::uint256> read_total_issued(const core::rawdb::DatabaseReader& reader, BlockNum block_number);

Expand Down
13 changes: 13 additions & 0 deletions silkworm/rpc/core/receipts.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,17 @@ Task<Receipts> get_receipts(const core::rawdb::DatabaseReader& db_reader, const
co_return Receipts{};
}

Task<Receipts> get_receipts2(const core::rawdb::DatabaseReader& db_reader, const silkworm::BlockWithHash& block_with_hash, boost::asio::thread_pool& worker_pool) {
const auto cached_receipts = co_await core::rawdb::read_receipts2(db_reader, block_with_hash, worker_pool);
if (cached_receipts) {
co_return *cached_receipts;
}

// If not already present, retrieve receipts by executing transactions
// auto block = co_await core::rawdb::read_block(db_reader, hash, number);
// TODO(canepat): implement
SILK_WARN << "retrieve receipts by executing transactions NOT YET IMPLEMENTED";
co_return Receipts{};
}

} // namespace silkworm::rpc::core
1 change: 1 addition & 0 deletions silkworm/rpc/core/receipts.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,6 @@
namespace silkworm::rpc::core {

Task<Receipts> get_receipts(const rawdb::DatabaseReader& db_reader, const silkworm::BlockWithHash& block_with_hash);
Task<Receipts> get_receipts2(const rawdb::DatabaseReader& db_reader, const silkworm::BlockWithHash& block_with_hash, boost::asio::thread_pool& worker_pool);

} // namespace silkworm::rpc::core
4 changes: 4 additions & 0 deletions silkworm/rpc/core/remote_state_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ TEST_CASE("async remote buffer", "[rpc][core][remote_buffer]") {
[[nodiscard]] Task<void> walk(const std::string& /*table*/, silkworm::ByteView /*start_key*/, uint32_t /*fixed_bits*/, core::rawdb::Walker /*w*/) const override {
co_return;
}
[[nodiscard]] Task<void> walk_worker(const std::string& /*table*/, silkworm::ByteView /*start_key*/, uint32_t /*fixed_bits*/, core::rawdb::Worker /*w*/,
uint32_t /* max_size */) const override {
co_return;
}
[[nodiscard]] Task<void> for_prefix(const std::string& /*table*/, silkworm::ByteView /*prefix*/, core::rawdb::Walker /*w*/) const override {
co_return;
}
Expand Down
9 changes: 9 additions & 0 deletions silkworm/rpc/ethdb/kv/cached_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@ Task<void> CachedDatabase::walk(
co_await txn_database_.walk(table, start_key, fixed_bits, w);
}

Task<void> CachedDatabase::walk_worker(
const std::string& table,
silkworm::ByteView start_key,
uint32_t fixed_bits,
core::rawdb::Worker w,
uint32_t max_size) const {
co_await txn_database_.walk_worker(table, start_key, fixed_bits, w, max_size);
}

Task<void> CachedDatabase::for_prefix(
const std::string& table,
silkworm::ByteView prefix,
Expand Down
6 changes: 6 additions & 0 deletions silkworm/rpc/ethdb/kv/cached_database.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ class CachedDatabase : public core::rawdb::DatabaseReader {
silkworm::ByteView start_key,
uint32_t fixed_bits,
core::rawdb::Walker w) const override;
Task<void> walk_worker(
const std::string& table,
silkworm::ByteView start_key,
uint32_t fixed_bits,
core::rawdb::Worker w,
uint32_t max_size) const override;

Task<void> for_prefix(
const std::string& table,
Expand Down
44 changes: 44 additions & 0 deletions silkworm/rpc/ethdb/transaction_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include <climits>
#include <exception>

#include <boost/asio/post.hpp>

#include <silkworm/infra/common/log.hpp>
#include <silkworm/rpc/common/util.hpp>

Expand Down Expand Up @@ -82,6 +84,48 @@ Task<void> TransactionDatabase::walk(const std::string& table, ByteView start_ke
co_return;
}

Task<void> TransactionDatabase::walk_worker(const std::string& table, ByteView start_key, uint32_t fixed_bits, core::rawdb::Worker w, uint32_t max_records) const {
const auto fixed_bytes = (fixed_bits + 7) / CHAR_BIT;
SILK_TRACE << "TransactionDatabase::walk fixed_bits: " << fixed_bits << " fixed_bytes: " << fixed_bytes;
const auto shift_bits = fixed_bits & 7;
uint8_t mask{0xff};
if (shift_bits != 0) {
mask = static_cast<uint8_t>(0xff << (CHAR_BIT - shift_bits));
}
SILK_TRACE << "mask: " << std::hex << std::setw(2) << std::setfill('0') << static_cast<int>(mask) << std::dec;

const auto cursor = co_await tx_.cursor(table);
SILK_TRACE << "TransactionDatabase::walk cursor_id: " << cursor->cursor_id();
auto kv_pair = co_await cursor->seek(start_key);
auto k = kv_pair.key;
auto v = kv_pair.value;
SILK_TRACE << "k: " << k << " v: " << v;
core::rawdb::WorkerChannel result_channel{co_await ThisTask::executor, max_records};
std::size_t worker_count{0};
while (
!k.empty() &&
k.size() >= fixed_bytes &&
(fixed_bits == 0 || (k.compare(0, fixed_bytes - 1, start_key, 0, fixed_bytes - 1) == 0 && (k[fixed_bytes - 1] & mask) == (start_key[fixed_bytes - 1] & mask)))) {
boost::asio::post(w.second, [&, k = std::move(k), v = std::move(v)]() { w.first(k, v, result_channel); });
kv_pair = co_await cursor->next();
k = kv_pair.key;
v = kv_pair.value;
++worker_count;

if (worker_count >= max_records) {
break;
}
}

while (worker_count) {
const bool ok{co_await result_channel.receive()};
SILK_TRACE << "TransactionDatabase::walk worker result: " << ok;
--worker_count;
}

co_return;
}

Task<void> TransactionDatabase::for_prefix(const std::string& table, ByteView prefix, core::rawdb::Walker w) const {
const auto cursor = co_await tx_.cursor(table);
SILK_TRACE << "TransactionDatabase::for_prefix cursor_id: " << cursor->cursor_id() << " prefix: " << silkworm::to_hex(prefix);
Expand Down
1 change: 1 addition & 0 deletions silkworm/rpc/ethdb/transaction_database.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class TransactionDatabase : public core::rawdb::DatabaseReader {
[[nodiscard]] Task<std::optional<Bytes>> get_both_range(const std::string& table, ByteView key, ByteView subkey) const override;

[[nodiscard]] Task<void> walk(const std::string& table, ByteView start_key, uint32_t fixed_bits, core::rawdb::Walker w) const override;
[[nodiscard]] Task<void> walk_worker(const std::string& table, ByteView start_key, uint32_t fixed_bits, core::rawdb::Worker w, uint32_t max_records) const override;

[[nodiscard]] Task<void> for_prefix(const std::string& table, ByteView prefix, core::rawdb::Walker w) const override;

Expand Down
2 changes: 2 additions & 0 deletions silkworm/rpc/test/mock_database_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class MockDatabaseReader : public core::rawdb::DatabaseReader {
(const std::string&, silkworm::ByteView, silkworm::ByteView), (const));
MOCK_METHOD((Task<void>), walk, (const std::string&, silkworm::ByteView, uint32_t, core::rawdb::Walker),
(const));
MOCK_METHOD((Task<void>), walk_worker, (const std::string&, silkworm::ByteView, uint32_t, core::rawdb::Worker, uint32_t max_records),
(const));
MOCK_METHOD((Task<void>), for_prefix, (const std::string&, silkworm::ByteView, core::rawdb::Walker),
(const));
};
Expand Down
Loading