-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-44052: [C++][Compute] Reduce the complexity of row segmenter #44053
Changes from all commits
412a0d7
c83c1ed
05b0bf0
05c14df
65eee57
4f66bfb
f2f26d1
2044125
9cb7b91
9c7dcdb
9281e6a
2233274
ffc2e85
374ab6a
c4659e2
2c16f32
1428999
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -24,6 +24,7 @@ | |||||||||||||||||||||||||||||||||||
#include "arrow/array/array_primitive.h" | ||||||||||||||||||||||||||||||||||||
#include "arrow/compute/api.h" | ||||||||||||||||||||||||||||||||||||
#include "arrow/table.h" | ||||||||||||||||||||||||||||||||||||
#include "arrow/testing/generator.h" | ||||||||||||||||||||||||||||||||||||
#include "arrow/testing/gtest_util.h" | ||||||||||||||||||||||||||||||||||||
#include "arrow/testing/random.h" | ||||||||||||||||||||||||||||||||||||
#include "arrow/util/benchmark_util.h" | ||||||||||||||||||||||||||||||||||||
|
@@ -325,7 +326,8 @@ BENCHMARK_TEMPLATE(ReferenceSum, SumBitmapVectorizeUnroll<int64_t>) | |||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
std::shared_ptr<RecordBatch> RecordBatchFromArrays( | ||||||||||||||||||||||||||||||||||||
const std::vector<std::shared_ptr<Array>>& arguments, | ||||||||||||||||||||||||||||||||||||
const std::vector<std::shared_ptr<Array>>& keys) { | ||||||||||||||||||||||||||||||||||||
const std::vector<std::shared_ptr<Array>>& keys, | ||||||||||||||||||||||||||||||||||||
const std::vector<std::shared_ptr<Array>>& segment_keys) { | ||||||||||||||||||||||||||||||||||||
std::vector<std::shared_ptr<Field>> fields; | ||||||||||||||||||||||||||||||||||||
std::vector<std::shared_ptr<Array>> all_arrays; | ||||||||||||||||||||||||||||||||||||
int64_t length = -1; | ||||||||||||||||||||||||||||||||||||
|
@@ -347,37 +349,56 @@ std::shared_ptr<RecordBatch> RecordBatchFromArrays( | |||||||||||||||||||||||||||||||||||
fields.push_back(field("key" + ToChars(key_idx), key->type())); | ||||||||||||||||||||||||||||||||||||
all_arrays.push_back(key); | ||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
for (std::size_t segment_key_idx = 0; segment_key_idx < segment_keys.size(); | ||||||||||||||||||||||||||||||||||||
segment_key_idx++) { | ||||||||||||||||||||||||||||||||||||
const auto& segment_key = segment_keys[segment_key_idx]; | ||||||||||||||||||||||||||||||||||||
DCHECK_EQ(segment_key->length(), length); | ||||||||||||||||||||||||||||||||||||
fields.push_back( | ||||||||||||||||||||||||||||||||||||
field("segment_key" + ToChars(segment_key_idx), segment_key->type())); | ||||||||||||||||||||||||||||||||||||
all_arrays.push_back(segment_key); | ||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
return RecordBatch::Make(schema(std::move(fields)), length, std::move(all_arrays)); | ||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
Result<std::shared_ptr<Table>> BatchGroupBy( | ||||||||||||||||||||||||||||||||||||
std::shared_ptr<RecordBatch> batch, std::vector<Aggregate> aggregates, | ||||||||||||||||||||||||||||||||||||
std::vector<FieldRef> keys, bool use_threads = false, | ||||||||||||||||||||||||||||||||||||
MemoryPool* memory_pool = default_memory_pool()) { | ||||||||||||||||||||||||||||||||||||
std::vector<FieldRef> keys, std::vector<FieldRef> segment_keys, | ||||||||||||||||||||||||||||||||||||
bool use_threads = false, MemoryPool* memory_pool = default_memory_pool()) { | ||||||||||||||||||||||||||||||||||||
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Table> table, | ||||||||||||||||||||||||||||||||||||
Table::FromRecordBatches({std::move(batch)})); | ||||||||||||||||||||||||||||||||||||
Declaration plan = Declaration::Sequence( | ||||||||||||||||||||||||||||||||||||
{{"table_source", TableSourceNodeOptions(std::move(table))}, | ||||||||||||||||||||||||||||||||||||
{"aggregate", AggregateNodeOptions(std::move(aggregates), std::move(keys))}}); | ||||||||||||||||||||||||||||||||||||
{"aggregate", AggregateNodeOptions(std::move(aggregates), std::move(keys), | ||||||||||||||||||||||||||||||||||||
std::move(segment_keys))}}); | ||||||||||||||||||||||||||||||||||||
return DeclarationToTable(std::move(plan), use_threads, memory_pool); | ||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
static void BenchmarkGroupBy(benchmark::State& state, std::vector<Aggregate> aggregates, | ||||||||||||||||||||||||||||||||||||
const std::vector<std::shared_ptr<Array>>& arguments, | ||||||||||||||||||||||||||||||||||||
const std::vector<std::shared_ptr<Array>>& keys) { | ||||||||||||||||||||||||||||||||||||
std::shared_ptr<RecordBatch> batch = RecordBatchFromArrays(arguments, keys); | ||||||||||||||||||||||||||||||||||||
static void BenchmarkAggregate( | ||||||||||||||||||||||||||||||||||||
benchmark::State& state, std::vector<Aggregate> aggregates, | ||||||||||||||||||||||||||||||||||||
const std::vector<std::shared_ptr<Array>>& arguments, | ||||||||||||||||||||||||||||||||||||
const std::vector<std::shared_ptr<Array>>& keys, | ||||||||||||||||||||||||||||||||||||
const std::vector<std::shared_ptr<Array>>& segment_keys = {}) { | ||||||||||||||||||||||||||||||||||||
std::shared_ptr<RecordBatch> batch = | ||||||||||||||||||||||||||||||||||||
RecordBatchFromArrays(arguments, keys, segment_keys); | ||||||||||||||||||||||||||||||||||||
std::vector<FieldRef> key_refs; | ||||||||||||||||||||||||||||||||||||
for (std::size_t key_idx = 0; key_idx < keys.size(); key_idx++) { | ||||||||||||||||||||||||||||||||||||
key_refs.emplace_back(static_cast<int>(key_idx + arguments.size())); | ||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
std::vector<FieldRef> segment_key_refs; | ||||||||||||||||||||||||||||||||||||
for (std::size_t segment_key_idx = 0; segment_key_idx < segment_keys.size(); | ||||||||||||||||||||||||||||||||||||
segment_key_idx++) { | ||||||||||||||||||||||||||||||||||||
segment_key_refs.emplace_back( | ||||||||||||||||||||||||||||||||||||
static_cast<int>(segment_key_idx + arguments.size() + keys.size())); | ||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
for (std::size_t arg_idx = 0; arg_idx < arguments.size(); arg_idx++) { | ||||||||||||||||||||||||||||||||||||
aggregates[arg_idx].target = {FieldRef(static_cast<int>(arg_idx))}; | ||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
int64_t total_bytes = TotalBufferSize(*batch); | ||||||||||||||||||||||||||||||||||||
for (auto _ : state) { | ||||||||||||||||||||||||||||||||||||
ABORT_NOT_OK(BatchGroupBy(batch, aggregates, key_refs)); | ||||||||||||||||||||||||||||||||||||
ABORT_NOT_OK(BatchGroupBy(batch, aggregates, key_refs, segment_key_refs)); | ||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
state.SetBytesProcessed(total_bytes * state.iterations()); | ||||||||||||||||||||||||||||||||||||
state.SetItemsProcessed(batch->num_rows() * state.iterations()); | ||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
#define GROUP_BY_BENCHMARK(Name, Impl) \ | ||||||||||||||||||||||||||||||||||||
|
@@ -404,7 +425,7 @@ GROUP_BY_BENCHMARK(SumDoublesGroupedByTinyStringSet, [&] { | |||||||||||||||||||||||||||||||||||
/*min_length=*/3, | ||||||||||||||||||||||||||||||||||||
/*max_length=*/32); | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
BenchmarkGroupBy(state, {{"hash_sum", ""}}, {summand}, {key}); | ||||||||||||||||||||||||||||||||||||
BenchmarkAggregate(state, {{"hash_sum", ""}}, {summand}, {key}); | ||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
GROUP_BY_BENCHMARK(SumDoublesGroupedBySmallStringSet, [&] { | ||||||||||||||||||||||||||||||||||||
|
@@ -419,7 +440,7 @@ GROUP_BY_BENCHMARK(SumDoublesGroupedBySmallStringSet, [&] { | |||||||||||||||||||||||||||||||||||
/*min_length=*/3, | ||||||||||||||||||||||||||||||||||||
/*max_length=*/32); | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
BenchmarkGroupBy(state, {{"hash_sum", ""}}, {summand}, {key}); | ||||||||||||||||||||||||||||||||||||
BenchmarkAggregate(state, {{"hash_sum", ""}}, {summand}, {key}); | ||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
GROUP_BY_BENCHMARK(SumDoublesGroupedByMediumStringSet, [&] { | ||||||||||||||||||||||||||||||||||||
|
@@ -434,7 +455,7 @@ GROUP_BY_BENCHMARK(SumDoublesGroupedByMediumStringSet, [&] { | |||||||||||||||||||||||||||||||||||
/*min_length=*/3, | ||||||||||||||||||||||||||||||||||||
/*max_length=*/32); | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
BenchmarkGroupBy(state, {{"hash_sum", ""}}, {summand}, {key}); | ||||||||||||||||||||||||||||||||||||
BenchmarkAggregate(state, {{"hash_sum", ""}}, {summand}, {key}); | ||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
GROUP_BY_BENCHMARK(SumDoublesGroupedByTinyIntegerSet, [&] { | ||||||||||||||||||||||||||||||||||||
|
@@ -448,7 +469,7 @@ GROUP_BY_BENCHMARK(SumDoublesGroupedByTinyIntegerSet, [&] { | |||||||||||||||||||||||||||||||||||
/*min=*/0, | ||||||||||||||||||||||||||||||||||||
/*max=*/15); | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
BenchmarkGroupBy(state, {{"hash_sum", ""}}, {summand}, {key}); | ||||||||||||||||||||||||||||||||||||
BenchmarkAggregate(state, {{"hash_sum", ""}}, {summand}, {key}); | ||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
GROUP_BY_BENCHMARK(SumDoublesGroupedBySmallIntegerSet, [&] { | ||||||||||||||||||||||||||||||||||||
|
@@ -462,7 +483,7 @@ GROUP_BY_BENCHMARK(SumDoublesGroupedBySmallIntegerSet, [&] { | |||||||||||||||||||||||||||||||||||
/*min=*/0, | ||||||||||||||||||||||||||||||||||||
/*max=*/255); | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
BenchmarkGroupBy(state, {{"hash_sum", ""}}, {summand}, {key}); | ||||||||||||||||||||||||||||||||||||
BenchmarkAggregate(state, {{"hash_sum", ""}}, {summand}, {key}); | ||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
GROUP_BY_BENCHMARK(SumDoublesGroupedByMediumIntegerSet, [&] { | ||||||||||||||||||||||||||||||||||||
|
@@ -476,7 +497,7 @@ GROUP_BY_BENCHMARK(SumDoublesGroupedByMediumIntegerSet, [&] { | |||||||||||||||||||||||||||||||||||
/*min=*/0, | ||||||||||||||||||||||||||||||||||||
/*max=*/4095); | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
BenchmarkGroupBy(state, {{"hash_sum", ""}}, {summand}, {key}); | ||||||||||||||||||||||||||||||||||||
BenchmarkAggregate(state, {{"hash_sum", ""}}, {summand}, {key}); | ||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
GROUP_BY_BENCHMARK(SumDoublesGroupedByTinyIntStringPairSet, [&] { | ||||||||||||||||||||||||||||||||||||
|
@@ -494,7 +515,7 @@ GROUP_BY_BENCHMARK(SumDoublesGroupedByTinyIntStringPairSet, [&] { | |||||||||||||||||||||||||||||||||||
/*min_length=*/3, | ||||||||||||||||||||||||||||||||||||
/*max_length=*/32); | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
BenchmarkGroupBy(state, {{"hash_sum", ""}}, {summand}, {int_key, str_key}); | ||||||||||||||||||||||||||||||||||||
BenchmarkAggregate(state, {{"hash_sum", ""}}, {summand}, {int_key, str_key}); | ||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
GROUP_BY_BENCHMARK(SumDoublesGroupedBySmallIntStringPairSet, [&] { | ||||||||||||||||||||||||||||||||||||
|
@@ -512,7 +533,7 @@ GROUP_BY_BENCHMARK(SumDoublesGroupedBySmallIntStringPairSet, [&] { | |||||||||||||||||||||||||||||||||||
/*min_length=*/3, | ||||||||||||||||||||||||||||||||||||
/*max_length=*/32); | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
BenchmarkGroupBy(state, {{"hash_sum", ""}}, {summand}, {int_key, str_key}); | ||||||||||||||||||||||||||||||||||||
BenchmarkAggregate(state, {{"hash_sum", ""}}, {summand}, {int_key, str_key}); | ||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
GROUP_BY_BENCHMARK(SumDoublesGroupedByMediumIntStringPairSet, [&] { | ||||||||||||||||||||||||||||||||||||
|
@@ -530,7 +551,7 @@ GROUP_BY_BENCHMARK(SumDoublesGroupedByMediumIntStringPairSet, [&] { | |||||||||||||||||||||||||||||||||||
/*min_length=*/3, | ||||||||||||||||||||||||||||||||||||
/*max_length=*/32); | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
BenchmarkGroupBy(state, {{"hash_sum", ""}}, {summand}, {int_key, str_key}); | ||||||||||||||||||||||||||||||||||||
BenchmarkAggregate(state, {{"hash_sum", ""}}, {summand}, {int_key, str_key}); | ||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
// Grouped MinMax | ||||||||||||||||||||||||||||||||||||
|
@@ -543,7 +564,7 @@ GROUP_BY_BENCHMARK(MinMaxDoublesGroupedByMediumInt, [&] { | |||||||||||||||||||||||||||||||||||
/*nan_probability=*/args.null_proportion / 10); | ||||||||||||||||||||||||||||||||||||
auto int_key = rng.Int64(args.size, /*min=*/0, /*max=*/63); | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
BenchmarkGroupBy(state, {{"hash_min_max", ""}}, {input}, {int_key}); | ||||||||||||||||||||||||||||||||||||
BenchmarkAggregate(state, {{"hash_min_max", ""}}, {input}, {int_key}); | ||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
GROUP_BY_BENCHMARK(MinMaxShortStringsGroupedByMediumInt, [&] { | ||||||||||||||||||||||||||||||||||||
|
@@ -553,7 +574,7 @@ GROUP_BY_BENCHMARK(MinMaxShortStringsGroupedByMediumInt, [&] { | |||||||||||||||||||||||||||||||||||
/*null_probability=*/args.null_proportion); | ||||||||||||||||||||||||||||||||||||
auto int_key = rng.Int64(args.size, /*min=*/0, /*max=*/63); | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
BenchmarkGroupBy(state, {{"hash_min_max", ""}}, {input}, {int_key}); | ||||||||||||||||||||||||||||||||||||
BenchmarkAggregate(state, {{"hash_min_max", ""}}, {input}, {int_key}); | ||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
GROUP_BY_BENCHMARK(MinMaxLongStringsGroupedByMediumInt, [&] { | ||||||||||||||||||||||||||||||||||||
|
@@ -563,7 +584,7 @@ GROUP_BY_BENCHMARK(MinMaxLongStringsGroupedByMediumInt, [&] { | |||||||||||||||||||||||||||||||||||
/*null_probability=*/args.null_proportion); | ||||||||||||||||||||||||||||||||||||
auto int_key = rng.Int64(args.size, /*min=*/0, /*max=*/63); | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
BenchmarkGroupBy(state, {{"hash_min_max", ""}}, {input}, {int_key}); | ||||||||||||||||||||||||||||||||||||
BenchmarkAggregate(state, {{"hash_min_max", ""}}, {input}, {int_key}); | ||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
// | ||||||||||||||||||||||||||||||||||||
|
@@ -866,5 +887,61 @@ BENCHMARK(TDigestKernelDoubleMedian)->Apply(QuantileKernelArgs); | |||||||||||||||||||||||||||||||||||
BENCHMARK(TDigestKernelDoubleDeciles)->Apply(QuantileKernelArgs); | ||||||||||||||||||||||||||||||||||||
BENCHMARK(TDigestKernelDoubleCentiles)->Apply(QuantileKernelArgs); | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
// | ||||||||||||||||||||||||||||||||||||
// Segmented Aggregate | ||||||||||||||||||||||||||||||||||||
// | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
static void BenchmarkSegmentedAggregate( | ||||||||||||||||||||||||||||||||||||
benchmark::State& state, int64_t num_rows, std::vector<Aggregate> aggregates, | ||||||||||||||||||||||||||||||||||||
const std::vector<std::shared_ptr<Array>>& arguments, | ||||||||||||||||||||||||||||||||||||
const std::vector<std::shared_ptr<Array>>& keys, int64_t num_segment_keys, | ||||||||||||||||||||||||||||||||||||
int64_t num_segments) { | ||||||||||||||||||||||||||||||||||||
ASSERT_GT(num_segments, 0); | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
auto rng = random::RandomArrayGenerator(42); | ||||||||||||||||||||||||||||||||||||
auto segment_key = rng.Int64(num_rows, /*min=*/0, /*max=*/num_segments - 1); | ||||||||||||||||||||||||||||||||||||
int64_t* values = segment_key->data()->GetMutableValues<int64_t>(1); | ||||||||||||||||||||||||||||||||||||
std::sort(values, values + num_rows); | ||||||||||||||||||||||||||||||||||||
// num_segment_keys copies of the segment key. | ||||||||||||||||||||||||||||||||||||
ArrayVector segment_keys(num_segment_keys, segment_key); | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
BenchmarkAggregate(state, std::move(aggregates), arguments, keys, segment_keys); | ||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
template <typename... Args> | ||||||||||||||||||||||||||||||||||||
static void CountScalarSegmentedByInts(benchmark::State& state, Args&&...) { | ||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So "CountScalar" is actually the case where Keys=0 but there are segment keys? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please let me explain a bit. Though both are named "aggregation", depending on whether there are "group by" keys or not (take SQL And segment keys, on the other hand, working orthogonally with group by keys, apply to both scalar and group by aggregations. Back to your question, yes, "CountScalar" implies exactly that this is a "scalar aggregation" (i.e., w/o any group by keys) on a "count" function. "SegmentedByInts" implies that there are potential segment keys. Hope this can clear things a bit. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are right, what this benchmark does is not simply a Though I'm not entirely sure how the segmented non-group-by agg map to a SQL basis, in Acero, specifying segment keys doesn't actually require group-by's existence. One just specifies segment keys and group by keys (independently) within arrow/cpp/src/arrow/acero/options.h Lines 332 to 348 in 9576a41
So I would let the naming reflect the underlying implementation (scalar agg vs. group by agg). (I understand the argument could be: a segment key still implies group by semantic. I agree. But that's a more end-to-end perspective and doesn't reflect how we handle the segment key). |
||||||||||||||||||||||||||||||||||||
constexpr int64_t num_rows = 32 * 1024; | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
// A trivial column to count from. | ||||||||||||||||||||||||||||||||||||
auto arg = ConstantArrayGenerator::Zeroes(num_rows, int32()); | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
BenchmarkSegmentedAggregate(state, num_rows, {{"count", ""}}, {arg}, /*keys=*/{}, | ||||||||||||||||||||||||||||||||||||
state.range(0), state.range(1)); | ||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
BENCHMARK(CountScalarSegmentedByInts) | ||||||||||||||||||||||||||||||||||||
->ArgNames({"SegmentKeys", "Segments"}) | ||||||||||||||||||||||||||||||||||||
->ArgsProduct({{0, 1, 2}, benchmark::CreateRange(1, 256, 8)}); | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
template <typename... Args> | ||||||||||||||||||||||||||||||||||||
static void CountGroupByIntsSegmentedByInts(benchmark::State& state, Args&&...) { | ||||||||||||||||||||||||||||||||||||
constexpr int64_t num_rows = 32 * 1024; | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
// A trivial column to count from. | ||||||||||||||||||||||||||||||||||||
auto arg = ConstantArrayGenerator::Zeroes(num_rows, int32()); | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
auto rng = random::RandomArrayGenerator(42); | ||||||||||||||||||||||||||||||||||||
int64_t num_keys = state.range(0); | ||||||||||||||||||||||||||||||||||||
ArrayVector keys(num_keys); | ||||||||||||||||||||||||||||||||||||
for (auto& key : keys) { | ||||||||||||||||||||||||||||||||||||
key = rng.Int64(num_rows, /*min=*/0, /*max=*/64); | ||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's a bit weird that this doesn't use num_segments. For comparison purposes, I would expect a similar cardinality in segmented and non-segmented keys. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Generally the cardinalities of a segment key and a group by key are relatively independent, so I chose not to use the one of segment key for group by as well. If we want to make the group by cardinality variable, we can use another independent parameter. But my concerns is that the performance of segment keys and group by keys are also independent, that is why this benchmark uses a fixed group by key cardinality. What do you think? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The point here is to compare performance of segmented vs. non-segmented keys, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In these two particular benchmarks, they are to show how the number of segment keys and the number of segments perform with a predefined group by setup. In other words, the group by portion of the benchmark is merely to make the benchmark more realistic. They are not designed for the comparison you suggested. IIUC, an apple to apple comparison though might be: a segmented scalar agg (N segment keys and 0 group by keys) VS. a non-segmented group by agg (0 segment keys and N group by keys), with the same key distribution. In this comparison, we can answer how a group by can be done more efficiently by taking advantage of the segmented nature of the key(s) (i.e., using segmented agg). If you are suggesting such a comparison, yes that makes sense. I think I can add a new benchmark for it. What do you think? Thanks. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, thanks for the explanation. A new benchmark can be added in a later PR if we want, this one is fine now that I understand the motivation. |
||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
BenchmarkSegmentedAggregate(state, num_rows, {{"hash_count", ""}}, {arg}, keys, | ||||||||||||||||||||||||||||||||||||
state.range(1), state.range(2)); | ||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||
BENCHMARK(CountGroupByIntsSegmentedByInts) | ||||||||||||||||||||||||||||||||||||
->ArgNames({"Keys", "SegmentKeys", "Segments"}) | ||||||||||||||||||||||||||||||||||||
->ArgsProduct({{1, 2}, {0, 1, 2}, benchmark::CreateRange(1, 256, 8)}); | ||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've tried to use 1+ segment keys and 0 non-segment keys to get an idea of the purely-segmented performance, but I get:
It's a bit of a bummer, isn't it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As my other comment explained, an aggregation w/o any group by keys (aka. non-segment keys here) is a scalar aggregation and requires "count" rather than "hash_count". This is a "group by aggregation" benchmark (the counter-part of the previous "CountScalar" one) so there has to be at least one group by key. |
||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
} // namespace acero | ||||||||||||||||||||||||||||||||||||
} // namespace arrow |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -131,17 +131,14 @@ void AggregatesToString(std::stringstream* ss, const Schema& input_schema, | |
template <typename BatchHandler> | ||
Status HandleSegments(RowSegmenter* segmenter, const ExecBatch& batch, | ||
const std::vector<int>& ids, const BatchHandler& handle_batch) { | ||
int64_t offset = 0; | ||
ARROW_ASSIGN_OR_RAISE(auto segment_exec_batch, batch.SelectValues(ids)); | ||
ExecSpan segment_batch(segment_exec_batch); | ||
|
||
while (true) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the only call-site in non-testing codes. |
||
ARROW_ASSIGN_OR_RAISE(compute::Segment segment, | ||
segmenter->GetNextSegment(segment_batch, offset)); | ||
if (segment.offset >= segment_batch.length) break; // condition of no-next-segment | ||
ARROW_ASSIGN_OR_RAISE(auto segments, segmenter->GetSegments(segment_batch)); | ||
for (const auto& segment : segments) { | ||
ARROW_RETURN_NOT_OK(handle_batch(batch, segment)); | ||
offset = segment.offset + segment.length; | ||
} | ||
|
||
return Status::OK(); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not much better than before, is it? I would expect something like:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see your point. It is only that I want to make the number of segments to be exactly as specified. Combining independently-random keys, even with the same distribution, will make the number of segments (potentially, much) bigger.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But that's also much more realistic, isn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That said, as you prefer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think having exact number of segments would help on diagnostics of performance issues than introducing yet another level of randomness, so I'd rather keep it as is :)