diff --git a/nano/core_test/CMakeLists.txt b/nano/core_test/CMakeLists.txt index b53c4a4deb..70e8e22d09 100644 --- a/nano/core_test/CMakeLists.txt +++ b/nano/core_test/CMakeLists.txt @@ -37,6 +37,7 @@ add_executable( network_functions.cpp node.cpp object_stream.cpp + online_reps.cpp optimistic_scheduler.cpp processing_queue.cpp processor_service.cpp diff --git a/nano/core_test/active_elections.cpp b/nano/core_test/active_elections.cpp index d0755d3019..f75268b525 100644 --- a/nano/core_test/active_elections.cpp +++ b/nano/core_test/active_elections.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include diff --git a/nano/core_test/election.cpp b/nano/core_test/election.cpp index 0ad811ab4f..d4265d8215 100644 --- a/nano/core_test/election.cpp +++ b/nano/core_test/election.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -259,11 +260,9 @@ TEST (election, quorum_minimum_update_weight_before_quorum_checks) node1.rep_crawler.force_process (vote2, channel); ASSERT_FALSE (election->confirmed ()); - { - nano::lock_guard guard (node1.online_reps.mutex); - // Modify online_m for online_reps to more than is available, this checks that voting below updates it to current online reps. - node1.online_reps.online_m = node_config.online_weight_minimum.number () + 20; - } + + // Modify online_m for online_reps to more than is available, this checks that voting below updates it to current online reps. + node1.online_reps.force_online_weight (node_config.online_weight_minimum.number () + 20); ASSERT_EQ (nano::vote_code::vote, node1.vote_router.vote (vote2).at (send1->hash ())); ASSERT_TIMELY (5s, election->confirmed ()); ASSERT_NE (nullptr, node1.block (send1->hash ())); diff --git a/nano/core_test/ledger_confirm.cpp b/nano/core_test/ledger_confirm.cpp index 5729369a06..7c4ae24793 100644 --- a/nano/core_test/ledger_confirm.cpp +++ b/nano/core_test/ledger_confirm.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index b730b489a4..6bff227689 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -1688,71 +1689,6 @@ TEST (node, bootstrap_connection_scaling) ASSERT_EQ (1, node1.bootstrap_initiator.connections->target_connections (50000, 1)); } -TEST (node, online_reps) -{ - nano::test::system system (1); - auto & node1 (*system.nodes[0]); - // 1 sample of minimum weight - ASSERT_EQ (node1.config.online_weight_minimum, node1.online_reps.trended ()); - auto vote (std::make_shared ()); - ASSERT_EQ (0, node1.online_reps.online ()); - node1.online_reps.observe (nano::dev::genesis_key.pub); - ASSERT_EQ (nano::dev::constants.genesis_amount, node1.online_reps.online ()); - // 1 minimum, 1 maximum - ASSERT_EQ (node1.config.online_weight_minimum, node1.online_reps.trended ()); - node1.online_reps.sample (); - ASSERT_EQ (nano::dev::constants.genesis_amount, node1.online_reps.trended ()); - node1.online_reps.clear (); - // 2 minimum, 1 maximum - node1.online_reps.sample (); - ASSERT_EQ (node1.config.online_weight_minimum, node1.online_reps.trended ()); -} - -TEST (node, online_reps_rep_crawler) -{ - nano::test::system system; - nano::node_flags flags; - flags.disable_rep_crawler = true; - auto & node1 = *system.add_node (flags); - auto vote = std::make_shared (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::milliseconds_since_epoch (), 0, std::vector{ nano::dev::genesis->hash () }); - ASSERT_EQ (0, node1.online_reps.online ()); - // Without rep crawler - node1.vote_processor.vote_blocking (vote, std::make_shared (node1)); - ASSERT_EQ (0, node1.online_reps.online ()); - // After inserting to rep crawler - auto channel = std::make_shared (node1); - node1.rep_crawler.force_query (nano::dev::genesis->hash (), channel); - node1.vote_processor.vote_blocking (vote, channel); - ASSERT_EQ (nano::dev::constants.genesis_amount, node1.online_reps.online ()); -} - -TEST (node, online_reps_election) -{ - nano::test::system system; - nano::node_flags flags; - flags.disable_rep_crawler = true; - auto & node1 = *system.add_node (flags); - // Start election - nano::keypair key; - nano::state_block_builder builder; - auto send1 = builder.make_block () - .account (nano::dev::genesis_key.pub) - .previous (nano::dev::genesis->hash ()) - .representative (nano::dev::genesis_key.pub) - .balance (nano::dev::constants.genesis_amount - nano::Gxrb_ratio) - .link (key.pub) - .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) - .work (*node1.work_generate_blocking (nano::dev::genesis->hash ())) - .build (); - node1.process_active (send1); - ASSERT_TIMELY_EQ (5s, 1, node1.active.size ()); - // Process vote for ongoing election - auto vote = std::make_shared (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::milliseconds_since_epoch (), 0, std::vector{ send1->hash () }); - ASSERT_EQ (0, node1.online_reps.online ()); - node1.vote_processor.vote_blocking (vote, std::make_shared (node1)); - ASSERT_EQ (nano::dev::constants.genesis_amount - nano::Gxrb_ratio, node1.online_reps.online ()); -} - TEST (node, block_confirm) { auto type = nano::transport::transport_type::tcp; diff --git a/nano/core_test/online_reps.cpp b/nano/core_test/online_reps.cpp new file mode 100644 index 0000000000..0036fade72 --- /dev/null +++ b/nano/core_test/online_reps.cpp @@ -0,0 +1,70 @@ +#include +#include +#include + +#include + +TEST (online_reps, basic) +{ + nano::test::system system (1); + auto & node1 (*system.nodes[0]); + // 1 sample of minimum weight + ASSERT_EQ (node1.config.online_weight_minimum, node1.online_reps.trended ()); + auto vote (std::make_shared ()); + ASSERT_EQ (0, node1.online_reps.online ()); + node1.online_reps.observe (nano::dev::genesis_key.pub); + ASSERT_EQ (nano::dev::constants.genesis_amount, node1.online_reps.online ()); + // 1 minimum, 1 maximum + ASSERT_EQ (node1.config.online_weight_minimum, node1.online_reps.trended ()); + node1.online_reps.force_sample (); + ASSERT_EQ (nano::dev::constants.genesis_amount, node1.online_reps.trended ()); + node1.online_reps.clear (); + // 2 minimum, 1 maximum + node1.online_reps.force_sample (); + ASSERT_EQ (node1.config.online_weight_minimum, node1.online_reps.trended ()); +} + +TEST (online_reps, rep_crawler) +{ + nano::test::system system; + nano::node_flags flags; + flags.disable_rep_crawler = true; + auto & node1 = *system.add_node (flags); + auto vote = std::make_shared (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::milliseconds_since_epoch (), 0, std::vector{ nano::dev::genesis->hash () }); + ASSERT_EQ (0, node1.online_reps.online ()); + // Without rep crawler + node1.vote_processor.vote_blocking (vote, std::make_shared (node1)); + ASSERT_EQ (0, node1.online_reps.online ()); + // After inserting to rep crawler + auto channel = std::make_shared (node1); + node1.rep_crawler.force_query (nano::dev::genesis->hash (), channel); + node1.vote_processor.vote_blocking (vote, channel); + ASSERT_EQ (nano::dev::constants.genesis_amount, node1.online_reps.online ()); +} + +TEST (online_reps, election) +{ + nano::test::system system; + nano::node_flags flags; + flags.disable_rep_crawler = true; + auto & node1 = *system.add_node (flags); + // Start election + nano::keypair key; + nano::state_block_builder builder; + auto send1 = builder.make_block () + .account (nano::dev::genesis_key.pub) + .previous (nano::dev::genesis->hash ()) + .representative (nano::dev::genesis_key.pub) + .balance (nano::dev::constants.genesis_amount - nano::Gxrb_ratio) + .link (key.pub) + .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) + .work (*node1.work_generate_blocking (nano::dev::genesis->hash ())) + .build (); + node1.process_active (send1); + ASSERT_TIMELY_EQ (5s, 1, node1.active.size ()); + // Process vote for ongoing election + auto vote = std::make_shared (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::milliseconds_since_epoch (), 0, std::vector{ send1->hash () }); + ASSERT_EQ (0, node1.online_reps.online ()); + node1.vote_processor.vote_blocking (vote, std::make_shared (node1)); + ASSERT_EQ (nano::dev::constants.genesis_amount - nano::Gxrb_ratio, node1.online_reps.online ()); +} \ No newline at end of file diff --git a/nano/core_test/vote_processor.cpp b/nano/core_test/vote_processor.cpp index 24e845bb53..414c5e45ba 100644 --- a/nano/core_test/vote_processor.cpp +++ b/nano/core_test/vote_processor.cpp @@ -112,8 +112,7 @@ TEST (vote_processor, weights) auto & node (*system.nodes[0]); // Create representatives of different weight levels - // FIXME: Using `online_weight_minimum` because calculation of trended and online weight is broken when running tests - auto const stake = node.config.online_weight_minimum.number (); + auto const stake = node.balance (nano::dev::genesis_key.pub); auto const level0 = stake / 5000; // 0.02% auto const level1 = stake / 500; // 0.2% auto const level2 = stake / 50; // 2% @@ -140,10 +139,10 @@ TEST (vote_processor, weights) node.stats.clear (); ASSERT_TIMELY (5s, node.stats.count (nano::stat::type::rep_tiers, nano::stat::detail::updated) >= 2); - ASSERT_EQ (node.rep_tiers.tier (key0.pub), nano::rep_tier::none); - ASSERT_EQ (node.rep_tiers.tier (key1.pub), nano::rep_tier::tier_1); - ASSERT_EQ (node.rep_tiers.tier (key2.pub), nano::rep_tier::tier_2); - ASSERT_EQ (node.rep_tiers.tier (nano::dev::genesis_key.pub), nano::rep_tier::tier_3); + ASSERT_TIMELY_EQ (5s, node.rep_tiers.tier (key0.pub), nano::rep_tier::none); + ASSERT_TIMELY_EQ (5s, node.rep_tiers.tier (key1.pub), nano::rep_tier::tier_1); + ASSERT_TIMELY_EQ (5s, node.rep_tiers.tier (key2.pub), nano::rep_tier::tier_2); + ASSERT_TIMELY_EQ (5s, node.rep_tiers.tier (nano::dev::genesis_key.pub), nano::rep_tier::tier_3); } // Issue that tracks last changes on this test: https://github.com/nanocurrency/nano-node/issues/3485 diff --git a/nano/core_test/websocket.cpp b/nano/core_test/websocket.cpp index f62c2e9682..61232194b4 100644 --- a/nano/core_test/websocket.cpp +++ b/nano/core_test/websocket.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include diff --git a/nano/lib/config.cpp b/nano/lib/config.cpp index df922fcc07..a9c0a05b67 100644 --- a/nano/lib/config.cpp +++ b/nano/lib/config.cpp @@ -246,6 +246,11 @@ void force_nano_dev_network () nano::network_constants::set_active_network (nano::networks::nano_dev_network); } +bool is_dev_run () +{ + return nano::network_constants::get_active_network () == nano::networks::nano_dev_network; +} + bool running_within_valgrind () { return (RUNNING_ON_VALGRIND > 0); diff --git a/nano/lib/config.hpp b/nano/lib/config.hpp index 7b1e1d241b..2396f86db4 100644 --- a/nano/lib/config.hpp +++ b/nano/lib/config.hpp @@ -41,7 +41,7 @@ consteval bool is_asan_build () #else return false; #endif -// GCC builds + // GCC builds #elif defined(__SANITIZE_ADDRESS__) return true; #else @@ -57,7 +57,7 @@ consteval bool is_tsan_build () #else return false; #endif -// GCC builds + // GCC builds #elif defined(__SANITIZE_THREAD__) return true; #else @@ -297,6 +297,11 @@ class network_constants active_network = network_a; } + static nano::networks get_active_network () + { + return active_network; + } + /** * Optionally called on startup to override the global active network. * If not called, the compile-time option will be used. @@ -384,6 +389,9 @@ bool slow_instrumentation (); /** Set the active network to the dev network */ void force_nano_dev_network (); +/** Checks that we are running in test mode */ +bool is_dev_run (); + /** * Attempt to read a configuration file from specified directory. Returns empty tomlconfig if nothing is found. * @throws std::runtime_error with error code if the file or overrides are not valid toml diff --git a/nano/lib/logging_enums.hpp b/nano/lib/logging_enums.hpp index 780d2b01ba..2609248c9c 100644 --- a/nano/lib/logging_enums.hpp +++ b/nano/lib/logging_enums.hpp @@ -79,6 +79,7 @@ enum class type signal_manager, peer_history, message_processor, + online_reps, // bootstrap bulk_pull_client, diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index c226337ea0..f9c1097244 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -83,6 +83,7 @@ enum class type message_processor, message_processor_overfill, message_processor_type, + online_reps, bootstrap_ascending, bootstrap_ascending_accounts, @@ -476,6 +477,15 @@ enum class detail active_confirmation_height, inactive_confirmation_height, + // online_reps + trim_trend, + sanitize_old, + sanitize_future, + sample, + rep_new, + rep_update, + update_online, + _last // Must be the last enum }; diff --git a/nano/lib/thread_roles.cpp b/nano/lib/thread_roles.cpp index 2948caa217..7dbc05c22a 100644 --- a/nano/lib/thread_roles.cpp +++ b/nano/lib/thread_roles.cpp @@ -157,6 +157,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role) case nano::thread_role::name::vote_router: thread_role_name_string = "Vote router"; break; + case nano::thread_role::name::online_reps: + thread_role_name_string = "Online reps"; + break; default: debug_assert (false && "nano::thread_role::get_string unhandled thread role"); } diff --git a/nano/lib/thread_roles.hpp b/nano/lib/thread_roles.hpp index b6e3196cac..0e0347a001 100644 --- a/nano/lib/thread_roles.hpp +++ b/nano/lib/thread_roles.hpp @@ -59,6 +59,7 @@ enum class name port_mapping, stats, vote_router, + online_reps, }; std::string_view to_string (name); diff --git a/nano/nano_node/entry.cpp b/nano/nano_node/entry.cpp index 205a8d2af9..035ddf5968 100644 --- a/nano/nano_node/entry.cpp +++ b/nano/nano_node/entry.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include diff --git a/nano/node/active_elections.cpp b/nano/node/active_elections.cpp index b21a631dc8..77560af5e9 100644 --- a/nano/node/active_elections.cpp +++ b/nano/node/active_elections.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -409,6 +410,7 @@ nano::election_insertion_result nano::active_elections::insert (std::shared_ptr< { result.inserted = true; auto observe_rep_cb = [&node = node] (auto const & rep_a) { + // TODO: Is this neccessary? Move this outside of the election class // Representative is defined as online if replying to live votes or rep_crawler queries node.online_reps.observe (rep_a); }; diff --git a/nano/node/election.cpp b/nano/node/election.cpp index d0e7631e24..5e6de64d7c 100644 --- a/nano/node/election.cpp +++ b/nano/node/election.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include diff --git a/nano/node/json_handler.cpp b/nano/node/json_handler.cpp index d8deb88692..e4e572d056 100644 --- a/nano/node/json_handler.cpp +++ b/nano/node/json_handler.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include diff --git a/nano/node/node.cpp b/nano/node/node.cpp index a2521e49cf..84b2838333 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -193,7 +194,8 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy rep_crawler (config.rep_crawler, *this), rep_tiers{ ledger, network_params, online_reps, stats, logger }, warmed_up (0), - online_reps (ledger, config), + online_reps_impl{ std::make_unique (config, ledger, stats, logger) }, + online_reps{ *online_reps_impl }, history_impl{ std::make_unique (config.network_params.voting) }, history{ *history_impl }, vote_uniquer{}, @@ -583,7 +585,7 @@ std::unique_ptr nano::collect_container_info (no composite->add_component (node.vote_cache_processor.collect_container_info ("vote_cache_processor")); composite->add_component (node.rep_crawler.collect_container_info ("rep_crawler")); composite->add_component (node.block_processor.collect_container_info ("block_processor")); - composite->add_component (collect_container_info (node.online_reps, "online_reps")); + composite->add_component (node.online_reps.collect_container_info ("online_reps")); composite->add_component (node.history.collect_container_info ("history")); composite->add_component (node.block_uniquer.collect_container_info ("block_uniquer")); composite->add_component (node.vote_uniquer.collect_container_info ("vote_uniquer")); @@ -654,8 +656,6 @@ void nano::node::start () rep_crawler.start (); } - ongoing_online_weight_calculation_queue (); - bool tcp_enabled = false; if (config.tcp_incoming_connections_max > 0 && !(flags.disable_bootstrap_listener && flags.disable_tcp_realtime)) { @@ -719,6 +719,7 @@ void nano::node::start () local_block_broadcaster.start (); peer_history.start (); vote_router.start (); + online_reps.start (); add_initial_peers (); } @@ -736,6 +737,8 @@ void nano::node::stop () bootstrap_workers.stop (); wallet_workers.stop (); election_workers.stop (); + + online_reps.stop (); vote_router.stop (); peer_history.stop (); // Cancels ongoing work generation tasks, which may be blocking other threads @@ -1214,28 +1217,11 @@ bool nano::node::block_confirmed_or_being_confirmed (nano::block_hash const & ha return block_confirmed_or_being_confirmed (ledger.tx_begin_read (), hash_a); } -void nano::node::ongoing_online_weight_calculation_queue () -{ - std::weak_ptr node_w (shared_from_this ()); - workers.add_timed_task (std::chrono::steady_clock::now () + (std::chrono::seconds (network_params.node.weight_period)), [node_w] () { - if (auto node_l = node_w.lock ()) - { - node_l->ongoing_online_weight_calculation (); - } - }); -} - bool nano::node::online () const { return rep_crawler.total_weight () > online_reps.delta (); } -void nano::node::ongoing_online_weight_calculation () -{ - online_reps.sample (); - ongoing_online_weight_calculation_queue (); -} - void nano::node::process_confirmed (nano::election_status const & status_a, uint64_t iteration_a) { auto hash (status_a.winner->hash ()); diff --git a/nano/node/node.hpp b/nano/node/node.hpp index 994bf1dce3..5dd18903fd 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -20,7 +20,6 @@ #include #include #include -#include #include #include #include @@ -47,6 +46,7 @@ class active_elections; class confirming_set; class message_processor; class node; +class online_reps; class vote_processor; class vote_cache_processor; class vote_router; @@ -129,8 +129,6 @@ class node final : public std::enable_shared_from_this bool block_confirmed_or_being_confirmed (nano::block_hash const &); void do_rpc_callback (boost::asio::ip::tcp::resolver::iterator i_a, std::string const &, uint16_t, std::shared_ptr const &, std::shared_ptr const &, std::shared_ptr const &); - void ongoing_online_weight_calculation (); - void ongoing_online_weight_calculation_queue (); bool online () const; bool init_error () const; std::pair> get_bootstrap_weights () const; @@ -184,7 +182,8 @@ class node final : public std::enable_shared_from_this nano::confirming_set & confirming_set; std::unique_ptr active_impl; nano::active_elections & active; - nano::online_reps online_reps; + std::unique_ptr online_reps_impl; + nano::online_reps & online_reps; nano::rep_crawler rep_crawler; nano::rep_tiers rep_tiers; unsigned warmed_up; diff --git a/nano/node/online_reps.cpp b/nano/node/online_reps.cpp index 3fdd2442dd..499e485bb1 100644 --- a/nano/node/online_reps.cpp +++ b/nano/node/online_reps.cpp @@ -1,91 +1,225 @@ +#include +#include +#include #include #include #include #include #include -nano::online_reps::online_reps (nano::ledger & ledger_a, nano::node_config const & config_a) : +nano::online_reps::online_reps (nano::node_config const & config_a, nano::ledger & ledger_a, nano::stats & stats_a, nano::logger & logger_a) : + config{ config_a }, ledger{ ledger_a }, - config{ config_a } + stats{ stats_a }, + logger{ logger_a } { - if (!ledger.store.init_error ()) +} + +nano::online_reps::~online_reps () +{ + debug_assert (!thread.joinable ()); +} + +void nano::online_reps::start () +{ + debug_assert (!thread.joinable ()); + { - auto transaction (ledger.store.tx_begin_read ()); + auto transaction = ledger.store.tx_begin_write ({ tables::online_weight }); + sanitize_trend (transaction); trended_m = calculate_trend (transaction); + logger.debug (nano::log::type::online_reps, "Initial trended weight: {}", fmt::streamed (trended_m)); } + + thread = std::thread ([this] () { + nano::thread_role::set (nano::thread_role::name::online_reps); + run (); + }); } -void nano::online_reps::observe (nano::account const & rep_a) +void nano::online_reps::stop () { - if (ledger.weight (rep_a) > 0) { nano::lock_guard lock{ mutex }; + stopped = true; + } + condition.notify_all (); + if (thread.joinable ()) + { + thread.join (); + } +} + +void nano::online_reps::observe (nano::account const & rep) +{ + if (ledger.weight (rep) > config.representative_vote_weight_minimum) + { + nano::lock_guard lock{ mutex }; + auto now = std::chrono::steady_clock::now (); - auto new_insert = reps.get ().erase (rep_a) == 0; - reps.insert ({ now, rep_a }); - auto cutoff = reps.get ().lower_bound (now - std::chrono::seconds (config.network_params.node.weight_period)); - auto trimmed = reps.get ().begin () != cutoff; - reps.get ().erase (reps.get ().begin (), cutoff); + auto new_insert = reps.get ().erase (rep) == 0; + reps.insert ({ now, rep }); + + stats.inc (nano::stat::type::online_reps, new_insert ? nano::stat::detail::rep_new : nano::stat::detail::rep_update); + + bool trimmed = trim (); + + // Update current online weight if anything changed if (new_insert || trimmed) { + stats.inc (nano::stat::type::online_reps, nano::stat::detail::update_online); online_m = calculate_online (); } } } -void nano::online_reps::sample () +bool nano::online_reps::trim () +{ + debug_assert (!mutex.try_lock ()); + + auto now = std::chrono::steady_clock::now (); + auto cutoff = reps.get ().lower_bound (now - config.network_params.node.weight_interval); + auto trimmed = reps.get ().begin () != cutoff; + reps.get ().erase (reps.get ().begin (), cutoff); + return trimmed; +} + +void nano::online_reps::run () { nano::unique_lock lock{ mutex }; - nano::uint128_t online_l = online_m; - lock.unlock (); - nano::uint128_t trend_l; + while (!stopped) { - auto transaction (ledger.store.tx_begin_write ({ tables::online_weight })); - // Discard oldest entries - while (ledger.store.online_weight.count (transaction) >= config.network_params.node.max_weight_samples) + auto next = std::chrono::steady_clock::now () + config.network_params.node.weight_interval; + condition.wait_until (lock, next, [this, next] { + return stopped || std::chrono::steady_clock::now () >= next; + }); + if (!stopped) { - auto oldest (ledger.store.online_weight.begin (transaction)); - debug_assert (oldest != ledger.store.online_weight.end ()); - ledger.store.online_weight.del (transaction, oldest->first); + lock.unlock (); + sample (); + lock.lock (); } - ledger.store.online_weight.put (transaction, std::chrono::system_clock::now ().time_since_epoch ().count (), online_l); - trend_l = calculate_trend (transaction); } - lock.lock (); - trended_m = trend_l; +} + +void nano::online_reps::sample () +{ + stats.inc (nano::stat::type::online_reps, nano::stat::detail::sample); + + auto transaction = ledger.store.tx_begin_write ({ tables::online_weight }); + trim_trend (transaction); + ledger.store.online_weight.put (transaction, nano::seconds_since_epoch (), online ()); + auto trended_l = calculate_trend (transaction); + { + nano::lock_guard lock{ mutex }; + trended_m = trended_l; + } + + logger.debug (nano::log::type::online_reps, "Updated trended weight: {}", fmt::streamed (trended_l)); } nano::uint128_t nano::online_reps::calculate_online () const { - nano::uint128_t current; - for (auto & i : reps) + debug_assert (!mutex.try_lock ()); + return std::accumulate (reps.begin (), reps.end (), nano::uint128_t{ 0 }, [this] (nano::uint128_t current, rep_info const & info) { + return current + ledger.weight (info.account); + }); +} + +void nano::online_reps::trim_trend (nano::store::write_transaction const & transaction) +{ + auto const now = std::chrono::system_clock::now (); + auto const cutoff = now - config.network_params.node.weight_cutoff; + + for (auto it = ledger.store.online_weight.begin (transaction); it != ledger.store.online_weight.end (); ++it) { - current += ledger.weight (i.account); + auto tstamp = nano::from_seconds_since_epoch (it->first); + if (tstamp < cutoff) + { + stats.inc (nano::stat::type::online_reps, nano::stat::detail::trim_trend); + ledger.store.online_weight.del (transaction, it->first); + } + else + { + // Entries are ordered by timestamp, so break early + break; + } } - return current; + + // Ensure that all remaining entries are within the expected range + debug_assert (verify_consistency (transaction, now, cutoff)); } -nano::uint128_t nano::online_reps::calculate_trend (store::transaction & transaction_a) const +void nano::online_reps::sanitize_trend (nano::store::write_transaction const & transaction) +{ + auto const now = std::chrono::system_clock::now (); + auto const cutoff = now - config.network_params.node.weight_cutoff; + + size_t removed_old = 0, removed_future = 0; + + for (auto it = ledger.store.online_weight.begin (transaction); it != ledger.store.online_weight.end (); ++it) + { + auto tstamp = nano::from_seconds_since_epoch (it->first); + if (tstamp < cutoff) + { + stats.inc (nano::stat::type::online_reps, nano::stat::detail::sanitize_old); + // TODO: Ensure it's OK to delete entry with the same key as the current iterator + ledger.store.online_weight.del (transaction, it->first); + ++removed_old; + } + else if (tstamp > now) + { + stats.inc (nano::stat::type::online_reps, nano::stat::detail::sanitize_future); + // TODO: Ensure it's OK to delete entry with the same key as the current iterator + ledger.store.online_weight.del (transaction, it->first); + ++removed_future; + } + } + + logger.info (nano::log::type::online_reps, "Sanitized online weight trend, remaining entries: {}, removed: {} (old: {}, future: {})", + ledger.store.online_weight.count (transaction), + removed_old + removed_future, + removed_old, + removed_future); + + // Ensure that all remaining entries are within the expected range + debug_assert (verify_consistency (transaction, now, cutoff)); +} + +bool nano::online_reps::verify_consistency (nano::store::write_transaction const & transaction, std::chrono::system_clock::time_point now, std::chrono::system_clock::time_point cutoff) const +{ + for (auto it = ledger.store.online_weight.begin (transaction); it != ledger.store.online_weight.end (); ++it) + { + auto tstamp = nano::from_seconds_since_epoch (it->first); + if (tstamp < cutoff || tstamp > now) + { + return false; + } + } + return true; +} + +nano::uint128_t nano::online_reps::calculate_trend (store::transaction const & transaction) const { std::vector items; - items.reserve (config.network_params.node.max_weight_samples + 1); - items.push_back (config.online_weight_minimum.number ()); - for (auto i (ledger.store.online_weight.begin (transaction_a)), n (ledger.store.online_weight.end ()); i != n; ++i) + for (auto it = ledger.store.online_weight.begin (transaction); it != ledger.store.online_weight.end (); ++it) { - items.push_back (i->second.number ()); + items.push_back (it->second.number ()); } - nano::uint128_t result; - // Pick median value for our target vote weight - auto median_idx = items.size () / 2; - nth_element (items.begin (), items.begin () + median_idx, items.end ()); - result = items[median_idx]; - return result; + if (!items.empty ()) + { + // Pick median value for our target vote weight + auto median_idx = items.size () / 2; + std::nth_element (items.begin (), items.begin () + median_idx, items.end ()); + return items[median_idx]; + } + return 0; } nano::uint128_t nano::online_reps::trended () const { nano::lock_guard lock{ mutex }; - return trended_m; + return std::max (trended_m, config.online_weight_minimum.number ()); } nano::uint128_t nano::online_reps::online () const @@ -99,7 +233,9 @@ nano::uint128_t nano::online_reps::delta () const nano::lock_guard lock{ mutex }; // Using a larger container to ensure maximum precision auto weight = static_cast (std::max ({ online_m, trended_m, config.online_weight_minimum.number () })); - return ((weight * online_weight_quorum) / 100).convert_to (); + auto delta = ((weight * online_weight_quorum) / 100).convert_to (); + release_assert (delta >= config.online_weight_minimum.number () / 100 * online_weight_quorum); + return delta; } std::vector nano::online_reps::list () @@ -117,16 +253,24 @@ void nano::online_reps::clear () online_m = 0; } -std::unique_ptr nano::collect_container_info (online_reps & online_reps, std::string const & name) +void nano::online_reps::force_online_weight (nano::uint128_t const & online_weight) { - std::size_t count; - { - nano::lock_guard guard{ online_reps.mutex }; - count = online_reps.reps.size (); - } + release_assert (nano::is_dev_run ()); + nano::lock_guard lock{ mutex }; + online_m = online_weight; +} + +void nano::online_reps::force_sample () +{ + release_assert (nano::is_dev_run ()); + sample (); +} + +std::unique_ptr nano::online_reps::collect_container_info (std::string const & name) +{ + nano::lock_guard guard{ mutex }; - auto sizeof_element = sizeof (decltype (online_reps.reps)::value_type); auto composite = std::make_unique (name); - composite->add_component (std::make_unique (container_info{ "reps", count, sizeof_element })); + composite->add_component (std::make_unique (container_info{ "reps", reps.size (), sizeof (decltype (reps)::value_type) })); return composite; } diff --git a/nano/node/online_reps.hpp b/nano/node/online_reps.hpp index f01fec91e0..6bcec904a4 100644 --- a/nano/node/online_reps.hpp +++ b/nano/node/online_reps.hpp @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -10,26 +11,26 @@ #include #include +#include #include +namespace mi = boost::multi_index; + namespace nano { -class ledger; -class node_config; -namespace store -{ - class transaction; -} - /** Track online representatives and trend online weight */ class online_reps final { public: - online_reps (nano::ledger & ledger_a, nano::node_config const & config_a); + online_reps (nano::node_config const &, nano::ledger &, nano::stats &, nano::logger &); + ~online_reps (); + + void start (); + void stop (); + /** Add voting account \p rep_account to the set of online representatives */ void observe (nano::account const & rep_account); - /** Called periodically to sample online weight */ - void sample (); + /** Returns the trended online stake */ nano::uint128_t trended () const; /** Returns the current online stake */ @@ -39,40 +40,62 @@ class online_reps final /** List of online representatives, both the currently sampling ones and the ones observed in the previous sampling period */ std::vector list (); void clear (); + + std::unique_ptr collect_container_info (std::string const & name); + + // TODO: This should be in network constants static unsigned constexpr online_weight_quorum = 67; +private: // Dependencies + nano::node_config const & config; + nano::ledger & ledger; + nano::stats & stats; + nano::logger & logger; + private: - class rep_info + void run (); + /** Called periodically to sample online weight */ + void sample (); + bool trim (); + /** Remove old records from the database */ + void trim_trend (nano::store::write_transaction const &); + /** Iterate over all database samples and remove invalid records. This is meant to clean potential leftovers from previous versions. */ + void sanitize_trend (nano::store::write_transaction const &); + bool verify_consistency (nano::store::write_transaction const &, std::chrono::system_clock::time_point now, std::chrono::system_clock::time_point cutoff) const; + nano::uint128_t calculate_trend (nano::store::transaction const &) const; + nano::uint128_t calculate_online () const; + +private: + struct rep_info { - public: std::chrono::steady_clock::time_point time; nano::account account; }; - class tag_time - { - }; - class tag_account - { - }; - nano::uint128_t calculate_trend (store::transaction &) const; - nano::uint128_t calculate_online () const; - mutable nano::mutex mutex; - nano::ledger & ledger; - nano::node_config const & config; - boost::multi_index_container, - boost::multi_index::member>, - boost::multi_index::hashed_unique, - boost::multi_index::member>>> - reps; + + // clang-format off + class tag_time {}; + class tag_account {}; + + using ordered_reps = boost::multi_index_container, + mi::member>, + mi::hashed_unique, + mi::member> + >>; + // clang-format off + ordered_reps reps; + nano::uint128_t trended_m; nano::uint128_t online_m; - nano::uint128_t minimum; - friend class election_quorum_minimum_update_weight_before_quorum_checks_Test; - friend std::unique_ptr collect_container_info (online_reps & online_reps, std::string const & name); -}; + bool stopped{ false }; + nano::condition_variable condition; + mutable nano::mutex mutex; + std::thread thread; -std::unique_ptr collect_container_info (online_reps & online_reps, std::string const & name); +public: // Only for tests + void force_online_weight (nano::uint128_t const & online_weight); + void force_sample (); +}; } diff --git a/nano/node/peer_history.cpp b/nano/node/peer_history.cpp index bf1fa099bb..d684f18677 100644 --- a/nano/node/peer_history.cpp +++ b/nano/node/peer_history.cpp @@ -110,6 +110,7 @@ void nano::peer_history::run_one () auto timestamp = nano::from_milliseconds_since_epoch (timestamp_millis); if (timestamp > now || timestamp < cutoff) { + // TODO: Ensure it's OK to delete entry with the same key as the current iterator store.peer.del (transaction, endpoint); stats.inc (nano::stat::type::peer_history, nano::stat::detail::erased); diff --git a/nano/node/repcrawler.cpp b/nano/node/repcrawler.cpp index f8f04a4ce3..9a706157ba 100644 --- a/nano/node/repcrawler.cpp +++ b/nano/node/repcrawler.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include diff --git a/nano/node/scheduler/hinted.cpp b/nano/node/scheduler/hinted.cpp index 6f654c65ec..4deb863e4a 100644 --- a/nano/node/scheduler/hinted.cpp +++ b/nano/node/scheduler/hinted.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include diff --git a/nano/rpc_test/rpc.cpp b/nano/rpc_test/rpc.cpp index b2ff31eb49..4dda5e6899 100644 --- a/nano/rpc_test/rpc.cpp +++ b/nano/rpc_test/rpc.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include diff --git a/nano/secure/common.cpp b/nano/secure/common.cpp index f5fd8e44fc..55cff9b40e 100644 --- a/nano/secure/common.cpp +++ b/nano/secure/common.cpp @@ -153,8 +153,8 @@ nano::node_constants::node_constants (nano::network_constants & network_constant search_pending_interval = network_constants.is_dev_network () ? std::chrono::seconds (1) : std::chrono::seconds (5 * 60); unchecked_cleaning_interval = std::chrono::minutes (30); process_confirmed_interval = network_constants.is_dev_network () ? std::chrono::milliseconds (50) : std::chrono::milliseconds (500); - max_weight_samples = (network_constants.is_live_network () || network_constants.is_test_network ()) ? 4032 : 288; - weight_period = 5 * 60; // 5 minutes + weight_interval = network_constants.is_dev_network () ? std::chrono::seconds (1) : std::chrono::minutes (5); + weight_cutoff = (network_constants.is_live_network () || network_constants.is_test_network ()) ? std::chrono::weeks (2) : std::chrono::days (1); } nano::voting_constants::voting_constants (nano::network_constants & network_constants) : diff --git a/nano/secure/common.hpp b/nano/secure/common.hpp index fd28d30b13..ec067a568a 100644 --- a/nano/secure/common.hpp +++ b/nano/secure/common.hpp @@ -272,9 +272,10 @@ class node_constants std::chrono::minutes unchecked_cleaning_interval; std::chrono::milliseconds process_confirmed_interval; - /** The maximum amount of samples for a 2 week period on live or 1 day on beta */ - uint64_t max_weight_samples; - uint64_t weight_period; + /** Time between collecting online representative samples */ + std::chrono::seconds weight_interval; + /** The maximum time to keep online weight samples: 2 weeks on live or 1 day on beta */ + std::chrono::seconds weight_cutoff; }; /** Voting related constants whose value depends on the active network */ diff --git a/nano/slow_test/node.cpp b/nano/slow_test/node.cpp index 960a205a64..f937631bc3 100644 --- a/nano/slow_test/node.cpp +++ b/nano/slow_test/node.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include