From 84983ea4ed153f965010d3a2f54cb3f235302e8c Mon Sep 17 00:00:00 2001 From: lupin012 <58134934+lupin012@users.noreply.github.com> Date: Sat, 13 Jan 2024 00:53:17 +0100 Subject: [PATCH] rpcdaemon: refactor stream data types (#1752) Co-authored-by: canepat <16927169+canepat@users.noreply.github.com> --- silkworm/rpc/commands/eth_api_test.cpp | 16 +-- silkworm/rpc/commands/rpc_api_test.cpp | 4 +- .../http/{channel_writer.hpp => channel.hpp} | 11 +- silkworm/rpc/http/connection.cpp | 46 ++++---- silkworm/rpc/http/connection.hpp | 11 +- silkworm/rpc/http/request_handler.cpp | 50 ++++----- silkworm/rpc/http/request_handler.hpp | 18 ++- silkworm/rpc/json/stream.hpp | 5 +- silkworm/rpc/json/stream_test.cpp | 6 +- silkworm/rpc/test/api_test_database.hpp | 33 +++--- silkworm/rpc/types/writer.cpp | 76 +------------ silkworm/rpc/types/writer.hpp | 46 ++------ silkworm/rpc/types/writer_test.cpp | 105 +++++++++++++++--- 13 files changed, 202 insertions(+), 225 deletions(-) rename silkworm/rpc/http/{channel_writer.hpp => channel.hpp} (85%) diff --git a/silkworm/rpc/commands/eth_api_test.cpp b/silkworm/rpc/commands/eth_api_test.cpp index d2c924c4d2..eee04cd1e9 100644 --- a/silkworm/rpc/commands/eth_api_test.cpp +++ b/silkworm/rpc/commands/eth_api_test.cpp @@ -26,7 +26,7 @@ namespace silkworm::rpc::commands { #ifndef SILKWORM_SANITIZE TEST_CASE_METHOD(test::RpcApiE2ETest, "unit: eth_blockNumber succeeds if request well-formed", "[rpc][api]") { const auto request = R"({"jsonrpc":"2.0","id":1,"method":"eth_blockNumber","params":[]})"_json; - ChannelWriter::Response reply; + Channel::Response reply; run<&test::RequestHandler_ForTest::request_and_create_reply>(request, reply); CHECK(nlohmann::json::parse(reply.content) == R"({ "jsonrpc":"2.0", @@ -37,7 +37,7 @@ TEST_CASE_METHOD(test::RpcApiE2ETest, "unit: eth_blockNumber succeeds if request TEST_CASE_METHOD(test::RpcApiE2ETest, "unit: eth_blockNumber fails if request empty", "[rpc][api]") { const auto request = R"({})"_json; - ChannelWriter::Response reply; + Channel::Response reply; run<&test::RequestHandler_ForTest::request_and_create_reply>(request, reply); CHECK(nlohmann::json::parse(reply.content) == R"({ "jsonrpc":"2.0", @@ -53,7 +53,7 @@ TEST_CASE_METHOD(test::RpcApiE2ETest, "unit: eth_sendRawTransaction fails rlp pa "method": "eth_sendRawTransaction", "params": ["0xd46ed67c5d32be8d46e8dd67c5d32be8058bb8eb970870f072445675058bb8eb970870f0724456"] })"_json; - ChannelWriter::Response reply; + Channel::Response reply; run<&test::RequestHandler_ForTest::request_and_create_reply>(request, reply); CHECK(nlohmann::json::parse(reply.content) == R"({ "jsonrpc":"2.0", @@ -69,7 +69,7 @@ TEST_CASE_METHOD(test::RpcApiE2ETest, "unit: eth_sendRawTransaction fails wrong "method": "eth_sendRawTransaction", "params": ["0xd46ed67c5d32be8d46e8dd67c5d32be8058bb8eb970870f072445675058bb8eb970870f072445"] })"_json; - ChannelWriter::Response reply; + Channel::Response reply; run<&test::RequestHandler_ForTest::request_and_create_reply>(request, reply); CHECK(nlohmann::json::parse(reply.content) == R"({ "jsonrpc":"2.0", @@ -80,7 +80,7 @@ TEST_CASE_METHOD(test::RpcApiE2ETest, "unit: eth_sendRawTransaction fails wrong TEST_CASE_METHOD(test::RpcApiE2ETest, "unit: eth_feeHistory succeeds if request well-formed", "[rpc][api]") { const auto request = R"({"jsonrpc":"2.0","id":1,"method":"eth_feeHistory","params":["0x1","0x867A80",[25,75]]})"_json; - ChannelWriter::Response reply; + Channel::Response reply; run<&test::RequestHandler_ForTest::request_and_create_reply>(request, reply); CHECK(nlohmann::json::parse(reply.content) == R"({ "jsonrpc":"2.0", @@ -91,7 +91,7 @@ TEST_CASE_METHOD(test::RpcApiE2ETest, "unit: eth_feeHistory succeeds if request TEST_CASE_METHOD(test::RpcApiE2ETest, "fuzzy: eth_call invalid params", "[rpc][api]") { const auto request = R"({"jsonrpc":"2.0","id":1,"method":"eth_call","params":[{}, "latest"]})"_json; - ChannelWriter::Response reply; + Channel::Response reply; run<&test::RequestHandler_ForTest::request_and_create_reply>(request, reply); CHECK(nlohmann::json::parse(reply.content) == R"({ "jsonrpc":"2.0", @@ -102,7 +102,7 @@ TEST_CASE_METHOD(test::RpcApiE2ETest, "fuzzy: eth_call invalid params", "[rpc][a TEST_CASE_METHOD(test::RpcApiE2ETest, "fuzzy: eth_feeHistory sigsegv invalid input", "[rpc][api]") { const auto request = R"({"jsonrpc":"2.0","id":1,"method":"eth_feeHistory","params":["5x1","0x2",[95,99]]})"_json; - ChannelWriter::Response reply; + Channel::Response reply; run<&test::RequestHandler_ForTest::request_and_create_reply>(request, reply); CHECK(nlohmann::json::parse(reply.content) == R"({ "jsonrpc":"2.0", @@ -113,7 +113,7 @@ TEST_CASE_METHOD(test::RpcApiE2ETest, "fuzzy: eth_feeHistory sigsegv invalid inp TEST_CASE_METHOD(test::RpcApiE2ETest, "fuzzy: eth_feeHistory sigsegv valid input", "[rpc][api]") { const auto request = R"({"jsonrpc":"2.0","id":1,"method":"eth_feeHistory","params":["0x5","0x2",[95,99]]})"_json; - ChannelWriter::Response reply; + Channel::Response reply; run<&test::RequestHandler_ForTest::request_and_create_reply>(request, reply); CHECK(nlohmann::json::parse(reply.content) == R"({ "jsonrpc":"2.0", diff --git a/silkworm/rpc/commands/rpc_api_test.cpp b/silkworm/rpc/commands/rpc_api_test.cpp index f2bab773e1..e9aac779e5 100644 --- a/silkworm/rpc/commands/rpc_api_test.cpp +++ b/silkworm/rpc/commands/rpc_api_test.cpp @@ -107,7 +107,7 @@ TEST_CASE("rpc_api io (all files)", "[rpc][rpc_api]") { auto request = nlohmann::json::parse(line_out.substr(3)); auto expected = nlohmann::json::parse(line_in.substr(3)); - ChannelWriter::Response response; + Channel::Response response; test_base.run<&test::RequestHandler_ForTest::request_and_create_reply>(request, response); INFO("Request: " << request.dump()) INFO("Actual response: " << response.content) @@ -131,7 +131,7 @@ TEST_CASE("rpc_api io (individual)", "[rpc][rpc_api][ignore]") { SECTION("sample test") { auto request = R"({"jsonrpc":"2.0","id":1,"method":"debug_getRawTransaction","params":["0x74e41d593675913d6d5521f46523f1bd396dff1891bdb35f59be47c7e5e0b34b"]})"_json; - ChannelWriter::Response response; + Channel::Response response; test_base.run<&test::RequestHandler_ForTest::request_and_create_reply>(request, response); CHECK(nlohmann::json::parse(response.content) == R"({"jsonrpc":"2.0","id":1,"result":"0xf8678084342770c182520894658bdf435d810c91414ec09147daa6db624063798203e880820a95a0af5fc351b9e457a31f37c84e5cd99dd3c5de60af3de33c6f4160177a2c786a60a0201da7a21046af55837330a2c52fc1543cd4d9ead00ddf178dd96935b607ff9b"})"_json); diff --git a/silkworm/rpc/http/channel_writer.hpp b/silkworm/rpc/http/channel.hpp similarity index 85% rename from silkworm/rpc/http/channel_writer.hpp rename to silkworm/rpc/http/channel.hpp index 2a85a8f3c9..4fbe540d3e 100644 --- a/silkworm/rpc/http/channel_writer.hpp +++ b/silkworm/rpc/http/channel.hpp @@ -20,7 +20,7 @@ namespace silkworm::rpc { -class ChannelWriter : public Writer { +class Channel : public StreamWriter { public: enum class ResponseStatus { processing_continue, @@ -47,10 +47,13 @@ class ChannelWriter : public Writer { std::string content; }; - ChannelWriter() = default; - ChannelWriter(const ChannelWriter&) = delete; - ChannelWriter& operator=(const ChannelWriter&) = delete; + Channel() = default; + ~Channel() override = default; + Channel(const Channel&) = delete; + Channel& operator=(const Channel&) = delete; + + virtual Task open_stream() = 0; virtual Task write_rsp(Response& response) = 0; }; diff --git a/silkworm/rpc/http/connection.cpp b/silkworm/rpc/http/connection.cpp index d04582daaf..2cf7fcd871 100644 --- a/silkworm/rpc/http/connection.cpp +++ b/silkworm/rpc/http/connection.cpp @@ -123,7 +123,7 @@ Connection::handle_request(Request& request) { } } -StatusType Connection::get_http_status(ChannelWriter::ResponseStatus status) { +StatusType Connection::get_http_status(Channel::ResponseStatus status) { switch (status) { case ResponseStatus::processing_continue: return StatusType::processing_continue; @@ -131,33 +131,33 @@ StatusType Connection::get_http_status(ChannelWriter::ResponseStatus status) { return StatusType::ok; case ResponseStatus::created: return StatusType::created; - case ChannelWriter::ResponseStatus::accepted: + case ResponseStatus::accepted: return StatusType::accepted; - case ChannelWriter::ResponseStatus::no_content: + case ResponseStatus::no_content: return StatusType::no_content; - case ChannelWriter::ChannelWriter::ResponseStatus::multiple_choices: + case ResponseStatus::multiple_choices: return StatusType::multiple_choices; - case ChannelWriter::ChannelWriter::ResponseStatus::moved_permanently: + case ResponseStatus::moved_permanently: return StatusType::moved_permanently; - case ChannelWriter::ChannelWriter::ResponseStatus::moved_temporarily: + case ResponseStatus::moved_temporarily: return StatusType::moved_temporarily; - case ChannelWriter::ChannelWriter::ResponseStatus::not_modified: + case ResponseStatus::not_modified: return StatusType::not_modified; - case ChannelWriter::ChannelWriter::ResponseStatus::bad_request: + case ResponseStatus::bad_request: return StatusType::bad_request; - case ChannelWriter::ChannelWriter::ResponseStatus::unauthorized: + case ResponseStatus::unauthorized: return StatusType::unauthorized; - case ChannelWriter::ResponseStatus::forbidden: + case ResponseStatus::forbidden: return StatusType::forbidden; - case ChannelWriter::ResponseStatus::not_found: + case ResponseStatus::not_found: return StatusType::not_found; - case ChannelWriter::ResponseStatus::internal_server_error: + case ResponseStatus::internal_server_error: return StatusType::internal_server_error; - case ChannelWriter::ResponseStatus::not_implemented: + case ResponseStatus::not_implemented: return StatusType::not_implemented; - case ChannelWriter::ResponseStatus::bad_gateway: + case ResponseStatus::bad_gateway: return StatusType::bad_gateway; - case ChannelWriter::ResponseStatus::service_unavailable: + case ResponseStatus::service_unavailable: return StatusType::service_unavailable; default: return StatusType::internal_server_error; @@ -173,13 +173,13 @@ Connection::write_rsp(Response& msg_response) { co_await do_write(reply); } +Task Connection::open_stream() { + co_await write_headers(); +} + Task Connection::write(std::string_view content) { - if (first_chunk_) { - co_await write_headers(); - first_chunk_ = false; - } const auto bytes_transferred = co_await boost::asio::async_write(socket_, boost::asio::buffer(content), boost::asio::use_awaitable); - SILK_TRACE << "SocketWriter::write bytes_transferred: " << bytes_transferred; + SILK_TRACE << "Connection::write bytes_transferred: " << bytes_transferred; co_return bytes_transferred; } @@ -200,7 +200,7 @@ static constexpr size_t kCorsNumHeaders{4}; Task Connection::do_write(Reply& reply) { try { - SILK_DEBUG << "RequestHandler::do_write reply: " << reply.content; + SILK_DEBUG << "Connection::do_write reply: " << reply.content; reply.headers.reserve(allowed_origins_.empty() ? 2 : 2 + kCorsNumHeaders); reply.headers.emplace_back(http::Header{"Content-Length", std::to_string(reply.content.size())}); @@ -209,7 +209,7 @@ Task Connection::do_write(Reply& reply) { set_cors(reply.headers); const auto bytes_transferred = co_await boost::asio::async_write(socket_, reply.to_buffers(), boost::asio::use_awaitable); - SILK_TRACE << "RequestHandler::do_write bytes_transferred: " << bytes_transferred; + SILK_TRACE << "Connection::do_write bytes_transferred: " << bytes_transferred; } catch (const boost::system::system_error& se) { std::rethrow_exception(std::make_exception_ptr(se)); } catch (const std::exception& e) { @@ -229,7 +229,7 @@ Task Connection::write_headers() { auto buffers = http::to_buffers(StatusType::ok, headers); const auto bytes_transferred = co_await boost::asio::async_write(socket_, buffers, boost::asio::use_awaitable); - SILK_TRACE << "RequestHandler::write_headers bytes_transferred: " << bytes_transferred; + SILK_TRACE << "Connection::write_headers bytes_transferred: " << bytes_transferred; } catch (const std::system_error& se) { std::rethrow_exception(std::make_exception_ptr(se)); } catch (const std::exception& e) { diff --git a/silkworm/rpc/http/connection.hpp b/silkworm/rpc/http/connection.hpp index 2839b1bd90..778b4e80e8 100644 --- a/silkworm/rpc/http/connection.hpp +++ b/silkworm/rpc/http/connection.hpp @@ -33,7 +33,7 @@ #include #include -#include +#include #include #include #include @@ -42,7 +42,7 @@ namespace silkworm::rpc::http { //! Represents a single connection from a client. -class Connection : public ChannelWriter { +class Connection : public Channel { public: Connection(const Connection&) = delete; Connection& operator=(const Connection&) = delete; @@ -53,7 +53,6 @@ class Connection : public ChannelWriter { commands::RpcApiTable& handler_table, const std::vector& allowed_origins, std::optional jwt_secret); - ~Connection() override; boost::asio::ip::tcp::socket& socket() { return socket_; } @@ -62,7 +61,9 @@ class Connection : public ChannelWriter { Task read_loop(); Task write_rsp(Response& response) override; + Task open_stream() override; Task write(std::string_view content) override; + Task close() override { co_return; } private: using AuthorizationError = std::string; @@ -78,7 +79,7 @@ class Connection : public ChannelWriter { Task write_headers(); - static StatusType get_http_status(ChannelWriter::ResponseStatus status); + static StatusType get_http_status(Channel::ResponseStatus status); //! Perform an asynchronous read operation. Task do_read(); @@ -108,8 +109,6 @@ class Connection : public ChannelWriter { const std::vector& allowed_origins_; const std::optional jwt_secret_; - - bool first_chunk_{true}; }; } // namespace silkworm::rpc::http diff --git a/silkworm/rpc/http/request_handler.cpp b/silkworm/rpc/http/request_handler.cpp index 6dd45d373b..73d56160a8 100644 --- a/silkworm/rpc/http/request_handler.cpp +++ b/silkworm/rpc/http/request_handler.cpp @@ -13,19 +13,11 @@ See the License for the specific language governing permissions and limitations under the License. */ -// -// Copyright (c) 2003-2020 Christopher M. Kohlhoff (chris at kohlhoff dot com) -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// #include "request_handler.hpp" #include -#include #include -#include #include @@ -37,15 +29,17 @@ namespace silkworm::rpc::http { +constexpr std::size_t kStreamBufferSize{4096}; + Task RequestHandler::handle(const std::string& content) { auto start = clock_time::now(); - ChannelWriter::Response msg_response; + Channel::Response msg_response; bool send_reply{true}; const auto request_json = nlohmann::json::parse(content); if (request_json.is_object()) { if (!is_valid_jsonrpc(request_json)) { - msg_response.status = ChannelWriter::ResponseStatus::bad_request; + msg_response.status = Channel::ResponseStatus::bad_request; msg_response.content = make_json_error(0, -32600, "invalid request").dump() + "\n"; } else { send_reply = co_await handle_request_and_create_reply(request_json, msg_response); @@ -63,19 +57,19 @@ Task RequestHandler::handle(const std::string& content) { if (!is_valid_jsonrpc(item.value())) { batch_reply_content << make_json_error(0, -32600, "invalid request").dump(); } else { - ChannelWriter::Response single_reply; + Channel::Response single_reply; send_reply = co_await handle_request_and_create_reply(item.value(), single_reply); batch_reply_content << single_reply.content; } } batch_reply_content << "]\n"; - msg_response.status = ChannelWriter::ResponseStatus::ok; + msg_response.status = Channel::ResponseStatus::ok; msg_response.content = batch_reply_content.str(); } if (send_reply) { - co_await channel_writer_->write_rsp(msg_response); + co_await channel_->write_rsp(msg_response); } SILK_TRACE << "handle HTTP request t=" << clock_time::since(start) << "ns"; } @@ -86,16 +80,16 @@ bool RequestHandler::is_valid_jsonrpc(const nlohmann::json& /* request_json */) return true; } -Task RequestHandler::handle_request_and_create_reply(const nlohmann::json& request_json, ChannelWriter::Response& response) { +Task RequestHandler::handle_request_and_create_reply(const nlohmann::json& request_json, Channel::Response& response) { if (!request_json.contains("method")) { - response.status = ChannelWriter::ResponseStatus::bad_request; + response.status = Channel::ResponseStatus::bad_request; response.content = make_json_error(request_json, -32600, "invalid request").dump(); co_return true; } const auto method = request_json["method"].get(); if (method.empty()) { - response.status = ChannelWriter::ResponseStatus::bad_request; + response.status = Channel::ResponseStatus::bad_request; response.content = make_json_error(request_json, -32600, "invalid request").dump(); co_return true; } @@ -124,47 +118,47 @@ Task RequestHandler::handle_request_and_create_reply(const nlohmann::json& } response.content = make_json_error(request_json, -32601, "the method " + method + " does not exist/is not available").dump(); - response.status = ChannelWriter::ResponseStatus::not_implemented; + response.status = Channel::ResponseStatus::not_implemented; co_return true; } -Task RequestHandler::handle_request(commands::RpcApiTable::HandleMethodGlaze handler, const nlohmann::json& request_json, ChannelWriter::Response& response) { +Task RequestHandler::handle_request(commands::RpcApiTable::HandleMethodGlaze handler, const nlohmann::json& request_json, Channel::Response& response) { try { std::string reply_json; reply_json.reserve(2048); co_await (rpc_api_.*handler)(request_json, reply_json); - response.status = ChannelWriter::ResponseStatus::ok; + response.status = Channel::ResponseStatus::ok; response.content = std::move(reply_json); } catch (const std::exception& e) { SILK_ERROR << "exception: " << e.what(); response.content = make_json_error(request_json, 100, e.what()).dump(); - response.status = ChannelWriter::ResponseStatus::internal_server_error; + response.status = Channel::ResponseStatus::internal_server_error; } catch (...) { SILK_ERROR << "unexpected exception"; response.content = make_json_error(request_json, 100, "unexpected exception").dump(); - response.status = ChannelWriter::ResponseStatus::internal_server_error; + response.status = Channel::ResponseStatus::internal_server_error; } co_return; } -Task RequestHandler::handle_request(commands::RpcApiTable::HandleMethod handler, const nlohmann::json& request_json, ChannelWriter::Response& response) { +Task RequestHandler::handle_request(commands::RpcApiTable::HandleMethod handler, const nlohmann::json& request_json, Channel::Response& response) { try { nlohmann::json reply_json; co_await (rpc_api_.*handler)(request_json, reply_json); response.content = reply_json.dump( /*indent=*/-1, /*indent_char=*/' ', /*ensure_ascii=*/false, nlohmann::json::error_handler_t::replace); - response.status = ChannelWriter::ResponseStatus::ok; + response.status = Channel::ResponseStatus::ok; } catch (const std::exception& e) { SILK_ERROR << "exception: " << e.what(); response.content = make_json_error(request_json, 100, e.what()).dump(); - response.status = ChannelWriter::ResponseStatus::internal_server_error; + response.status = Channel::ResponseStatus::internal_server_error; } catch (...) { SILK_ERROR << "unexpected exception"; response.content = make_json_error(request_json, 100, "unexpected exception").dump(); - response.status = ChannelWriter::ResponseStatus::internal_server_error; + response.status = Channel::ResponseStatus::internal_server_error; } co_return; @@ -173,10 +167,10 @@ Task RequestHandler::handle_request(commands::RpcApiTable::HandleMethod ha Task RequestHandler::handle_request(commands::RpcApiTable::HandleStream handler, const nlohmann::json& request_json) { try { auto io_executor = co_await boost::asio::this_coro::executor; - const std::size_t kStreamBufferSize = 4096; - ChunksWriter chunks_writer(*channel_writer_); - json::Stream stream(io_executor, chunks_writer, kStreamBufferSize); + co_await channel_->open_stream(); + ChunkWriter chunk_writer(*channel_); + json::Stream stream(io_executor, chunk_writer, kStreamBufferSize); co_await (rpc_api_.*handler)(request_json, stream); diff --git a/silkworm/rpc/http/request_handler.hpp b/silkworm/rpc/http/request_handler.hpp index a7a2df2e1f..3f17eb2271 100644 --- a/silkworm/rpc/http/request_handler.hpp +++ b/silkworm/rpc/http/request_handler.hpp @@ -30,19 +30,15 @@ #include #include -#include +#include #include namespace silkworm::rpc::http { class RequestHandler { public: - RequestHandler(ChannelWriter* channel_writer, - commands::RpcApi& rpc_api, - const commands::RpcApiTable& rpc_api_table) - : channel_writer_{channel_writer}, - rpc_api_{rpc_api}, - rpc_api_table_(rpc_api_table) {} + RequestHandler(Channel* channel, commands::RpcApi& rpc_api, const commands::RpcApiTable& rpc_api_table) + : channel_{channel}, rpc_api_{rpc_api}, rpc_api_table_(rpc_api_table) {} RequestHandler(const RequestHandler&) = delete; virtual ~RequestHandler() = default; @@ -51,7 +47,7 @@ class RequestHandler { Task handle(const std::string& content); protected: - Task handle_request_and_create_reply(const nlohmann::json& request_json, ChannelWriter::Response& response); + Task handle_request_and_create_reply(const nlohmann::json& request_json, Channel::Response& response); private: bool is_valid_jsonrpc(const nlohmann::json& request_json); @@ -59,14 +55,14 @@ class RequestHandler { Task handle_request( commands::RpcApiTable::HandleMethod handler, const nlohmann::json& request_json, - ChannelWriter::Response& response); + Channel::Response& response); Task handle_request( commands::RpcApiTable::HandleMethodGlaze handler, const nlohmann::json& request_json, - ChannelWriter::Response& response); + Channel::Response& response); Task handle_request(commands::RpcApiTable::HandleStream handler, const nlohmann::json& request_json); - ChannelWriter* channel_writer_; + Channel* channel_; commands::RpcApi& rpc_api_; diff --git a/silkworm/rpc/json/stream.hpp b/silkworm/rpc/json/stream.hpp index f6533cd2a1..7fa5a95f75 100644 --- a/silkworm/rpc/json/stream.hpp +++ b/silkworm/rpc/json/stream.hpp @@ -34,7 +34,8 @@ static const nlohmann::json EMPTY_ARRAY = nlohmann::json::value_t::array; class Stream { public: - explicit Stream(boost::asio::any_io_executor& executor, Writer& writer, std::size_t threshold = kDefaultThreshold) : io_executor_(executor), writer_(writer), threshold_(threshold) {} + explicit Stream(boost::asio::any_io_executor& executor, StreamWriter& writer, std::size_t threshold = kDefaultThreshold) + : io_executor_(executor), writer_(writer), threshold_(threshold) {} Stream(const Stream& stream) = delete; Stream& operator=(const Stream&) = delete; @@ -71,7 +72,7 @@ class Stream { boost::asio::any_io_executor& io_executor_; - Writer& writer_; + StreamWriter& writer_; std::stack stack_; const std::size_t threshold_; diff --git a/silkworm/rpc/json/stream_test.cpp b/silkworm/rpc/json/stream_test.cpp index 83efdef881..ae16ab627f 100644 --- a/silkworm/rpc/json/stream_test.cpp +++ b/silkworm/rpc/json/stream_test.cpp @@ -34,7 +34,7 @@ TEST_CASE_METHOD(JsonStreamTest, "JsonStream[json]") { boost::asio::any_io_executor io_executor = pool.next_io_context().get_executor(); StringWriter string_writer; - ChunksWriter chunks_writer(string_writer); + ChunkWriter chunk_writer(string_writer); SECTION("write_json in string") { Stream stream(io_executor, string_writer); @@ -49,7 +49,7 @@ TEST_CASE_METHOD(JsonStreamTest, "JsonStream[json]") { CHECK(string_writer.get_content() == "{\"test\":\"test\"}"); } SECTION("write_json in 1 chunk") { - Stream stream(io_executor, chunks_writer); + Stream stream(io_executor, chunk_writer); nlohmann::json json = R"({ "test": "test" @@ -61,7 +61,7 @@ TEST_CASE_METHOD(JsonStreamTest, "JsonStream[json]") { CHECK(string_writer.get_content() == "f\r\n{\"test\":\"test\"}\r\n0\r\n\r\n"); } SECTION("write_json in 2 chunks") { - Stream stream(io_executor, chunks_writer); + Stream stream(io_executor, chunk_writer); nlohmann::json json = R"({ "check": "check", diff --git a/silkworm/rpc/test/api_test_database.hpp b/silkworm/rpc/test/api_test_database.hpp index 8529724501..7d8e5e48a2 100644 --- a/silkworm/rpc/test/api_test_database.hpp +++ b/silkworm/rpc/test/api_test_database.hpp @@ -40,7 +40,7 @@ #include #include #include -#include +#include #include #include @@ -52,31 +52,32 @@ InMemoryState populate_genesis(db::RWTxn& txn, const std::filesystem::path& test void populate_blocks(db::RWTxn& txn, const std::filesystem::path& tests_dir, InMemoryState& state_buffer); -class ChannelWriterForTest : public ChannelWriter { +class ChannelForTest : public Channel { + Task open_stream() override { co_return; } Task write_rsp(Response& /* response */) override { co_return; } Task write(std::string_view /* content */) override { co_return 0; } + Task close() override { co_return; } }; class RequestHandler_ForTest : public http::RequestHandler { public: - RequestHandler_ForTest(ChannelWriterForTest* channel_writer, + RequestHandler_ForTest(ChannelForTest* channel, commands::RpcApi& rpc_api, const commands::RpcApiTable& rpc_api_table) - : http::RequestHandler(channel_writer, rpc_api, rpc_api_table) { - } + : http::RequestHandler(channel, rpc_api, rpc_api_table) {} - Task request_and_create_reply(const nlohmann::json& request_json, ChannelWriter::Response& response) { + Task request_and_create_reply(const nlohmann::json& request_json, Channel::Response& response) { co_await RequestHandler::handle_request_and_create_reply(request_json, response); } - Task handle_request(const std::string& request_str, ChannelWriter::Response& response) { + Task handle_request(const std::string& request_str, Channel::Response& response) { co_await RequestHandler::handle(request_str); response = std::move(response_); } private: inline static const std::vector allowed_origins; - ChannelWriter::Response response_; + Channel::Response response_; }; class LocalContextTestBase : public silkworm::rpc::test::ContextTestBase { @@ -92,22 +93,22 @@ class RpcApiTestBase : public LocalContextTestBase { explicit RpcApiTestBase(mdbx::env& chaindata_env) : LocalContextTestBase(chaindata_env), workers_{1}, - socket{io_context_}, - rpc_api{io_context_, workers_}, - rpc_api_table{kDefaultEth1ApiSpec} { + socket_{io_context_}, + rpc_api_{io_context_, workers_}, + rpc_api_table_{kDefaultEth1ApiSpec} { } template auto run(Args&&... args) { - ChannelWriterForTest channel_writer; - TestRequestHandler handler{&channel_writer, rpc_api, rpc_api_table}; + ChannelForTest channel; + TestRequestHandler handler{&channel, rpc_api_, rpc_api_table_}; return spawn_and_wait((handler.*method)(std::forward(args)...)); } boost::asio::thread_pool workers_; - boost::asio::ip::tcp::socket socket; - commands::RpcApi rpc_api; - commands::RpcApiTable rpc_api_table; + boost::asio::ip::tcp::socket socket_; + commands::RpcApi rpc_api_; + commands::RpcApiTable rpc_api_table_; }; class TestDatabaseContext { diff --git a/silkworm/rpc/types/writer.cpp b/silkworm/rpc/types/writer.cpp index 67dbcec63e..07ab88dab5 100644 --- a/silkworm/rpc/types/writer.cpp +++ b/silkworm/rpc/types/writer.cpp @@ -16,26 +16,16 @@ #include "writer.hpp" -#include #include #include -#include - -#include -#include #include namespace silkworm::rpc { -const std::string kChunkSep{'\r', '\n'}; // NOLINT(runtime/string) -const std::string kFinalChunk{'0', '\r', '\n', '\r', '\n'}; // NOLINT(runtime/string) - -ChunksWriter::ChunksWriter(Writer& writer) - : writer_(writer) { -} +ChunkWriter::ChunkWriter(StreamWriter& writer) : writer_(writer) {} -Task ChunksWriter::write(std::string_view content) { +Task ChunkWriter::write(std::string_view content) { auto size = content.size(); std::array str{}; @@ -56,67 +46,7 @@ Task ChunksWriter::write(std::string_view content) { co_return written; } -Task ChunksWriter::close() { - co_await writer_.write(kFinalChunk); - co_await writer_.close(); - - co_return; -} - -JsonChunksWriter::JsonChunksWriter(Writer& writer, std::size_t chunk_size) - : writer_(writer), chunk_size_(chunk_size), room_left_in_chunck_(chunk_size_), written_(0) { - str_chunk_size_ << std::hex << chunk_size_ << kChunkSep; -} - -Task JsonChunksWriter::write(std::string_view content) { - auto size = content.size(); - - SILK_DEBUG << "JsonChunksWriter::write written_: " << written_ << " size: " << size; - - if (!chunk_open_) { - co_await writer_.write(str_chunk_size_.str()); - chunk_open_ = true; - } - - size_t remaining_in_view = size; - size_t start = 0; - while (start < size) { - const auto length = std::min(room_left_in_chunck_, remaining_in_view); - std::string_view sub_view(content.data() + start, length); - co_await writer_.write(sub_view); - - written_ += length; - start += length; - remaining_in_view -= length; - room_left_in_chunck_ -= length; - - if ((room_left_in_chunck_ % chunk_size_) == 0) { - if (chunk_open_) { - co_await writer_.write(kChunkSep); - room_left_in_chunck_ = chunk_size_; - chunk_open_ = false; - } - if (remaining_in_view > 0) { - co_await writer_.write(str_chunk_size_.str()); - chunk_open_ = true; - } - } - } - co_return content.size(); -} - -Task JsonChunksWriter::close() { - if (chunk_open_) { - if (room_left_in_chunck_ > 0) { - std::unique_ptr buffer{new char[room_left_in_chunck_]}; - std::memset(buffer.get(), ' ', room_left_in_chunck_); - co_await writer_.write(std::string_view(buffer.get(), room_left_in_chunck_)); - } - co_await writer_.write(kChunkSep); - chunk_open_ = false; - room_left_in_chunck_ = chunk_size_; - } - +Task ChunkWriter::close() { co_await writer_.write(kFinalChunk); co_await writer_.close(); diff --git a/silkworm/rpc/types/writer.hpp b/silkworm/rpc/types/writer.hpp index 82dc8640ff..69741460da 100644 --- a/silkworm/rpc/types/writer.hpp +++ b/silkworm/rpc/types/writer.hpp @@ -27,26 +27,15 @@ namespace silkworm::rpc { -class Writer { +class StreamWriter { public: - virtual ~Writer() = default; + virtual ~StreamWriter() = default; virtual Task write(std::string_view content) = 0; - virtual Task close() { - co_return; - } -}; - -class NullWriter : public Writer { - public: - explicit NullWriter() = default; - - Task write(std::string_view content) override { - co_return content.size(); - } + virtual Task close() = 0; }; -class StringWriter : public Writer { +class StringWriter : public StreamWriter { public: StringWriter() = default; @@ -59,6 +48,8 @@ class StringWriter : public Writer { co_return content.size(); } + Task close() override { co_return; } + const std::string& get_content() { return content_; } @@ -67,33 +58,18 @@ class StringWriter : public Writer { std::string content_; }; -class ChunksWriter : public Writer { - public: - explicit ChunksWriter(Writer& writer); - - Task write(std::string_view content) override; - Task close() override; - - private: - Writer& writer_; -}; +const std::string kChunkSep{'\r', '\n'}; // NOLINT(runtime/string) +const std::string kFinalChunk{'0', '\r', '\n', '\r', '\n'}; // NOLINT(runtime/string) -class JsonChunksWriter : public Writer { +class ChunkWriter : public StreamWriter { public: - explicit JsonChunksWriter(Writer& writer, std::size_t chunk_size = kDefaultChunkSize); + explicit ChunkWriter(StreamWriter& writer); Task write(std::string_view content) override; Task close() override; private: - static const std::size_t kDefaultChunkSize = 0x800; - - Writer& writer_; - bool chunk_open_ = false; - const std::size_t chunk_size_; - size_t room_left_in_chunck_; - std::size_t written_; - std::stringstream str_chunk_size_; + StreamWriter& writer_; }; } // namespace silkworm::rpc diff --git a/silkworm/rpc/types/writer_test.cpp b/silkworm/rpc/types/writer_test.cpp index 16ca511133..c49f76cde6 100644 --- a/silkworm/rpc/types/writer_test.cpp +++ b/silkworm/rpc/types/writer_test.cpp @@ -28,6 +28,83 @@ namespace silkworm::rpc { struct WriterTest : test::ContextTestBase { }; +class JsonChunkWriter : public StreamWriter { + public: + explicit JsonChunkWriter(StreamWriter& writer, std::size_t chunk_size = kDefaultChunkSize); + + Task write(std::string_view content) override; + Task close() override; + + private: + static const std::size_t kDefaultChunkSize = 0x800; + + StreamWriter& writer_; + bool chunk_open_ = false; + const std::size_t chunk_size_; + size_t room_left_in_chunk_; + std::size_t written_{0}; + std::stringstream str_chunk_size_; +}; + +JsonChunkWriter::JsonChunkWriter(StreamWriter& writer, std::size_t chunk_size) + : writer_(writer), chunk_size_(chunk_size), room_left_in_chunk_(chunk_size_) { + str_chunk_size_ << std::hex << chunk_size_ << kChunkSep; +} + +Task JsonChunkWriter::write(std::string_view content) { + auto size = content.size(); + + SILK_DEBUG << "JsonChunkWriter::write written_: " << written_ << " size: " << size; + + if (!chunk_open_) { + co_await writer_.write(str_chunk_size_.str()); + chunk_open_ = true; + } + + size_t remaining_in_view = size; + size_t start = 0; + while (start < size) { + const auto length = std::min(room_left_in_chunk_, remaining_in_view); + std::string_view sub_view(content.data() + start, length); + co_await writer_.write(sub_view); + + written_ += length; + start += length; + remaining_in_view -= length; + room_left_in_chunk_ -= length; + + if ((room_left_in_chunk_ % chunk_size_) == 0) { + if (chunk_open_) { + co_await writer_.write(kChunkSep); + room_left_in_chunk_ = chunk_size_; + chunk_open_ = false; + } + if (remaining_in_view > 0) { + co_await writer_.write(str_chunk_size_.str()); + chunk_open_ = true; + } + } + } + co_return content.size(); +} + +Task JsonChunkWriter::close() { + if (chunk_open_) { + if (room_left_in_chunk_ > 0) { + std::unique_ptr buffer{new char[room_left_in_chunk_]}; + std::memset(buffer.get(), ' ', room_left_in_chunk_); + co_await writer_.write(std::string_view(buffer.get(), room_left_in_chunk_)); + } + co_await writer_.write(kChunkSep); + chunk_open_ = false; + room_left_in_chunk_ = chunk_size_; + } + + co_await writer_.write(kFinalChunk); + + co_return; +} + TEST_CASE_METHOD(WriterTest, "StringWriter") { SECTION("write") { StringWriter writer; @@ -48,10 +125,10 @@ TEST_CASE_METHOD(WriterTest, "StringWriter") { } } -TEST_CASE_METHOD(WriterTest, "ChunksWriter") { +TEST_CASE_METHOD(WriterTest, "ChunkWriter") { SECTION("write&close under chunk size") { StringWriter s_writer; - ChunksWriter writer(s_writer); + ChunkWriter writer(s_writer); spawn_and_wait(writer.write("1234")); spawn_and_wait(writer.close()); @@ -60,7 +137,7 @@ TEST_CASE_METHOD(WriterTest, "ChunksWriter") { } SECTION("write over chunk size 4") { StringWriter s_writer; - ChunksWriter writer(s_writer); + ChunkWriter writer(s_writer); spawn_and_wait(writer.write("1234")); spawn_and_wait(writer.write("5678")); @@ -69,7 +146,7 @@ TEST_CASE_METHOD(WriterTest, "ChunksWriter") { } SECTION("write&close over chunk size 4") { StringWriter s_writer; - ChunksWriter writer(s_writer); + ChunkWriter writer(s_writer); spawn_and_wait(writer.write("1234")); spawn_and_wait(writer.write("5678")); @@ -80,7 +157,7 @@ TEST_CASE_METHOD(WriterTest, "ChunksWriter") { } SECTION("write over chunk size 5") { StringWriter s_writer; - ChunksWriter writer(s_writer); + ChunkWriter writer(s_writer); spawn_and_wait(writer.write("12345")); spawn_and_wait(writer.write("67890")); @@ -89,7 +166,7 @@ TEST_CASE_METHOD(WriterTest, "ChunksWriter") { } SECTION("write&close over chunk size 5") { StringWriter s_writer; - ChunksWriter writer(s_writer); + ChunkWriter writer(s_writer); spawn_and_wait(writer.write("12345")); spawn_and_wait(writer.write("67890")); @@ -100,7 +177,7 @@ TEST_CASE_METHOD(WriterTest, "ChunksWriter") { } SECTION("close") { StringWriter s_writer; - ChunksWriter writer(s_writer); + ChunkWriter writer(s_writer); spawn_and_wait(writer.close()); @@ -108,10 +185,10 @@ TEST_CASE_METHOD(WriterTest, "ChunksWriter") { } } -TEST_CASE_METHOD(WriterTest, "JsonChunksWriter") { +TEST_CASE_METHOD(WriterTest, "JsonChunkWriter") { SECTION("write&close under chunk size") { StringWriter s_writer; - JsonChunksWriter writer(s_writer, 16); + JsonChunkWriter writer(s_writer, 16); spawn_and_wait(writer.write("1234")); spawn_and_wait(writer.close()); @@ -120,7 +197,7 @@ TEST_CASE_METHOD(WriterTest, "JsonChunksWriter") { } SECTION("write&close over chunk size 4") { StringWriter s_writer; - JsonChunksWriter writer(s_writer, 4); + JsonChunkWriter writer(s_writer, 4); spawn_and_wait(writer.write("1234567890")); spawn_and_wait(writer.close()); @@ -129,7 +206,7 @@ TEST_CASE_METHOD(WriterTest, "JsonChunksWriter") { } SECTION("write&close over chunk size 5") { StringWriter s_writer; - JsonChunksWriter writer(s_writer, 5); + JsonChunkWriter writer(s_writer, 5); spawn_and_wait(writer.write("1234567890")); spawn_and_wait(writer.close()); @@ -138,7 +215,7 @@ TEST_CASE_METHOD(WriterTest, "JsonChunksWriter") { } SECTION("write&close over chunk size 5") { StringWriter s_writer; - JsonChunksWriter writer(s_writer, 5); + JsonChunkWriter writer(s_writer, 5); spawn_and_wait(writer.write("123456789012")); spawn_and_wait(writer.close()); @@ -147,7 +224,7 @@ TEST_CASE_METHOD(WriterTest, "JsonChunksWriter") { } SECTION("close") { StringWriter s_writer; - JsonChunksWriter writer(s_writer); + JsonChunkWriter writer(s_writer); spawn_and_wait(writer.close()); @@ -155,7 +232,7 @@ TEST_CASE_METHOD(WriterTest, "JsonChunksWriter") { } SECTION("write json") { StringWriter s_writer; - JsonChunksWriter writer(s_writer, 48); + JsonChunkWriter writer(s_writer, 48); nlohmann::json json = R"({ "accounts": {},