diff --git a/include/bitcoin/network/messages/block.hpp b/include/bitcoin/network/messages/block.hpp index 1c5e4626c..eee6d00f3 100644 --- a/include/bitcoin/network/messages/block.hpp +++ b/include/bitcoin/network/messages/block.hpp @@ -38,6 +38,8 @@ struct BCT_API block static const uint32_t version_minimum; static const uint32_t version_maximum; + static cptr deserialize(auto& memory, uint32_t version, + const system::data_chunk& data, bool witness=true) NOEXCEPT; static cptr deserialize(uint32_t version, const system::data_chunk& data, bool witness=true) NOEXCEPT; static block deserialize(uint32_t version, system::reader& source, @@ -51,7 +53,6 @@ struct BCT_API block size_t size(uint32_t version, bool witness) const NOEXCEPT; system::chain::block::cptr block_ptr; - mutable size_t cached_size{}; }; } // namespace messages diff --git a/include/bitcoin/network/net/channel.hpp b/include/bitcoin/network/net/channel.hpp index 5f4244d2f..82a688d4d 100644 --- a/include/bitcoin/network/net/channel.hpp +++ b/include/bitcoin/network/net/channel.hpp @@ -28,6 +28,7 @@ #include #include #include +#include #include #include @@ -73,7 +74,7 @@ class BCT_API channel } /// Construct a channel to encapsulated and communicate on the socket. - channel(const logger& log, const socket::ptr& socket, + channel(memory& memory, const logger& log, const socket::ptr& socket, const settings& settings, uint64_t identifier=zero, bool quiet=true) NOEXCEPT; diff --git a/include/bitcoin/network/net/distributor.hpp b/include/bitcoin/network/net/distributor.hpp index 822746d4c..f7cec7cf9 100644 --- a/include/bitcoin/network/net/distributor.hpp +++ b/include/bitcoin/network/net/distributor.hpp @@ -25,6 +25,7 @@ #include #include #include +#include namespace libbitcoin { namespace network { @@ -84,7 +85,7 @@ class BCT_API distributor DEFINE_SUBSCRIBER(version_acknowledge); /// Create an instance of this class. - distributor(asio::strand& strand) NOEXCEPT; + distributor(memory& memory, asio::strand& strand) NOEXCEPT; /// If stopped, handler is invoked with error::subscriber_stopped. /// If key exists, handler is invoked with error::subscriber_exists. @@ -115,9 +116,9 @@ class BCT_API distributor if (!is_zero(subscriber.size())) { // Subscribers are notified only with stop code or error::success. - const auto message = messages::deserialize(data, version); - if (!message) return error::invalid_message; - subscriber.notify(error::success, message); + const auto ptr = messages::deserialize(data, version); + if (!ptr) return error::invalid_message; + subscriber.notify(error::success, ptr); } return error::success; @@ -191,8 +192,15 @@ class BCT_API distributor DECLARE_SUBSCRIBER(transaction); DECLARE_SUBSCRIBER(version); DECLARE_SUBSCRIBER(version_acknowledge); + + memory& memory_; }; +template <> +code distributor::do_notify( + distributor::block_subscriber& subscriber, uint32_t version, + const system::data_chunk& data) NOEXCEPT; + #undef SUBSCRIBER #undef SUBSCRIBER_TYPE #undef DEFINE_SUBSCRIBER diff --git a/include/bitcoin/network/net/memory.hpp b/include/bitcoin/network/net/memory.hpp index 11099dbec..87ec26176 100644 --- a/include/bitcoin/network/net/memory.hpp +++ b/include/bitcoin/network/net/memory.hpp @@ -38,8 +38,8 @@ class BCT_API memory arena* get_arena() NOEXCEPT; retainer::ptr get_retainer() NOEXCEPT; - private: + // These are thread safe. arena* arena_; std::shared_mutex remap_mutex_{}; }; diff --git a/include/bitcoin/network/net/proxy.hpp b/include/bitcoin/network/net/proxy.hpp index 135ade37b..3320e2a1c 100644 --- a/include/bitcoin/network/net/proxy.hpp +++ b/include/bitcoin/network/net/proxy.hpp @@ -29,6 +29,7 @@ #include #include #include +#include #include namespace libbitcoin { @@ -122,7 +123,7 @@ class BCT_API proxy const config::address& address() const NOEXCEPT; protected: - proxy(const socket::ptr& socket) NOEXCEPT; + proxy(memory& memory, const socket::ptr& socket) NOEXCEPT; /// Property values provided to the proxy. virtual size_t minimum_buffer() const NOEXCEPT = 0; diff --git a/src/messages/block.cpp b/src/messages/block.cpp index e384fa912..7a6dccb6b 100644 --- a/src/messages/block.cpp +++ b/src/messages/block.cpp @@ -43,6 +43,13 @@ typename block::cptr block::deserialize(uint32_t version, const system::data_chunk& data, bool witness) NOEXCEPT { static memory memory{}; + return deserialize(memory, version, data, witness); +} + +// static +typename block::cptr block::deserialize(auto& memory, uint32_t version, + const system::data_chunk& data, bool witness) NOEXCEPT +{ system::istream source{ data }; system::byte_reader reader{ source, memory.get_arena() }; @@ -86,6 +93,7 @@ typename block::cptr block::deserialize(uint32_t version, std::advance(start, full); } + // WARNING: retainer does not track objects shared from block (e.g. tx). message->block_ptr->set_retainer(memory.get_retainer()); return message; } @@ -97,10 +105,7 @@ block block::deserialize(uint32_t version, reader& source, if (version < version_minimum || version > version_maximum) source.invalidate(); - const auto start = source.get_read_position(); - const auto block_ptr = to_shared(source, witness); - const auto size = source.get_read_position() - start; - return { block_ptr, size }; + return { to_shared(source, witness) }; } bool block::serialize(uint32_t version, @@ -116,14 +121,12 @@ void block::serialize(uint32_t BC_DEBUG_ONLY(version), writer& sink, bool witness) const NOEXCEPT { BC_DEBUG_ONLY(const auto bytes = size(version, witness);) - ////BC_DEBUG_ONLY(const auto start = sink.get_write_position();) - const auto start = sink.get_write_position(); + BC_DEBUG_ONLY(const auto start = sink.get_write_position();) if (block_ptr) block_ptr->to_data(sink, witness); - cached_size = sink.get_write_position() - start; - BC_ASSERT(sink && cached_size == bytes); + BC_ASSERT(sink && (sink.get_write_position() - start) == bytes); } size_t block::size(uint32_t, bool witness) const NOEXCEPT diff --git a/src/net/channel.cpp b/src/net/channel.cpp index e5b5440e0..2283c4ece 100644 --- a/src/net/channel.cpp +++ b/src/net/channel.cpp @@ -27,6 +27,7 @@ #include #include #include +#include #include #include @@ -53,9 +54,9 @@ inline deadline::ptr expiration(const logger& log, asio::strand& strand, return timeout(log, strand, pseudo_random::duration(span)); } -channel::channel(const logger& log, const socket::ptr& socket, +channel::channel(memory& memory, const logger& log, const socket::ptr& socket, const settings& settings, uint64_t identifier, bool quiet) NOEXCEPT - : proxy(socket), + : proxy(memory, socket), quiet_(quiet), settings_(settings), identifier_(identifier), diff --git a/src/net/distributor.cpp b/src/net/distributor.cpp index 4a86b708e..8b38fd9a3 100644 --- a/src/net/distributor.cpp +++ b/src/net/distributor.cpp @@ -21,6 +21,7 @@ #include #include #include +#include namespace libbitcoin { namespace network { @@ -33,7 +34,7 @@ using namespace system; #define CASE_NOTIFY(name) case messages::identifier::name: \ return do_notify(SUBSCRIBER(name), version, data) -distributor::distributor(asio::strand& strand) NOEXCEPT +distributor::distributor(memory& memory, asio::strand& strand) NOEXCEPT : MAKE_SUBSCRIBER(address), MAKE_SUBSCRIBER(alert), MAKE_SUBSCRIBER(block), @@ -66,7 +67,8 @@ distributor::distributor(asio::strand& strand) NOEXCEPT MAKE_SUBSCRIBER(send_headers), MAKE_SUBSCRIBER(transaction), MAKE_SUBSCRIBER(version), - MAKE_SUBSCRIBER(version_acknowledge) + MAKE_SUBSCRIBER(version_acknowledge), + memory_(memory) { } @@ -152,6 +154,21 @@ void distributor::stop(const code& ec) NOEXCEPT STOP_SUBSCRIBER(version_acknowledge); } +template <> +code distributor::do_notify( + distributor::block_subscriber& subscriber, uint32_t version, + const system::data_chunk& data) NOEXCEPT +{ + if (!is_zero(subscriber.size())) + { + const auto ptr = messages::block::deserialize(memory_, version, data); + if (!ptr) return error::invalid_message; + subscriber.notify(error::success, ptr); + } + + return error::success; +} + #undef SUBSCRIBER #undef MAKE_SUBSCRIBER #undef CASE_NOTIFY diff --git a/src/net/proxy.cpp b/src/net/proxy.cpp index 59f40f95d..f25734d6d 100644 --- a/src/net/proxy.cpp +++ b/src/net/proxy.cpp @@ -25,6 +25,7 @@ #include #include #include +#include namespace libbitcoin { namespace network { @@ -47,10 +48,10 @@ static constexpr uint32_t https_magic = 0x02010316; // This is created in a started state and must be stopped, as the subscribers // assert if not stopped. Subscribers may hold protocols even if the service // is not started. -proxy::proxy(const socket::ptr& socket) NOEXCEPT +proxy::proxy(memory& memory, const socket::ptr& socket) NOEXCEPT : socket_(socket), stop_subscriber_(socket->strand()), - distributor_(socket->strand()), + distributor_(memory, socket->strand()), reporter(socket->log) { } diff --git a/src/sessions/session.cpp b/src/sessions/session.cpp index ad7ac8d35..98ca02642 100644 --- a/src/sessions/session.cpp +++ b/src/sessions/session.cpp @@ -443,9 +443,12 @@ channel::ptr session::create_channel(const socket::ptr& socket, { BC_ASSERT_MSG(stranded(), "strand"); + // Default message memory resource, override create_channel to replace. + static memory memory{}; + // Channel id must be created using create_key(). const auto id = create_key(); - return std::make_shared(log, socket, settings(), id, quiet); + return std::make_shared(memory, log, socket, settings(), id, quiet); } // At one object/session/ns, this overflows in ~585 years (and handled). diff --git a/test/net/channel.cpp b/test/net/channel.cpp index c55cb51f3..8f8ac58c7 100644 --- a/test/net/channel.cpp +++ b/test/net/channel.cpp @@ -49,12 +49,13 @@ class channel_accessor BOOST_AUTO_TEST_CASE(channel__stopped__default__false) { + memory memory{}; const logger log{}; threadpool pool(1); asio::strand strand(pool.service().get_executor()); const settings set(bc::system::chain::selection::mainnet); auto socket_ptr = std::make_shared(log, pool.service()); - auto channel_ptr = std::make_shared(log, socket_ptr, set, 42); + auto channel_ptr = std::make_shared(memory, log, socket_ptr, set, 42); BOOST_REQUIRE(!channel_ptr->stopped()); // Stop completion is asynchronous. @@ -70,12 +71,13 @@ inline size_t payload_maximum(const settings& settings) BOOST_AUTO_TEST_CASE(channel__properties__default__expected) { + memory memory{}; const logger log{}; threadpool pool(1); asio::strand strand(pool.service().get_executor()); const settings set(bc::system::chain::selection::mainnet); auto socket_ptr = std::make_shared(log, pool.service()); - auto channel_ptr = std::make_shared(log, socket_ptr, set, 42); + auto channel_ptr = std::make_shared(memory, log, socket_ptr, set, 42); BOOST_REQUIRE(!channel_ptr->address()); BOOST_REQUIRE_NE(channel_ptr->nonce(), 0u); diff --git a/test/net/distributor.cpp b/test/net/distributor.cpp index bcfae4d6a..010ec3ea1 100644 --- a/test/net/distributor.cpp +++ b/test/net/distributor.cpp @@ -22,9 +22,10 @@ BOOST_AUTO_TEST_SUITE(distributor_tests) BOOST_AUTO_TEST_CASE(distributor__construct__stop__stops) { + memory memory{}; threadpool pool(2); asio::strand strand(pool.service().get_executor()); - distributor instance(strand); + distributor instance(memory, strand); std::promise promise; boost::asio::post(strand, [&]() NOEXCEPT @@ -40,9 +41,10 @@ BOOST_AUTO_TEST_CASE(distributor__construct__stop__stops) BOOST_AUTO_TEST_CASE(distributor__subscribe__stop__expected_code) { + memory memory{}; threadpool pool(2); asio::strand strand(pool.service().get_executor()); - distributor instance(strand); + distributor instance(memory, strand); constexpr auto expected_ec = error::invalid_magic; auto result = true; @@ -71,9 +73,10 @@ BOOST_AUTO_TEST_CASE(distributor__subscribe__stop__expected_code) BOOST_AUTO_TEST_CASE(distributor__notify__invalid_message__no_notification) { + memory memory{}; threadpool pool(2); asio::strand strand(pool.service().get_executor()); - distributor instance(strand); + distributor instance(memory, strand); constexpr auto expected_ec = error::invalid_magic; auto result = true; @@ -113,9 +116,10 @@ BOOST_AUTO_TEST_CASE(distributor__notify__invalid_message__no_notification) BOOST_AUTO_TEST_CASE(distributor__notify__valid_message_invalid_version__no_notification) { + memory memory{}; threadpool pool(2); asio::strand strand(pool.service().get_executor()); - distributor instance(strand); + distributor instance(memory, strand); constexpr auto expected_ec = error::invalid_magic; auto result = true; @@ -153,9 +157,10 @@ BOOST_AUTO_TEST_CASE(distributor__notify__valid_message_invalid_version__no_noti BOOST_AUTO_TEST_CASE(distributor__notify__valid_nonced_ping__expected_notification) { + memory memory{}; threadpool pool(2); asio::strand strand(pool.service().get_executor()); - distributor instance(strand); + distributor instance(memory, strand); constexpr uint64_t expected_nonce = 42; constexpr auto expected_ec = error::invalid_magic; auto result = true; diff --git a/test/net/proxy.cpp b/test/net/proxy.cpp index 3e948de95..a02ab06a0 100644 --- a/test/net/proxy.cpp +++ b/test/net/proxy.cpp @@ -37,8 +37,8 @@ class mock_proxy proxy::subscribe_stop(std::move(handler)); } - mock_proxy(socket::ptr socket) NOEXCEPT - : proxy(socket) + mock_proxy(const socket::ptr& socket) NOEXCEPT + : proxy(memory_, socket) { } @@ -88,6 +88,7 @@ class mock_proxy } private: + mutable memory memory_{}; mutable bool stop_{ false }; mutable std::promise stopped_; }; diff --git a/test/sessions/session.cpp b/test/sessions/session.cpp index 24b099cdd..8ac7b044d 100644 --- a/test/sessions/session.cpp +++ b/test/sessions/session.cpp @@ -579,6 +579,7 @@ BOOST_AUTO_TEST_CASE(session__start__stop__success) BOOST_AUTO_TEST_CASE(session__start_channel__session_not_started__handlers_service_stopped_channel_service_stopped_not_pent_or_stored) { + memory memory{}; const logger log{}; settings set(selection::mainnet); mock_p2p net(set, log); @@ -586,7 +587,7 @@ BOOST_AUTO_TEST_CASE(session__start_channel__session_not_started__handlers_servi BOOST_REQUIRE(session->stopped()); const auto socket = std::make_shared(net.log, net.service()); - const auto channel = std::make_shared(net.log, socket, session->settings(), 42); + const auto channel = std::make_shared(memory, net.log, socket, session->settings(), 42); std::promise started_channel; std::promise stopped_channel; @@ -624,6 +625,7 @@ BOOST_AUTO_TEST_CASE(session__start_channel__session_not_started__handlers_servi BOOST_AUTO_TEST_CASE(session__start_channel__channel_not_started__handlers_channel_stopped_channel_channel_stopped_stored_and_not_counted) { + memory memory{}; const logger log{}; settings set(selection::mainnet); mock_p2p net(set, log); @@ -641,7 +643,7 @@ BOOST_AUTO_TEST_CASE(session__start_channel__channel_not_started__handlers_chann BOOST_REQUIRE_EQUAL(started.get_future().get(), error::success); const auto socket = std::make_shared(net.log, net.service()); - const auto channel = std::make_shared(net.log, socket, session->settings(), 42); + const auto channel = std::make_shared(memory, net.log, socket, session->settings(), 42); // Stop the channel (started by default). std::promise unstarted_channel; @@ -701,6 +703,7 @@ BOOST_AUTO_TEST_CASE(session__start_channel__channel_not_started__handlers_chann BOOST_AUTO_TEST_CASE(session__start_channel__all_started__handlers_expected_channel_service_stopped_stored_and_counted) { + memory memory{}; const logger log{}; settings set(selection::mainnet); set.host_pool_capacity = 0; @@ -730,7 +733,7 @@ BOOST_AUTO_TEST_CASE(session__start_channel__all_started__handlers_expected_chan BOOST_REQUIRE_EQUAL(started.get_future().get(), error::success); const auto socket = std::make_shared(net.log, net.service()); - const auto channel = std::make_shared(net.log, socket, session->settings(), 42); + const auto channel = std::make_shared(memory, net.log, socket, session->settings(), 42); std::promise started_channel; std::promise stopped_channel; @@ -781,6 +784,7 @@ BOOST_AUTO_TEST_CASE(session__start_channel__all_started__handlers_expected_chan BOOST_AUTO_TEST_CASE(session__start_channel__outbound_all_started__handlers_expected_channel_success_stored_and_counted) { + memory memory{}; const logger log{}; settings set(selection::mainnet); set.host_pool_capacity = 0; @@ -811,7 +815,7 @@ BOOST_AUTO_TEST_CASE(session__start_channel__outbound_all_started__handlers_expe BOOST_REQUIRE_EQUAL(started.get_future().get(), error::success); const auto socket = std::make_shared(net.log, net.service()); - const auto channel = std::make_shared(net.log, socket, session->settings(), 42); + const auto channel = std::make_shared(memory, net.log, socket, session->settings(), 42); std::promise started_channel; std::promise stopped_channel; @@ -864,6 +868,7 @@ BOOST_AUTO_TEST_CASE(session__start_channel__outbound_all_started__handlers_expe BOOST_AUTO_TEST_CASE(session__start_channel__inbound_all_started__handlers_expected_channel_success_not_stored_and_counted) { + memory memory{}; const logger log{}; settings set(selection::mainnet); set.host_pool_capacity = 0; @@ -894,7 +899,7 @@ BOOST_AUTO_TEST_CASE(session__start_channel__inbound_all_started__handlers_expec BOOST_REQUIRE_EQUAL(started.get_future().get(), error::success); const auto socket = std::make_shared(net.log, net.service()); - const auto channel = std::make_shared(net.log, socket, session->settings(), 42); + const auto channel = std::make_shared(memory, net.log, socket, session->settings(), 42); std::promise started_channel; std::promise stopped_channel;