Skip to content

Commit

Permalink
db: state change stream and cache on generic KV API (#2168)
Browse files Browse the repository at this point in the history
  • Loading branch information
canepat authored Jul 12, 2024
1 parent 88b09ff commit b0ecf82
Show file tree
Hide file tree
Showing 29 changed files with 1,329 additions and 602 deletions.
30 changes: 27 additions & 3 deletions silkworm/db/kv/api/direct_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,45 @@

#include "direct_service.hpp"

namespace silkworm::db::kv::api {
#include <gsl/util>

#include "endpoint/state_changes_call.hpp"

DirectService::DirectService() = default;
namespace silkworm::db::kv::api {

// rpc Version(google.protobuf.Empty) returns (types.VersionReply);
Task<Version> DirectService::version() {
co_return kCurrentVersion;
}

// rpc Tx(stream Cursor) returns (stream Pair);
Task<std::unique_ptr<db::kv::api::Transaction>> DirectService::begin_transaction() {
Task<std::unique_ptr<Transaction>> DirectService::begin_transaction() {
// TODO(canepat) implement
co_return nullptr;
}

// rpc StateChanges(StateChangeRequest) returns (stream StateChangeBatch);
Task<void> DirectService::state_changes(const api::StateChangeOptions& options, api::StateChangeConsumer consumer) {
auto executor = co_await ThisTask::executor;
api::StateChangesCall call{options, executor};

auto unsubscribe_signal = call.unsubscribe_signal();
[[maybe_unused]] auto _ = gsl::finally([=]() { unsubscribe_signal->notify(); });

co_await router_.state_changes_calls_channel.send(call);
auto channel = co_await call.result();

// Loop until stream completed (i.e. no message received) or cancelled exception
bool stream_completed{false};
while (!stream_completed) {
auto message = co_await channel->receive();
if (!message) {
stream_completed = true;
}
co_await consumer(std::move(message));
}
}

/** Temporal Point Queries **/

// rpc HistoryGet(HistoryGetReq) returns (HistoryGetReply);
Expand Down
11 changes: 9 additions & 2 deletions silkworm/db/kv/api/direct_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
#pragma once

#include "service.hpp"
#include "service_router.hpp"

namespace silkworm::db::kv::api {

//! Straightforward asynchronous implementation of KV API service relying on \code Domains.
//! This is used both client-side by 'direct' (i.e. no-gRPC) implementation and server-side by gRPC server.
class DirectService : public Service {
public:
explicit DirectService();
explicit DirectService(ServiceRouter router) : router_(router) {}
~DirectService() override = default;

DirectService(const DirectService&) = delete;
Expand All @@ -37,7 +38,10 @@ class DirectService : public Service {
Task<Version> version() override;

// rpc Tx(stream Cursor) returns (stream Pair);
Task<std::unique_ptr<db::kv::api::Transaction>> begin_transaction() override;
Task<std::unique_ptr<Transaction>> begin_transaction() override;

// rpc StateChanges(StateChangeRequest) returns (stream StateChangeBatch);
Task<void> state_changes(const StateChangeOptions&, StateChangeConsumer) override;

/** Temporal Point Queries **/

Expand All @@ -57,6 +61,9 @@ class DirectService : public Service {

// rpc DomainRange(DomainRangeReq) returns (Pairs);
Task<DomainRangeResult> get_domain_range(const DomainRangeQuery&) override;

private:
ServiceRouter router_;
};

} // namespace silkworm::db::kv::api
66 changes: 66 additions & 0 deletions silkworm/db/kv/api/direct_service_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
Copyright 2024 The Silkworm Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#include "direct_service.hpp"

#include <catch2/catch_test_macros.hpp>

#include <silkworm/db/test_util/kv_test_base.hpp>
#include <silkworm/infra/test_util/fixture.hpp>

namespace silkworm::db::kv::api {

using namespace silkworm::test_util;

struct DirectServiceTest : public test_util::KVTestBase {
Task<void> consumer(std::optional<StateChangeSet> change_set) {
if (!change_set) co_return;
change_set_vector.push_back(*change_set);
}

StateChangeChannelPtr channel{std::make_shared<StateChangeChannel>(io_context_.get_executor())};
concurrency::Channel<StateChangesCall> state_changes_calls_channel{io_context_.get_executor()};
DirectService service{ServiceRouter{state_changes_calls_channel}};
std::vector<StateChangeSet> change_set_vector;
};

TEST_CASE_METHOD(DirectServiceTest, "state_changes: state change sets", "[db][kv][api][direct_service]") {
const std::vector<std::vector<StateChangeSet>> fixtures{
{},
{StateChangeSet{}},
{StateChangeSet{}, StateChangeSet{}},
{StateChangeSet{}, StateChangeSet{}, StateChangeSet{}},
};
auto state_changes_future = spawn(service.state_changes(StateChangeOptions{}, [this](auto cs) -> Task<void> {
co_await consumer(cs);
}));
for (const auto& expected_change_sets : fixtures) {
SECTION("expected_change_sets size=" + std::to_string(expected_change_sets.size())) {
spawn_and_wait([&]() -> Task<void> {
auto state_changes_call = co_await state_changes_calls_channel.receive();
state_changes_call.set_result(channel);
for (const auto& change_set : expected_change_sets) {
co_await channel->send(change_set);
}
});
spawn(channel->send({}));
CHECK_NOTHROW(state_changes_future.get());
CHECK(change_set_vector == expected_change_sets);
}
}
}

} // namespace silkworm::db::kv::api
124 changes: 124 additions & 0 deletions silkworm/db/kv/api/endpoint/state_change.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
Copyright 2024 The Silkworm Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#pragma once

#include <functional>
#include <optional>
#include <vector>

#include <silkworm/infra/concurrency/task.hpp>

#include <silkworm/core/common/base.hpp>
#include <silkworm/core/common/bytes.hpp>
#include <silkworm/core/types/hash.hpp>
#include <silkworm/infra/concurrency/cancellation_token.hpp>

#include "common.hpp"

namespace silkworm::db::kv::api {

struct StateChangeOptions {
bool with_storage{false};
bool with_transactions{false};
CancellationToken* cancellation_token{nullptr};
};

enum Action : uint8_t {
kStorage,
kUpsert,
kCode,
kUpsertCode,
kRemove,
};

struct StorageChange {
Hash location;
Bytes data;
};
using StorageChangeSequence = std::vector<StorageChange>;

struct AccountChange {
evmc::address address;
uint64_t incarnation{0};
Action change_type{kStorage};
Bytes data;
Bytes code;
StorageChangeSequence storage_changes;
};
using AccountChangeSequence = std::vector<AccountChange>;

enum Direction : uint8_t {
kForward,
kUnwind,
};

struct StateChange {
Direction direction{kForward};
BlockNum block_height{0};
Hash block_hash;
AccountChangeSequence account_changes;
ListOfBytes rlp_txs; // Enabled using StateChangeOptions::with_transactions=true
};
using StateChangeSequence = std::vector<StateChange>;

struct StateChangeSet {
uint64_t state_version_id{0}; // Unique id of MDBX write transaction where this changes happened
uint64_t pending_block_base_fee{0}; // Base fee of the next block to be produced
uint64_t block_gas_limit{0}; // Gas limit of the latest block (proxy for the gas limit of the next block to be produced)
BlockNum finalized_block{0};
uint64_t pending_blob_fee_per_gas{0}; // Base blob fee for the next block to be produced
StateChangeSequence state_changes;
};

using StateChangeConsumer = std::function<Task<void>(std::optional<StateChangeSet>)>;

inline bool operator==(const StorageChange& lhs, const StorageChange& rhs) {
if (lhs.location != rhs.location) return false;
if (lhs.data != rhs.data) return false;
return true;
}

inline bool operator==(const AccountChange& lhs, const AccountChange& rhs) {
if (lhs.address != rhs.address) return false;
if (lhs.incarnation != rhs.incarnation) return false;
if (lhs.change_type != rhs.change_type) return false;
if (lhs.data != rhs.data) return false;
if (lhs.code != rhs.code) return false;
if (lhs.storage_changes != rhs.storage_changes) return false;
return true;
}

inline bool operator==(const StateChange& lhs, const StateChange& rhs) {
if (lhs.direction != rhs.direction) return false;
if (lhs.block_height != rhs.block_height) return false;
if (lhs.block_hash != rhs.block_hash) return false;
if (lhs.account_changes != rhs.account_changes) return false;
if (lhs.rlp_txs != rhs.rlp_txs) return false;
return true;
}

inline bool operator==(const StateChangeSet& lhs, const StateChangeSet& rhs) {
if (lhs.state_version_id != rhs.state_version_id) return false;
if (lhs.pending_block_base_fee != rhs.pending_block_base_fee) return false;
if (lhs.block_gas_limit != rhs.block_gas_limit) return false;
if (lhs.finalized_block != rhs.finalized_block) return false;
if (lhs.pending_blob_fee_per_gas != rhs.pending_blob_fee_per_gas) return false;
if (lhs.state_changes != rhs.state_changes) return false;
return true;
}

} // namespace silkworm::db::kv::api
68 changes: 68 additions & 0 deletions silkworm/db/kv/api/endpoint/state_changes_call.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
Copyright 2024 The Silkworm Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#pragma once

#include <silkworm/infra/concurrency/task.hpp>

#include <boost/asio/any_io_executor.hpp>

#include <silkworm/infra/concurrency/awaitable_future.hpp>
#include <silkworm/infra/concurrency/channel.hpp>
#include <silkworm/infra/concurrency/event_notifier.hpp>
#include <silkworm/sentry/api/common/message_from_peer.hpp>
#include <silkworm/sentry/api/common/message_id_set.hpp>

#include "state_change.hpp"

namespace silkworm::db::kv::api {

using StateChangeChannel = concurrency::Channel<std::optional<StateChangeSet>>;
using StateChangeChannelPtr = std::shared_ptr<StateChangeChannel>;

class StateChangesCall final {
public:
using StateChangeChannelPromise = concurrency::AwaitablePromise<StateChangeChannelPtr>;

StateChangesCall(StateChangeOptions options, const boost::asio::any_io_executor& executor)
: options_(options),
channel_promise_(std::make_shared<StateChangeChannelPromise>(executor)),
unsubscribe_signal_(std::make_shared<concurrency::EventNotifier>(executor)) {}

StateChangesCall() = default;

[[nodiscard]] const StateChangeOptions& options() const { return options_; }

Task<StateChangeChannelPtr> result() {
auto future = channel_promise_->get_future();
co_return co_await future.get_async();
}

void set_result(StateChangeChannelPtr channel) {
channel_promise_->set_value(std::move(channel));
}

[[nodiscard]] std::shared_ptr<concurrency::EventNotifier> unsubscribe_signal() const {
return unsubscribe_signal_;
}

private:
StateChangeOptions options_;
std::shared_ptr<StateChangeChannelPromise> channel_promise_;
std::shared_ptr<concurrency::EventNotifier> unsubscribe_signal_;
};

} // namespace silkworm::db::kv::api
Loading

0 comments on commit b0ecf82

Please sign in to comment.