Skip to content

Commit

Permalink
Add groupby_max multi-threaded benchmark (#16154)
Browse files Browse the repository at this point in the history
This PR adds **groupby_max** multi-threaded benchmark. The benchmark runs multiple **max groupby aggregations** concurrently using one CUDA stream per host thread.

Closes #16134

Authors:
  - Srinivas Yadav (https://github.com/srinivasyadav18)

Approvers:
  - Yunsong Wang (https://github.com/PointKernel)
  - Vukasin Milovanovic (https://github.com/vuule)

URL: #16154
  • Loading branch information
srinivasyadav18 authored Jul 10, 2024
1 parent 67bd366 commit f592e9c
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 7 deletions.
4 changes: 2 additions & 2 deletions cpp/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,8 @@ ConfigureBench(
)

ConfigureNVBench(
GROUPBY_NVBENCH groupby/group_max.cpp groupby/group_nunique.cpp groupby/group_rank.cpp
groupby/group_struct_keys.cpp
GROUPBY_NVBENCH groupby/group_max.cpp groupby/group_max_multithreaded.cpp
groupby/group_nunique.cpp groupby/group_rank.cpp groupby/group_struct_keys.cpp
)

# ##################################################################################################
Expand Down
16 changes: 11 additions & 5 deletions cpp/benchmarks/groupby/group_max.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,25 @@ void groupby_max_helper(nvbench::state& state,
cudf::type_to_id<Type>(), row_count{num_rows}, data_profile{builder});
}();

auto const num_aggregations = state.get_int64("num_aggregations");

auto keys_view = keys->view();
auto gb_obj = cudf::groupby::groupby(cudf::table_view({keys_view, keys_view, keys_view}));

std::vector<cudf::groupby::aggregation_request> requests;
requests.emplace_back(cudf::groupby::aggregation_request());
requests[0].values = vals->view();
requests[0].aggregations.push_back(cudf::make_max_aggregation<cudf::groupby_aggregation>());
for (int64_t i = 0; i < num_aggregations; i++) {
requests.emplace_back(cudf::groupby::aggregation_request());
requests[i].values = vals->view();
requests[i].aggregations.push_back(cudf::make_max_aggregation<cudf::groupby_aggregation>());
}

auto const mem_stats_logger = cudf::memory_stats_logger();
state.set_cuda_stream(nvbench::make_cuda_stream_view(cudf::get_default_stream().value()));
state.exec(nvbench::exec_tag::sync,
[&](nvbench::launch& launch) { auto const result = gb_obj.aggregate(requests); });
auto const elapsed_time = state.get_summary("nv/cold/time/gpu/mean").get_float64("value");
state.add_element_count(static_cast<double>(num_rows) / elapsed_time / 1'000'000., "Mrows/s");
state.add_element_count(
static_cast<double>(num_rows * num_aggregations) / elapsed_time / 1'000'000., "Mrows/s");
state.add_buffer_size(
mem_stats_logger.peak_memory_usage(), "peak_memory_usage", "peak_memory_usage");
}
Expand Down Expand Up @@ -91,7 +96,8 @@ NVBENCH_BENCH_TYPES(bench_groupby_max,
.set_name("groupby_max")
.add_int64_axis("cardinality", {0})
.add_int64_power_of_two_axis("num_rows", {12, 18, 24})
.add_float64_axis("null_probability", {0, 0.1, 0.9});
.add_float64_axis("null_probability", {0, 0.1, 0.9})
.add_int64_axis("num_aggregations", {1, 2, 4, 8, 16, 32});

NVBENCH_BENCH_TYPES(bench_groupby_max_cardinality, NVBENCH_TYPE_AXES(nvbench::type_list<int32_t>))
.set_name("groupby_max_cardinality")
Expand Down
102 changes: 102 additions & 0 deletions cpp/benchmarks/groupby/group_max_multithreaded.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <benchmarks/common/generate_input.hpp>
#include <benchmarks/fixture/benchmark_fixture.hpp>

#include <cudf/detail/utilities/stream_pool.hpp>
#include <cudf/groupby.hpp>
#include <cudf/utilities/default_stream.hpp>
#include <cudf/utilities/thread_pool.hpp>

#include <nvbench/nvbench.cuh>

template <typename Type>
void bench_groupby_max_multithreaded(nvbench::state& state, nvbench::type_list<Type>)
{
auto const cardinality = static_cast<cudf::size_type>(state.get_int64("cardinality"));
auto const num_rows = static_cast<cudf::size_type>(state.get_int64("num_rows"));
auto const null_probability = state.get_float64("null_probability");
auto const num_threads = state.get_int64("num_threads");
auto const num_aggregations = state.get_int64("num_aggregations");

auto const keys = [&] {
data_profile const profile =
data_profile_builder()
.cardinality(cardinality)
.no_validity()
.distribution(cudf::type_to_id<int32_t>(), distribution_id::UNIFORM, 0, num_rows);
return create_random_column(cudf::type_to_id<int32_t>(), row_count{num_rows}, profile);
}();

auto const vals = [&] {
auto builder = data_profile_builder().cardinality(0).distribution(
cudf::type_to_id<Type>(), distribution_id::UNIFORM, 0, num_rows);
if (null_probability > 0) {
builder.null_probability(null_probability);
} else {
builder.no_validity();
}
return create_random_column(
cudf::type_to_id<Type>(), row_count{num_rows}, data_profile{builder});
}();

auto keys_view = keys->view();
auto gb_obj = cudf::groupby::groupby(cudf::table_view({keys_view, keys_view, keys_view}));

auto streams = cudf::detail::fork_streams(cudf::get_default_stream(), num_threads);
cudf::detail::thread_pool threads(num_threads);

std::vector<std::vector<cudf::groupby::aggregation_request>> requests(num_threads);
for (auto& thread_requests : requests) {
for (int64_t j = 0; j < num_aggregations; j++) {
thread_requests.emplace_back();
thread_requests.back().values = vals->view();
thread_requests.back().aggregations.push_back(
cudf::make_max_aggregation<cudf::groupby_aggregation>());
}
}

auto const mem_stats_logger = cudf::memory_stats_logger();
state.exec(
nvbench::exec_tag::sync | nvbench::exec_tag::timer, [&](nvbench::launch& launch, auto& timer) {
auto perform_agg = [&](int64_t index) { gb_obj.aggregate(requests[index], streams[index]); };
timer.start();
for (int64_t i = 0; i < num_threads; ++i) {
threads.submit(perform_agg, i);
}
threads.wait_for_tasks();
cudf::detail::join_streams(streams, cudf::get_default_stream());
cudf::get_default_stream().synchronize();
timer.stop();
});

auto const elapsed_time = state.get_summary("nv/cold/time/gpu/mean").get_float64("value");
state.add_element_count(
static_cast<double>(num_rows * num_threads * num_aggregations) / elapsed_time / 1'000'000.,
"Mrows/s");
state.add_buffer_size(
mem_stats_logger.peak_memory_usage(), "peak_memory_usage", "peak_memory_usage");
}

NVBENCH_BENCH_TYPES(bench_groupby_max_multithreaded,
NVBENCH_TYPE_AXES(nvbench::type_list<int32_t, int64_t, float, double>))
.set_name("groupby_max_multithreaded")
.add_int64_axis("cardinality", {0})
.add_int64_power_of_two_axis("num_rows", {12, 18})
.add_float64_axis("null_probability", {0, 0.1, 0.9})
.add_int64_axis("num_aggregations", {1})
.add_int64_axis("num_threads", {1, 2, 4, 8});

0 comments on commit f592e9c

Please sign in to comment.