Skip to content

Commit

Permalink
rpcdaemon: compression on separate thread (#1936)
Browse files Browse the repository at this point in the history
  • Loading branch information
lupin012 authored Mar 28, 2024
1 parent 90049ac commit efa0e73
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 17 deletions.
47 changes: 33 additions & 14 deletions silkworm/rpc/http/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
#include <string_view>

#include <boost/asio/buffer.hpp>
#include <boost/asio/compose.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/asio/write.hpp>
#include <boost/beast/http/chunk_encode.hpp>
Expand Down Expand Up @@ -50,6 +52,7 @@ Connection::Connection(boost::asio::io_context& io_context,
bool ws_upgrade_enabled,
bool ws_compression,
bool http_compression,
boost::asio::thread_pool& workers,
InterfaceLogSettings ifc_log_settings)
: socket_{io_context},
api_{api},
Expand All @@ -59,7 +62,8 @@ Connection::Connection(boost::asio::io_context& io_context,
jwt_secret_{std ::move(jwt_secret)},
ws_upgrade_enabled_{ws_upgrade_enabled},
ws_compression_{ws_compression},
http_compression_{http_compression} {
http_compression_{http_compression},
workers_{workers} {
SILK_TRACE << "Connection::Connection socket " << &socket_ << " created";
if (http_compression_) { // temporary to avoid warning with clang
SILK_TRACE << "Connection::Connection compression enabled";
Expand Down Expand Up @@ -288,12 +292,9 @@ Task<void> Connection::do_write(const std::string& content, boost::beast::http::
// Positive response w/ compression required
res.set(boost::beast::http::field::content_encoding, content_encoding);
std::string compressed_content;
try {
compress(content, compressed_content);
} catch (const std::exception& e) {
SILK_ERROR << "Connection::do_write cannot compress exception: " << e.what();
throw;
}

co_await compress(content, compressed_content);

res.content_length(compressed_content.length());
res.body() = std::move(compressed_content);
} else {
Expand Down Expand Up @@ -321,13 +322,6 @@ Task<void> Connection::do_write(const std::string& content, boost::beast::http::
co_return;
}

void Connection::compress(const std::string& clear_data, std::string& compressed_data) {
boost::iostreams::filtering_ostream out;
out.push(boost::iostreams::gzip_compressor());
out.push(boost::iostreams::back_inserter(compressed_data));
boost::iostreams::copy(boost::make_iterator_range(clear_data), out);
}

Connection::AuthorizationResult Connection::is_request_authorized(const boost::beast::http::request<boost::beast::http::string_body>& req) {
if (!jwt_secret_.has_value() || (*jwt_secret_).empty()) {
return {};
Expand Down Expand Up @@ -426,4 +420,29 @@ std::string Connection::get_date_time() {
return ss.str();
}

Task<void> Connection::compress(
const std::string& clear_data,
std::string& compressed_data) {
auto this_executor = co_await boost::asio::this_coro::executor;
boost::iostreams::filtering_ostream out;
co_await boost::asio::async_compose<decltype(boost::asio::use_awaitable), void(std::exception_ptr)>(
[&](auto& self) {
boost::asio::post(workers_, [&, self = std::move(self)]() mutable {
std::exception_ptr eptr;
try {
out.push(boost::iostreams::gzip_compressor());
out.push(boost::iostreams::back_inserter(compressed_data));
boost::iostreams::copy(boost::make_iterator_range(clear_data), out);
} catch (const std::exception& e) {
SILK_ERROR << "Connection::compress cannot compress exception: " << e.what();
eptr = std::current_exception();
}
boost::asio::post(this_executor, [eptr, self = std::move(self)]() mutable {
self.complete(eptr);
});
});
},
boost::asio::use_awaitable);
}

} // namespace silkworm::rpc::http
6 changes: 5 additions & 1 deletion silkworm/rpc/http/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/system/error_code.hpp>

#include <silkworm/rpc/commands/rpc_api_table.hpp>
#include <silkworm/rpc/common/constants.hpp>
Expand All @@ -52,6 +53,7 @@ class Connection : public StreamWriter {
bool ws_upgrade_enabled,
bool ws_compression,
bool http_compression,
boost::asio::thread_pool& workers,
InterfaceLogSettings ifc_log_settings);
~Connection() override;

Expand Down Expand Up @@ -91,7 +93,7 @@ class Connection : public StreamWriter {

static std::string get_date_time();

void compress(const std::string& clear_data, std::string& compressed_data);
Task<void> compress(const std::string& clear_data, std::string& compressed_data);

//! Socket for the connection.
boost::asio::ip::tcp::socket socket_;
Expand All @@ -116,6 +118,8 @@ class Connection : public StreamWriter {

bool http_compression_;

boost::asio::thread_pool& workers_;

std::string vary_;
std::string origin_;
boost::beast::http::verb method_{boost::beast::http::verb::unknown};
Expand Down
5 changes: 3 additions & 2 deletions silkworm/rpc/http/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ Server::Server(const std::string& end_point,
use_websocket_{use_websocket},
ws_compression_{ws_compression},
http_compression_{http_compression},
ifc_log_settings_{std::move(ifc_log_settings)} {
ifc_log_settings_{std::move(ifc_log_settings)},
workers_{workers} {
const auto [host, port] = parse_endpoint(end_point);

// Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR).
Expand All @@ -86,7 +87,7 @@ Task<void> Server::run() {
SILK_DEBUG << "Server::run accepting using io_context " << &io_context_ << "...";

auto new_connection = std::make_shared<Connection>(
io_context_, rpc_api_, handler_table_, allowed_origins_, jwt_secret_, use_websocket_, ws_compression_, http_compression_, ifc_log_settings_);
io_context_, rpc_api_, handler_table_, allowed_origins_, jwt_secret_, use_websocket_, ws_compression_, http_compression_, workers_, ifc_log_settings_);
co_await acceptor_.async_accept(new_connection->socket(), boost::asio::use_awaitable);
if (!acceptor_.is_open()) {
SILK_TRACE << "Server::run returning...";
Expand Down
3 changes: 3 additions & 0 deletions silkworm/rpc/http/server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ class Server {

//! The interface logging configuration
InterfaceLogSettings ifc_log_settings_;

//! The configured workers
boost::asio::thread_pool& workers_;
};

} // namespace silkworm::rpc::http

0 comments on commit efa0e73

Please sign in to comment.