Skip to content

Commit

Permalink
rpcdaemon: refactor stream data types (#1752)
Browse files Browse the repository at this point in the history
Co-authored-by: canepat <[email protected]>
  • Loading branch information
lupin012 and canepat committed Jan 12, 2024
1 parent 8f68433 commit 84983ea
Show file tree
Hide file tree
Showing 13 changed files with 202 additions and 225 deletions.
16 changes: 8 additions & 8 deletions silkworm/rpc/commands/eth_api_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ namespace silkworm::rpc::commands {
#ifndef SILKWORM_SANITIZE
TEST_CASE_METHOD(test::RpcApiE2ETest, "unit: eth_blockNumber succeeds if request well-formed", "[rpc][api]") {
const auto request = R"({"jsonrpc":"2.0","id":1,"method":"eth_blockNumber","params":[]})"_json;
ChannelWriter::Response reply;
Channel::Response reply;
run<&test::RequestHandler_ForTest::request_and_create_reply>(request, reply);
CHECK(nlohmann::json::parse(reply.content) == R"({
"jsonrpc":"2.0",
Expand All @@ -37,7 +37,7 @@ TEST_CASE_METHOD(test::RpcApiE2ETest, "unit: eth_blockNumber succeeds if request

TEST_CASE_METHOD(test::RpcApiE2ETest, "unit: eth_blockNumber fails if request empty", "[rpc][api]") {
const auto request = R"({})"_json;
ChannelWriter::Response reply;
Channel::Response reply;
run<&test::RequestHandler_ForTest::request_and_create_reply>(request, reply);
CHECK(nlohmann::json::parse(reply.content) == R"({
"jsonrpc":"2.0",
Expand All @@ -53,7 +53,7 @@ TEST_CASE_METHOD(test::RpcApiE2ETest, "unit: eth_sendRawTransaction fails rlp pa
"method": "eth_sendRawTransaction",
"params": ["0xd46ed67c5d32be8d46e8dd67c5d32be8058bb8eb970870f072445675058bb8eb970870f0724456"]
})"_json;
ChannelWriter::Response reply;
Channel::Response reply;
run<&test::RequestHandler_ForTest::request_and_create_reply>(request, reply);
CHECK(nlohmann::json::parse(reply.content) == R"({
"jsonrpc":"2.0",
Expand All @@ -69,7 +69,7 @@ TEST_CASE_METHOD(test::RpcApiE2ETest, "unit: eth_sendRawTransaction fails wrong
"method": "eth_sendRawTransaction",
"params": ["0xd46ed67c5d32be8d46e8dd67c5d32be8058bb8eb970870f072445675058bb8eb970870f072445"]
})"_json;
ChannelWriter::Response reply;
Channel::Response reply;
run<&test::RequestHandler_ForTest::request_and_create_reply>(request, reply);
CHECK(nlohmann::json::parse(reply.content) == R"({
"jsonrpc":"2.0",
Expand All @@ -80,7 +80,7 @@ TEST_CASE_METHOD(test::RpcApiE2ETest, "unit: eth_sendRawTransaction fails wrong

TEST_CASE_METHOD(test::RpcApiE2ETest, "unit: eth_feeHistory succeeds if request well-formed", "[rpc][api]") {
const auto request = R"({"jsonrpc":"2.0","id":1,"method":"eth_feeHistory","params":["0x1","0x867A80",[25,75]]})"_json;
ChannelWriter::Response reply;
Channel::Response reply;
run<&test::RequestHandler_ForTest::request_and_create_reply>(request, reply);
CHECK(nlohmann::json::parse(reply.content) == R"({
"jsonrpc":"2.0",
Expand All @@ -91,7 +91,7 @@ TEST_CASE_METHOD(test::RpcApiE2ETest, "unit: eth_feeHistory succeeds if request

TEST_CASE_METHOD(test::RpcApiE2ETest, "fuzzy: eth_call invalid params", "[rpc][api]") {
const auto request = R"({"jsonrpc":"2.0","id":1,"method":"eth_call","params":[{}, "latest"]})"_json;
ChannelWriter::Response reply;
Channel::Response reply;
run<&test::RequestHandler_ForTest::request_and_create_reply>(request, reply);
CHECK(nlohmann::json::parse(reply.content) == R"({
"jsonrpc":"2.0",
Expand All @@ -102,7 +102,7 @@ TEST_CASE_METHOD(test::RpcApiE2ETest, "fuzzy: eth_call invalid params", "[rpc][a

TEST_CASE_METHOD(test::RpcApiE2ETest, "fuzzy: eth_feeHistory sigsegv invalid input", "[rpc][api]") {
const auto request = R"({"jsonrpc":"2.0","id":1,"method":"eth_feeHistory","params":["5x1","0x2",[95,99]]})"_json;
ChannelWriter::Response reply;
Channel::Response reply;
run<&test::RequestHandler_ForTest::request_and_create_reply>(request, reply);
CHECK(nlohmann::json::parse(reply.content) == R"({
"jsonrpc":"2.0",
Expand All @@ -113,7 +113,7 @@ TEST_CASE_METHOD(test::RpcApiE2ETest, "fuzzy: eth_feeHistory sigsegv invalid inp

TEST_CASE_METHOD(test::RpcApiE2ETest, "fuzzy: eth_feeHistory sigsegv valid input", "[rpc][api]") {
const auto request = R"({"jsonrpc":"2.0","id":1,"method":"eth_feeHistory","params":["0x5","0x2",[95,99]]})"_json;
ChannelWriter::Response reply;
Channel::Response reply;
run<&test::RequestHandler_ForTest::request_and_create_reply>(request, reply);
CHECK(nlohmann::json::parse(reply.content) == R"({
"jsonrpc":"2.0",
Expand Down
4 changes: 2 additions & 2 deletions silkworm/rpc/commands/rpc_api_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ TEST_CASE("rpc_api io (all files)", "[rpc][rpc_api]") {
auto request = nlohmann::json::parse(line_out.substr(3));
auto expected = nlohmann::json::parse(line_in.substr(3));

ChannelWriter::Response response;
Channel::Response response;
test_base.run<&test::RequestHandler_ForTest::request_and_create_reply>(request, response);
INFO("Request: " << request.dump())
INFO("Actual response: " << response.content)
Expand All @@ -131,7 +131,7 @@ TEST_CASE("rpc_api io (individual)", "[rpc][rpc_api][ignore]") {

SECTION("sample test") {
auto request = R"({"jsonrpc":"2.0","id":1,"method":"debug_getRawTransaction","params":["0x74e41d593675913d6d5521f46523f1bd396dff1891bdb35f59be47c7e5e0b34b"]})"_json;
ChannelWriter::Response response;
Channel::Response response;

test_base.run<&test::RequestHandler_ForTest::request_and_create_reply>(request, response);
CHECK(nlohmann::json::parse(response.content) == R"({"jsonrpc":"2.0","id":1,"result":"0xf8678084342770c182520894658bdf435d810c91414ec09147daa6db624063798203e880820a95a0af5fc351b9e457a31f37c84e5cd99dd3c5de60af3de33c6f4160177a2c786a60a0201da7a21046af55837330a2c52fc1543cd4d9ead00ddf178dd96935b607ff9b"})"_json);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

namespace silkworm::rpc {

class ChannelWriter : public Writer {
class Channel : public StreamWriter {
public:
enum class ResponseStatus {
processing_continue,
Expand All @@ -47,10 +47,13 @@ class ChannelWriter : public Writer {
std::string content;
};

ChannelWriter() = default;
ChannelWriter(const ChannelWriter&) = delete;
ChannelWriter& operator=(const ChannelWriter&) = delete;
Channel() = default;
~Channel() override = default;

Channel(const Channel&) = delete;
Channel& operator=(const Channel&) = delete;

virtual Task<void> open_stream() = 0;
virtual Task<void> write_rsp(Response& response) = 0;
};

Expand Down
46 changes: 23 additions & 23 deletions silkworm/rpc/http/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,41 +123,41 @@ Connection::handle_request(Request& request) {
}
}

StatusType Connection::get_http_status(ChannelWriter::ResponseStatus status) {
StatusType Connection::get_http_status(Channel::ResponseStatus status) {
switch (status) {
case ResponseStatus::processing_continue:
return StatusType::processing_continue;
case ResponseStatus::ok:
return StatusType::ok;
case ResponseStatus::created:
return StatusType::created;
case ChannelWriter::ResponseStatus::accepted:
case ResponseStatus::accepted:
return StatusType::accepted;
case ChannelWriter::ResponseStatus::no_content:
case ResponseStatus::no_content:
return StatusType::no_content;
case ChannelWriter::ChannelWriter::ResponseStatus::multiple_choices:
case ResponseStatus::multiple_choices:
return StatusType::multiple_choices;
case ChannelWriter::ChannelWriter::ResponseStatus::moved_permanently:
case ResponseStatus::moved_permanently:
return StatusType::moved_permanently;
case ChannelWriter::ChannelWriter::ResponseStatus::moved_temporarily:
case ResponseStatus::moved_temporarily:
return StatusType::moved_temporarily;
case ChannelWriter::ChannelWriter::ResponseStatus::not_modified:
case ResponseStatus::not_modified:
return StatusType::not_modified;
case ChannelWriter::ChannelWriter::ResponseStatus::bad_request:
case ResponseStatus::bad_request:
return StatusType::bad_request;
case ChannelWriter::ChannelWriter::ResponseStatus::unauthorized:
case ResponseStatus::unauthorized:
return StatusType::unauthorized;
case ChannelWriter::ResponseStatus::forbidden:
case ResponseStatus::forbidden:
return StatusType::forbidden;
case ChannelWriter::ResponseStatus::not_found:
case ResponseStatus::not_found:
return StatusType::not_found;
case ChannelWriter::ResponseStatus::internal_server_error:
case ResponseStatus::internal_server_error:
return StatusType::internal_server_error;
case ChannelWriter::ResponseStatus::not_implemented:
case ResponseStatus::not_implemented:
return StatusType::not_implemented;
case ChannelWriter::ResponseStatus::bad_gateway:
case ResponseStatus::bad_gateway:
return StatusType::bad_gateway;
case ChannelWriter::ResponseStatus::service_unavailable:
case ResponseStatus::service_unavailable:
return StatusType::service_unavailable;
default:
return StatusType::internal_server_error;
Expand All @@ -173,13 +173,13 @@ Connection::write_rsp(Response& msg_response) {
co_await do_write(reply);
}

Task<void> Connection::open_stream() {
co_await write_headers();
}

Task<std::size_t> Connection::write(std::string_view content) {
if (first_chunk_) {
co_await write_headers();
first_chunk_ = false;
}
const auto bytes_transferred = co_await boost::asio::async_write(socket_, boost::asio::buffer(content), boost::asio::use_awaitable);
SILK_TRACE << "SocketWriter::write bytes_transferred: " << bytes_transferred;
SILK_TRACE << "Connection::write bytes_transferred: " << bytes_transferred;
co_return bytes_transferred;
}

Expand All @@ -200,7 +200,7 @@ static constexpr size_t kCorsNumHeaders{4};

Task<void> Connection::do_write(Reply& reply) {
try {
SILK_DEBUG << "RequestHandler::do_write reply: " << reply.content;
SILK_DEBUG << "Connection::do_write reply: " << reply.content;

reply.headers.reserve(allowed_origins_.empty() ? 2 : 2 + kCorsNumHeaders);
reply.headers.emplace_back(http::Header{"Content-Length", std::to_string(reply.content.size())});
Expand All @@ -209,7 +209,7 @@ Task<void> Connection::do_write(Reply& reply) {
set_cors(reply.headers);

const auto bytes_transferred = co_await boost::asio::async_write(socket_, reply.to_buffers(), boost::asio::use_awaitable);
SILK_TRACE << "RequestHandler::do_write bytes_transferred: " << bytes_transferred;
SILK_TRACE << "Connection::do_write bytes_transferred: " << bytes_transferred;
} catch (const boost::system::system_error& se) {
std::rethrow_exception(std::make_exception_ptr(se));
} catch (const std::exception& e) {
Expand All @@ -229,7 +229,7 @@ Task<void> Connection::write_headers() {
auto buffers = http::to_buffers(StatusType::ok, headers);

const auto bytes_transferred = co_await boost::asio::async_write(socket_, buffers, boost::asio::use_awaitable);
SILK_TRACE << "RequestHandler::write_headers bytes_transferred: " << bytes_transferred;
SILK_TRACE << "Connection::write_headers bytes_transferred: " << bytes_transferred;
} catch (const std::system_error& se) {
std::rethrow_exception(std::make_exception_ptr(se));
} catch (const std::exception& e) {
Expand Down
11 changes: 5 additions & 6 deletions silkworm/rpc/http/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

#include <silkworm/rpc/commands/rpc_api_table.hpp>
#include <silkworm/rpc/common/constants.hpp>
#include <silkworm/rpc/http/channel_writer.hpp>
#include <silkworm/rpc/http/channel.hpp>
#include <silkworm/rpc/http/reply.hpp>
#include <silkworm/rpc/http/request.hpp>
#include <silkworm/rpc/http/request_handler.hpp>
Expand All @@ -42,7 +42,7 @@
namespace silkworm::rpc::http {

//! Represents a single connection from a client.
class Connection : public ChannelWriter {
class Connection : public Channel {
public:
Connection(const Connection&) = delete;
Connection& operator=(const Connection&) = delete;
Expand All @@ -53,7 +53,6 @@ class Connection : public ChannelWriter {
commands::RpcApiTable& handler_table,
const std::vector<std::string>& allowed_origins,
std::optional<std::string> jwt_secret);

~Connection() override;

boost::asio::ip::tcp::socket& socket() { return socket_; }
Expand All @@ -62,7 +61,9 @@ class Connection : public ChannelWriter {
Task<void> read_loop();

Task<void> write_rsp(Response& response) override;
Task<void> open_stream() override;
Task<std::size_t> write(std::string_view content) override;
Task<void> close() override { co_return; }

private:
using AuthorizationError = std::string;
Expand All @@ -78,7 +79,7 @@ class Connection : public ChannelWriter {

Task<void> write_headers();

static StatusType get_http_status(ChannelWriter::ResponseStatus status);
static StatusType get_http_status(Channel::ResponseStatus status);

//! Perform an asynchronous read operation.
Task<void> do_read();
Expand Down Expand Up @@ -108,8 +109,6 @@ class Connection : public ChannelWriter {
const std::vector<std::string>& allowed_origins_;

const std::optional<std::string> jwt_secret_;

bool first_chunk_{true};
};

} // namespace silkworm::rpc::http
Loading

0 comments on commit 84983ea

Please sign in to comment.