Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

work_pool: move stopping from destructor to stop function #4437

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions nano/core_test/block_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,7 @@ TEST (mdb_block_store, sideband_height)
auto transaction (store.tx_begin_write ());
store.initialize (transaction, ledger.cache, nano::dev::constants);
nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits<unsigned>::max () };
nano::test::start_stop_guard pool_guard{ pool };
auto send = builder
.send ()
.previous (nano::dev::genesis->hash ())
Expand Down
5 changes: 5 additions & 0 deletions nano/core_test/confirmation_height.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,7 @@ TEST (confirmation_heightDeathTest, rollback_added_block)
nano::ledger ledger (*store, stats, nano::dev::constants);
nano::write_database_queue write_database_queue (false);
nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits<unsigned>::max () };
nano::test::start_stop_guard pool_guard{ pool };
nano::keypair key1;
nano::block_builder builder;
auto send = builder
Expand Down Expand Up @@ -1256,6 +1257,7 @@ TEST (confirmation_heightDeathTest, modified_chain)
nano::ledger ledger (*store, stats, nano::dev::constants);
nano::write_database_queue write_database_queue (false);
nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits<unsigned>::max () };
nano::test::start_stop_guard pool_guard{ pool };
nano::keypair key1;
nano::block_builder builder;
auto send = builder
Expand Down Expand Up @@ -1333,6 +1335,7 @@ TEST (confirmation_heightDeathTest, modified_chain_account_removed)
nano::ledger ledger (*store, stats, nano::dev::constants);
nano::write_database_queue write_database_queue (false);
nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits<unsigned>::max () };
nano::test::start_stop_guard pool_guard{ pool };
nano::keypair key1;
nano::block_builder builder;
auto send = builder
Expand Down Expand Up @@ -2041,6 +2044,7 @@ TEST (confirmation_height, unbounded_block_cache_iteration)
nano::write_database_queue write_database_queue (false);
boost::latch initialized_latch{ 0 };
nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits<unsigned>::max () };
nano::test::start_stop_guard pool_guard{ pool };
nano::keypair key1;
nano::block_builder builder;
auto send = builder
Expand Down Expand Up @@ -2104,6 +2108,7 @@ TEST (confirmation_height, pruned_source)
ledger.pruning = true;
nano::write_database_queue write_database_queue (false);
nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits<unsigned>::max () };
nano::test::start_stop_guard pool_guard{ pool };
nano::keypair key1, key2;
nano::block_builder builder;
auto send1 = builder
Expand Down
94 changes: 94 additions & 0 deletions nano/core_test/ledger.cpp

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ TEST (node, block_store_path_failure)
auto service (std::make_shared<boost::asio::io_context> ());
auto path (nano::unique_path ());
nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits<unsigned>::max () };
nano::test::start_stop_guard pool_guard{ pool };
auto node (std::make_shared<nano::node> (*service, system.get_available_port (), path, pool));
ASSERT_TRUE (node->wallets.items.empty ());
node->stop ();
Expand Down Expand Up @@ -100,6 +101,7 @@ TEST (node, password_fanout)
nano::node_config config;
config.peering_port = system.get_available_port ();
nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits<unsigned>::max () };
nano::test::start_stop_guard pool_guard{ pool };
config.password_fanout = 10;
nano::node node (io_ctx, path, config, pool);
auto wallet (node.wallets.create (100));
Expand Down
2 changes: 2 additions & 0 deletions nano/core_test/processor_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ TEST (processor_service, bad_send_signature)
auto transaction (store->tx_begin_write ());
store->initialize (transaction, ledger.cache, ledger.constants);
nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits<unsigned>::max () };
nano::test::start_stop_guard pool_guard{ pool };
auto info1 = ledger.account_info (transaction, nano::dev::genesis_key.pub);
ASSERT_TRUE (info1);
nano::keypair key2;
Expand All @@ -45,6 +46,7 @@ TEST (processor_service, bad_receive_signature)
auto transaction (store->tx_begin_write ());
store->initialize (transaction, ledger.cache, ledger.constants);
nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits<unsigned>::max () };
nano::test::start_stop_guard pool_guard{ pool };
auto info1 = ledger.account_info (transaction, nano::dev::genesis_key.pub);
ASSERT_TRUE (info1);
nano::block_builder builder;
Expand Down
9 changes: 9 additions & 0 deletions nano/core_test/work_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <nano/node/openclwork.hpp>
#include <nano/secure/common.hpp>
#include <nano/secure/utility.hpp>
#include <nano/test_common/testutil.hpp>

#include <gtest/gtest.h>

Expand All @@ -15,6 +16,7 @@
TEST (work, one)
{
nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits<unsigned>::max () };
nano::test::start_stop_guard pool_guard{ pool };
nano::block_builder builder;
auto block = builder
.change ()
Expand All @@ -30,13 +32,15 @@ TEST (work, one)
TEST (work, disabled)
{
nano::work_pool pool{ nano::dev::network_params.network, 0 };
nano::test::start_stop_guard pool_guard{ pool };
auto result (pool.generate (nano::block_hash ()));
ASSERT_FALSE (result.is_initialized ());
}

TEST (work, validate)
{
nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits<unsigned>::max () };
nano::test::start_stop_guard pool_guard{ pool };
nano::block_builder builder;
auto send_block = builder
.send ()
Expand All @@ -54,6 +58,7 @@ TEST (work, validate)
TEST (work, cancel)
{
nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits<unsigned>::max () };
nano::test::start_stop_guard pool_guard{ pool };
auto iterations (0);
auto done (false);
while (!done)
Expand All @@ -72,6 +77,7 @@ TEST (work, cancel)
TEST (work, cancel_many)
{
nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits<unsigned>::max () };
nano::test::start_stop_guard pool_guard{ pool };
nano::root key1 (1);
nano::root key2 (2);
nano::root key3 (1);
Expand Down Expand Up @@ -103,6 +109,7 @@ TEST (work, opencl)
nano::work_pool pool{ nano::dev::network_params.network, 0, std::chrono::nanoseconds (0), [&opencl] (nano::work_version const version_a, nano::root const & root_a, uint64_t difficulty_a, std::atomic<int> & ticket_a) {
return opencl->generate_work (version_a, root_a, difficulty_a);
} };
nano::test::start_stop_guard pool_guard{ pool };
ASSERT_NE (nullptr, pool.opencl);
nano::root root;
uint64_t difficulty (0xff00000000000000);
Expand All @@ -129,6 +136,7 @@ TEST (work, opencl)
TEST (work, difficulty)
{
nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits<unsigned>::max () };
nano::test::start_stop_guard pool_guard{ pool };
nano::root root (1);
uint64_t difficulty1 (0xff00000000000000);
uint64_t difficulty2 (0xfff0000000000000);
Expand All @@ -153,6 +161,7 @@ TEST (work, eco_pow)
{
auto work_func = [] (std::promise<std::chrono::nanoseconds> & promise, std::chrono::nanoseconds interval) {
nano::work_pool pool{ nano::dev::network_params.network, 1, interval };
nano::test::start_stop_guard pool_guard{ pool };
constexpr auto num_iterations = 5;

nano::timer<std::chrono::nanoseconds> timer;
Expand Down
51 changes: 29 additions & 22 deletions nano/lib/work.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,33 +28,14 @@ nano::work_pool::work_pool (nano::network_constants & network_constants, unsigne
ticket (0),
done (false),
pow_rate_limiter (pow_rate_limiter_a),
opencl (opencl_a)
opencl (opencl_a),
max_threads (max_threads_a)
{
static_assert (ATOMIC_INT_LOCK_FREE == 2, "Atomic int needed");

auto count (network_constants.is_dev_network () ? std::min (max_threads_a, 1u) : std::min (max_threads_a, std::max (1u, nano::hardware_concurrency ())));
if (opencl)
{
// One thread to handle OpenCL
++count;
}
for (auto i (0u); i < count; ++i)
{
threads.emplace_back (nano::thread_attributes::get_default (), [this, i] () {
nano::thread_role::set (nano::thread_role::name::work);
nano::work_thread_reprioritize ();
loop (i);
});
}
}

nano::work_pool::~work_pool ()
{
stop ();
for (auto & i : threads)
{
i.join ();
}
debug_assert (done);
}

void nano::work_pool::loop (uint64_t thread)
Expand Down Expand Up @@ -171,14 +152,40 @@ void nano::work_pool::cancel (nano::root const & root_a)
}
}

void nano::work_pool::start ()
{
static_assert (ATOMIC_INT_LOCK_FREE == 2, "Atomic int needed");

auto count (network_constants.is_dev_network () ? std::min (max_threads, 1u) : std::min (max_threads, std::max (1u, nano::hardware_concurrency ())));
if (opencl)
{
// One thread to handle OpenCL
++count;
}
for (auto i (0u); i < count; ++i)
{
threads.emplace_back (nano::thread_attributes::get_default (), [this, i] () {
nano::thread_role::set (nano::thread_role::name::work);
nano::work_thread_reprioritize ();
loop (i);
});
}
}

void nano::work_pool::stop ()
{
{
nano::lock_guard<nano::mutex> lock{ mutex };
done = true;
++ticket;
}

producer_condition.notify_all ();

for (auto & i : threads)
{
i.join ();
}
}

void nano::work_pool::generate (nano::work_version const version_a, nano::root const & root_a, uint64_t difficulty_a, std::function<void (boost::optional<uint64_t> const &)> callback_a)
Expand Down
2 changes: 2 additions & 0 deletions nano/lib/work.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class work_pool final
work_pool (nano::network_constants & network_constants, unsigned, std::chrono::nanoseconds = std::chrono::nanoseconds (0), std::function<boost::optional<uint64_t> (nano::work_version const, nano::root const &, uint64_t, std::atomic<int> &)> = nullptr);
~work_pool ();
void loop (uint64_t);
void start ();
void stop ();
void cancel (nano::root const &);
void generate (nano::work_version const, nano::root const &, uint64_t, std::function<void (boost::optional<uint64_t> const &)>);
Expand All @@ -57,6 +58,7 @@ class work_pool final
std::chrono::nanoseconds pow_rate_limiter;
std::function<boost::optional<uint64_t> (nano::work_version const, nano::root const &, uint64_t, std::atomic<int> &)> opencl;
nano::observer_set<bool> work_observers;
unsigned max_threads;
};

std::unique_ptr<container_info_component> collect_container_info (work_pool & work_pool, std::string const & name);
Expand Down
3 changes: 2 additions & 1 deletion nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,7 @@ void nano::node::process_local_async (std::shared_ptr<nano::block> const & block

void nano::node::start ()
{
work.start ();
long_inactivity_cleanup ();
network.start ();
add_initial_peers ();
Expand Down Expand Up @@ -698,7 +699,7 @@ void nano::node::stop ()
stats.stop ();
epoch_upgrader.stop ();
workers.stop ();
// work pool is not stopped on purpose due to testing setup
work.stop ();
}

void nano::node::keepalive_preconfigured (std::vector<std::string> const & peers_a)
Expand Down
4 changes: 4 additions & 0 deletions nano/test_common/ledger.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#include "testutil.hpp"

#include <nano/node/make_store.hpp>
#include <nano/node/node.hpp>
#include <nano/test_common/ledger.hpp>
Expand Down Expand Up @@ -46,6 +48,7 @@ auto nano::test::context::ledger_send_receive () -> ledger_context
{
std::deque<std::shared_ptr<nano::block>> blocks;
nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits<unsigned>::max () };
nano::test::start_stop_guard pool_guard{ pool };
nano::block_builder builder;
auto send = builder.state ()
.make_block ()
Expand Down Expand Up @@ -76,6 +79,7 @@ auto nano::test::context::ledger_send_receive_legacy () -> ledger_context
{
std::deque<std::shared_ptr<nano::block>> blocks;
nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits<unsigned>::max () };
nano::test::start_stop_guard pool_guard{ pool };
nano::block_builder builder;
auto send = builder.send ()
.make_block ()
Expand Down
2 changes: 2 additions & 0 deletions nano/test_common/system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ std::shared_ptr<nano::node> nano::test::system::make_disconnected_node (std::opt

nano::test::system::system ()
{
work.start ();
auto scale_str = std::getenv ("DEADLINE_SCALE_FACTOR");
if (scale_str)
{
Expand All @@ -150,6 +151,7 @@ nano::test::system::system ()
nano::test::system::system (uint16_t count_a, nano::transport::transport_type type_a, nano::node_flags flags_a) :
system ()
{
work.start ();
nodes.reserve (count_a);
for (uint16_t i (0); i < count_a; ++i)
{
Expand Down
Loading