diff --git a/silkworm/rpc/http/request_handler.cpp b/silkworm/rpc/http/request_handler.cpp index b590c9b26e..0fdde71e57 100644 --- a/silkworm/rpc/http/request_handler.cpp +++ b/silkworm/rpc/http/request_handler.cpp @@ -29,8 +29,6 @@ namespace silkworm::rpc::http { -constexpr std::size_t kStreamBufferSize{4096}; - Task RequestHandler::handle(const std::string& content) { auto start = clock_time::now(); std::string response; @@ -165,7 +163,7 @@ Task RequestHandler::handle_request(commands::RpcApiTable::HandleStream ha co_await channel_->open_stream(); ChunkWriter chunk_writer(*channel_); - json::Stream stream(io_executor, chunk_writer, kStreamBufferSize); + json::Stream stream(io_executor, chunk_writer); co_await (rpc_api_.*handler)(request_json, stream); diff --git a/silkworm/rpc/json/stream.cpp b/silkworm/rpc/json/stream.cpp index b952d94c2c..e0a11f4bb1 100644 --- a/silkworm/rpc/json/stream.cpp +++ b/silkworm/rpc/json/stream.cpp @@ -18,51 +18,57 @@ #include #include -#include #include +#include #include #include #include +#include #include -#include namespace silkworm::rpc::json { -static std::uint8_t kObjectOpen = 1; -static std::uint8_t kArrayOpen = 2; -static std::uint8_t kFieldWritten = 3; -static std::uint8_t kEntryWritten = 4; - -static std::string kOpenBrace{"{"}; // NOLINT(runtime/string) -static std::string kCloseBrace{"}"}; // NOLINT(runtime/string) -static std::string kOpenBracket{"["}; // NOLINT(runtime/string) -static std::string kCloseBracket{"]"}; // NOLINT(runtime/string) -static std::string kFieldSeparator{","}; // NOLINT(runtime/string) -static std::string kColon{":"}; // NOLINT(runtime/string) -static std::string kDoubleQuotes{"\""}; // NOLINT(runtime/string) - -Stream::Stream(boost::asio::any_io_executor& executor, StreamWriter& writer, std::size_t threshold) - : io_executor_(executor), writer_(writer), threshold_(threshold), channel_{executor, threshold} { - buffer_.reserve(threshold); - runner_task_ = co_spawn( - executor, [](auto self) -> Task { - co_await self->run(); - }(this), - boost::asio::use_awaitable); +using namespace std::chrono_literals; + +static constexpr uint8_t kObjectOpen{1}; +static constexpr uint8_t kArrayOpen{2}; +static constexpr uint8_t kFieldWritten{3}; +static constexpr uint8_t kEntryWritten{4}; + +static constexpr std::string_view kOpenBrace{"{"}; +static constexpr std::string_view kCloseBrace{"}"}; +static constexpr std::string_view kOpenBracket{"["}; +static constexpr std::string_view kCloseBracket{"]"}; +static constexpr std::string_view kFieldSeparator{","}; +static constexpr std::string_view kColon{":"}; +static constexpr std::string_view kDoubleQuotes{"\""}; + +//! The maximum number of items enqueued in the chunk channel +static constexpr std::size_t kChannelCapacity{100}; + +Stream::Stream(boost::asio::any_io_executor& executor, StreamWriter& writer, std::size_t buffer_capacity) + : writer_(writer), + buffer_capacity_(buffer_capacity), + channel_{executor, kChannelCapacity}, + run_completion_promise_{co_spawn( + executor, [](auto self) -> Task { + co_await self->run(); + }(this), + boost::asio::experimental::use_promise)} { + buffer_.reserve(buffer_capacity_ + buffer_capacity_ / 4); // try to prevent reallocation when buffer overflows } Task Stream::close() { if (!buffer_.empty()) { - do_write(std::make_shared(std::move(buffer_))); + co_await do_async_write(std::make_shared(std::move(buffer_))); } + co_await do_async_write(nullptr); - do_write(nullptr); - co_await std::move(runner_task_); - co_await writer_.close(); + co_await run_completion_promise_(boost::asio::use_awaitable); - co_return; + co_await writer_.close(); } void Stream::open_object() { @@ -100,8 +106,8 @@ void Stream::close_array() { } void Stream::write_json(const nlohmann::json& json) { - bool isEntry = !stack_.empty() && (stack_.top() == kArrayOpen || stack_.top() == kEntryWritten); - if (isEntry) { + const bool is_entry = !stack_.empty() && (stack_.top() == kArrayOpen || stack_.top() == kEntryWritten); + if (is_entry) { if (stack_.top() != kEntryWritten) { stack_.push(kEntryWritten); } else { @@ -230,7 +236,7 @@ void Stream::write_string(std::string_view str) { void Stream::write(std::string_view str) { buffer_ += str; - if (buffer_.size() >= threshold_) { + if (buffer_.size() >= buffer_capacity_) { do_write(std::make_shared(std::move(buffer_))); } } @@ -245,51 +251,61 @@ void Stream::ensure_separator() { } } -void Stream::do_write(std::shared_ptr chunk) { - if (!closed_) { - co_spawn( - io_executor_, [](auto self, auto chunk_ptr) -> Task { - co_await self->channel_.async_send(boost::system::error_code(), chunk_ptr, boost::asio::use_awaitable); - }(this, std::move(chunk)), - boost::asio::detached); +void Stream::do_write(ChunkPtr chunk) { + // Stream write API will usually be called by worker threads rather than I/O contexts, but we handle both + const auto& channel_executor{channel_.get_executor()}; + if (channel_executor.target()->running_in_this_thread()) [[unlikely]] { + // Delegate any back pressure to do_async_write + boost::asio::co_spawn(channel_executor, do_async_write(chunk), boost::asio::detached); + } else { + // Handle back pressure simply by retrying after a while // TODO(canepat) clever wait strategy + while (channel_.is_open()) { + if (const bool ok{channel_.try_send(boost::system::error_code(), chunk)}; ok) { + break; + } + SILK_TRACE << "Chunk size=" << (chunk ? chunk->size() : 0) << " not enqueued, worker back pressured"; + std::this_thread::sleep_for(10ms); + } } } -Task Stream::run() { - std::unique_ptr stop_watch; - - uint32_t write_counter{0}; - std::size_t total_send{0}; - while (true) { - const auto chunk_ptr = co_await channel_.async_receive(boost::asio::use_awaitable); - if (!chunk_ptr) { - break; - } - if (!stop_watch) { - stop_watch = std::make_unique(true); +Task Stream::do_async_write(ChunkPtr chunk) { + try { + co_await channel_.async_send(boost::system::error_code(), chunk, boost::asio::use_awaitable); + } catch (const boost::system::system_error& se) { + if (se.code() != boost::asio::experimental::error::channel_cancelled) { + SILK_ERROR << "Stream::do_async_write unexpected system_error: " << se.what(); } + } catch (const std::exception& exception) { + SILK_ERROR << "Stream::do_async_write unexpected exception: " << exception.what(); + } +} +Task Stream::run() { + uint32_t total_writes{0}; + std::size_t total_bytes_sent{0}; + while (true) { try { - total_send += co_await writer_.write(*chunk_ptr); - write_counter++; + const auto chunk_ptr = co_await channel_.async_receive(boost::asio::use_awaitable); + if (!chunk_ptr) { + break; + } + total_bytes_sent += co_await writer_.write(*chunk_ptr); + ++total_writes; + } catch (const boost::system::system_error& se) { + if (se.code() != boost::asio::experimental::error::channel_cancelled) { + SILK_ERROR << "Stream::run unexpected system_error: " << se.what(); + } + break; } catch (const std::exception& exception) { - SILK_ERROR << "#" << std::dec << write_counter << " Exception: " << exception.what(); - closed_ = true; - channel_.close(); + SILK_ERROR << "Stream::run unexpected exception: " << exception.what(); break; } } - closed_ = true; channel_.close(); - SILK_DEBUG << "Stream::run -> total write " << std::dec << write_counter << ", total sent: " << total_send; - if (stop_watch) { - const auto [_, duration] = stop_watch->lap(); - SILK_DEBUG << "Stream::run -> actual duration " << StopWatch::format(duration); - } - - co_return; + SILK_TRACE << "Stream::run total_writes: " << total_writes << " total_bytes_sent: " << total_bytes_sent; } } // namespace silkworm::rpc::json diff --git a/silkworm/rpc/json/stream.hpp b/silkworm/rpc/json/stream.hpp index b33c1a3690..e4e991ec17 100644 --- a/silkworm/rpc/json/stream.hpp +++ b/silkworm/rpc/json/stream.hpp @@ -24,22 +24,24 @@ #include #include +#include #include #include #include namespace silkworm::rpc::json { -static const nlohmann::json JSON_NULL = nlohmann::json::value_t::null; -static const nlohmann::json EMPTY_OBJECT = nlohmann::json::value_t::object; -static const nlohmann::json EMPTY_ARRAY = nlohmann::json::value_t::array; +//! Stream can be used to send big JSON data split into multiple fragments. class Stream { public: - explicit Stream(boost::asio::any_io_executor& executor, StreamWriter& writer, std::size_t threshold = kDefaultThreshold); + inline static constexpr std::size_t kDefaultCapacity{4096}; + + Stream(boost::asio::any_io_executor& executor, StreamWriter& writer, std::size_t buffer_capacity = kDefaultCapacity); Stream(const Stream& stream) = delete; Stream& operator=(const Stream&) = delete; + //! Flush any remaining data and close properly as per the underlying transport Task close(); void open_object(); @@ -63,27 +65,29 @@ class Stream { void write_field(std::string_view name, std::double_t value); private: - static const std::size_t kDefaultThreshold = 0x800; + using ChunkPtr = std::shared_ptr; void write_string(std::string_view str); void ensure_separator(); void write(std::string_view str); - void do_write(std::shared_ptr chunk); + void do_write(ChunkPtr chunk); + Task do_async_write(ChunkPtr chunk); + //! Run loop writing channeled chunks in order Task run(); - boost::asio::any_io_executor& io_executor_; - StreamWriter& writer_; std::stack stack_; - const std::size_t threshold_; + const std::size_t buffer_capacity_; std::string buffer_; - bool closed_{false}; - Task runner_task_; - boost::asio::experimental::concurrent_channel)> channel_; + using ChunkChannel = boost::asio::experimental::concurrent_channel; + ChunkChannel channel_; // Chunks enqueued waiting to be written asynchronously + + using RunPromise = boost::asio::experimental::promise; + RunPromise run_completion_promise_; // Rendez-vous for run loop completion }; } // namespace silkworm::rpc::json diff --git a/silkworm/rpc/json/stream_test.cpp b/silkworm/rpc/json/stream_test.cpp index ae16ab627f..bd639ffd94 100644 --- a/silkworm/rpc/json/stream_test.cpp +++ b/silkworm/rpc/json/stream_test.cpp @@ -18,6 +18,7 @@ #include +#include #include #include @@ -25,13 +26,16 @@ namespace silkworm::rpc::json { -struct JsonStreamTest : test::ContextTestBase { +// The following constants *must* be initialized using assignment and *not* uniform initialization syntax +static const nlohmann::json kJsonNull = nlohmann::json::value_t::null; +static const nlohmann::json kJsonEmptyObject = nlohmann::json::value_t::object; +static const nlohmann::json kJsonEmptyArray = nlohmann::json::value_t::array; + +struct StreamTest : test::ContextTestBase { }; -TEST_CASE_METHOD(JsonStreamTest, "JsonStream[json]") { - ClientContextPool pool{1}; - pool.start(); - boost::asio::any_io_executor io_executor = pool.next_io_context().get_executor(); +TEST_CASE_METHOD(StreamTest, "json::Stream writing JSON", "[rpc][json]") { + boost::asio::any_io_executor io_executor = io_context_.get_executor(); StringWriter string_writer; ChunkWriter chunk_writer(string_writer); @@ -75,10 +79,8 @@ TEST_CASE_METHOD(JsonStreamTest, "JsonStream[json]") { } } -TEST_CASE_METHOD(JsonStreamTest, "JsonStream calls") { - ClientContextPool pool{1}; - pool.start(); - boost::asio::any_io_executor io_executor = pool.next_io_context().get_executor(); +TEST_CASE_METHOD(StreamTest, "json::Stream API", "[rpc][json]") { + boost::asio::any_io_executor io_executor = io_context_.get_executor(); StringWriter string_writer; Stream stream(io_executor, string_writer); @@ -101,7 +103,7 @@ TEST_CASE_METHOD(JsonStreamTest, "JsonStream calls") { CHECK(string_writer.get_content() == "{}"); } SECTION("empty object 2") { - stream.write_json(EMPTY_OBJECT); + stream.write_json(kJsonEmptyObject); spawn_and_wait(stream.close()); CHECK(string_writer.get_content() == "{}"); @@ -114,14 +116,14 @@ TEST_CASE_METHOD(JsonStreamTest, "JsonStream calls") { CHECK(string_writer.get_content() == "[]"); } SECTION("empty array 2") { - stream.write_json(EMPTY_ARRAY); + stream.write_json(kJsonEmptyArray); spawn_and_wait(stream.close()); CHECK(string_writer.get_content() == "[]"); } SECTION("simple object 1") { stream.open_object(); - stream.write_json_field("null", JSON_NULL); + stream.write_json_field("null", kJsonNull); stream.close_object(); spawn_and_wait(stream.close()); @@ -129,7 +131,7 @@ TEST_CASE_METHOD(JsonStreamTest, "JsonStream calls") { } SECTION("simple object 2") { stream.open_object(); - stream.write_json_field("array", EMPTY_ARRAY); + stream.write_json_field("array", kJsonEmptyArray); stream.close_object(); spawn_and_wait(stream.close()); @@ -326,4 +328,32 @@ TEST_CASE_METHOD(JsonStreamTest, "JsonStream calls") { } } +TEST_CASE_METHOD(StreamTest, "json::Stream threading", "[rpc][json]") { + boost::asio::any_io_executor io_executor = io_context_.get_executor(); + constexpr std::string_view kData{R"({"test":"test"})"}; + + StringWriter string_writer; + Stream stream(io_executor, string_writer, 1); // tiny buffer capacity + + const nlohmann::json json = R"({"test":"test"})"_json; + + SECTION("using I/O context thread") { + stream.write_json(json); + CHECK_NOTHROW(spawn_and_wait(stream.close())); + CHECK(string_writer.get_content() == kData); + } + + SECTION("using worker thread") { + boost::asio::thread_pool workers; + boost::asio::post(workers, [&]() { + for (int i{0}; i < 1'000; ++i) { + stream.write_json(json); + } + }); + workers.join(); + CHECK_NOTHROW(spawn_and_wait(stream.close())); + CHECK(string_writer.get_content().size() == kData.size() * 1'000); + } +} + } // namespace silkworm::rpc::json