Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prototype for batch TCP receiving #4630

Draft
wants to merge 2 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 4 additions & 20 deletions nano/core_test/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ TEST (socket, disconnection_of_silent_connections)
ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_silent_connection_drop, nano::stat::dir::in));
}

TEST (socket, drop_policy)
TEST (socket, DISABLED_drop_policy)
{
nano::test::system system;

Expand Down Expand Up @@ -366,8 +366,8 @@ TEST (socket, drop_policy)
}

// This is abusing the socket class, it's interfering with the normal node lifetimes and as a result deadlocks
// TEST (socket, DISABLED_concurrent_writes)
TEST (socket, concurrent_writes)
TEST (socket, DISABLED_concurrent_writes)
// TEST (socket, concurrent_writes)
{
nano::test::system system;

Expand Down Expand Up @@ -455,25 +455,9 @@ TEST (socket, concurrent_writes)

// Execute overlapping writes from multiple threads
auto client (clients[0]);
std::vector<std::thread> client_threads;
for (int i = 0; i < client_count; i++)
{
client_threads.emplace_back ([&client, &message_count] () {
for (int i = 0; i < message_count; i++)
{
std::vector<uint8_t> buff;
buff.push_back ('A' + i);
client->async_write (nano::shared_const_buffer (std::move (buff)));
}
});
}
nano::thread_runner runner{ node->io_ctx_shared, node->logger, client_count };

ASSERT_TIMELY_EQ (10s, completed_reads, total_message_count);

for (auto & t : client_threads)
{
t.join ();
}
}

/**
Expand Down
2 changes: 1 addition & 1 deletion nano/node/election.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ void nano::election::confirm_once (nano::unique_lock<nano::mutex> & lock_a)

lock_a.unlock ();

node.background ([node_l = node.shared (), status_l, confirmation_action_l = confirmation_action] () {
node.workers.push_task ([node_l = node.shared (), status_l, confirmation_action_l = confirmation_action] () {
node_l->process_confirmed (status_l);

if (confirmation_action_l)
Expand Down
127 changes: 102 additions & 25 deletions nano/node/transport/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include <nano/node/transport/socket.hpp>
#include <nano/node/transport/transport.hpp>

#include <boost/asio/use_future.hpp>
#include <boost/exception/detail/exception_ptr.hpp>
#include <boost/format.hpp>

#include <cstdint>
Expand Down Expand Up @@ -36,8 +38,11 @@ nano::transport::socket::socket (nano::node & node_a, boost::asio::ip::tcp::sock
last_receive_time_or_init{ nano::seconds_since_epoch () },
default_timeout{ node_a.config.tcp_io_timeout },
silent_connection_tolerance_time{ node_a.network_params.network.silent_connection_tolerance_time },
read_buffer{ 16384 },
buffer_condition{ strand },
max_queue_size{ max_queue_size_a }
{
os_buffer.insert (os_buffer.begin (), 16384, 0);
}

nano::transport::socket::~socket ()
Expand Down Expand Up @@ -88,6 +93,7 @@ void nano::transport::socket::async_connect (nano::tcp_endpoint const & endpoint
node_l->observers.socket_connected.notify (*this_l);
}
callback (ec);
this_l->begin_read_loop ();
}));
});
}
Expand All @@ -101,31 +107,9 @@ void nano::transport::socket::async_read (std::shared_ptr<std::vector<uint8_t>>
if (!closed)
{
set_default_timeout ();
boost::asio::post (strand, [this_l = shared_from_this (), buffer_a, callback = std::move (callback_a), size_a] () mutable {
boost::asio::async_read (this_l->tcp_socket, boost::asio::buffer (buffer_a->data (), size_a),
boost::asio::bind_executor (this_l->strand,
[this_l, buffer_a, cbk = std::move (callback)] (boost::system::error_code const & ec, std::size_t size_a) {
debug_assert (this_l->strand.running_in_this_thread ());

auto node_l = this_l->node_w.lock ();
if (!node_l)
{
return;
}

if (ec)
{
node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_read_error, nano::stat::dir::in);
this_l->close ();
}
else
{
node_l->stats.add (nano::stat::type::traffic_tcp, nano::stat::detail::all, nano::stat::dir::in, size_a);
this_l->set_last_completion ();
this_l->set_last_receive_time ();
}
cbk (ec, size_a);
}));
boost::asio::post (strand, [this_l = shared_from_this (), buffer_a, callback = std::move (callback_a), size_a] () {
this_l->requests.emplace_back (buffer_a, size_a, callback);
this_l->service_requests_maybe ();
});
}
}
Expand Down Expand Up @@ -389,6 +373,15 @@ void nano::transport::socket::close_internal ()
tcp_socket.shutdown (boost::asio::ip::tcp::socket::shutdown_both, ec);
tcp_socket.close (ec);

// FIXME Encapsulate or simplify this
for (auto const & request : requests)
{
node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_read_error, nano::stat::dir::in);
auto const & [buffer, size, callback] = request;
callback (boost::asio::error::operation_aborted, 0);
}
requests.clear ();

if (ec)
{
node_l->stats.inc (nano::stat::type::socket, nano::stat::detail::error_socket_close);
Expand All @@ -408,6 +401,90 @@ nano::tcp_endpoint nano::transport::socket::local_endpoint () const
return local;
}

void nano::transport::socket::begin_read_loop ()
{
boost::asio::co_spawn (
strand, [this_l = shared_from_this ()] () -> asio::awaitable<void> {
co_await this_l->read_loop ();
},
// FIXME This should probably clean up in a structured way by getting a future for this loop and wait for it similar to a thread::join()
asio::detached);
}

boost::asio::awaitable<void> nano::transport::socket::read_loop ()
{
debug_assert (strand.running_in_this_thread ());

try
{
while (!closed)
{
// Wait until there is data available to read in the socket
co_await tcp_socket.async_wait (boost::asio::ip::tcp::socket::wait_read, boost::asio::use_awaitable);
if (read_buffer.capacity () == read_buffer.size ())
{
// Wait until there is writable space
co_await buffer_condition.async_wait (boost::asio::use_awaitable);
}

// Read up to as much data from the OS as we can hold in the write section
auto buffer = boost::asio::buffer (os_buffer.data (), read_buffer.capacity () - read_buffer.size ());
// Pick up multiple messages in a single syscall
size_t amount_read = co_await tcp_socket.async_read_some (buffer, boost::asio::use_awaitable);

// FIXME This is the undesired copy
std::transform (os_buffer.begin (), os_buffer.begin () + amount_read, std::back_inserter (read_buffer), [] (uint8_t val) { return val; });

service_requests_maybe ();
}
}
catch (boost::system::system_error const & e)
{
close ();
}
}

void nano::transport::socket::service_requests_maybe ()
{
debug_assert (strand.running_in_this_thread ());

while (!requests.empty ())
{
auto front = requests.front ();
auto const & [buffer, size, callback] = front;
auto available = read_buffer.size ();
if (available < size)
{
// Once read requests can't be serviced with enough readable data, we're done
return;
}
std::copy (read_buffer.begin (), read_buffer.begin () + size, buffer->begin ());
if (read_buffer.capacity () == read_buffer.size ())
{
buffer_condition.cancel ();
}

// FIXME having valid iterators will be needed when merging read_buffer and buffer'
read_buffer.erase (read_buffer.begin (), read_buffer.begin () + size);

// FIXME Execute callback outside this socket's strand if possible
boost::asio::post (strand, [this_l = shared_from_this (), front] () {
auto const & [buffer, size, callback] = front;
auto node_l = this_l->node_w.lock ();
if (!node_l)
{
return;
}

node_l->stats.add (nano::stat::type::traffic_tcp, nano::stat::detail::all, nano::stat::dir::in, size);
this_l->set_last_completion ();
this_l->set_last_receive_time ();
callback (boost::system::error_code{}, size);
});
requests.pop_front ();
}
}

void nano::transport::socket::operator() (nano::object_stream & obs) const
{
obs.write ("remote_endpoint", remote_endpoint ());
Expand Down
34 changes: 34 additions & 0 deletions nano/node/transport/socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
#include <nano/node/transport/common.hpp>
#include <nano/node/transport/traffic_type.hpp>

#include <boost/asio/awaitable.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/circular_buffer.hpp>

#include <chrono>
#include <map>
#include <memory>
Expand Down Expand Up @@ -194,10 +198,40 @@ class socket final : public std::enable_shared_from_this<socket>
void ongoing_checkup ();
void read_impl (std::shared_ptr<std::vector<uint8_t>> const & data_a, std::size_t size_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a);

// FIXME: The read loop is being launched separately.
// socket::start is called before the socket descriptor is set when in client mode
// This should happen internally somehow
void begin_read_loop ();

private:
nano::transport::socket_type type_m{ socket_type::undefined };
nano::transport::socket_endpoint endpoint_type_m;

// Read buffering operations
private:
using request_t = std::tuple<std::shared_ptr<std::vector<uint8_t>>, size_t, std::function<void (boost::system::error_code const & error, std::size_t bytes_transferred)>>;
boost::asio::awaitable<void> read_loop ();

// Service all waiting requests or until there is no more readable data
void service_requests_maybe ();
std::deque<request_t> requests;

// FIXME: These two buffers should be merged because it produces an extra data copy
// Need two regions
// - The region writable by the operasing system in service on an os async_read call
// - The region the region available for socket::async_read requests to obtain data
// Both may be full or empty independently
// Eliminating the copy requires both regions to overlap
boost::circular_buffer<uint8_t> read_buffer;
std::vector<uint8_t> os_buffer;

// FIXME: This is a hack of a condition_variable
// If the buffer is full, e.g. the writable region is empty so no data can be read
// Getting free space requires a call to socket::async_read
// We cannot block on a condition_variable::wait since this operation happens inside a coroutine
// Maybe wrap as a nano:: type
boost::asio::steady_timer buffer_condition;

public:
std::size_t const max_queue_size;

Expand Down
1 change: 1 addition & 0 deletions nano/node/transport/tcp_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ auto nano::transport::tcp_listener::accept_one (asio::ip::tcp::socket raw_socket

socket->set_timeout (node.network_params.network.idle_timeout);
socket->start ();
socket->begin_read_loop ();
server->start ();

connection_accepted.notify (socket, server);
Expand Down
Loading