Skip to content

Commit

Permalink
Merge pull request #420 from evoskuil/master
Browse files Browse the repository at this point in the history
Pass memory allocator via session, channel, proxy, distributor.
  • Loading branch information
evoskuil authored Jul 30, 2024
2 parents 9fea514 + 41546eb commit a48593f
Show file tree
Hide file tree
Showing 14 changed files with 86 additions and 37 deletions.
3 changes: 2 additions & 1 deletion include/bitcoin/network/messages/block.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ struct BCT_API block
static const uint32_t version_minimum;
static const uint32_t version_maximum;

static cptr deserialize(auto& memory, uint32_t version,
const system::data_chunk& data, bool witness=true) NOEXCEPT;
static cptr deserialize(uint32_t version, const system::data_chunk& data,
bool witness=true) NOEXCEPT;
static block deserialize(uint32_t version, system::reader& source,
Expand All @@ -51,7 +53,6 @@ struct BCT_API block
size_t size(uint32_t version, bool witness) const NOEXCEPT;

system::chain::block::cptr block_ptr;
mutable size_t cached_size{};
};

} // namespace messages
Expand Down
3 changes: 2 additions & 1 deletion include/bitcoin/network/net/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <bitcoin/network/messages/messages.hpp>
#include <bitcoin/network/net/broadcaster.hpp>
#include <bitcoin/network/net/deadline.hpp>
#include <bitcoin/network/net/memory.hpp>
#include <bitcoin/network/net/proxy.hpp>
#include <bitcoin/network/settings.hpp>

Expand Down Expand Up @@ -73,7 +74,7 @@ class BCT_API channel
}

/// Construct a channel to encapsulated and communicate on the socket.
channel(const logger& log, const socket::ptr& socket,
channel(memory& memory, const logger& log, const socket::ptr& socket,
const settings& settings, uint64_t identifier=zero,
bool quiet=true) NOEXCEPT;

Expand Down
16 changes: 12 additions & 4 deletions include/bitcoin/network/net/distributor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <bitcoin/network/async/async.hpp>
#include <bitcoin/network/define.hpp>
#include <bitcoin/network/messages/messages.hpp>
#include <bitcoin/network/net/memory.hpp>

namespace libbitcoin {
namespace network {
Expand Down Expand Up @@ -84,7 +85,7 @@ class BCT_API distributor
DEFINE_SUBSCRIBER(version_acknowledge);

/// Create an instance of this class.
distributor(asio::strand& strand) NOEXCEPT;
distributor(memory& memory, asio::strand& strand) NOEXCEPT;

/// If stopped, handler is invoked with error::subscriber_stopped.
/// If key exists, handler is invoked with error::subscriber_exists.
Expand Down Expand Up @@ -115,9 +116,9 @@ class BCT_API distributor
if (!is_zero(subscriber.size()))
{
// Subscribers are notified only with stop code or error::success.
const auto message = messages::deserialize<Message>(data, version);
if (!message) return error::invalid_message;
subscriber.notify(error::success, message);
const auto ptr = messages::deserialize<Message>(data, version);
if (!ptr) return error::invalid_message;
subscriber.notify(error::success, ptr);
}

return error::success;
Expand Down Expand Up @@ -191,8 +192,15 @@ class BCT_API distributor
DECLARE_SUBSCRIBER(transaction);
DECLARE_SUBSCRIBER(version);
DECLARE_SUBSCRIBER(version_acknowledge);

memory& memory_;
};

template <>
code distributor::do_notify<messages::block>(
distributor::block_subscriber& subscriber, uint32_t version,
const system::data_chunk& data) NOEXCEPT;

#undef SUBSCRIBER
#undef SUBSCRIBER_TYPE
#undef DEFINE_SUBSCRIBER
Expand Down
2 changes: 1 addition & 1 deletion include/bitcoin/network/net/memory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ class BCT_API memory
arena* get_arena() NOEXCEPT;
retainer::ptr get_retainer() NOEXCEPT;


private:
// These are thread safe.
arena* arena_;
std::shared_mutex remap_mutex_{};
};
Expand Down
3 changes: 2 additions & 1 deletion include/bitcoin/network/net/proxy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <bitcoin/network/define.hpp>
#include <bitcoin/network/messages/messages.hpp>
#include <bitcoin/network/net/distributor.hpp>
#include <bitcoin/network/net/memory.hpp>
#include <bitcoin/network/net/socket.hpp>

namespace libbitcoin {
Expand Down Expand Up @@ -122,7 +123,7 @@ class BCT_API proxy
const config::address& address() const NOEXCEPT;

protected:
proxy(const socket::ptr& socket) NOEXCEPT;
proxy(memory& memory, const socket::ptr& socket) NOEXCEPT;

/// Property values provided to the proxy.
virtual size_t minimum_buffer() const NOEXCEPT = 0;
Expand Down
19 changes: 11 additions & 8 deletions src/messages/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ typename block::cptr block::deserialize(uint32_t version,
const system::data_chunk& data, bool witness) NOEXCEPT
{
static memory memory{};
return deserialize(memory, version, data, witness);
}

// static
typename block::cptr block::deserialize(auto& memory, uint32_t version,
const system::data_chunk& data, bool witness) NOEXCEPT
{
system::istream source{ data };
system::byte_reader reader{ source, memory.get_arena() };

Expand Down Expand Up @@ -86,6 +93,7 @@ typename block::cptr block::deserialize(uint32_t version,
std::advance(start, full);
}

// WARNING: retainer does not track objects shared from block (e.g. tx).
message->block_ptr->set_retainer(memory.get_retainer());
return message;
}
Expand All @@ -97,10 +105,7 @@ block block::deserialize(uint32_t version, reader& source,
if (version < version_minimum || version > version_maximum)
source.invalidate();

const auto start = source.get_read_position();
const auto block_ptr = to_shared<chain::block>(source, witness);
const auto size = source.get_read_position() - start;
return { block_ptr, size };
return { to_shared<chain::block>(source, witness) };
}

bool block::serialize(uint32_t version,
Expand All @@ -116,14 +121,12 @@ void block::serialize(uint32_t BC_DEBUG_ONLY(version), writer& sink,
bool witness) const NOEXCEPT
{
BC_DEBUG_ONLY(const auto bytes = size(version, witness);)
////BC_DEBUG_ONLY(const auto start = sink.get_write_position();)
const auto start = sink.get_write_position();
BC_DEBUG_ONLY(const auto start = sink.get_write_position();)

if (block_ptr)
block_ptr->to_data(sink, witness);

cached_size = sink.get_write_position() - start;
BC_ASSERT(sink && cached_size == bytes);
BC_ASSERT(sink && (sink.get_write_position() - start) == bytes);
}

size_t block::size(uint32_t, bool witness) const NOEXCEPT
Expand Down
5 changes: 3 additions & 2 deletions src/net/channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <bitcoin/network/log/log.hpp>
#include <bitcoin/network/messages/messages.hpp>
#include <bitcoin/network/net/deadline.hpp>
#include <bitcoin/network/net/memory.hpp>
#include <bitcoin/network/net/proxy.hpp>
#include <bitcoin/network/settings.hpp>

Expand All @@ -53,9 +54,9 @@ inline deadline::ptr expiration(const logger& log, asio::strand& strand,
return timeout(log, strand, pseudo_random::duration(span));
}

channel::channel(const logger& log, const socket::ptr& socket,
channel::channel(memory& memory, const logger& log, const socket::ptr& socket,
const settings& settings, uint64_t identifier, bool quiet) NOEXCEPT
: proxy(socket),
: proxy(memory, socket),
quiet_(quiet),
settings_(settings),
identifier_(identifier),
Expand Down
21 changes: 19 additions & 2 deletions src/net/distributor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <bitcoin/system.hpp>
#include <bitcoin/network/async/async.hpp>
#include <bitcoin/network/messages/messages.hpp>
#include <bitcoin/network/net/memory.hpp>

namespace libbitcoin {
namespace network {
Expand All @@ -33,7 +34,7 @@ using namespace system;
#define CASE_NOTIFY(name) case messages::identifier::name: \
return do_notify<messages::name>(SUBSCRIBER(name), version, data)

distributor::distributor(asio::strand& strand) NOEXCEPT
distributor::distributor(memory& memory, asio::strand& strand) NOEXCEPT
: MAKE_SUBSCRIBER(address),
MAKE_SUBSCRIBER(alert),
MAKE_SUBSCRIBER(block),
Expand Down Expand Up @@ -66,7 +67,8 @@ distributor::distributor(asio::strand& strand) NOEXCEPT
MAKE_SUBSCRIBER(send_headers),
MAKE_SUBSCRIBER(transaction),
MAKE_SUBSCRIBER(version),
MAKE_SUBSCRIBER(version_acknowledge)
MAKE_SUBSCRIBER(version_acknowledge),
memory_(memory)
{
}

Expand Down Expand Up @@ -152,6 +154,21 @@ void distributor::stop(const code& ec) NOEXCEPT
STOP_SUBSCRIBER(version_acknowledge);
}

template <>
code distributor::do_notify<messages::block>(
distributor::block_subscriber& subscriber, uint32_t version,
const system::data_chunk& data) NOEXCEPT
{
if (!is_zero(subscriber.size()))
{
const auto ptr = messages::block::deserialize(memory_, version, data);
if (!ptr) return error::invalid_message;
subscriber.notify(error::success, ptr);
}

return error::success;
}

#undef SUBSCRIBER
#undef MAKE_SUBSCRIBER
#undef CASE_NOTIFY
Expand Down
5 changes: 3 additions & 2 deletions src/net/proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <bitcoin/network/async/async.hpp>
#include <bitcoin/network/define.hpp>
#include <bitcoin/network/log/log.hpp>
#include <bitcoin/network/net/memory.hpp>

namespace libbitcoin {
namespace network {
Expand All @@ -47,10 +48,10 @@ static constexpr uint32_t https_magic = 0x02010316;
// This is created in a started state and must be stopped, as the subscribers
// assert if not stopped. Subscribers may hold protocols even if the service
// is not started.
proxy::proxy(const socket::ptr& socket) NOEXCEPT
proxy::proxy(memory& memory, const socket::ptr& socket) NOEXCEPT
: socket_(socket),
stop_subscriber_(socket->strand()),
distributor_(socket->strand()),
distributor_(memory, socket->strand()),
reporter(socket->log)
{
}
Expand Down
5 changes: 4 additions & 1 deletion src/sessions/session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -443,9 +443,12 @@ channel::ptr session::create_channel(const socket::ptr& socket,
{
BC_ASSERT_MSG(stranded(), "strand");

// Default message memory resource, override create_channel to replace.
static memory memory{};

// Channel id must be created using create_key().
const auto id = create_key();
return std::make_shared<channel>(log, socket, settings(), id, quiet);
return std::make_shared<channel>(memory, log, socket, settings(), id, quiet);
}

// At one object/session/ns, this overflows in ~585 years (and handled).
Expand Down
6 changes: 4 additions & 2 deletions test/net/channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,13 @@ class channel_accessor

BOOST_AUTO_TEST_CASE(channel__stopped__default__false)
{
memory memory{};
const logger log{};
threadpool pool(1);
asio::strand strand(pool.service().get_executor());
const settings set(bc::system::chain::selection::mainnet);
auto socket_ptr = std::make_shared<network::socket>(log, pool.service());
auto channel_ptr = std::make_shared<channel>(log, socket_ptr, set, 42);
auto channel_ptr = std::make_shared<channel>(memory, log, socket_ptr, set, 42);
BOOST_REQUIRE(!channel_ptr->stopped());

// Stop completion is asynchronous.
Expand All @@ -70,12 +71,13 @@ inline size_t payload_maximum(const settings& settings)

BOOST_AUTO_TEST_CASE(channel__properties__default__expected)
{
memory memory{};
const logger log{};
threadpool pool(1);
asio::strand strand(pool.service().get_executor());
const settings set(bc::system::chain::selection::mainnet);
auto socket_ptr = std::make_shared<network::socket>(log, pool.service());
auto channel_ptr = std::make_shared<channel_accessor>(log, socket_ptr, set, 42);
auto channel_ptr = std::make_shared<channel_accessor>(memory, log, socket_ptr, set, 42);

BOOST_REQUIRE(!channel_ptr->address());
BOOST_REQUIRE_NE(channel_ptr->nonce(), 0u);
Expand Down
15 changes: 10 additions & 5 deletions test/net/distributor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ BOOST_AUTO_TEST_SUITE(distributor_tests)

BOOST_AUTO_TEST_CASE(distributor__construct__stop__stops)
{
memory memory{};
threadpool pool(2);
asio::strand strand(pool.service().get_executor());
distributor instance(strand);
distributor instance(memory, strand);

std::promise<bool> promise;
boost::asio::post(strand, [&]() NOEXCEPT
Expand All @@ -40,9 +41,10 @@ BOOST_AUTO_TEST_CASE(distributor__construct__stop__stops)

BOOST_AUTO_TEST_CASE(distributor__subscribe__stop__expected_code)
{
memory memory{};
threadpool pool(2);
asio::strand strand(pool.service().get_executor());
distributor instance(strand);
distributor instance(memory, strand);
constexpr auto expected_ec = error::invalid_magic;
auto result = true;

Expand Down Expand Up @@ -71,9 +73,10 @@ BOOST_AUTO_TEST_CASE(distributor__subscribe__stop__expected_code)

BOOST_AUTO_TEST_CASE(distributor__notify__invalid_message__no_notification)
{
memory memory{};
threadpool pool(2);
asio::strand strand(pool.service().get_executor());
distributor instance(strand);
distributor instance(memory, strand);
constexpr auto expected_ec = error::invalid_magic;
auto result = true;

Expand Down Expand Up @@ -113,9 +116,10 @@ BOOST_AUTO_TEST_CASE(distributor__notify__invalid_message__no_notification)

BOOST_AUTO_TEST_CASE(distributor__notify__valid_message_invalid_version__no_notification)
{
memory memory{};
threadpool pool(2);
asio::strand strand(pool.service().get_executor());
distributor instance(strand);
distributor instance(memory, strand);
constexpr auto expected_ec = error::invalid_magic;
auto result = true;

Expand Down Expand Up @@ -153,9 +157,10 @@ BOOST_AUTO_TEST_CASE(distributor__notify__valid_message_invalid_version__no_noti

BOOST_AUTO_TEST_CASE(distributor__notify__valid_nonced_ping__expected_notification)
{
memory memory{};
threadpool pool(2);
asio::strand strand(pool.service().get_executor());
distributor instance(strand);
distributor instance(memory, strand);
constexpr uint64_t expected_nonce = 42;
constexpr auto expected_ec = error::invalid_magic;
auto result = true;
Expand Down
5 changes: 3 additions & 2 deletions test/net/proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ class mock_proxy
proxy::subscribe_stop(std::move(handler));
}

mock_proxy(socket::ptr socket) NOEXCEPT
: proxy(socket)
mock_proxy(const socket::ptr& socket) NOEXCEPT
: proxy(memory_, socket)
{
}

Expand Down Expand Up @@ -88,6 +88,7 @@ class mock_proxy
}

private:
mutable memory memory_{};
mutable bool stop_{ false };
mutable std::promise<code> stopped_;
};
Expand Down
Loading

0 comments on commit a48593f

Please sign in to comment.