Skip to content

Commit

Permalink
Pass memory allocator via session, channel, proxy, distributor.
Browse files Browse the repository at this point in the history
  • Loading branch information
evoskuil committed Jul 28, 2024
1 parent 0ceaada commit 41546eb
Show file tree
Hide file tree
Showing 11 changed files with 72 additions and 27 deletions.
3 changes: 2 additions & 1 deletion include/bitcoin/network/net/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <bitcoin/network/messages/messages.hpp>
#include <bitcoin/network/net/broadcaster.hpp>
#include <bitcoin/network/net/deadline.hpp>
#include <bitcoin/network/net/memory.hpp>
#include <bitcoin/network/net/proxy.hpp>
#include <bitcoin/network/settings.hpp>

Expand Down Expand Up @@ -73,7 +74,7 @@ class BCT_API channel
}

/// Construct a channel to encapsulated and communicate on the socket.
channel(const logger& log, const socket::ptr& socket,
channel(memory& memory, const logger& log, const socket::ptr& socket,
const settings& settings, uint64_t identifier=zero,
bool quiet=true) NOEXCEPT;

Expand Down
16 changes: 12 additions & 4 deletions include/bitcoin/network/net/distributor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <bitcoin/network/async/async.hpp>
#include <bitcoin/network/define.hpp>
#include <bitcoin/network/messages/messages.hpp>
#include <bitcoin/network/net/memory.hpp>

namespace libbitcoin {
namespace network {
Expand Down Expand Up @@ -84,7 +85,7 @@ class BCT_API distributor
DEFINE_SUBSCRIBER(version_acknowledge);

/// Create an instance of this class.
distributor(asio::strand& strand) NOEXCEPT;
distributor(memory& memory, asio::strand& strand) NOEXCEPT;

/// If stopped, handler is invoked with error::subscriber_stopped.
/// If key exists, handler is invoked with error::subscriber_exists.
Expand Down Expand Up @@ -115,9 +116,9 @@ class BCT_API distributor
if (!is_zero(subscriber.size()))
{
// Subscribers are notified only with stop code or error::success.
const auto message = messages::deserialize<Message>(data, version);
if (!message) return error::invalid_message;
subscriber.notify(error::success, message);
const auto ptr = messages::deserialize<Message>(data, version);
if (!ptr) return error::invalid_message;
subscriber.notify(error::success, ptr);
}

return error::success;
Expand Down Expand Up @@ -191,8 +192,15 @@ class BCT_API distributor
DECLARE_SUBSCRIBER(transaction);
DECLARE_SUBSCRIBER(version);
DECLARE_SUBSCRIBER(version_acknowledge);

memory& memory_;
};

template <>
code distributor::do_notify<messages::block>(
distributor::block_subscriber& subscriber, uint32_t version,
const system::data_chunk& data) NOEXCEPT;

#undef SUBSCRIBER
#undef SUBSCRIBER_TYPE
#undef DEFINE_SUBSCRIBER
Expand Down
3 changes: 2 additions & 1 deletion include/bitcoin/network/net/proxy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <bitcoin/network/define.hpp>
#include <bitcoin/network/messages/messages.hpp>
#include <bitcoin/network/net/distributor.hpp>
#include <bitcoin/network/net/memory.hpp>
#include <bitcoin/network/net/socket.hpp>

namespace libbitcoin {
Expand Down Expand Up @@ -122,7 +123,7 @@ class BCT_API proxy
const config::address& address() const NOEXCEPT;

protected:
proxy(const socket::ptr& socket) NOEXCEPT;
proxy(memory& memory, const socket::ptr& socket) NOEXCEPT;

/// Property values provided to the proxy.
virtual size_t minimum_buffer() const NOEXCEPT = 0;
Expand Down
5 changes: 3 additions & 2 deletions src/net/channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <bitcoin/network/log/log.hpp>
#include <bitcoin/network/messages/messages.hpp>
#include <bitcoin/network/net/deadline.hpp>
#include <bitcoin/network/net/memory.hpp>
#include <bitcoin/network/net/proxy.hpp>
#include <bitcoin/network/settings.hpp>

Expand All @@ -53,9 +54,9 @@ inline deadline::ptr expiration(const logger& log, asio::strand& strand,
return timeout(log, strand, pseudo_random::duration(span));
}

channel::channel(const logger& log, const socket::ptr& socket,
channel::channel(memory& memory, const logger& log, const socket::ptr& socket,
const settings& settings, uint64_t identifier, bool quiet) NOEXCEPT
: proxy(socket),
: proxy(memory, socket),
quiet_(quiet),
settings_(settings),
identifier_(identifier),
Expand Down
21 changes: 19 additions & 2 deletions src/net/distributor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <bitcoin/system.hpp>
#include <bitcoin/network/async/async.hpp>
#include <bitcoin/network/messages/messages.hpp>
#include <bitcoin/network/net/memory.hpp>

namespace libbitcoin {
namespace network {
Expand All @@ -33,7 +34,7 @@ using namespace system;
#define CASE_NOTIFY(name) case messages::identifier::name: \
return do_notify<messages::name>(SUBSCRIBER(name), version, data)

distributor::distributor(asio::strand& strand) NOEXCEPT
distributor::distributor(memory& memory, asio::strand& strand) NOEXCEPT
: MAKE_SUBSCRIBER(address),
MAKE_SUBSCRIBER(alert),
MAKE_SUBSCRIBER(block),
Expand Down Expand Up @@ -66,7 +67,8 @@ distributor::distributor(asio::strand& strand) NOEXCEPT
MAKE_SUBSCRIBER(send_headers),
MAKE_SUBSCRIBER(transaction),
MAKE_SUBSCRIBER(version),
MAKE_SUBSCRIBER(version_acknowledge)
MAKE_SUBSCRIBER(version_acknowledge),
memory_(memory)
{
}

Expand Down Expand Up @@ -152,6 +154,21 @@ void distributor::stop(const code& ec) NOEXCEPT
STOP_SUBSCRIBER(version_acknowledge);
}

template <>
code distributor::do_notify<messages::block>(
distributor::block_subscriber& subscriber, uint32_t version,
const system::data_chunk& data) NOEXCEPT
{
if (!is_zero(subscriber.size()))
{
const auto ptr = messages::block::deserialize(memory_, version, data);
if (!ptr) return error::invalid_message;
subscriber.notify(error::success, ptr);
}

return error::success;
}

#undef SUBSCRIBER
#undef MAKE_SUBSCRIBER
#undef CASE_NOTIFY
Expand Down
5 changes: 3 additions & 2 deletions src/net/proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <bitcoin/network/async/async.hpp>
#include <bitcoin/network/define.hpp>
#include <bitcoin/network/log/log.hpp>
#include <bitcoin/network/net/memory.hpp>

namespace libbitcoin {
namespace network {
Expand All @@ -47,10 +48,10 @@ static constexpr uint32_t https_magic = 0x02010316;
// This is created in a started state and must be stopped, as the subscribers
// assert if not stopped. Subscribers may hold protocols even if the service
// is not started.
proxy::proxy(const socket::ptr& socket) NOEXCEPT
proxy::proxy(memory& memory, const socket::ptr& socket) NOEXCEPT
: socket_(socket),
stop_subscriber_(socket->strand()),
distributor_(socket->strand()),
distributor_(memory, socket->strand()),
reporter(socket->log)
{
}
Expand Down
5 changes: 4 additions & 1 deletion src/sessions/session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -443,9 +443,12 @@ channel::ptr session::create_channel(const socket::ptr& socket,
{
BC_ASSERT_MSG(stranded(), "strand");

// Default message memory resource, override create_channel to replace.
static memory memory{};

// Channel id must be created using create_key().
const auto id = create_key();
return std::make_shared<channel>(log, socket, settings(), id, quiet);
return std::make_shared<channel>(memory, log, socket, settings(), id, quiet);
}

// At one object/session/ns, this overflows in ~585 years (and handled).
Expand Down
6 changes: 4 additions & 2 deletions test/net/channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,13 @@ class channel_accessor

BOOST_AUTO_TEST_CASE(channel__stopped__default__false)
{
memory memory{};
const logger log{};
threadpool pool(1);
asio::strand strand(pool.service().get_executor());
const settings set(bc::system::chain::selection::mainnet);
auto socket_ptr = std::make_shared<network::socket>(log, pool.service());
auto channel_ptr = std::make_shared<channel>(log, socket_ptr, set, 42);
auto channel_ptr = std::make_shared<channel>(memory, log, socket_ptr, set, 42);
BOOST_REQUIRE(!channel_ptr->stopped());

// Stop completion is asynchronous.
Expand All @@ -70,12 +71,13 @@ inline size_t payload_maximum(const settings& settings)

BOOST_AUTO_TEST_CASE(channel__properties__default__expected)
{
memory memory{};
const logger log{};
threadpool pool(1);
asio::strand strand(pool.service().get_executor());
const settings set(bc::system::chain::selection::mainnet);
auto socket_ptr = std::make_shared<network::socket>(log, pool.service());
auto channel_ptr = std::make_shared<channel_accessor>(log, socket_ptr, set, 42);
auto channel_ptr = std::make_shared<channel_accessor>(memory, log, socket_ptr, set, 42);

BOOST_REQUIRE(!channel_ptr->address());
BOOST_REQUIRE_NE(channel_ptr->nonce(), 0u);
Expand Down
15 changes: 10 additions & 5 deletions test/net/distributor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ BOOST_AUTO_TEST_SUITE(distributor_tests)

BOOST_AUTO_TEST_CASE(distributor__construct__stop__stops)
{
memory memory{};
threadpool pool(2);
asio::strand strand(pool.service().get_executor());
distributor instance(strand);
distributor instance(memory, strand);

std::promise<bool> promise;
boost::asio::post(strand, [&]() NOEXCEPT
Expand All @@ -40,9 +41,10 @@ BOOST_AUTO_TEST_CASE(distributor__construct__stop__stops)

BOOST_AUTO_TEST_CASE(distributor__subscribe__stop__expected_code)
{
memory memory{};
threadpool pool(2);
asio::strand strand(pool.service().get_executor());
distributor instance(strand);
distributor instance(memory, strand);
constexpr auto expected_ec = error::invalid_magic;
auto result = true;

Expand Down Expand Up @@ -71,9 +73,10 @@ BOOST_AUTO_TEST_CASE(distributor__subscribe__stop__expected_code)

BOOST_AUTO_TEST_CASE(distributor__notify__invalid_message__no_notification)
{
memory memory{};
threadpool pool(2);
asio::strand strand(pool.service().get_executor());
distributor instance(strand);
distributor instance(memory, strand);
constexpr auto expected_ec = error::invalid_magic;
auto result = true;

Expand Down Expand Up @@ -113,9 +116,10 @@ BOOST_AUTO_TEST_CASE(distributor__notify__invalid_message__no_notification)

BOOST_AUTO_TEST_CASE(distributor__notify__valid_message_invalid_version__no_notification)
{
memory memory{};
threadpool pool(2);
asio::strand strand(pool.service().get_executor());
distributor instance(strand);
distributor instance(memory, strand);
constexpr auto expected_ec = error::invalid_magic;
auto result = true;

Expand Down Expand Up @@ -153,9 +157,10 @@ BOOST_AUTO_TEST_CASE(distributor__notify__valid_message_invalid_version__no_noti

BOOST_AUTO_TEST_CASE(distributor__notify__valid_nonced_ping__expected_notification)
{
memory memory{};
threadpool pool(2);
asio::strand strand(pool.service().get_executor());
distributor instance(strand);
distributor instance(memory, strand);
constexpr uint64_t expected_nonce = 42;
constexpr auto expected_ec = error::invalid_magic;
auto result = true;
Expand Down
5 changes: 3 additions & 2 deletions test/net/proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ class mock_proxy
proxy::subscribe_stop(std::move(handler));
}

mock_proxy(socket::ptr socket) NOEXCEPT
: proxy(socket)
mock_proxy(const socket::ptr& socket) NOEXCEPT
: proxy(memory_, socket)
{
}

Expand Down Expand Up @@ -88,6 +88,7 @@ class mock_proxy
}

private:
mutable memory memory_{};
mutable bool stop_{ false };
mutable std::promise<code> stopped_;
};
Expand Down
15 changes: 10 additions & 5 deletions test/sessions/session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -579,14 +579,15 @@ BOOST_AUTO_TEST_CASE(session__start__stop__success)

BOOST_AUTO_TEST_CASE(session__start_channel__session_not_started__handlers_service_stopped_channel_service_stopped_not_pent_or_stored)
{
memory memory{};
const logger log{};
settings set(selection::mainnet);
mock_p2p net(set, log);
auto session = std::make_shared<mock_session>(net, 1);
BOOST_REQUIRE(session->stopped());

const auto socket = std::make_shared<network::socket>(net.log, net.service());
const auto channel = std::make_shared<mock_channel>(net.log, socket, session->settings(), 42);
const auto channel = std::make_shared<mock_channel>(memory, net.log, socket, session->settings(), 42);

std::promise<code> started_channel;
std::promise<code> stopped_channel;
Expand Down Expand Up @@ -624,6 +625,7 @@ BOOST_AUTO_TEST_CASE(session__start_channel__session_not_started__handlers_servi

BOOST_AUTO_TEST_CASE(session__start_channel__channel_not_started__handlers_channel_stopped_channel_channel_stopped_stored_and_not_counted)
{
memory memory{};
const logger log{};
settings set(selection::mainnet);
mock_p2p net(set, log);
Expand All @@ -641,7 +643,7 @@ BOOST_AUTO_TEST_CASE(session__start_channel__channel_not_started__handlers_chann
BOOST_REQUIRE_EQUAL(started.get_future().get(), error::success);

const auto socket = std::make_shared<network::socket>(net.log, net.service());
const auto channel = std::make_shared<mock_channel>(net.log, socket, session->settings(), 42);
const auto channel = std::make_shared<mock_channel>(memory, net.log, socket, session->settings(), 42);

// Stop the channel (started by default).
std::promise<bool> unstarted_channel;
Expand Down Expand Up @@ -701,6 +703,7 @@ BOOST_AUTO_TEST_CASE(session__start_channel__channel_not_started__handlers_chann

BOOST_AUTO_TEST_CASE(session__start_channel__all_started__handlers_expected_channel_service_stopped_stored_and_counted)
{
memory memory{};
const logger log{};
settings set(selection::mainnet);
set.host_pool_capacity = 0;
Expand Down Expand Up @@ -730,7 +733,7 @@ BOOST_AUTO_TEST_CASE(session__start_channel__all_started__handlers_expected_chan
BOOST_REQUIRE_EQUAL(started.get_future().get(), error::success);

const auto socket = std::make_shared<network::socket>(net.log, net.service());
const auto channel = std::make_shared<mock_channel>(net.log, socket, session->settings(), 42);
const auto channel = std::make_shared<mock_channel>(memory, net.log, socket, session->settings(), 42);

std::promise<code> started_channel;
std::promise<code> stopped_channel;
Expand Down Expand Up @@ -781,6 +784,7 @@ BOOST_AUTO_TEST_CASE(session__start_channel__all_started__handlers_expected_chan

BOOST_AUTO_TEST_CASE(session__start_channel__outbound_all_started__handlers_expected_channel_success_stored_and_counted)
{
memory memory{};
const logger log{};
settings set(selection::mainnet);
set.host_pool_capacity = 0;
Expand Down Expand Up @@ -811,7 +815,7 @@ BOOST_AUTO_TEST_CASE(session__start_channel__outbound_all_started__handlers_expe
BOOST_REQUIRE_EQUAL(started.get_future().get(), error::success);

const auto socket = std::make_shared<network::socket>(net.log, net.service());
const auto channel = std::make_shared<mock_channel_no_read>(net.log, socket, session->settings(), 42);
const auto channel = std::make_shared<mock_channel_no_read>(memory, net.log, socket, session->settings(), 42);

std::promise<code> started_channel;
std::promise<code> stopped_channel;
Expand Down Expand Up @@ -864,6 +868,7 @@ BOOST_AUTO_TEST_CASE(session__start_channel__outbound_all_started__handlers_expe

BOOST_AUTO_TEST_CASE(session__start_channel__inbound_all_started__handlers_expected_channel_success_not_stored_and_counted)
{
memory memory{};
const logger log{};
settings set(selection::mainnet);
set.host_pool_capacity = 0;
Expand Down Expand Up @@ -894,7 +899,7 @@ BOOST_AUTO_TEST_CASE(session__start_channel__inbound_all_started__handlers_expec
BOOST_REQUIRE_EQUAL(started.get_future().get(), error::success);

const auto socket = std::make_shared<network::socket>(net.log, net.service());
const auto channel = std::make_shared<mock_channel_no_read>(net.log, socket, session->settings(), 42);
const auto channel = std::make_shared<mock_channel_no_read>(memory, net.log, socket, session->settings(), 42);

std::promise<code> started_channel;
std::promise<code> stopped_channel;
Expand Down

0 comments on commit 41546eb

Please sign in to comment.