Skip to content

Commit

Permalink
rpcdaemon: fix async loop in JSON streaming (#1777)
Browse files Browse the repository at this point in the history
Co-authored-by: Sixtysixter <[email protected]>
  • Loading branch information
canepat and Sixtysixter authored Jan 26, 2024
1 parent 4182d93 commit 25754a5
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 91 deletions.
4 changes: 1 addition & 3 deletions silkworm/rpc/http/request_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@

namespace silkworm::rpc::http {

constexpr std::size_t kStreamBufferSize{4096};

Task<void> RequestHandler::handle(const std::string& content) {
auto start = clock_time::now();
std::string response;
Expand Down Expand Up @@ -165,7 +163,7 @@ Task<void> RequestHandler::handle_request(commands::RpcApiTable::HandleStream ha

co_await channel_->open_stream();
ChunkWriter chunk_writer(*channel_);
json::Stream stream(io_executor, chunk_writer, kStreamBufferSize);
json::Stream stream(io_executor, chunk_writer);

co_await (rpc_api_.*handler)(request_json, stream);

Expand Down
142 changes: 79 additions & 63 deletions silkworm/rpc/json/stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,51 +18,57 @@

#include <array>
#include <charconv>
#include <iostream>
#include <string>
#include <thread>

#include <boost/asio/co_spawn.hpp>
#include <boost/asio/compose.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/experimental/use_promise.hpp>

#include <silkworm/infra/common/log.hpp>
#include <silkworm/infra/common/stopwatch.hpp>

namespace silkworm::rpc::json {

static std::uint8_t kObjectOpen = 1;
static std::uint8_t kArrayOpen = 2;
static std::uint8_t kFieldWritten = 3;
static std::uint8_t kEntryWritten = 4;

static std::string kOpenBrace{"{"}; // NOLINT(runtime/string)
static std::string kCloseBrace{"}"}; // NOLINT(runtime/string)
static std::string kOpenBracket{"["}; // NOLINT(runtime/string)
static std::string kCloseBracket{"]"}; // NOLINT(runtime/string)
static std::string kFieldSeparator{","}; // NOLINT(runtime/string)
static std::string kColon{":"}; // NOLINT(runtime/string)
static std::string kDoubleQuotes{"\""}; // NOLINT(runtime/string)

Stream::Stream(boost::asio::any_io_executor& executor, StreamWriter& writer, std::size_t threshold)
: io_executor_(executor), writer_(writer), threshold_(threshold), channel_{executor, threshold} {
buffer_.reserve(threshold);
runner_task_ = co_spawn(
executor, [](auto self) -> Task<void> {
co_await self->run();
}(this),
boost::asio::use_awaitable);
using namespace std::chrono_literals;

static constexpr uint8_t kObjectOpen{1};
static constexpr uint8_t kArrayOpen{2};
static constexpr uint8_t kFieldWritten{3};
static constexpr uint8_t kEntryWritten{4};

static constexpr std::string_view kOpenBrace{"{"};
static constexpr std::string_view kCloseBrace{"}"};
static constexpr std::string_view kOpenBracket{"["};
static constexpr std::string_view kCloseBracket{"]"};
static constexpr std::string_view kFieldSeparator{","};
static constexpr std::string_view kColon{":"};
static constexpr std::string_view kDoubleQuotes{"\""};

//! The maximum number of items enqueued in the chunk channel
static constexpr std::size_t kChannelCapacity{100};

Stream::Stream(boost::asio::any_io_executor& executor, StreamWriter& writer, std::size_t buffer_capacity)
: writer_(writer),
buffer_capacity_(buffer_capacity),
channel_{executor, kChannelCapacity},
run_completion_promise_{co_spawn(
executor, [](auto self) -> Task<void> {
co_await self->run();
}(this),
boost::asio::experimental::use_promise)} {
buffer_.reserve(buffer_capacity_ + buffer_capacity_ / 4); // try to prevent reallocation when buffer overflows
}

Task<void> Stream::close() {
if (!buffer_.empty()) {
do_write(std::make_shared<std::string>(std::move(buffer_)));
co_await do_async_write(std::make_shared<std::string>(std::move(buffer_)));
}
co_await do_async_write(nullptr);

do_write(nullptr);
co_await std::move(runner_task_);
co_await writer_.close();
co_await run_completion_promise_(boost::asio::use_awaitable);

co_return;
co_await writer_.close();
}

void Stream::open_object() {
Expand Down Expand Up @@ -100,8 +106,8 @@ void Stream::close_array() {
}

void Stream::write_json(const nlohmann::json& json) {
bool isEntry = !stack_.empty() && (stack_.top() == kArrayOpen || stack_.top() == kEntryWritten);
if (isEntry) {
const bool is_entry = !stack_.empty() && (stack_.top() == kArrayOpen || stack_.top() == kEntryWritten);
if (is_entry) {
if (stack_.top() != kEntryWritten) {
stack_.push(kEntryWritten);
} else {
Expand Down Expand Up @@ -230,7 +236,7 @@ void Stream::write_string(std::string_view str) {

void Stream::write(std::string_view str) {
buffer_ += str;
if (buffer_.size() >= threshold_) {
if (buffer_.size() >= buffer_capacity_) {
do_write(std::make_shared<std::string>(std::move(buffer_)));
}
}
Expand All @@ -245,51 +251,61 @@ void Stream::ensure_separator() {
}
}

void Stream::do_write(std::shared_ptr<std::string> chunk) {
if (!closed_) {
co_spawn(
io_executor_, [](auto self, auto chunk_ptr) -> Task<void> {
co_await self->channel_.async_send(boost::system::error_code(), chunk_ptr, boost::asio::use_awaitable);
}(this, std::move(chunk)),
boost::asio::detached);
void Stream::do_write(ChunkPtr chunk) {
// Stream write API will usually be called by worker threads rather than I/O contexts, but we handle both
const auto& channel_executor{channel_.get_executor()};
if (channel_executor.target<boost::asio::io_context::executor_type>()->running_in_this_thread()) [[unlikely]] {
// Delegate any back pressure to do_async_write
boost::asio::co_spawn(channel_executor, do_async_write(chunk), boost::asio::detached);
} else {
// Handle back pressure simply by retrying after a while // TODO(canepat) clever wait strategy
while (channel_.is_open()) {
if (const bool ok{channel_.try_send(boost::system::error_code(), chunk)}; ok) {
break;
}
SILK_TRACE << "Chunk size=" << (chunk ? chunk->size() : 0) << " not enqueued, worker back pressured";
std::this_thread::sleep_for(10ms);
}
}
}

Task<void> Stream::run() {
std::unique_ptr<silkworm::StopWatch> stop_watch;

uint32_t write_counter{0};
std::size_t total_send{0};
while (true) {
const auto chunk_ptr = co_await channel_.async_receive(boost::asio::use_awaitable);
if (!chunk_ptr) {
break;
}
if (!stop_watch) {
stop_watch = std::make_unique<StopWatch>(true);
Task<void> Stream::do_async_write(ChunkPtr chunk) {
try {
co_await channel_.async_send(boost::system::error_code(), chunk, boost::asio::use_awaitable);
} catch (const boost::system::system_error& se) {
if (se.code() != boost::asio::experimental::error::channel_cancelled) {
SILK_ERROR << "Stream::do_async_write unexpected system_error: " << se.what();
}
} catch (const std::exception& exception) {
SILK_ERROR << "Stream::do_async_write unexpected exception: " << exception.what();
}
}

Task<void> Stream::run() {
uint32_t total_writes{0};
std::size_t total_bytes_sent{0};
while (true) {
try {
total_send += co_await writer_.write(*chunk_ptr);
write_counter++;
const auto chunk_ptr = co_await channel_.async_receive(boost::asio::use_awaitable);
if (!chunk_ptr) {
break;
}
total_bytes_sent += co_await writer_.write(*chunk_ptr);
++total_writes;
} catch (const boost::system::system_error& se) {
if (se.code() != boost::asio::experimental::error::channel_cancelled) {
SILK_ERROR << "Stream::run unexpected system_error: " << se.what();
}
break;
} catch (const std::exception& exception) {
SILK_ERROR << "#" << std::dec << write_counter << " Exception: " << exception.what();
closed_ = true;
channel_.close();
SILK_ERROR << "Stream::run unexpected exception: " << exception.what();
break;
}
}

closed_ = true;
channel_.close();

SILK_DEBUG << "Stream::run -> total write " << std::dec << write_counter << ", total sent: " << total_send;
if (stop_watch) {
const auto [_, duration] = stop_watch->lap();
SILK_DEBUG << "Stream::run -> actual duration " << StopWatch::format(duration);
}

co_return;
SILK_TRACE << "Stream::run total_writes: " << total_writes << " total_bytes_sent: " << total_bytes_sent;
}

} // namespace silkworm::rpc::json
28 changes: 16 additions & 12 deletions silkworm/rpc/json/stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,24 @@
#include <silkworm/infra/concurrency/task.hpp>

#include <boost/asio/experimental/concurrent_channel.hpp>
#include <boost/asio/experimental/promise.hpp>
#include <boost/asio/io_context.hpp>
#include <nlohmann/json.hpp>

#include <silkworm/rpc/types/writer.hpp>

namespace silkworm::rpc::json {
static const nlohmann::json JSON_NULL = nlohmann::json::value_t::null;
static const nlohmann::json EMPTY_OBJECT = nlohmann::json::value_t::object;
static const nlohmann::json EMPTY_ARRAY = nlohmann::json::value_t::array;

//! Stream can be used to send big JSON data split into multiple fragments.
class Stream {
public:
explicit Stream(boost::asio::any_io_executor& executor, StreamWriter& writer, std::size_t threshold = kDefaultThreshold);
inline static constexpr std::size_t kDefaultCapacity{4096};

Stream(boost::asio::any_io_executor& executor, StreamWriter& writer, std::size_t buffer_capacity = kDefaultCapacity);
Stream(const Stream& stream) = delete;
Stream& operator=(const Stream&) = delete;

//! Flush any remaining data and close properly as per the underlying transport
Task<void> close();

void open_object();
Expand All @@ -63,27 +65,29 @@ class Stream {
void write_field(std::string_view name, std::double_t value);

private:
static const std::size_t kDefaultThreshold = 0x800;
using ChunkPtr = std::shared_ptr<std::string>;

void write_string(std::string_view str);
void ensure_separator();

void write(std::string_view str);
void do_write(std::shared_ptr<std::string> chunk);
void do_write(ChunkPtr chunk);
Task<void> do_async_write(ChunkPtr chunk);

//! Run loop writing channeled chunks in order
Task<void> run();

boost::asio::any_io_executor& io_executor_;

StreamWriter& writer_;
std::stack<std::uint8_t> stack_;

const std::size_t threshold_;
const std::size_t buffer_capacity_;
std::string buffer_;

bool closed_{false};
Task<void> runner_task_;
boost::asio::experimental::concurrent_channel<void(boost::system::error_code, std::shared_ptr<std::string>)> channel_;
using ChunkChannel = boost::asio::experimental::concurrent_channel<void(boost::system::error_code, ChunkPtr)>;
ChunkChannel channel_; // Chunks enqueued waiting to be written asynchronously

using RunPromise = boost::asio::experimental::promise<void(std::exception_ptr)>;
RunPromise run_completion_promise_; // Rendez-vous for run loop completion
};

} // namespace silkworm::rpc::json
56 changes: 43 additions & 13 deletions silkworm/rpc/json/stream_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,24 @@

#include <silkworm/infra/concurrency/task.hpp>

#include <boost/asio/thread_pool.hpp>
#include <catch2/catch.hpp>

#include <silkworm/infra/common/log.hpp>
#include <silkworm/rpc/test/context_test_base.hpp>

namespace silkworm::rpc::json {

struct JsonStreamTest : test::ContextTestBase {
// The following constants *must* be initialized using assignment and *not* uniform initialization syntax
static const nlohmann::json kJsonNull = nlohmann::json::value_t::null;
static const nlohmann::json kJsonEmptyObject = nlohmann::json::value_t::object;
static const nlohmann::json kJsonEmptyArray = nlohmann::json::value_t::array;

struct StreamTest : test::ContextTestBase {
};

TEST_CASE_METHOD(JsonStreamTest, "JsonStream[json]") {
ClientContextPool pool{1};
pool.start();
boost::asio::any_io_executor io_executor = pool.next_io_context().get_executor();
TEST_CASE_METHOD(StreamTest, "json::Stream writing JSON", "[rpc][json]") {
boost::asio::any_io_executor io_executor = io_context_.get_executor();

StringWriter string_writer;
ChunkWriter chunk_writer(string_writer);
Expand Down Expand Up @@ -75,10 +79,8 @@ TEST_CASE_METHOD(JsonStreamTest, "JsonStream[json]") {
}
}

TEST_CASE_METHOD(JsonStreamTest, "JsonStream calls") {
ClientContextPool pool{1};
pool.start();
boost::asio::any_io_executor io_executor = pool.next_io_context().get_executor();
TEST_CASE_METHOD(StreamTest, "json::Stream API", "[rpc][json]") {
boost::asio::any_io_executor io_executor = io_context_.get_executor();

StringWriter string_writer;
Stream stream(io_executor, string_writer);
Expand All @@ -101,7 +103,7 @@ TEST_CASE_METHOD(JsonStreamTest, "JsonStream calls") {
CHECK(string_writer.get_content() == "{}");
}
SECTION("empty object 2") {
stream.write_json(EMPTY_OBJECT);
stream.write_json(kJsonEmptyObject);
spawn_and_wait(stream.close());

CHECK(string_writer.get_content() == "{}");
Expand All @@ -114,22 +116,22 @@ TEST_CASE_METHOD(JsonStreamTest, "JsonStream calls") {
CHECK(string_writer.get_content() == "[]");
}
SECTION("empty array 2") {
stream.write_json(EMPTY_ARRAY);
stream.write_json(kJsonEmptyArray);
spawn_and_wait(stream.close());

CHECK(string_writer.get_content() == "[]");
}
SECTION("simple object 1") {
stream.open_object();
stream.write_json_field("null", JSON_NULL);
stream.write_json_field("null", kJsonNull);
stream.close_object();
spawn_and_wait(stream.close());

CHECK(string_writer.get_content() == "{\"null\":null}");
}
SECTION("simple object 2") {
stream.open_object();
stream.write_json_field("array", EMPTY_ARRAY);
stream.write_json_field("array", kJsonEmptyArray);
stream.close_object();
spawn_and_wait(stream.close());

Expand Down Expand Up @@ -326,4 +328,32 @@ TEST_CASE_METHOD(JsonStreamTest, "JsonStream calls") {
}
}

TEST_CASE_METHOD(StreamTest, "json::Stream threading", "[rpc][json]") {
boost::asio::any_io_executor io_executor = io_context_.get_executor();
constexpr std::string_view kData{R"({"test":"test"})"};

StringWriter string_writer;
Stream stream(io_executor, string_writer, 1); // tiny buffer capacity

const nlohmann::json json = R"({"test":"test"})"_json;

SECTION("using I/O context thread") {
stream.write_json(json);
CHECK_NOTHROW(spawn_and_wait(stream.close()));
CHECK(string_writer.get_content() == kData);
}

SECTION("using worker thread") {
boost::asio::thread_pool workers;
boost::asio::post(workers, [&]() {
for (int i{0}; i < 1'000; ++i) {
stream.write_json(json);
}
});
workers.join();
CHECK_NOTHROW(spawn_and_wait(stream.close()));
CHECK(string_writer.get_content().size() == kData.size() * 1'000);
}
}

} // namespace silkworm::rpc::json

0 comments on commit 25754a5

Please sign in to comment.