From 0bb7299cc777f40cdd621982f587415807ec046c Mon Sep 17 00:00:00 2001 From: Hartmut Kaiser Date: Mon, 7 Aug 2023 18:47:24 -0500 Subject: [PATCH] Adding hierarchical operation to index_queue spawning - flyby: fixing integral conversion warnings - flyby: fixing target.num_pus --- .../tests/performance/foreach_scaling.cpp | 44 ++-- .../include/hpx/compute_local/host/target.hpp | 10 +- libs/core/compute_local/src/host_target.cpp | 23 +- .../executors/detail/index_queue_spawning.hpp | 196 +++++++++++++----- .../hpx/executors/parallel_executor.hpp | 4 +- .../executors/thread_pool_scheduler_bulk.hpp | 11 +- 6 files changed, 204 insertions(+), 84 deletions(-) diff --git a/libs/core/algorithms/tests/performance/foreach_scaling.cpp b/libs/core/algorithms/tests/performance/foreach_scaling.cpp index 1a98161d0ada..2958aa5212a6 100644 --- a/libs/core/algorithms/tests/performance/foreach_scaling.cpp +++ b/libs/core/algorithms/tests/performance/foreach_scaling.cpp @@ -35,7 +35,7 @@ std::uint64_t averageout_plain_for(std::size_t vector_size) std::iota( std::begin(data_representation), std::end(data_representation), gen()); - std::uint64_t start = hpx::chrono::high_resolution_clock::now(); + std::uint64_t const start = hpx::chrono::high_resolution_clock::now(); // average out 100 executions to avoid varying results for (auto i = 0; i < test_count; i++) @@ -52,7 +52,7 @@ std::uint64_t averageout_plain_for_iter(std::size_t vector_size) std::iota( std::begin(data_representation), std::end(data_representation), gen()); - std::uint64_t start = hpx::chrono::high_resolution_clock::now(); + std::uint64_t const start = hpx::chrono::high_resolution_clock::now(); // average out 100 executions to avoid varying results for (auto i = 0; i < test_count; i++) @@ -72,7 +72,7 @@ std::uint64_t averageout_parallel_foreach( std::iota( std::begin(data_representation), std::end(data_representation), gen()); - std::uint64_t start = hpx::chrono::high_resolution_clock::now(); + std::uint64_t const start = hpx::chrono::high_resolution_clock::now(); // average out 100 executions to avoid varying results for (auto i = 0; i < test_count; i++) @@ -92,7 +92,7 @@ std::uint64_t averageout_task_foreach(std::size_t vector_size, Executor&& exec) if (num_overlapping_loops <= 0) { - std::uint64_t start = hpx::chrono::high_resolution_clock::now(); + std::uint64_t const start = hpx::chrono::high_resolution_clock::now(); for (auto i = 0; i < test_count; i++) measure_task_foreach(data_representation, exec).wait(); @@ -103,7 +103,7 @@ std::uint64_t averageout_task_foreach(std::size_t vector_size, Executor&& exec) std::vector> tests; tests.resize(num_overlapping_loops); - std::uint64_t start = hpx::chrono::high_resolution_clock::now(); + std::uint64_t const start = hpx::chrono::high_resolution_clock::now(); for (auto i = 0; i < test_count; i++) { @@ -124,7 +124,7 @@ std::uint64_t averageout_sequential_foreach(std::size_t vector_size) std::iota( std::begin(data_representation), std::end(data_representation), gen()); - std::uint64_t start = hpx::chrono::high_resolution_clock::now(); + std::uint64_t const start = hpx::chrono::high_resolution_clock::now(); // average out 100 executions to avoid varying results for (auto i = 0; i < test_count; i++) @@ -142,7 +142,7 @@ std::uint64_t averageout_parallel_forloop( std::iota( std::begin(data_representation), std::end(data_representation), gen()); - std::uint64_t start = hpx::chrono::high_resolution_clock::now(); + std::uint64_t const start = hpx::chrono::high_resolution_clock::now(); // average out 100 executions to avoid varying results for (auto i = 0; i < test_count; i++) @@ -167,7 +167,7 @@ std::uint64_t averageout_task_forloop(std::size_t vector_size, Executor&& exec) if (num_overlapping_loops <= 0) { - std::uint64_t start = hpx::chrono::high_resolution_clock::now(); + std::uint64_t const start = hpx::chrono::high_resolution_clock::now(); for (auto i = 0; i < test_count; i++) measure_task_forloop(data_representation, exec).wait(); @@ -178,7 +178,7 @@ std::uint64_t averageout_task_forloop(std::size_t vector_size, Executor&& exec) std::vector> tests; tests.resize(num_overlapping_loops); - std::uint64_t start = hpx::chrono::high_resolution_clock::now(); + std::uint64_t const start = hpx::chrono::high_resolution_clock::now(); for (auto i = 0; i < test_count; i++) { @@ -199,7 +199,7 @@ std::uint64_t averageout_sequential_forloop(std::size_t vector_size) std::iota( std::begin(data_representation), std::end(data_representation), gen()); - std::uint64_t start = hpx::chrono::high_resolution_clock::now(); + std::uint64_t const start = hpx::chrono::high_resolution_clock::now(); // average out 100 executions to avoid varying results for (auto i = 0; i < test_count; i++) @@ -212,8 +212,8 @@ std::uint64_t averageout_sequential_forloop(std::size_t vector_size) int hpx_main(hpx::program_options::variables_map& vm) { // pull values from cmd - std::size_t vector_size = vm["vector_size"].as(); - bool csvoutput = vm.count("csv_output") != 0; + std::size_t const vector_size = vm["vector_size"].as(); + bool const csvoutput = vm.count("csv_output") != 0; delay = vm["work_delay"].as(); test_count = vm["test_count"].as(); chunk_size = vm["chunk_size"].as(); @@ -264,8 +264,8 @@ int hpx_main(hpx::program_options::variables_map& vm) std::uint64_t task_time_forloop = 0; std::uint64_t seq_time_forloop = 0; - std::uint64_t plain_time_for = averageout_plain_for(vector_size); - std::uint64_t plain_time_for_iter = + std::uint64_t const plain_time_for = averageout_plain_for(vector_size); + std::uint64_t const plain_time_for_iter = averageout_plain_for_iter(vector_size); if (vm["executor"].as() == "forkjoin") @@ -467,11 +467,15 @@ int hpx_main(hpx::program_options::variables_map& vm) << std::left << "Parallel Scale : " << std::right << std::setw(8) - << (double(seq_time_foreach) / par_time_foreach) << "\n" + << (static_cast(seq_time_foreach) / + par_time_foreach) + << "\n" << std::left << "Task Scale : " << std::right << std::setw(8) - << (double(seq_time_foreach) / task_time_foreach) << "\n" + << (static_cast(seq_time_foreach) / + task_time_foreach) + << "\n" << std::flush; std::cout << "-------------Average-(for_loop)----------------\n" @@ -490,11 +494,15 @@ int hpx_main(hpx::program_options::variables_map& vm) << std::left << "Parallel Scale : " << std::right << std::setw(8) - << (double(seq_time_forloop) / par_time_forloop) << "\n" + << (static_cast(seq_time_forloop) / + par_time_forloop) + << "\n" << std::left << "Task Scale : " << std::right << std::setw(8) - << (double(seq_time_forloop) / task_time_forloop) << "\n"; + << (static_cast(seq_time_forloop) / + task_time_forloop) + << "\n"; } } diff --git a/libs/core/compute_local/include/hpx/compute_local/host/target.hpp b/libs/core/compute_local/include/hpx/compute_local/host/target.hpp index d7081559245a..cb88143c46ff 100644 --- a/libs/core/compute_local/include/hpx/compute_local/host/target.hpp +++ b/libs/core/compute_local/include/hpx/compute_local/host/target.hpp @@ -33,7 +33,7 @@ namespace hpx::compute::host { { } - explicit native_handle_type(hpx::threads::mask_type mask) + explicit native_handle_type(hpx::threads::mask_type const& mask) : mask_(mask) { } @@ -58,7 +58,7 @@ namespace hpx::compute::host { target() = default; // Constructs target from a given mask of processing units - explicit target(hpx::threads::mask_type mask) + explicit target(hpx::threads::mask_type const& mask) : handle_(mask) { } @@ -74,12 +74,12 @@ namespace hpx::compute::host { std::pair num_pus() const; - constexpr void synchronize() const noexcept + static constexpr void synchronize() noexcept { // nothing to do here... } - hpx::future get_future() const + static hpx::future get_future() { return hpx::make_ready_future(); } @@ -98,7 +98,7 @@ namespace hpx::compute::host { friend class hpx::serialization::access; void serialize(serialization::input_archive& ar, unsigned int); - void serialize(serialization::output_archive& ar, unsigned int); + void serialize(serialization::output_archive& ar, unsigned int) const; native_handle_type handle_; }; diff --git a/libs/core/compute_local/src/host_target.cpp b/libs/core/compute_local/src/host_target.cpp index 4a790059ca5e..1299d3fc6855 100644 --- a/libs/core/compute_local/src/host_target.cpp +++ b/libs/core/compute_local/src/host_target.cpp @@ -21,11 +21,13 @@ namespace hpx::compute::host { std::pair target::num_pus() const { - auto& rp = hpx::resource::get_partitioner(); - std::size_t num_os_threads = hpx::get_os_thread_count(); + auto const& rp = hpx::resource::get_partitioner(); + std::size_t const num_os_threads = hpx::get_os_thread_count(); - hpx::threads::mask_type mask = native_handle().get_device(); - std::size_t mask_size = hpx::threads::mask_size(mask); + hpx::threads::mask_type const mask = native_handle().get_device(); + std::size_t const mask_size = hpx::threads::mask_size(mask); + + bool found_one = false; std::size_t num_thread = 0; for (/**/; num_thread != num_os_threads; ++num_thread) @@ -33,10 +35,18 @@ namespace hpx::compute::host { if (hpx::threads::bit_and( mask, rp.get_pu_mask(num_thread), mask_size)) { + found_one = true; break; } } - return std::make_pair(num_thread, hpx::threads::count(mask)); + + if (!found_one) + { + return std::make_pair(static_cast(-1), 0); + } + + return std::make_pair( + num_thread, (std::min)(num_os_threads, hpx::threads::count(mask))); } void target::serialize(serialization::input_archive& ar, unsigned int) @@ -44,7 +54,8 @@ namespace hpx::compute::host { ar >> handle_.mask_; } - void target::serialize(serialization::output_archive& ar, unsigned int) + void target::serialize( + serialization::output_archive& ar, unsigned int) const { ar << handle_.mask_; } diff --git a/libs/core/executors/include/hpx/executors/detail/index_queue_spawning.hpp b/libs/core/executors/include/hpx/executors/detail/index_queue_spawning.hpp index 927a1ca7b705..c514f8abb227 100644 --- a/libs/core/executors/include/hpx/executors/detail/index_queue_spawning.hpp +++ b/libs/core/executors/include/hpx/executors/detail/index_queue_spawning.hpp @@ -274,9 +274,10 @@ namespace hpx::parallel::execution::detail { { auto& queue = queues[worker_thread].data_; auto const part_begin = static_cast( - (worker_thread * size) / num_threads); + (static_cast(worker_thread) * size) / num_threads); auto const part_end = static_cast( - ((worker_thread + 1) * size) / num_threads); + ((static_cast(worker_thread) + 1) * size) / + num_threads); queue.reset(part_begin, part_end); } @@ -300,10 +301,11 @@ namespace hpx::parallel::execution::detail { // Spawn a task which will process a number of chunks. If the queue // contains no chunks no task will be spawned. - template - void do_work_task(hpx::threads::thread_description const& desc, - threads::thread_pool_base* pool, bool dont_bind_to_core, - Task&& task_f) const + template + void do_work_task( + [[maybe_unused]] hpx::threads::thread_description const& desc, + [[maybe_unused]] threads::thread_pool_base* pool, + [[maybe_unused]] bool dont_bind_to_core, Task&& task_f) const { std::uint32_t const worker_thread = task_f.worker_thread; if (queues[worker_thread].data_.empty()) @@ -314,55 +316,71 @@ namespace hpx::parallel::execution::detail { return; } - // run task on small stack - auto post_policy = hpx::execution::experimental::with_stacksize( - policy, threads::thread_stacksize::small_); - - if (dont_bind_to_core) + if constexpr (!RunDirectly) { - // Make sure the new task is not bound to a particular core, if - // requested. This prevents the main thread from potentially - // being occupied in asynchronous scenarios. - hpx::threads::thread_priority const priority = - hpx::execution::experimental::get_priority(post_policy); - if (priority == hpx::threads::thread_priority::bound) + // run task on small stack + auto post_policy = hpx::execution::experimental::with_stacksize( + policy, threads::thread_stacksize::small_); + + if (dont_bind_to_core) { - post_policy = hpx::execution::experimental::with_priority( - post_policy, hpx::threads::thread_priority::normal); + // Make sure the new task is not bound to a particular core, if + // requested. This prevents the main thread from potentially + // being occupied in asynchronous scenarios. + hpx::threads::thread_priority const priority = + hpx::execution::experimental::get_priority(post_policy); + if (priority == hpx::threads::thread_priority::bound) + { + post_policy = + hpx::execution::experimental::with_priority( + post_policy, + hpx::threads::thread_priority::normal); + } } - } - - // launch task on new HPX-thread - auto hint = hpx::execution::experimental::get_hint(policy); - if (hint.mode == hpx::threads::thread_schedule_hint_mode::none && - hint.hint == -1) - { - // apply hint if none was given - hint.mode = hpx::threads::thread_schedule_hint_mode::thread; - hint.hint = worker_thread + first_thread; - hpx::detail::post_policy_dispatch::call( - hpx::execution::experimental::with_hint(post_policy, hint), - desc, pool, HPX_FORWARD(Task, task_f)); + // launch task on new HPX-thread + auto hint = hpx::execution::experimental::get_hint(policy); + if (hint.mode == + hpx::threads::thread_schedule_hint_mode::none && + hint.hint == -1) + { + // apply hint if none was given + hint.mode = hpx::threads::thread_schedule_hint_mode::thread; + hint.hint = static_cast(worker_thread) + + first_thread; + + hpx::detail::post_policy_dispatch::call( + hpx::execution::experimental::with_hint( + post_policy, hint), + desc, pool, HPX_FORWARD(Task, task_f)); + } + else + { + hpx::detail::post_policy_dispatch::call( + post_policy, desc, pool, HPX_FORWARD(Task, task_f)); + } } else { - hpx::detail::post_policy_dispatch::call( - post_policy, desc, pool, HPX_FORWARD(Task, task_f)); + // execute directly + hpx::detail::sync_launch_policy_dispatch< + launch::sync_policy>::call(policy, + HPX_FORWARD(Task, task_f)); } } public: template index_queue_bulk_state(std::size_t first_thread, - std::size_t num_threads, Launch l, F_&& f, Shape const& shape, - Ts_&&... ts) noexcept + std::size_t num_threads, std::size_t hierarchical_threshold, + Launch l, F_&& f, Shape shape, Ts_&&... ts) noexcept : base_type(init_no_addref{}) , first_thread(static_cast(first_thread)) , num_threads(num_threads) + , hierarchical_threshold(hierarchical_threshold) , policy(HPX_MOVE(l)) , f(HPX_FORWARD(F_, f)) - , shape(shape) + , shape(HPX_MOVE(shape)) , ts(HPX_FORWARD(Ts_, ts)...) , pu_mask(full_mask(first_thread, num_threads)) , queues(num_threads) @@ -371,8 +389,27 @@ namespace hpx::parallel::execution::detail { HPX_ASSERT(hpx::threads::count(pu_mask) == num_threads); } - void execute(hpx::threads::thread_description const& desc, - threads::thread_pool_base* pool) + struct launch_data + { + launch_data(task_function&& func, + bool bind_to_core) noexcept + : func(HPX_MOVE(func)) + , bind_to_core(bind_to_core) + { + } + + ~launch_data() = default; + + launch_data(launch_data const&) = default; + launch_data(launch_data&&) = default; + launch_data& operator=(launch_data const&) = default; + launch_data& operator=(launch_data&&) = default; + + task_function func; + bool bind_to_core; + }; + + std::vector generate_launch_data() { auto const size = static_cast(hpx::util::size(shape)); @@ -439,6 +476,9 @@ namespace hpx::parallel::execution::detail { bool allow_stealing = !hpx::threads::do_not_share_function(hint.sharing_mode()); + std::vector data; + data.reserve(num_threads); + for (std::uint32_t pu = 0; worker_thread != num_threads && pu != num_pus; ++pu) { @@ -471,11 +511,12 @@ namespace hpx::parallel::execution::detail { } // Schedule task for this worker thread - do_work_task(desc, pool, false, + data.emplace_back( task_function{ hpx::intrusive_ptr(this), size, chunk_size, worker_thread, reverse_placement, - allow_stealing}); + allow_stealing}, + false); ++worker_thread; } @@ -488,16 +529,72 @@ namespace hpx::parallel::execution::detail { if (main_thread_ok) { // Handle the queue for the local thread. - do_work_task(desc, pool, true, + data.emplace_back( task_function{ hpx::intrusive_ptr(this), size, chunk_size, local_worker_thread, reverse_placement, - allow_stealing}); + allow_stealing}, + true); + } + + return data; + } + + void execute(hpx::threads::thread_description const& desc, + threads::thread_pool_base* pool) + { + auto launch_data = generate_launch_data(); + std::size_t const size = launch_data.size(); + + // Do straight spawning if hierarchical spawning was disabled or we + // have less chunks than our threshold. + if (hierarchical_threshold == 0 || hierarchical_threshold >= size) + { + for (std::size_t i = 0; i != size; ++i) + { + do_work_task(desc, pool, launch_data[i].bind_to_core, + HPX_MOVE(launch_data[i].func)); + } + return; + } + + auto task = [desc, pool, launch_data](auto b, auto e) { + for (std::size_t i = b; i != e - 1; ++i) + { + auto state = launch_data[i].func.state; + state->template do_work_task(desc, pool, + launch_data[i].bind_to_core, launch_data[i].func); + } + + // directly execute last task + auto state = launch_data[e - 1].func.state; + state->template do_work_task( + desc, pool, false, launch_data[e - 1].func); + }; + + // run task on small stack + auto post_policy = hpx::execution::experimental::with_stacksize( + policy, threads::thread_stacksize::small_); + std::size_t start = 0; + while (true) + { + auto const stop = start + hierarchical_threshold; + if (stop > size) + { + hpx::detail::post_policy_dispatch::call( + post_policy, desc, pool, HPX_MOVE(task), start, size); + break; + } + + hpx::detail::post_policy_dispatch::call( + post_policy, desc, pool, task, start, stop); + start = stop; } } std::uint32_t first_thread; std::size_t num_threads; + std::size_t hierarchical_threshold; Launch policy; std::decay_t f; Shape shape; @@ -521,8 +618,8 @@ namespace hpx::parallel::execution::detail { decltype(auto) index_queue_bulk_async_execute_void( hpx::threads::thread_description const& desc, threads::thread_pool_base* pool, std::size_t first_thread, - std::size_t num_threads, Launch policy, F&& f, S const& shape, - Ts&&... ts) + std::size_t num_threads, std::size_t hierarchical_threshold, + Launch policy, F&& f, S const& shape, Ts&&... ts) { HPX_ASSERT(pool); @@ -535,8 +632,9 @@ namespace hpx::parallel::execution::detail { using shared_state = index_queue_bulk_state; hpx::intrusive_ptr p( - new shared_state(first_thread, num_threads, HPX_MOVE(policy), - HPX_FORWARD(F, f), shape, HPX_FORWARD(Ts, ts)...), + new shared_state(first_thread, num_threads, hierarchical_threshold, + HPX_MOVE(policy), HPX_FORWARD(F, f), shape, + HPX_FORWARD(Ts, ts)...), false); p->execute(desc, pool); @@ -562,8 +660,8 @@ namespace hpx::parallel::execution::detail { else { return index_queue_bulk_async_execute_void(desc, pool, first_thread, - num_threads, policy, HPX_FORWARD(F, f), shape, - HPX_FORWARD(Ts, ts)...); + num_threads, hierarchical_threshold, policy, HPX_FORWARD(F, f), + shape, HPX_FORWARD(Ts, ts)...); } } diff --git a/libs/core/executors/include/hpx/executors/parallel_executor.hpp b/libs/core/executors/include/hpx/executors/parallel_executor.hpp index 010c082497d7..2e026f61b383 100644 --- a/libs/core/executors/include/hpx/executors/parallel_executor.hpp +++ b/libs/core/executors/include/hpx/executors/parallel_executor.hpp @@ -415,7 +415,7 @@ namespace hpx::execution { hpx::threads::do_not_combine_tasks( exec.policy().get_hint().sharing_mode()); - if (exec.hierarchical_threshold_ == 0 && !do_not_combine_tasks) + if (!do_not_combine_tasks) { return parallel::execution::detail:: index_queue_bulk_async_execute(desc, pool, @@ -527,7 +527,7 @@ namespace hpx::execution { private: /// \cond NOINTERNAL - static constexpr std::size_t hierarchical_threshold_default_ = 0; + static constexpr std::size_t hierarchical_threshold_default_ = 6; threads::thread_pool_base* pool_; Policy policy_; diff --git a/libs/core/executors/include/hpx/executors/thread_pool_scheduler_bulk.hpp b/libs/core/executors/include/hpx/executors/thread_pool_scheduler_bulk.hpp index 6f5eb98943d5..217905d20ac4 100644 --- a/libs/core/executors/include/hpx/executors/thread_pool_scheduler_bulk.hpp +++ b/libs/core/executors/include/hpx/executors/thread_pool_scheduler_bulk.hpp @@ -401,7 +401,8 @@ namespace hpx::execution::experimental::detail { { // apply hint if none was given hint.mode = hpx::threads::thread_schedule_hint_mode::thread; - hint.hint = worker_thread + op_state->first_thread; + hint.hint = static_cast( + worker_thread + op_state->first_thread); auto policy = hpx::execution::experimental::with_hint( op_state->scheduler.policy(), hint); @@ -454,7 +455,7 @@ namespace hpx::execution::experimental::detail { // Calculate chunk size and number of chunks std::uint32_t chunk_size = get_bulk_scheduler_chunk_size( - op_state->num_worker_threads, size); + static_cast(op_state->num_worker_threads), size); std::uint32_t num_chunks = (size + chunk_size - 1) / chunk_size; // launch only as many tasks as we have chunks @@ -490,13 +491,15 @@ namespace hpx::execution::experimental::detail { hint.placement_mode() == placement::breadth_first_reverse) { init_queue_breadth_first(worker_thread, num_chunks, - op_state->num_worker_threads); + static_cast( + op_state->num_worker_threads)); } else { // the default for this scheduler is depth-first placement init_queue_depth_first(worker_thread, num_chunks, - op_state->num_worker_threads); + static_cast( + op_state->num_worker_threads)); } }