From e8f8f2a77d96f1040864e6634e9941c9d3f0db47 Mon Sep 17 00:00:00 2001 From: evoskuil Date: Sat, 24 Feb 2024 19:49:10 -0500 Subject: [PATCH 1/5] Comments. --- src/chasers/chaser_block.cpp | 3 +-- src/chasers/chaser_header.cpp | 1 + 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/chasers/chaser_block.cpp b/src/chasers/chaser_block.cpp index a7c216e5..96b31b4e 100644 --- a/src/chasers/chaser_block.cpp +++ b/src/chasers/chaser_block.cpp @@ -47,7 +47,6 @@ chaser_block::~chaser_block() NOEXCEPT { } -// protected code chaser_block::start() NOEXCEPT { BC_ASSERT_MSG(node_stranded(), "chaser_block"); @@ -117,7 +116,7 @@ void chaser_block::do_organize(const block::cptr& block_ptr, } // Peer processing should have precluded orphan submission. - // Results from running headers-first and then restarting to blocks-first. + // Results from running headers-first and then blocks-first. if (!tree_.contains(previous) && !query.is_block(previous)) { handler(error::orphan_block); diff --git a/src/chasers/chaser_header.cpp b/src/chasers/chaser_header.cpp index da622a1c..63f4eb0f 100644 --- a/src/chasers/chaser_header.cpp +++ b/src/chasers/chaser_header.cpp @@ -255,6 +255,7 @@ void chaser_header::do_organize(const header::cptr& header_ptr, // Notify candidate reorganization with branch point. // ------------------------------------------------------------------------ + // New branch organized, queue up candidate downloads from branch point. notify(error::success, chase::header, { possible_narrow_cast(point) }); From 9e42bc0196e45d717feec04557b40253c57c217a Mon Sep 17 00:00:00 2001 From: evoskuil Date: Sat, 24 Feb 2024 19:49:24 -0500 Subject: [PATCH 2/5] Stub in parallel download. --- include/bitcoin/node/chasers/chaser_check.hpp | 8 +- .../protocols/protocol_block_in_31800.hpp | 42 +--- src/chasers/chaser_check.cpp | 73 +++++-- src/protocols/protocol_block_in_31800.cpp | 200 ++---------------- 4 files changed, 83 insertions(+), 240 deletions(-) diff --git a/include/bitcoin/node/chasers/chaser_check.hpp b/include/bitcoin/node/chasers/chaser_check.hpp index e6c24b6d..668d76d6 100644 --- a/include/bitcoin/node/chasers/chaser_check.hpp +++ b/include/bitcoin/node/chasers/chaser_check.hpp @@ -40,13 +40,19 @@ class BCN_API chaser_check virtual ~chaser_check() NOEXCEPT; virtual code start() NOEXCEPT; - virtual void checked(const system::chain::block::cptr& block) NOEXCEPT; + + virtual void get_hashes(network::result_handler&& handler) NOEXCEPT; + virtual void put_hashes(network::result_handler&& handler) NOEXCEPT; protected: + /// Handlers. virtual void handle_header(height_t branch_point) NOEXCEPT; virtual void handle_event(const code& ec, chase event_, link value) NOEXCEPT; + virtual void do_get_hashes(const network::result_handler& handler) NOEXCEPT; + virtual void do_put_hashes(const network::result_handler& handler) NOEXCEPT; + private: void do_handle_event(const code& ec, chase event_, link value) NOEXCEPT; }; diff --git a/include/bitcoin/node/protocols/protocol_block_in_31800.hpp b/include/bitcoin/node/protocols/protocol_block_in_31800.hpp index bd30ca75..960b1873 100644 --- a/include/bitcoin/node/protocols/protocol_block_in_31800.hpp +++ b/include/bitcoin/node/protocols/protocol_block_in_31800.hpp @@ -55,24 +55,15 @@ class BCN_API protocol_block_in_31800 void start() NOEXCEPT override; void stopping(const code& ec) NOEXCEPT override; -protected: - struct track - { - const size_t announced; - const system::hash_digest last; - system::hashes hashes; - }; + /// Check and store any registered block in any order of arrival. + virtual void check(const system::chain::block::cptr& block_ptr, + network::result_handler&& handler) NOEXCEPT; - typedef std::shared_ptr track_ptr; - - /// Recieved incoming inventory message. - virtual bool handle_receive_inventory(const code& ec, - const network::messages::inventory::cptr& message) NOEXCEPT; +protected: /// Recieved incoming block message. virtual bool handle_receive_block(const code& ec, - const network::messages::block::cptr& message, - const track_ptr& tracker) NOEXCEPT; + const network::messages::block::cptr& message) NOEXCEPT; /// Handle performance timer event. virtual void handle_performance_timer(const code& ec) NOEXCEPT; @@ -80,33 +71,20 @@ class BCN_API protocol_block_in_31800 /// Handle result of performance reporting. virtual void handle_performance(const code& ec) NOEXCEPT; - /// Invoked when initial blocks sync is complete. - virtual void complete() NOEXCEPT; - - /// Handle organize result. - virtual void handle_organize(const code& ec, size_t height, - const system::chain::block::cptr& block_ptr) NOEXCEPT; + /// Handle check result. + virtual void handle_check(const code& ec) NOEXCEPT; private: - static system::hashes to_hashes( - const network::messages::get_data& getter) NOEXCEPT; - - network::messages::get_blocks create_get_inventory() const NOEXCEPT; - network::messages::get_blocks create_get_inventory( - const system::hash_digest& last) const NOEXCEPT; - network::messages::get_blocks create_get_inventory( - system::hashes&& start_hashes) const NOEXCEPT; - network::messages::get_data create_get_data( - const network::messages::inventory& message) const NOEXCEPT; + const system::hashes& hashes) const NOEXCEPT; void do_handle_performance(const code& ec) NOEXCEPT; - // Thread safe. + // These are thread safe. const bool report_performance_; const network::messages::inventory::type_id block_type_; - // Protected by strand. + // These are protected by strand. uint64_t bytes_{ zero }; system::chain::checkpoint top_{}; network::steady_clock::time_point start_{}; diff --git a/src/chasers/chaser_check.cpp b/src/chasers/chaser_check.cpp index 946e23e4..594e6622 100644 --- a/src/chasers/chaser_check.cpp +++ b/src/chasers/chaser_check.cpp @@ -27,13 +27,21 @@ namespace libbitcoin { namespace node { - + +using namespace network; +using namespace system; using namespace system::chain; using namespace std::placeholders; BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT) -// Requires subscriber_ protection (call from node construct or node.strand). +/// We need to be able to perform block.check(ctx) while we still have the +/// deserialized block, because of the witness commitment check (hash). +/// Requires timestamp (header) height, mtp, flags. These can be cached on the +/// block hash registry maintained by this chaser or queried from the stored +/// header. Caching requries rolling forward through all states as the registry +/// is initialized. Store query is simpler and may be as fast. + chaser_check::chaser_check(full_node& node) NOEXCEPT : chaser(node) { @@ -43,56 +51,75 @@ chaser_check::~chaser_check() NOEXCEPT { } -// TODO: initialize check state. code chaser_check::start() NOEXCEPT { BC_ASSERT_MSG(node_stranded(), "chaser_check"); - // get_all_unassociated_above(0) + // TODO: get_all_unassociated_above(0) + BC_ASSERT_MSG(true, "Store not initialized."); + return subscribe(std::bind(&chaser_check::handle_event, this, _1, _2, _3)); } +// protected void chaser_check::handle_event(const code& ec, chase event_, link value) NOEXCEPT { boost::asio::post(strand(), - std::bind(&chaser_check::do_handle_event, this, ec, event_, value)); + std::bind(&chaser_check::do_handle_event, + this, ec, event_, value)); } -void chaser_check::do_handle_event(const code& ec, chase event_, +// private +void chaser_check::do_handle_event(const code&, chase event_, link value) NOEXCEPT { BC_ASSERT_MSG(stranded(), "chaser_check"); - if (ec) + if (event_ == chase::stop) return; - switch (event_) + if (event_ == chase::header) { - case chase::header: - { - BC_ASSERT(std::holds_alternative(value)); - handle_header(std::get(value)); - break; - } - default: - return; + BC_ASSERT(std::holds_alternative(value)); + handle_header(std::get(value)); } } -// TODO: handle the new strong branch (may issue 'checked'). -void chaser_check::handle_header(height_t) NOEXCEPT +void chaser_check::get_hashes(result_handler&& handler) NOEXCEPT +{ + boost::asio::post(strand(), + std::bind(&chaser_check::do_get_hashes, + this, std::move(handler))); +} + +void chaser_check::put_hashes(result_handler&& handler) NOEXCEPT +{ + boost::asio::post(strand(), + std::bind(&chaser_check::do_put_hashes, + this, std::move(handler))); +} + +// protected +// ---------------------------------------------------------------------------- + +void chaser_check::do_get_hashes(const result_handler&) NOEXCEPT +{ + BC_ASSERT_MSG(stranded(), "chaser_check"); +} + +void chaser_check::do_put_hashes(const result_handler&) NOEXCEPT { BC_ASSERT_MSG(stranded(), "chaser_check"); - ////LOGN("Handle candidate organization above height (" << branch_point << ")."); - // get_all_unassociated_above(branch_point) } -void chaser_check::checked(const block::cptr&) NOEXCEPT +// New branch organized, queue up candidate downloads from branch point. +void chaser_check::handle_header(height_t) NOEXCEPT { - // Push checked block into store and issue 'checked' event so that connect - // can connect the next blocks in order. Executes in caller thread. + BC_ASSERT_MSG(stranded(), "chaser_check"); + + // TODO: get_all_unassociated_above(branch_point) } BC_POP_WARNING() diff --git a/src/protocols/protocol_block_in_31800.cpp b/src/protocols/protocol_block_in_31800.cpp index a92cf15f..68ce1cb8 100644 --- a/src/protocols/protocol_block_in_31800.cpp +++ b/src/protocols/protocol_block_in_31800.cpp @@ -71,7 +71,6 @@ void protocol_block_in_31800::handle_performance_timer(const code& ec) NOEXCEPT // Reset counters and log rate. bytes_ = zero; start_ = now; - ////log.fire(event_block, rate); // Bounces to network strand, performs work, then calls handler. // Channel will continue to process blocks while this call excecutes on the @@ -116,235 +115,68 @@ void protocol_block_in_31800::start() NOEXCEPT const auto top = query.get_top_candidate(); top_ = { query.get_header_key(query.to_candidate(top)), top }; - ////if (report_performance_) - ////{ - //// start_ = steady_clock::now(); - //// performance_timer_->start(BIND1(handle_performance_timer, _1)); - ////} + if (report_performance_) + { + start_ = steady_clock::now(); + performance_timer_->start(BIND1(handle_performance_timer, _1)); + } - // There is one persistent common inventory subscription. - SUBSCRIBE_CHANNEL2(inventory, handle_receive_inventory, _1, _2); - SEND1(create_get_inventory(), handle_send, _1); + SUBSCRIBE_CHANNEL2(block, handle_receive_block, _1, _2); protocol::start(); } void protocol_block_in_31800::stopping(const code& ec) NOEXCEPT { BC_ASSERT_MSG(stranded(), "protocol_block_in_31800"); - ////performance_timer_->stop(); + performance_timer_->stop(); protocol::stopping(ec); } // Inbound (blocks). // ---------------------------------------------------------------------------- -// Receive inventory and send get_data for all blocks that are not found. -bool protocol_block_in_31800::handle_receive_inventory(const code& ec, - const inventory::cptr& message) NOEXCEPT -{ - BC_ASSERT_MSG(stranded(), "protocol_block_in_31800"); - constexpr auto block_id = inventory::type_id::block; - - if (stopped(ec)) - return false; - - LOGP("Received (" << message->count(block_id) << ") block inventory from [" - << authority() << "]."); - - const auto getter = create_get_data(*message); - - // If getter is empty it may be only because we have them all, so iterate. - if (getter.items.empty()) - { - // If the original request was maximal, we assume there are more. - if (message->items.size() == max_get_blocks) - { - LOGP("Get inventory [" << authority() << "] (empty maximal)."); - SEND1(create_get_inventory(message->items.back().hash), - handle_send, _1); - } - - return true; - } - - LOGP("Requesting (" << getter.items.size() << ") blocks from [" - << authority() << "]."); - - BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT) - const auto tracker = std::make_shared(track - { - getter.items.size(), - getter.items.back().hash, - to_hashes(getter) - }); - BC_POP_WARNING() - - // TODO: these should be limited in quantity for DOS protection. - // There is one block subscription for each received unexhausted inventory. - SUBSCRIBE_CHANNEL3(block, handle_receive_block, _1, _2, tracker); - SEND1(getter, handle_send, _1); - return true; -} - -// Process block responses in order as dictated by tracker. bool protocol_block_in_31800::handle_receive_block(const code& ec, - const block::cptr& message, const track_ptr& tracker) NOEXCEPT + const block::cptr& message) NOEXCEPT { BC_ASSERT_MSG(stranded(), "protocol_block_in_31800"); if (stopped(ec)) return false; - if (tracker->hashes.empty()) - { - LOGF("Exhausted block tracker."); - return false; - } - - // Alias. - const auto& block_ptr = message->block_ptr; - - // Unrequested block, may not have been announced via inventory. - if (tracker->hashes.back() != block_ptr->hash()) - return true; - - // Out of order or invalid. - if (block_ptr->header().previous_block_hash() != top_.hash()) - { - LOGP("Orphan block [" << encode_hash(block_ptr->hash()) - << "] from [" << authority() << "]."); - return false; - } - - // Add block at next height. - const auto height = add1(top_.height()); - // Asynchronous organization serves all channels. // A job backlog will occur when organize is slower than download. // This should not be a material issue given lack of validation here. - organize(block_ptr, BIND3(handle_organize, _1, height, block_ptr)); - - // Set the new top and continue. Organize error will stop the channel. - top_ = { block_ptr->hash(), height }; + check(message->block_ptr, BIND1(handle_check, _1)); - ////// Accumulate byte count. - ////bytes_ += message->cached_size; + bytes_ += message->cached_size; - // Order is reversed, so next is at back. - tracker->hashes.pop_back(); - - // Handle completion of the inventory block subset. - if (tracker->hashes.empty()) - { - // Protocol presumes max_get_blocks unless complete. - if (tracker->announced == max_get_blocks) - { - LOGP("Get inventory [" << authority() << "] (exhausted maximal)."); - SEND1(create_get_inventory(tracker->last), handle_send, _1); - } - else - { - // Completeness stalls if on 500 as empty message is ambiguous. - // This is ok, since complete is not used for anything essential. - complete(); - } - } - - // Release subscription if exhausted. - // handle_receive_inventory will restart inventory iteration. - return !tracker->hashes.empty(); -} - -// This could be the end of a catch-up sequence, or a singleton announcement. -// The distinction is ultimately arbitrary, but this signals initial currency. -void protocol_block_in_31800::complete() NOEXCEPT -{ - LOGN("Blocks from [" << authority() << "] complete at (" - << top_.height() << ")."); + // TODO: return true only if there are more blocks outstanding. + return true; } -void protocol_block_in_31800::handle_organize(const code& ec, size_t height, - const chain::block::cptr& block_ptr) NOEXCEPT +void protocol_block_in_31800::handle_check(const code& ec) NOEXCEPT { if (ec == network::error::service_stopped) return; - if (!ec || ec == error::duplicate_block) - { - LOGP("Block [" << encode_hash(block_ptr->hash()) - << "] at (" << height << ") from [" << authority() << "] " - << ec.message()); - return; - } - - // Assuming no store failure this is a consensus failure. - LOGR("Block [" << encode_hash(block_ptr->hash()) - << "] at (" << height << ") from [" << authority() << "] " - << ec.message()); + // TODO: log result (LOGR/LOGP). stop(ec); } - // private // ---------------------------------------------------------------------------- -get_blocks protocol_block_in_31800::create_get_inventory() const NOEXCEPT -{ - return create_get_inventory(archive().get_candidate_hashes( - get_blocks::heights(archive().get_top_candidate()))); -} - -get_blocks protocol_block_in_31800::create_get_inventory( - const hash_digest& last) const NOEXCEPT -{ - return create_get_inventory(hashes{ last }); -} - -get_blocks protocol_block_in_31800::create_get_inventory( - hashes&& hashes) const NOEXCEPT -{ - if (!hashes.empty()) - { - LOGP("Request blocks after [" << encode_hash(hashes.front()) - << "] from [" << authority() << "]."); - } - - return { std::move(hashes) }; -} - get_data protocol_block_in_31800::create_get_data( - const inventory& message) const NOEXCEPT + const hashes&) const NOEXCEPT { get_data getter{}; - getter.items.reserve(message.count(type_id::block)); - // clang emplace_back bug (no matching constructor), using push_back. - // bip144: get_data uses witness constant but inventory does not. - for (const auto& item: message.items) - if ((item.type == type_id::block) && !archive().is_block(item.hash)) - getter.items.push_back({ block_type_, item.hash }); + // TODO: generate get_data request. - getter.items.shrink_to_fit(); return getter; } -// static -hashes protocol_block_in_31800::to_hashes(const get_data& getter) NOEXCEPT -{ - hashes out{}; - out.resize(getter.items.size()); - - // Order reversed for individual erase performance (using pop_back). - std::transform(getter.items.rbegin(), getter.items.rend(), out.begin(), - [](const auto& item) NOEXCEPT - { - return item.hash; - }); - - return out; -} - BC_POP_WARNING() BC_POP_WARNING() From 5de786b960b0a0e7cab913277dd2606c50826706 Mon Sep 17 00:00:00 2001 From: evoskuil Date: Sat, 24 Feb 2024 21:39:51 -0500 Subject: [PATCH 3/5] Fix up test overrides. --- test/chasers/chaser_header.cpp | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/test/chasers/chaser_header.cpp b/test/chasers/chaser_header.cpp index 96b28bbe..0d040401 100644 --- a/test/chasers/chaser_header.cpp +++ b/test/chasers/chaser_header.cpp @@ -31,19 +31,15 @@ class mock_chaser_header return tree_; } - const network::wall_clock::duration& currency_window() const NOEXCEPT - { - return chaser_header::currency_window(); - } - - bool use_currency_window() const NOEXCEPT + code start() NOEXCEPT override { - return chaser_header::use_currency_window(); + return chaser_header::start(); } - code start() NOEXCEPT override + void organize(const system::chain::header::cptr& header_ptr, + network::result_handler&& handler) NOEXCEPT override { - return chaser_header::start(); + chaser_header::organize(header_ptr, std::move(handler)); } void handle_event(const code& ec, chase event_, @@ -88,6 +84,22 @@ class mock_chaser_header { return chaser_header::push(key); } + + void do_organize(const system::chain::header::cptr& header, + const network::result_handler& handler) NOEXCEPT override + { + return chaser_header::do_organize(header, handler); + } + + const network::wall_clock::duration& currency_window() const NOEXCEPT override + { + return chaser_header::currency_window(); + } + + bool use_currency_window() const NOEXCEPT override + { + return chaser_header::use_currency_window(); + } }; BOOST_AUTO_TEST_CASE(chaser_header_test__currency_window__zero__use_currency_window_false) From 1cebb6d4279c797627dd240df2513df42a7a478e Mon Sep 17 00:00:00 2001 From: evoskuil Date: Sun, 25 Feb 2024 10:23:41 -0500 Subject: [PATCH 4/5] WIP on parallel download. --- include/bitcoin/node/chasers/chaser_check.hpp | 16 +++-- .../protocols/protocol_block_in_31800.hpp | 20 +++--- src/chasers/chaser_check.cpp | 11 ++-- src/protocols/protocol_block_in_31800.cpp | 63 +++++++++++++------ 4 files changed, 71 insertions(+), 39 deletions(-) diff --git a/include/bitcoin/node/chasers/chaser_check.hpp b/include/bitcoin/node/chasers/chaser_check.hpp index 668d76d6..4cc651ab 100644 --- a/include/bitcoin/node/chasers/chaser_check.hpp +++ b/include/bitcoin/node/chasers/chaser_check.hpp @@ -20,6 +20,8 @@ #define LIBBITCOIN_NODE_CHASERS_CHASER_CHECK_HPP #include +#include +#include #include #include #include @@ -34,6 +36,11 @@ class BCN_API chaser_check : public chaser { public: + typedef std::unordered_map hashmap; + typedef std::shared_ptr hashmap_ptr; + typedef std::function handler; + DELETE_COPY_MOVE(chaser_check); chaser_check(full_node& node) NOEXCEPT; @@ -41,17 +48,16 @@ class BCN_API chaser_check virtual code start() NOEXCEPT; - virtual void get_hashes(network::result_handler&& handler) NOEXCEPT; - virtual void put_hashes(network::result_handler&& handler) NOEXCEPT; + virtual void get_hashes(handler&& handler) NOEXCEPT; + virtual void put_hashes(handler&& handler) NOEXCEPT; protected: - /// Handlers. virtual void handle_header(height_t branch_point) NOEXCEPT; virtual void handle_event(const code& ec, chase event_, link value) NOEXCEPT; - virtual void do_get_hashes(const network::result_handler& handler) NOEXCEPT; - virtual void do_put_hashes(const network::result_handler& handler) NOEXCEPT; + virtual void do_get_hashes(const handler& handler) NOEXCEPT; + virtual void do_put_hashes(const handler& handler) NOEXCEPT; private: void do_handle_event(const code& ec, chase event_, link value) NOEXCEPT; diff --git a/include/bitcoin/node/protocols/protocol_block_in_31800.hpp b/include/bitcoin/node/protocols/protocol_block_in_31800.hpp index 960b1873..c6328fcd 100644 --- a/include/bitcoin/node/protocols/protocol_block_in_31800.hpp +++ b/include/bitcoin/node/protocols/protocol_block_in_31800.hpp @@ -19,7 +19,10 @@ #ifndef LIBBITCOIN_NODE_PROTOCOLS_PROTOCOL_BLOCK_IN_31800_HPP #define LIBBITCOIN_NODE_PROTOCOLS_PROTOCOL_BLOCK_IN_31800_HPP +#include +#include #include +#include #include #include @@ -55,9 +58,9 @@ class BCN_API protocol_block_in_31800 void start() NOEXCEPT override; void stopping(const code& ec) NOEXCEPT override; - /// Check and store any registered block in any order of arrival. - virtual void check(const system::chain::block::cptr& block_ptr, - network::result_handler&& handler) NOEXCEPT; + /// Manage download queue. + virtual void get_hashes(chaser_check::handler&& handler) NOEXCEPT; + virtual void put_hashes(chaser_check::handler&& handler) NOEXCEPT; protected: @@ -71,12 +74,13 @@ class BCN_API protocol_block_in_31800 /// Handle result of performance reporting. virtual void handle_performance(const code& ec) NOEXCEPT; - /// Handle check result. - virtual void handle_check(const code& ec) NOEXCEPT; + /// Manage download queue. + virtual void handle_put_hashes(const code& ec) NOEXCEPT; + virtual void handle_get_hashes(const code& ec, + const chaser_check::hashmap_ptr& hashes) NOEXCEPT; private: - network::messages::get_data create_get_data( - const system::hashes& hashes) const NOEXCEPT; + network::messages::get_data create_get_data() const NOEXCEPT; void do_handle_performance(const code& ec) NOEXCEPT; @@ -86,7 +90,7 @@ class BCN_API protocol_block_in_31800 // These are protected by strand. uint64_t bytes_{ zero }; - system::chain::checkpoint top_{}; + chaser_check::hashmap_ptr hashes_{}; network::steady_clock::time_point start_{}; network::deadline::ptr performance_timer_; }; diff --git a/src/chasers/chaser_check.cpp b/src/chasers/chaser_check.cpp index 594e6622..e44407f2 100644 --- a/src/chasers/chaser_check.cpp +++ b/src/chasers/chaser_check.cpp @@ -77,9 +77,6 @@ void chaser_check::do_handle_event(const code&, chase event_, { BC_ASSERT_MSG(stranded(), "chaser_check"); - if (event_ == chase::stop) - return; - if (event_ == chase::header) { BC_ASSERT(std::holds_alternative(value)); @@ -87,14 +84,14 @@ void chaser_check::do_handle_event(const code&, chase event_, } } -void chaser_check::get_hashes(result_handler&& handler) NOEXCEPT +void chaser_check::get_hashes(handler&& handler) NOEXCEPT { boost::asio::post(strand(), std::bind(&chaser_check::do_get_hashes, this, std::move(handler))); } -void chaser_check::put_hashes(result_handler&& handler) NOEXCEPT +void chaser_check::put_hashes(handler&& handler) NOEXCEPT { boost::asio::post(strand(), std::bind(&chaser_check::do_put_hashes, @@ -104,12 +101,12 @@ void chaser_check::put_hashes(result_handler&& handler) NOEXCEPT // protected // ---------------------------------------------------------------------------- -void chaser_check::do_get_hashes(const result_handler&) NOEXCEPT +void chaser_check::do_get_hashes(const handler&) NOEXCEPT { BC_ASSERT_MSG(stranded(), "chaser_check"); } -void chaser_check::do_put_hashes(const result_handler&) NOEXCEPT +void chaser_check::do_put_hashes(const handler&) NOEXCEPT { BC_ASSERT_MSG(stranded(), "chaser_check"); } diff --git a/src/protocols/protocol_block_in_31800.cpp b/src/protocols/protocol_block_in_31800.cpp index 68ce1cb8..09bdeb19 100644 --- a/src/protocols/protocol_block_in_31800.cpp +++ b/src/protocols/protocol_block_in_31800.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include @@ -111,30 +112,51 @@ void protocol_block_in_31800::start() NOEXCEPT if (started()) return; - const auto& query = archive(); - const auto top = query.get_top_candidate(); - top_ = { query.get_header_key(query.to_candidate(top)), top }; - if (report_performance_) { start_ = steady_clock::now(); performance_timer_->start(BIND1(handle_performance_timer, _1)); } - SUBSCRIBE_CHANNEL2(block, handle_receive_block, _1, _2); + get_hashes(BIND2(handle_get_hashes, _1, hashes_)); protocol::start(); } void protocol_block_in_31800::stopping(const code& ec) NOEXCEPT { BC_ASSERT_MSG(stranded(), "protocol_block_in_31800"); + performance_timer_->stop(); + put_hashes(BIND1(handle_put_hashes, _1)); + protocol::stopping(ec); } // Inbound (blocks). // ---------------------------------------------------------------------------- +void protocol_block_in_31800::handle_get_hashes(const code& ec, + const chaser_check::hashmap_ptr&) NOEXCEPT +{ + if (ec) + { + stop(ec); + return; + } + + SUBSCRIBE_CHANNEL2(block, handle_receive_block, _1, _2); + + // TODO: send if not empty, send when new headers (subscrive to header). + SEND1(create_get_data(), handle_send, _1); + stop(ec); +} + +void protocol_block_in_31800::handle_put_hashes(const code& ec) NOEXCEPT +{ + if (ec) + stop(ec); +} + bool protocol_block_in_31800::handle_receive_block(const code& ec, const block::cptr& message) NOEXCEPT { @@ -143,10 +165,20 @@ bool protocol_block_in_31800::handle_receive_block(const code& ec, if (stopped(ec)) return false; + const auto hash = message->block_ptr->hash(); + if (is_zero(hashes_->erase(hash))) + { + // Zero erased implies not found (not requested of peer). + LOGR("Unrequested block [" << encode_hash(hash) << "]."); + return true; + } + + archive().set_link(*message->block_ptr); + // Asynchronous organization serves all channels. // A job backlog will occur when organize is slower than download. // This should not be a material issue given lack of validation here. - check(message->block_ptr, BIND1(handle_check, _1)); + get_hashes(BIND2(handle_get_hashes, _1, hashes_)); bytes_ += message->cached_size; @@ -154,25 +186,18 @@ bool protocol_block_in_31800::handle_receive_block(const code& ec, return true; } -void protocol_block_in_31800::handle_check(const code& ec) NOEXCEPT -{ - if (ec == network::error::service_stopped) - return; - - // TODO: log result (LOGR/LOGP). - - stop(ec); -} - // private // ---------------------------------------------------------------------------- -get_data protocol_block_in_31800::create_get_data( - const hashes&) const NOEXCEPT +get_data protocol_block_in_31800::create_get_data() const NOEXCEPT { get_data getter{}; + getter.items.reserve(hashes_->size()); - // TODO: generate get_data request. + // clang emplace_back bug (no matching constructor), using push_back. + // bip144: get_data uses witness constant but inventory does not. + for (const auto& item: *hashes_) + getter.items.push_back({ block_type_, item.first }); return getter; } From 523f928bf98139afa318e316387e11694552d006 Mon Sep 17 00:00:00 2001 From: evoskuil Date: Sun, 25 Feb 2024 17:29:34 -0500 Subject: [PATCH 5/5] WIP on parallel download. --- include/bitcoin/node/chasers/chaser_check.hpp | 17 ++-- include/bitcoin/node/full_node.hpp | 5 + include/bitcoin/node/protocols/protocol.hpp | 6 ++ .../protocols/protocol_block_in_31800.hpp | 14 ++- .../protocols/protocol_header_in_31800.hpp | 2 +- include/bitcoin/node/sessions/session.hpp | 6 ++ src/chasers/chaser_block.cpp | 5 +- src/chasers/chaser_candidate.cpp | 5 +- src/chasers/chaser_check.cpp | 30 ++++-- src/chasers/chaser_confirm.cpp | 5 +- src/chasers/chaser_connect.cpp | 5 +- src/chasers/chaser_header.cpp | 5 +- src/chasers/chaser_transaction.cpp | 5 +- src/full_node.cpp | 11 +++ src/protocols/protocol.cpp | 12 +++ src/protocols/protocol_block_in.cpp | 8 +- src/protocols/protocol_block_in_31800.cpp | 94 ++++++++++++++----- src/protocols/protocol_header_in_31800.cpp | 10 +- src/protocols/protocol_header_in_70012.cpp | 1 + src/sessions/session.cpp | 13 +++ 20 files changed, 192 insertions(+), 67 deletions(-) diff --git a/include/bitcoin/node/chasers/chaser_check.hpp b/include/bitcoin/node/chasers/chaser_check.hpp index 4cc651ab..2706e3f6 100644 --- a/include/bitcoin/node/chasers/chaser_check.hpp +++ b/include/bitcoin/node/chasers/chaser_check.hpp @@ -20,8 +20,6 @@ #define LIBBITCOIN_NODE_CHASERS_CHASER_CHECK_HPP #include -#include -#include #include #include #include @@ -36,10 +34,9 @@ class BCN_API chaser_check : public chaser { public: - typedef std::unordered_map hashmap; - typedef std::shared_ptr hashmap_ptr; - typedef std::function handler; + // context_map casts header_fk into context.minimum_block_version. + using map = database::context_map; + typedef std::function handler; DELETE_COPY_MOVE(chaser_check); @@ -49,7 +46,8 @@ class BCN_API chaser_check virtual code start() NOEXCEPT; virtual void get_hashes(handler&& handler) NOEXCEPT; - virtual void put_hashes(handler&& handler) NOEXCEPT; + virtual void put_hashes(const map& map, + network::result_handler&& handler) NOEXCEPT; protected: virtual void handle_header(height_t branch_point) NOEXCEPT; @@ -57,10 +55,13 @@ class BCN_API chaser_check link value) NOEXCEPT; virtual void do_get_hashes(const handler& handler) NOEXCEPT; - virtual void do_put_hashes(const handler& handler) NOEXCEPT; + virtual void do_put_hashes(const map& map, + const network::result_handler& handler) NOEXCEPT; private: void do_handle_event(const code& ec, chase event_, link value) NOEXCEPT; + + map map_{}; }; } // namespace node diff --git a/include/bitcoin/node/full_node.hpp b/include/bitcoin/node/full_node.hpp index 318309fd..3ff92b64 100644 --- a/include/bitcoin/node/full_node.hpp +++ b/include/bitcoin/node/full_node.hpp @@ -69,6 +69,11 @@ class BCN_API full_node virtual void organize(const system::chain::block::cptr& block, network::result_handler&& handler) NOEXCEPT; + /// Manage download queue. + virtual void get_hashes(chaser_check::handler&& handler) NOEXCEPT; + virtual void put_hashes(const chaser_check::map& map, + network::result_handler&& handler) NOEXCEPT; + /// Properties. /// ----------------------------------------------------------------------- diff --git a/include/bitcoin/node/protocols/protocol.hpp b/include/bitcoin/node/protocols/protocol.hpp index 9f67c0f1..de2e9ddf 100644 --- a/include/bitcoin/node/protocols/protocol.hpp +++ b/include/bitcoin/node/protocols/protocol.hpp @@ -20,6 +20,7 @@ #define LIBBITCOIN_NODE_PROTOCOLS_PROTOCOL_HPP #include +#include #include #include #include @@ -61,6 +62,11 @@ class BCN_API protocol virtual void organize(const system::chain::block::cptr& block, network::result_handler&& handler) NOEXCEPT; + /// Manage download queue. + virtual void get_hashes(chaser_check::handler&& handler) NOEXCEPT; + virtual void put_hashes(const chaser_check::map& map, + network::result_handler&& handler) NOEXCEPT; + /// Configuration settings for all libraries. const configuration& config() const NOEXCEPT; diff --git a/include/bitcoin/node/protocols/protocol_block_in_31800.hpp b/include/bitcoin/node/protocols/protocol_block_in_31800.hpp index c6328fcd..d60b4e36 100644 --- a/include/bitcoin/node/protocols/protocol_block_in_31800.hpp +++ b/include/bitcoin/node/protocols/protocol_block_in_31800.hpp @@ -58,12 +58,7 @@ class BCN_API protocol_block_in_31800 void start() NOEXCEPT override; void stopping(const code& ec) NOEXCEPT override; - /// Manage download queue. - virtual void get_hashes(chaser_check::handler&& handler) NOEXCEPT; - virtual void put_hashes(chaser_check::handler&& handler) NOEXCEPT; - protected: - /// Recieved incoming block message. virtual bool handle_receive_block(const code& ec, const network::messages::block::cptr& message) NOEXCEPT; @@ -77,12 +72,15 @@ class BCN_API protocol_block_in_31800 /// Manage download queue. virtual void handle_put_hashes(const code& ec) NOEXCEPT; virtual void handle_get_hashes(const code& ec, - const chaser_check::hashmap_ptr& hashes) NOEXCEPT; + const chaser_check::map& map) NOEXCEPT; private: - network::messages::get_data create_get_data() const NOEXCEPT; + network::messages::get_data create_get_data( + const chaser_check::map& map) const NOEXCEPT; void do_handle_performance(const code& ec) NOEXCEPT; + void do_handle_get_hashes(const code& ec, + const chaser_check::map& map) NOEXCEPT; // These are thread safe. const bool report_performance_; @@ -90,7 +88,7 @@ class BCN_API protocol_block_in_31800 // These are protected by strand. uint64_t bytes_{ zero }; - chaser_check::hashmap_ptr hashes_{}; + chaser_check::map map_{}; network::steady_clock::time_point start_{}; network::deadline::ptr performance_timer_; }; diff --git a/include/bitcoin/node/protocols/protocol_header_in_31800.hpp b/include/bitcoin/node/protocols/protocol_header_in_31800.hpp index cfed9fa1..65133b39 100644 --- a/include/bitcoin/node/protocols/protocol_header_in_31800.hpp +++ b/include/bitcoin/node/protocols/protocol_header_in_31800.hpp @@ -57,7 +57,7 @@ class BCN_API protocol_header_in_31800 const system::chain::header::cptr& header_ptr) NOEXCEPT; private: - network::messages::get_headers create_get_headers() NOEXCEPT; + network::messages::get_headers create_get_headers() const NOEXCEPT; network::messages::get_headers create_get_headers( const system::hash_digest& last) const NOEXCEPT; network::messages::get_headers create_get_headers( diff --git a/include/bitcoin/node/sessions/session.hpp b/include/bitcoin/node/sessions/session.hpp index 2d468b46..aa90e074 100644 --- a/include/bitcoin/node/sessions/session.hpp +++ b/include/bitcoin/node/sessions/session.hpp @@ -20,6 +20,7 @@ #define LIBBITCOIN_NODE_SESSIONS_SESSION_HPP #include +#include #include #include @@ -44,6 +45,11 @@ class BCN_API session virtual void organize(const system::chain::block::cptr& block, network::result_handler&& handler) NOEXCEPT; + /// Manage download queue. + virtual void get_hashes(chaser_check::handler&& handler) NOEXCEPT; + virtual void put_hashes(const chaser_check::map& map, + network::result_handler&& handler) NOEXCEPT; + /// Configuration settings for all libraries. const configuration& config() const NOEXCEPT; diff --git a/src/chasers/chaser_block.cpp b/src/chasers/chaser_block.cpp index 96b31b4e..7241ec68 100644 --- a/src/chasers/chaser_block.cpp +++ b/src/chasers/chaser_block.cpp @@ -54,8 +54,9 @@ code chaser_block::start() NOEXCEPT state_ = archive().get_candidate_chain_state(config().bitcoin); BC_ASSERT_MSG(state_, "Store not initialized."); - return subscribe(std::bind(&chaser_block::handle_event, - this, _1, _2, _3)); + return subscribe( + std::bind(&chaser_block::handle_event, + this, _1, _2, _3)); } // protected diff --git a/src/chasers/chaser_candidate.cpp b/src/chasers/chaser_candidate.cpp index ffb9ce78..a964302e 100644 --- a/src/chasers/chaser_candidate.cpp +++ b/src/chasers/chaser_candidate.cpp @@ -46,8 +46,9 @@ chaser_candidate::~chaser_candidate() NOEXCEPT code chaser_candidate::start() NOEXCEPT { BC_ASSERT_MSG(node_stranded(), "chaser_check"); - return subscribe(std::bind(&chaser_candidate::handle_event, - this, _1, _2, _3)); + return subscribe( + std::bind(&chaser_candidate::handle_event, + this, _1, _2, _3)); } void chaser_candidate::handle_event(const code& ec, chase event_, diff --git a/src/chasers/chaser_check.cpp b/src/chasers/chaser_check.cpp index e44407f2..feb4549f 100644 --- a/src/chasers/chaser_check.cpp +++ b/src/chasers/chaser_check.cpp @@ -55,11 +55,12 @@ code chaser_check::start() NOEXCEPT { BC_ASSERT_MSG(node_stranded(), "chaser_check"); - // TODO: get_all_unassociated_above(0) - BC_ASSERT_MSG(true, "Store not initialized."); + // Initialize from genesis block. + handle_header(zero); - return subscribe(std::bind(&chaser_check::handle_event, - this, _1, _2, _3)); + return subscribe( + std::bind(&chaser_check::handle_event, + this, _1, _2, _3)); } // protected @@ -91,11 +92,12 @@ void chaser_check::get_hashes(handler&& handler) NOEXCEPT this, std::move(handler))); } -void chaser_check::put_hashes(handler&& handler) NOEXCEPT +void chaser_check::put_hashes(const chaser_check::map& map, + network::result_handler&& handler) NOEXCEPT { boost::asio::post(strand(), std::bind(&chaser_check::do_put_hashes, - this, std::move(handler))); + this, map, std::move(handler))); } // protected @@ -106,17 +108,25 @@ void chaser_check::do_get_hashes(const handler&) NOEXCEPT BC_ASSERT_MSG(stranded(), "chaser_check"); } -void chaser_check::do_put_hashes(const handler&) NOEXCEPT +void chaser_check::do_put_hashes(const chaser_check::map&, + const network::result_handler&) NOEXCEPT { BC_ASSERT_MSG(stranded(), "chaser_check"); } -// New branch organized, queue up candidate downloads from branch point. -void chaser_check::handle_header(height_t) NOEXCEPT +void chaser_check::handle_header(height_t branch_point) NOEXCEPT { BC_ASSERT_MSG(stranded(), "chaser_check"); - // TODO: get_all_unassociated_above(branch_point) + // Map and peer maps may have newly stale blocks. + // All stale branches can just be allowed to complete. + // The connect chaser will verify proper advancement. + + // get_all_unassociated_above(branch_point) and add to map. + const auto& query = archive(); + const auto top = query.get_top_candidate(); + const auto last = query.get_last_associated_from(branch_point); + map_.merge(query.get_all_unassociated_above(last)); } BC_POP_WARNING() diff --git a/src/chasers/chaser_confirm.cpp b/src/chasers/chaser_confirm.cpp index 49712c14..0926e07d 100644 --- a/src/chasers/chaser_confirm.cpp +++ b/src/chasers/chaser_confirm.cpp @@ -45,8 +45,9 @@ chaser_confirm::~chaser_confirm() NOEXCEPT code chaser_confirm::start() NOEXCEPT { BC_ASSERT_MSG(node_stranded(), "chaser_confirm"); - return subscribe(std::bind(&chaser_confirm::handle_event, - this, _1, _2, _3)); + return subscribe( + std::bind(&chaser_confirm::handle_event, + this, _1, _2, _3)); } void chaser_confirm::handle_event(const code& ec, chase event_, diff --git a/src/chasers/chaser_connect.cpp b/src/chasers/chaser_connect.cpp index 4ad61d6a..b7bf57e6 100644 --- a/src/chasers/chaser_connect.cpp +++ b/src/chasers/chaser_connect.cpp @@ -45,8 +45,9 @@ chaser_connect::~chaser_connect() NOEXCEPT code chaser_connect::start() NOEXCEPT { BC_ASSERT_MSG(node_stranded(), "chaser_connect"); - return subscribe(std::bind(&chaser_connect::handle_event, - this, _1, _2, _3)); + return subscribe( + std::bind(&chaser_connect::handle_event, + this, _1, _2, _3)); } void chaser_connect::handle_event(const code& ec, chase event_, diff --git a/src/chasers/chaser_header.cpp b/src/chasers/chaser_header.cpp index 63f4eb0f..d4321cf2 100644 --- a/src/chasers/chaser_header.cpp +++ b/src/chasers/chaser_header.cpp @@ -69,8 +69,9 @@ code chaser_header::start() NOEXCEPT state_ = archive().get_candidate_chain_state(config().bitcoin); BC_ASSERT_MSG(state_, "Store not initialized."); - return subscribe(std::bind(&chaser_header::handle_event, - this, _1, _2, _3)); + return subscribe( + std::bind(&chaser_header::handle_event, + this, _1, _2, _3)); } // protected diff --git a/src/chasers/chaser_transaction.cpp b/src/chasers/chaser_transaction.cpp index ed118430..1ae75423 100644 --- a/src/chasers/chaser_transaction.cpp +++ b/src/chasers/chaser_transaction.cpp @@ -46,8 +46,9 @@ chaser_transaction::~chaser_transaction() NOEXCEPT code chaser_transaction::start() NOEXCEPT { BC_ASSERT_MSG(node_stranded(), "chaser_transaction"); - return subscribe(std::bind(&chaser_transaction::handle_event, - this, _1, _2, _3)); + return subscribe( + std::bind(&chaser_transaction::handle_event, + this, _1, _2, _3)); } void chaser_transaction::handle_event(const code& ec, chase event_, diff --git a/src/full_node.cpp b/src/full_node.cpp index 174495d1..85c6ca43 100644 --- a/src/full_node.cpp +++ b/src/full_node.cpp @@ -143,6 +143,17 @@ void full_node::organize(const system::chain::block::cptr& block, chaser_block_.organize(block, std::move(handler)); } +void full_node::get_hashes(chaser_check::handler&& handler) NOEXCEPT +{ + chaser_check_.get_hashes(std::move(handler)); +} + +void full_node::put_hashes(const chaser_check::map& map, + network::result_handler&& handler) NOEXCEPT +{ + chaser_check_.put_hashes(map, std::move(handler)); +} + // Properties. // ---------------------------------------------------------------------------- diff --git a/src/protocols/protocol.cpp b/src/protocols/protocol.cpp index cd2000bc..f49c512b 100644 --- a/src/protocols/protocol.cpp +++ b/src/protocols/protocol.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -52,6 +53,17 @@ void protocol::organize(const system::chain::block::cptr& block, session_.organize(block, std::move(handler)); } +void protocol::get_hashes(chaser_check::handler&& handler) NOEXCEPT +{ + session_.get_hashes(std::move(handler)); +} + +void protocol::put_hashes(const chaser_check::map& map, + network::result_handler&& handler) NOEXCEPT +{ + session_.put_hashes(map, std::move(handler)); +} + const configuration& protocol::config() const NOEXCEPT { return session_.config(); diff --git a/src/protocols/protocol_block_in.cpp b/src/protocols/protocol_block_in.cpp index 2b6812c4..613564cc 100644 --- a/src/protocols/protocol_block_in.cpp +++ b/src/protocols/protocol_block_in.cpp @@ -89,6 +89,7 @@ bool protocol_block_in::handle_receive_inventory(const code& ec, if (getter.items.empty()) { // If the original request was maximal, we assume there are more. + // The inv response to get_blocks is limited to max_get_blocks. if (message->items.size() == max_get_blocks) { LOGP("Get inventory [" << authority() << "] (empty maximal)."); @@ -189,6 +190,8 @@ bool protocol_block_in::handle_receive_block(const code& ec, // The distinction is ultimately arbitrary, but this signals initial currency. void protocol_block_in::complete() NOEXCEPT { + BC_ASSERT_MSG(stranded(), "protocol_block_in"); + LOGN("Blocks from [" << authority() << "] complete at (" << top_.height() << ")."); } @@ -224,8 +227,9 @@ get_blocks protocol_block_in::create_get_inventory() const NOEXCEPT // All strong block branches are archived, so this will reflect latest. // This will bypass all blocks with candidate headers, resulting in block // orphans if headers-first is run followed by a restart and blocks-first. - return create_get_inventory(archive().get_candidate_hashes( - get_blocks::heights(archive().get_top_candidate()))); + const auto& query = archive(); + return create_get_inventory(query.get_candidate_hashes(get_blocks::heights( + query.get_top_candidate()))); } get_blocks protocol_block_in::create_get_inventory( diff --git a/src/protocols/protocol_block_in_31800.cpp b/src/protocols/protocol_block_in_31800.cpp index 09bdeb19..7227cdc2 100644 --- a/src/protocols/protocol_block_in_31800.cpp +++ b/src/protocols/protocol_block_in_31800.cpp @@ -39,6 +39,7 @@ using namespace network::messages; using namespace std::placeholders; // Shared pointers required for lifetime in handler parameters. +BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT) BC_PUSH_WARNING(SMART_PTR_NOT_NEEDED) BC_PUSH_WARNING(NO_VALUE_OR_CONST_REF_SHARED_PTR) @@ -118,7 +119,8 @@ void protocol_block_in_31800::start() NOEXCEPT performance_timer_->start(BIND1(handle_performance_timer, _1)); } - get_hashes(BIND2(handle_get_hashes, _1, hashes_)); + SUBSCRIBE_CHANNEL2(block, handle_receive_block, _1, _2); + get_hashes(BIND2(handle_get_hashes, _1, _2)); protocol::start(); } @@ -127,34 +129,52 @@ void protocol_block_in_31800::stopping(const code& ec) NOEXCEPT BC_ASSERT_MSG(stranded(), "protocol_block_in_31800"); performance_timer_->stop(); - put_hashes(BIND1(handle_put_hashes, _1)); - + put_hashes(map_, BIND1(handle_put_hashes, _1)); protocol::stopping(ec); } // Inbound (blocks). // ---------------------------------------------------------------------------- +// TODO: need map pointer from chaser to avoid large map copies here. void protocol_block_in_31800::handle_get_hashes(const code& ec, - const chaser_check::hashmap_ptr&) NOEXCEPT + const chaser_check::map& map) NOEXCEPT { + POST2(do_handle_get_hashes, ec, map); +} + +// private +void protocol_block_in_31800::do_handle_get_hashes(const code& ec, + const chaser_check::map& map) NOEXCEPT +{ + BC_ASSERT_MSG(stranded(), "protocol_block_in_31800"); + BC_ASSERT_MSG(map.size() < max_inventory, "inventory overflow"); + if (ec) { + LOGF("Error getting block hashes for [" << authority() << "] " + << ec.message()); stop(ec); return; } - SUBSCRIBE_CHANNEL2(block, handle_receive_block, _1, _2); + if (map.empty()) + { + LOGP("Exhausted block hashes at [" << authority() << "] " + << ec.message()); + return; + } - // TODO: send if not empty, send when new headers (subscrive to header). - SEND1(create_get_data(), handle_send, _1); - stop(ec); + SEND1(create_get_data(map), handle_send, _1); } void protocol_block_in_31800::handle_put_hashes(const code& ec) NOEXCEPT { if (ec) - stop(ec); + { + LOGF("Error putting block hashes for [" << authority() << "] " + << ec.message()); + } } bool protocol_block_in_31800::handle_receive_block(const code& ec, @@ -165,43 +185,71 @@ bool protocol_block_in_31800::handle_receive_block(const code& ec, if (stopped(ec)) return false; - const auto hash = message->block_ptr->hash(); - if (is_zero(hashes_->erase(hash))) + const auto& block = *message->block_ptr; + const auto hash = block.hash(); + const auto it = map_.find(hash); + + if (it == map_.end()) { - // Zero erased implies not found (not requested of peer). - LOGR("Unrequested block [" << encode_hash(hash) << "]."); - return true; + // TODO: store and signal invalid block state (reorgs header chaser). + LOGR("Unrequested block [" << encode_hash(hash) << "] from [" + << authority() << "]."); + stop(node::error::unknown); + return false; } - archive().set_link(*message->block_ptr); + code error{}; + const auto& ctx = it->second; + if (((error = block.check())) || ((error = block.check(ctx)))) + { + // TODO: store and signal invalid block state (reorgs header chaser). + LOGR("Invalid block [" << encode_hash(hash) << "] from [" + << authority() << "] " << error.message()); + stop(error); + return false; + } - // Asynchronous organization serves all channels. - // A job backlog will occur when organize is slower than download. - // This should not be a material issue given lack of validation here. - get_hashes(BIND2(handle_get_hashes, _1, hashes_)); + // TODO: optimize using header_fk with txs (or remove header_fk). + if (archive().set_link(block).is_terminal()) + { + // TODO: store and signal invalid block state (reorgs header chaser). + LOGF("Failure storing block [" << encode_hash(hash) << "]."); + stop(node::error::store_integrity); + return false; + } + // Block check accounted for. + map_.erase(it); bytes_ += message->cached_size; - // TODO: return true only if there are more blocks outstanding. + // Get some more work from chaser. + if (is_zero(map_.size())) + { + LOGP("Getting more block hashes for [" << authority() << "]."); + get_hashes(BIND2(handle_get_hashes, _1, _2)); + } + return true; } // private // ---------------------------------------------------------------------------- -get_data protocol_block_in_31800::create_get_data() const NOEXCEPT +get_data protocol_block_in_31800::create_get_data( + const chaser_check::map& map) const NOEXCEPT { get_data getter{}; - getter.items.reserve(hashes_->size()); + getter.items.reserve(map.size()); // clang emplace_back bug (no matching constructor), using push_back. // bip144: get_data uses witness constant but inventory does not. - for (const auto& item: *hashes_) + for (const auto& item: map) getter.items.push_back({ block_type_, item.first }); return getter; } +BC_POP_WARNING() BC_POP_WARNING() BC_POP_WARNING() diff --git a/src/protocols/protocol_header_in_31800.cpp b/src/protocols/protocol_header_in_31800.cpp index f525c0a1..258d5448 100644 --- a/src/protocols/protocol_header_in_31800.cpp +++ b/src/protocols/protocol_header_in_31800.cpp @@ -102,6 +102,7 @@ bool protocol_header_in_31800::handle_receive_headers(const code& ec, } // Protocol presumes max_get_headers unless complete. + // The headers response to get_headers is limited to max_get_headers. if (message->header_ptrs.size() == max_get_headers) { SEND1(create_get_headers(message->header_ptrs.back()->hash()), @@ -120,6 +121,8 @@ bool protocol_header_in_31800::handle_receive_headers(const code& ec, // The distinction is ultimately arbitrary, but this signals peer completeness. void protocol_header_in_31800::complete() NOEXCEPT { + BC_ASSERT_MSG(stranded(), "protocol_header_in_31800"); + LOGN("Headers from [" << authority() << "] complete at (" << top_.height() << ")."); } @@ -149,13 +152,14 @@ void protocol_header_in_31800::handle_organize(const code& ec, size_t height, // private // ---------------------------------------------------------------------------- -get_headers protocol_header_in_31800::create_get_headers() NOEXCEPT +get_headers protocol_header_in_31800::create_get_headers() const NOEXCEPT { // Header sync is from the archived (strong) candidate chain. // Until the header tree is current the candidate chain remains empty. // So all channels will fully sync from the top candidate at their startup. - return create_get_headers(archive().get_candidate_hashes( - get_headers::heights(archive().get_top_candidate()))); + const auto& query = archive(); + return create_get_headers(query.get_candidate_hashes(get_headers::heights( + query.get_top_candidate()))); } get_headers protocol_header_in_31800::create_get_headers( diff --git a/src/protocols/protocol_header_in_70012.cpp b/src/protocols/protocol_header_in_70012.cpp index 99d3dacf..a18ee188 100644 --- a/src/protocols/protocol_header_in_70012.cpp +++ b/src/protocols/protocol_header_in_70012.cpp @@ -32,6 +32,7 @@ using namespace std::placeholders; void protocol_header_in_70012::complete() NOEXCEPT { + BC_ASSERT_MSG(stranded(), "protocol_header_in_70012"); protocol_header_in_31800::complete(); if (!sent_) diff --git a/src/sessions/session.cpp b/src/sessions/session.cpp index 1376cd97..4bafdf3b 100644 --- a/src/sessions/session.cpp +++ b/src/sessions/session.cpp @@ -19,7 +19,9 @@ #include #include +#include #include +#include #include #include #include @@ -59,6 +61,17 @@ void session::organize(const block::cptr& block, node_.organize(block, std::move(handler)); } +void session::get_hashes(chaser_check::handler&& handler) NOEXCEPT +{ + node_.get_hashes(std::move(handler)); +} + +void session::put_hashes(const chaser_check::map& map, + network::result_handler&& handler) NOEXCEPT +{ + node_.put_hashes(map, std::move(handler)); +} + const configuration& session::config() const NOEXCEPT { return node_.config();