Skip to content

Commit

Permalink
rpcdaemon: fix wrong chunk sequence in streaming (#1764)
Browse files Browse the repository at this point in the history
Co-authored-by: canepat <[email protected]>
  • Loading branch information
Sixtysixter and canepat authored Jan 22, 2024
1 parent ff7de5e commit df6332e
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 27 deletions.
88 changes: 64 additions & 24 deletions silkworm/rpc/json/stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

#include "stream.hpp"

#include <algorithm>
#include <array>
#include <charconv>
#include <iostream>
Expand All @@ -25,7 +24,9 @@
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/compose.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/use_future.hpp>

#include <silkworm/infra/common/log.hpp>
#include <silkworm/infra/common/stopwatch.hpp>

namespace silkworm::rpc::json {

Expand All @@ -42,12 +43,23 @@ static std::string kFieldSeparator{","}; // NOLINT(runtime/string)
static std::string kColon{":"}; // NOLINT(runtime/string)
static std::string kDoubleQuotes{"\""}; // NOLINT(runtime/string)

Stream::Stream(boost::asio::any_io_executor& executor, StreamWriter& writer, std::size_t threshold)
: io_executor_(executor), writer_(writer), threshold_(threshold), channel_{executor, threshold} {
buffer_.reserve(threshold);
runner_task_ = co_spawn(
executor, [](auto self) -> Task<void> {
co_await self->run();
}(this),
boost::asio::use_awaitable);
}

Task<void> Stream::close() {
if (!buffer_.empty()) {
co_await writer_.write(buffer_);
buffer_.clear();
do_write(std::make_shared<std::string>(std::move(buffer_)));
}

do_write(nullptr);
co_await std::move(runner_task_);
co_await writer_.close();

co_return;
Expand Down Expand Up @@ -197,19 +209,6 @@ void Stream::write_field(std::string_view name, std::uint64_t value) {
}
}

void Stream::write_field(std::string_view name, std::float_t value) {
ensure_separator();
write_string(name);
write(kColon);

std::array<char, 30> str{};
if (auto [ptr, ec] = std::to_chars(str.data(), str.data() + str.size(), value); ec == std::errc()) {
write(std::string_view(str.data(), ptr));
} else {
write("Invalid value");
}
}

void Stream::write_field(std::string_view name, std::double_t value) {
ensure_separator();
write_string(name);
Expand All @@ -232,13 +231,7 @@ void Stream::write_string(std::string_view str) {
void Stream::write(std::string_view str) {
buffer_ += str;
if (buffer_.size() >= threshold_) {
std::string to_write(buffer_);
buffer_.clear();
co_spawn(
io_executor_, [&, value = std::move(to_write)]() -> Task<void> {
co_await writer_.write(value);
},
boost::asio::detached);
do_write(std::make_shared<std::string>(std::move(buffer_)));
}
}

Expand All @@ -252,4 +245,51 @@ void Stream::ensure_separator() {
}
}

void Stream::do_write(std::shared_ptr<std::string> chunk) {
if (!closed_) {
co_spawn(
io_executor_, [](auto self, auto chunk_ptr) -> Task<void> {
co_await self->channel_.async_send(boost::system::error_code(), chunk_ptr, boost::asio::use_awaitable);
}(this, std::move(chunk)),
boost::asio::detached);
}
}

Task<void> Stream::run() {
std::unique_ptr<silkworm::StopWatch> stop_watch;

uint32_t write_counter{0};
std::size_t total_send{0};
while (true) {
const auto chunk_ptr = co_await channel_.async_receive(boost::asio::use_awaitable);
if (!chunk_ptr) {
break;
}
if (!stop_watch) {
stop_watch = std::make_unique<StopWatch>(true);
}

try {
total_send += co_await writer_.write(*chunk_ptr);
write_counter++;
} catch (const std::exception& exception) {
SILK_ERROR << "#" << std::dec << write_counter << " Exception: " << exception.what();
closed_ = true;
channel_.close();
break;
}
}

closed_ = true;
channel_.close();

SILK_DEBUG << "Stream::run -> total write " << std::dec << write_counter << ", total sent: " << total_send;
if (stop_watch) {
const auto [_, duration] = stop_watch->lap();
SILK_DEBUG << "Stream::run -> actual duration " << StopWatch::format(duration);
}

co_return;
}

} // namespace silkworm::rpc::json
13 changes: 10 additions & 3 deletions silkworm/rpc/json/stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@

#pragma once

#include <memory>
#include <stack>
#include <string>
#include <string_view>

#include <silkworm/infra/concurrency/task.hpp>

#include <boost/asio/experimental/concurrent_channel.hpp>
#include <boost/asio/io_context.hpp>
#include <nlohmann/json.hpp>

Expand All @@ -34,8 +36,7 @@ static const nlohmann::json EMPTY_ARRAY = nlohmann::json::value_t::array;

class Stream {
public:
explicit Stream(boost::asio::any_io_executor& executor, StreamWriter& writer, std::size_t threshold = kDefaultThreshold)
: io_executor_(executor), writer_(writer), threshold_(threshold) {}
explicit Stream(boost::asio::any_io_executor& executor, StreamWriter& writer, std::size_t threshold = kDefaultThreshold);
Stream(const Stream& stream) = delete;
Stream& operator=(const Stream&) = delete;

Expand All @@ -59,7 +60,6 @@ class Stream {
void write_field(std::string_view name, std::uint32_t value);
void write_field(std::string_view name, std::int64_t value);
void write_field(std::string_view name, std::uint64_t value);
void write_field(std::string_view name, std::float_t value);
void write_field(std::string_view name, std::double_t value);

private:
Expand All @@ -69,6 +69,9 @@ class Stream {
void ensure_separator();

void write(std::string_view str);
void do_write(std::shared_ptr<std::string> chunk);

Task<void> run();

boost::asio::any_io_executor& io_executor_;

Expand All @@ -77,6 +80,10 @@ class Stream {

const std::size_t threshold_;
std::string buffer_;

bool closed_{false};
Task<void> runner_task_;
boost::asio::experimental::concurrent_channel<void(boost::system::error_code, std::shared_ptr<std::string>)> channel_;
};

} // namespace silkworm::rpc::json

0 comments on commit df6332e

Please sign in to comment.