Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Faster CPU (Arg-)Reductions #6989

Merged
merged 2 commits into from
Oct 3, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 62 additions & 4 deletions cpp/open3d/core/kernel/ReductionCPU.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,14 @@ class CPUReductionEngine {
for (int64_t thread_idx = 0; thread_idx < num_threads; ++thread_idx) {
int64_t start = thread_idx * workload_per_thread;
int64_t end = std::min(start + workload_per_thread, num_workloads);
scalar_t local_result = identity;
for (int64_t workload_idx = start; workload_idx < end;
++workload_idx) {
scalar_t* src = reinterpret_cast<scalar_t*>(
indexer.GetInputPtr(0, workload_idx));
thread_results[thread_idx] =
element_kernel(*src, thread_results[thread_idx]);
local_result = element_kernel(*src, local_result);
}
thread_results[thread_idx] = local_result;
}
scalar_t* dst = reinterpret_cast<scalar_t*>(indexer.GetOutputPtr(0));
for (int64_t thread_idx = 0; thread_idx < num_threads; ++thread_idx) {
Expand Down Expand Up @@ -190,14 +191,25 @@ class CPUArgReductionEngine {
// elements. We need to keep track of the indices within each
// sub-iteration.
int64_t num_output_elements = indexer_.NumOutputElements();
if (num_output_elements <= 1) {
LaunchArgReductionKernelTwoPass(indexer_, reduce_func, identity);
} else {
LaunchArgReductionParallelDim(indexer_, reduce_func, identity);
}
}

template <typename scalar_t, typename func_t>
static void LaunchArgReductionParallelDim(const Indexer& indexer,
func_t reduce_func,
scalar_t identity) {
int64_t num_output_elements = indexer.NumOutputElements();
#pragma omp parallel for schedule(static) \
num_threads(utility::EstimateMaxThreads())
for (int64_t output_idx = 0; output_idx < num_output_elements;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since usually num_output_elements << num_threads, we should collapse the two for loops to improve thread utilization with#pragma omp for collapse(2) I believe Visual Studio now supports this (OpenMP v3) but good to check Windows CI.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just want to remind about #6626 as this will add new OpenMP code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think its possible to collapse the two loops, since the inner loop has a loop-carried dependency. What could be done is using the LaunchArgReductionKernelTwoPass instead of the inner loop. Another option would be the use of OpenMP`s reduction clause directly

Copy link
Contributor Author

@manuelvogel12 manuelvogel12 Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When simply launching LaunchArgReductionKernelTwoPass inside the outer for loop, it improves the performance when reducing over the large_dim, but worsens the performance of reducing over the small dim.

The image shows the time, so +4.5 means this approach would be 4.5 times slower in that benchmark,
-0.78 means roughly 4.5 times faster
image
Therefore, I think, when the number of output elements is smaller than the number of threads, it's best not to optimize the outer loop at all, or find a good way to calculate the number of threads for the inner loop,

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @manuelvogel12 for the detailed benchmarking. This looks good to me.

To address @benjaminum's point, we will be switching from OpenMP to TBB soon (PR #6626). This helps quite a bit with lack of good support for OpenMP on Windows and Apple, simplifies the code and also comes with the benefit of composable parallelism. We also expect (some) speedup from TBB's work stealing scheduler model.

I think we can merge this PR due to its speed boost for now and then switch this code to TBB as well along with the rest of the code. @manuelvogel12 I would encourage you to check out PR #6626 and help us test that.

output_idx++) {
// sub_indexer.NumWorkloads() == ipo.
// sub_indexer's workload_idx is indexer_'s ipo_idx.
Indexer sub_indexer = indexer_.GetPerOutputIndexer(output_idx);
// sub_indexer's workload_idx is indexer's ipo_idx.
Indexer sub_indexer = indexer.GetPerOutputIndexer(output_idx);
scalar_t dst_val = identity;
for (int64_t workload_idx = 0;
workload_idx < sub_indexer.NumWorkloads(); workload_idx++) {
Expand All @@ -212,6 +224,52 @@ class CPUArgReductionEngine {
}
}

/// Create num_threads workers to compute partial arg reductions
/// and then reduce to the final results.
/// This only applies to arg reduction op with one output.
template <typename scalar_t, typename func_t>
static void LaunchArgReductionKernelTwoPass(const Indexer& indexer,
func_t reduce_func,
scalar_t identity) {
if (indexer.NumOutputElements() > 1) {
utility::LogError(
"Internal error: two-pass arg reduction only works for "
"single-output arg reduction ops.");
}
int64_t num_workloads = indexer.NumWorkloads();
int64_t num_threads = utility::EstimateMaxThreads();
int64_t workload_per_thread =
(num_workloads + num_threads - 1) / num_threads;
std::vector<int64_t> thread_results_idx(num_threads, 0);
std::vector<scalar_t> thread_results_val(num_threads, identity);

#pragma omp parallel for schedule(static) \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we simplify this with for loop collapse in OpenMP?

num_threads(utility::EstimateMaxThreads())
for (int64_t thread_idx = 0; thread_idx < num_threads; ++thread_idx) {
int64_t start = thread_idx * workload_per_thread;
int64_t end = std::min(start + workload_per_thread, num_workloads);
scalar_t local_result_val = identity;
int64_t local_result_idx = 0;
for (int64_t workload_idx = start; workload_idx < end;
++workload_idx) {
int64_t src_idx = workload_idx;
scalar_t* src_val = reinterpret_cast<scalar_t*>(
indexer.GetInputPtr(0, workload_idx));
std::tie(local_result_idx, local_result_val) = reduce_func(
src_idx, *src_val, local_result_idx, local_result_val);
}
thread_results_val[thread_idx] = local_result_val;
thread_results_idx[thread_idx] = local_result_idx;
}
scalar_t dst_val = identity;
int64_t* dst_idx = reinterpret_cast<int64_t*>(indexer.GetOutputPtr(0));
for (int64_t thread_idx = 0; thread_idx < num_threads; ++thread_idx) {
std::tie(*dst_idx, dst_val) = reduce_func(
thread_results_idx[thread_idx],
thread_results_val[thread_idx], *dst_idx, dst_val);
}
}

private:
Indexer indexer_;
};
Expand Down
Loading