Skip to content

Commit

Permalink
Adding hierarchical operation to index_queue spawning
Browse files Browse the repository at this point in the history
- flyby: fixing integral conversion warnings
- flyby: fixing target.num_pus
  • Loading branch information
hkaiser committed Aug 8, 2023
1 parent e7c31a4 commit 090cb5c
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 84 deletions.
44 changes: 26 additions & 18 deletions libs/core/algorithms/tests/performance/foreach_scaling.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ std::uint64_t averageout_plain_for(std::size_t vector_size)
std::iota(
std::begin(data_representation), std::end(data_representation), gen());

std::uint64_t start = hpx::chrono::high_resolution_clock::now();
std::uint64_t const start = hpx::chrono::high_resolution_clock::now();

// average out 100 executions to avoid varying results
for (auto i = 0; i < test_count; i++)
Expand All @@ -52,7 +52,7 @@ std::uint64_t averageout_plain_for_iter(std::size_t vector_size)
std::iota(
std::begin(data_representation), std::end(data_representation), gen());

std::uint64_t start = hpx::chrono::high_resolution_clock::now();
std::uint64_t const start = hpx::chrono::high_resolution_clock::now();

// average out 100 executions to avoid varying results
for (auto i = 0; i < test_count; i++)
Expand All @@ -72,7 +72,7 @@ std::uint64_t averageout_parallel_foreach(
std::iota(
std::begin(data_representation), std::end(data_representation), gen());

std::uint64_t start = hpx::chrono::high_resolution_clock::now();
std::uint64_t const start = hpx::chrono::high_resolution_clock::now();

// average out 100 executions to avoid varying results
for (auto i = 0; i < test_count; i++)
Expand All @@ -92,7 +92,7 @@ std::uint64_t averageout_task_foreach(std::size_t vector_size, Executor&& exec)

if (num_overlapping_loops <= 0)
{
std::uint64_t start = hpx::chrono::high_resolution_clock::now();
std::uint64_t const start = hpx::chrono::high_resolution_clock::now();

for (auto i = 0; i < test_count; i++)
measure_task_foreach(data_representation, exec).wait();
Expand All @@ -103,7 +103,7 @@ std::uint64_t averageout_task_foreach(std::size_t vector_size, Executor&& exec)
std::vector<hpx::shared_future<void>> tests;
tests.resize(num_overlapping_loops);

std::uint64_t start = hpx::chrono::high_resolution_clock::now();
std::uint64_t const start = hpx::chrono::high_resolution_clock::now();

for (auto i = 0; i < test_count; i++)
{
Expand All @@ -124,7 +124,7 @@ std::uint64_t averageout_sequential_foreach(std::size_t vector_size)
std::iota(
std::begin(data_representation), std::end(data_representation), gen());

std::uint64_t start = hpx::chrono::high_resolution_clock::now();
std::uint64_t const start = hpx::chrono::high_resolution_clock::now();

// average out 100 executions to avoid varying results
for (auto i = 0; i < test_count; i++)
Expand All @@ -142,7 +142,7 @@ std::uint64_t averageout_parallel_forloop(
std::iota(
std::begin(data_representation), std::end(data_representation), gen());

std::uint64_t start = hpx::chrono::high_resolution_clock::now();
std::uint64_t const start = hpx::chrono::high_resolution_clock::now();

// average out 100 executions to avoid varying results
for (auto i = 0; i < test_count; i++)
Expand All @@ -167,7 +167,7 @@ std::uint64_t averageout_task_forloop(std::size_t vector_size, Executor&& exec)

if (num_overlapping_loops <= 0)
{
std::uint64_t start = hpx::chrono::high_resolution_clock::now();
std::uint64_t const start = hpx::chrono::high_resolution_clock::now();

for (auto i = 0; i < test_count; i++)
measure_task_forloop(data_representation, exec).wait();
Expand All @@ -178,7 +178,7 @@ std::uint64_t averageout_task_forloop(std::size_t vector_size, Executor&& exec)
std::vector<hpx::shared_future<void>> tests;
tests.resize(num_overlapping_loops);

std::uint64_t start = hpx::chrono::high_resolution_clock::now();
std::uint64_t const start = hpx::chrono::high_resolution_clock::now();

for (auto i = 0; i < test_count; i++)
{
Expand All @@ -199,7 +199,7 @@ std::uint64_t averageout_sequential_forloop(std::size_t vector_size)
std::iota(
std::begin(data_representation), std::end(data_representation), gen());

std::uint64_t start = hpx::chrono::high_resolution_clock::now();
std::uint64_t const start = hpx::chrono::high_resolution_clock::now();

// average out 100 executions to avoid varying results
for (auto i = 0; i < test_count; i++)
Expand All @@ -212,8 +212,8 @@ std::uint64_t averageout_sequential_forloop(std::size_t vector_size)
int hpx_main(hpx::program_options::variables_map& vm)
{
// pull values from cmd
std::size_t vector_size = vm["vector_size"].as<std::size_t>();
bool csvoutput = vm.count("csv_output") != 0;
std::size_t const vector_size = vm["vector_size"].as<std::size_t>();
bool const csvoutput = vm.count("csv_output") != 0;
delay = vm["work_delay"].as<int>();
test_count = vm["test_count"].as<int>();
chunk_size = vm["chunk_size"].as<int>();
Expand Down Expand Up @@ -264,8 +264,8 @@ int hpx_main(hpx::program_options::variables_map& vm)
std::uint64_t task_time_forloop = 0;
std::uint64_t seq_time_forloop = 0;

std::uint64_t plain_time_for = averageout_plain_for(vector_size);
std::uint64_t plain_time_for_iter =
std::uint64_t const plain_time_for = averageout_plain_for(vector_size);
std::uint64_t const plain_time_for_iter =
averageout_plain_for_iter(vector_size);

if (vm["executor"].as<std::string>() == "forkjoin")
Expand Down Expand Up @@ -467,11 +467,15 @@ int hpx_main(hpx::program_options::variables_map& vm)
<< std::left
<< "Parallel Scale : " << std::right
<< std::setw(8)
<< (double(seq_time_foreach) / par_time_foreach) << "\n"
<< (static_cast<double>(seq_time_foreach) /
par_time_foreach)
<< "\n"
<< std::left
<< "Task Scale : " << std::right
<< std::setw(8)
<< (double(seq_time_foreach) / task_time_foreach) << "\n"
<< (static_cast<double>(seq_time_foreach) /
task_time_foreach)
<< "\n"
<< std::flush;

std::cout << "-------------Average-(for_loop)----------------\n"
Expand All @@ -490,11 +494,15 @@ int hpx_main(hpx::program_options::variables_map& vm)
<< std::left
<< "Parallel Scale : " << std::right
<< std::setw(8)
<< (double(seq_time_forloop) / par_time_forloop) << "\n"
<< (static_cast<double>(seq_time_forloop) /
par_time_forloop)
<< "\n"
<< std::left
<< "Task Scale : " << std::right
<< std::setw(8)
<< (double(seq_time_forloop) / task_time_forloop) << "\n";
<< (static_cast<double>(seq_time_forloop) /
task_time_forloop)
<< "\n";
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace hpx::compute::host {
{
}

explicit native_handle_type(hpx::threads::mask_type mask)
explicit native_handle_type(hpx::threads::mask_type const& mask)
: mask_(mask)
{
}
Expand All @@ -58,7 +58,7 @@ namespace hpx::compute::host {
target() = default;

// Constructs target from a given mask of processing units
explicit target(hpx::threads::mask_type mask)
explicit target(hpx::threads::mask_type const& mask)
: handle_(mask)
{
}
Expand All @@ -74,12 +74,12 @@ namespace hpx::compute::host {

std::pair<std::size_t, std::size_t> num_pus() const;

constexpr void synchronize() const noexcept
static constexpr void synchronize() noexcept
{
// nothing to do here...
}

hpx::future<void> get_future() const
static hpx::future<void> get_future()
{
return hpx::make_ready_future();
}
Expand All @@ -98,7 +98,7 @@ namespace hpx::compute::host {
friend class hpx::serialization::access;

void serialize(serialization::input_archive& ar, unsigned int);
void serialize(serialization::output_archive& ar, unsigned int);
void serialize(serialization::output_archive& ar, unsigned int) const;

native_handle_type handle_;
};
Expand Down
23 changes: 17 additions & 6 deletions libs/core/compute_local/src/host_target.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,30 +21,41 @@ namespace hpx::compute::host {

std::pair<std::size_t, std::size_t> target::num_pus() const
{
auto& rp = hpx::resource::get_partitioner();
std::size_t num_os_threads = hpx::get_os_thread_count();
auto const& rp = hpx::resource::get_partitioner();
std::size_t const num_os_threads = hpx::get_os_thread_count();

hpx::threads::mask_type mask = native_handle().get_device();
std::size_t mask_size = hpx::threads::mask_size(mask);
hpx::threads::mask_type const mask = native_handle().get_device();
std::size_t const mask_size = hpx::threads::mask_size(mask);

bool found_one = false;

std::size_t num_thread = 0;
for (/**/; num_thread != num_os_threads; ++num_thread)
{
if (hpx::threads::bit_and(
mask, rp.get_pu_mask(num_thread), mask_size))
{
found_one = true;
break;
}
}
return std::make_pair(num_thread, hpx::threads::count(mask));

if (!found_one)
{
return std::make_pair(static_cast<std::size_t>(-1), 0);
}

return std::make_pair(
num_thread, (std::min)(num_os_threads, hpx::threads::count(mask)));
}

void target::serialize(serialization::input_archive& ar, unsigned int)
{
ar >> handle_.mask_;
}

void target::serialize(serialization::output_archive& ar, unsigned int)
void target::serialize(
serialization::output_archive& ar, unsigned int) const
{
ar << handle_.mask_;
}
Expand Down
Loading

0 comments on commit 090cb5c

Please sign in to comment.