Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-44052: [C++][Compute] Reduce the complexity of row segmenter #44053

Merged
merged 17 commits into from
Sep 18, 2024
119 changes: 98 additions & 21 deletions cpp/src/arrow/acero/aggregate_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "arrow/array/array_primitive.h"
#include "arrow/compute/api.h"
#include "arrow/table.h"
#include "arrow/testing/generator.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
#include "arrow/util/benchmark_util.h"
Expand Down Expand Up @@ -325,7 +326,8 @@ BENCHMARK_TEMPLATE(ReferenceSum, SumBitmapVectorizeUnroll<int64_t>)

std::shared_ptr<RecordBatch> RecordBatchFromArrays(
const std::vector<std::shared_ptr<Array>>& arguments,
const std::vector<std::shared_ptr<Array>>& keys) {
const std::vector<std::shared_ptr<Array>>& keys,
const std::vector<std::shared_ptr<Array>>& segment_keys) {
std::vector<std::shared_ptr<Field>> fields;
std::vector<std::shared_ptr<Array>> all_arrays;
int64_t length = -1;
Expand All @@ -347,37 +349,56 @@ std::shared_ptr<RecordBatch> RecordBatchFromArrays(
fields.push_back(field("key" + ToChars(key_idx), key->type()));
all_arrays.push_back(key);
}
for (std::size_t segment_key_idx = 0; segment_key_idx < segment_keys.size();
segment_key_idx++) {
const auto& segment_key = segment_keys[segment_key_idx];
DCHECK_EQ(segment_key->length(), length);
fields.push_back(
field("segment_key" + ToChars(segment_key_idx), segment_key->type()));
all_arrays.push_back(segment_key);
}
return RecordBatch::Make(schema(std::move(fields)), length, std::move(all_arrays));
}

Result<std::shared_ptr<Table>> BatchGroupBy(
std::shared_ptr<RecordBatch> batch, std::vector<Aggregate> aggregates,
std::vector<FieldRef> keys, bool use_threads = false,
MemoryPool* memory_pool = default_memory_pool()) {
std::vector<FieldRef> keys, std::vector<FieldRef> segment_keys,
bool use_threads = false, MemoryPool* memory_pool = default_memory_pool()) {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Table> table,
Table::FromRecordBatches({std::move(batch)}));
Declaration plan = Declaration::Sequence(
{{"table_source", TableSourceNodeOptions(std::move(table))},
{"aggregate", AggregateNodeOptions(std::move(aggregates), std::move(keys))}});
{"aggregate", AggregateNodeOptions(std::move(aggregates), std::move(keys),
std::move(segment_keys))}});
return DeclarationToTable(std::move(plan), use_threads, memory_pool);
}

static void BenchmarkGroupBy(benchmark::State& state, std::vector<Aggregate> aggregates,
const std::vector<std::shared_ptr<Array>>& arguments,
const std::vector<std::shared_ptr<Array>>& keys) {
std::shared_ptr<RecordBatch> batch = RecordBatchFromArrays(arguments, keys);
static void BenchmarkAggregate(
benchmark::State& state, std::vector<Aggregate> aggregates,
const std::vector<std::shared_ptr<Array>>& arguments,
const std::vector<std::shared_ptr<Array>>& keys,
const std::vector<std::shared_ptr<Array>>& segment_keys = {}) {
std::shared_ptr<RecordBatch> batch =
RecordBatchFromArrays(arguments, keys, segment_keys);
std::vector<FieldRef> key_refs;
for (std::size_t key_idx = 0; key_idx < keys.size(); key_idx++) {
key_refs.emplace_back(static_cast<int>(key_idx + arguments.size()));
}
std::vector<FieldRef> segment_key_refs;
for (std::size_t segment_key_idx = 0; segment_key_idx < segment_keys.size();
segment_key_idx++) {
segment_key_refs.emplace_back(
static_cast<int>(segment_key_idx + arguments.size() + keys.size()));
}
for (std::size_t arg_idx = 0; arg_idx < arguments.size(); arg_idx++) {
aggregates[arg_idx].target = {FieldRef(static_cast<int>(arg_idx))};
}
int64_t total_bytes = TotalBufferSize(*batch);
for (auto _ : state) {
ABORT_NOT_OK(BatchGroupBy(batch, aggregates, key_refs));
ABORT_NOT_OK(BatchGroupBy(batch, aggregates, key_refs, segment_key_refs));
}
state.SetBytesProcessed(total_bytes * state.iterations());
state.SetItemsProcessed(batch->num_rows() * state.iterations());
}

#define GROUP_BY_BENCHMARK(Name, Impl) \
Expand All @@ -404,7 +425,7 @@ GROUP_BY_BENCHMARK(SumDoublesGroupedByTinyStringSet, [&] {
/*min_length=*/3,
/*max_length=*/32);

BenchmarkGroupBy(state, {{"hash_sum", ""}}, {summand}, {key});
BenchmarkAggregate(state, {{"hash_sum", ""}}, {summand}, {key});
});

GROUP_BY_BENCHMARK(SumDoublesGroupedBySmallStringSet, [&] {
Expand All @@ -419,7 +440,7 @@ GROUP_BY_BENCHMARK(SumDoublesGroupedBySmallStringSet, [&] {
/*min_length=*/3,
/*max_length=*/32);

BenchmarkGroupBy(state, {{"hash_sum", ""}}, {summand}, {key});
BenchmarkAggregate(state, {{"hash_sum", ""}}, {summand}, {key});
});

GROUP_BY_BENCHMARK(SumDoublesGroupedByMediumStringSet, [&] {
Expand All @@ -434,7 +455,7 @@ GROUP_BY_BENCHMARK(SumDoublesGroupedByMediumStringSet, [&] {
/*min_length=*/3,
/*max_length=*/32);

BenchmarkGroupBy(state, {{"hash_sum", ""}}, {summand}, {key});
BenchmarkAggregate(state, {{"hash_sum", ""}}, {summand}, {key});
});

GROUP_BY_BENCHMARK(SumDoublesGroupedByTinyIntegerSet, [&] {
Expand All @@ -448,7 +469,7 @@ GROUP_BY_BENCHMARK(SumDoublesGroupedByTinyIntegerSet, [&] {
/*min=*/0,
/*max=*/15);

BenchmarkGroupBy(state, {{"hash_sum", ""}}, {summand}, {key});
BenchmarkAggregate(state, {{"hash_sum", ""}}, {summand}, {key});
});

GROUP_BY_BENCHMARK(SumDoublesGroupedBySmallIntegerSet, [&] {
Expand All @@ -462,7 +483,7 @@ GROUP_BY_BENCHMARK(SumDoublesGroupedBySmallIntegerSet, [&] {
/*min=*/0,
/*max=*/255);

BenchmarkGroupBy(state, {{"hash_sum", ""}}, {summand}, {key});
BenchmarkAggregate(state, {{"hash_sum", ""}}, {summand}, {key});
});

GROUP_BY_BENCHMARK(SumDoublesGroupedByMediumIntegerSet, [&] {
Expand All @@ -476,7 +497,7 @@ GROUP_BY_BENCHMARK(SumDoublesGroupedByMediumIntegerSet, [&] {
/*min=*/0,
/*max=*/4095);

BenchmarkGroupBy(state, {{"hash_sum", ""}}, {summand}, {key});
BenchmarkAggregate(state, {{"hash_sum", ""}}, {summand}, {key});
});

GROUP_BY_BENCHMARK(SumDoublesGroupedByTinyIntStringPairSet, [&] {
Expand All @@ -494,7 +515,7 @@ GROUP_BY_BENCHMARK(SumDoublesGroupedByTinyIntStringPairSet, [&] {
/*min_length=*/3,
/*max_length=*/32);

BenchmarkGroupBy(state, {{"hash_sum", ""}}, {summand}, {int_key, str_key});
BenchmarkAggregate(state, {{"hash_sum", ""}}, {summand}, {int_key, str_key});
});

GROUP_BY_BENCHMARK(SumDoublesGroupedBySmallIntStringPairSet, [&] {
Expand All @@ -512,7 +533,7 @@ GROUP_BY_BENCHMARK(SumDoublesGroupedBySmallIntStringPairSet, [&] {
/*min_length=*/3,
/*max_length=*/32);

BenchmarkGroupBy(state, {{"hash_sum", ""}}, {summand}, {int_key, str_key});
BenchmarkAggregate(state, {{"hash_sum", ""}}, {summand}, {int_key, str_key});
});

GROUP_BY_BENCHMARK(SumDoublesGroupedByMediumIntStringPairSet, [&] {
Expand All @@ -530,7 +551,7 @@ GROUP_BY_BENCHMARK(SumDoublesGroupedByMediumIntStringPairSet, [&] {
/*min_length=*/3,
/*max_length=*/32);

BenchmarkGroupBy(state, {{"hash_sum", ""}}, {summand}, {int_key, str_key});
BenchmarkAggregate(state, {{"hash_sum", ""}}, {summand}, {int_key, str_key});
});

// Grouped MinMax
Expand All @@ -543,7 +564,7 @@ GROUP_BY_BENCHMARK(MinMaxDoublesGroupedByMediumInt, [&] {
/*nan_probability=*/args.null_proportion / 10);
auto int_key = rng.Int64(args.size, /*min=*/0, /*max=*/63);

BenchmarkGroupBy(state, {{"hash_min_max", ""}}, {input}, {int_key});
BenchmarkAggregate(state, {{"hash_min_max", ""}}, {input}, {int_key});
});

GROUP_BY_BENCHMARK(MinMaxShortStringsGroupedByMediumInt, [&] {
Expand All @@ -553,7 +574,7 @@ GROUP_BY_BENCHMARK(MinMaxShortStringsGroupedByMediumInt, [&] {
/*null_probability=*/args.null_proportion);
auto int_key = rng.Int64(args.size, /*min=*/0, /*max=*/63);

BenchmarkGroupBy(state, {{"hash_min_max", ""}}, {input}, {int_key});
BenchmarkAggregate(state, {{"hash_min_max", ""}}, {input}, {int_key});
});

GROUP_BY_BENCHMARK(MinMaxLongStringsGroupedByMediumInt, [&] {
Expand All @@ -563,7 +584,7 @@ GROUP_BY_BENCHMARK(MinMaxLongStringsGroupedByMediumInt, [&] {
/*null_probability=*/args.null_proportion);
auto int_key = rng.Int64(args.size, /*min=*/0, /*max=*/63);

BenchmarkGroupBy(state, {{"hash_min_max", ""}}, {input}, {int_key});
BenchmarkAggregate(state, {{"hash_min_max", ""}}, {input}, {int_key});
});

//
Expand Down Expand Up @@ -866,5 +887,61 @@ BENCHMARK(TDigestKernelDoubleMedian)->Apply(QuantileKernelArgs);
BENCHMARK(TDigestKernelDoubleDeciles)->Apply(QuantileKernelArgs);
BENCHMARK(TDigestKernelDoubleCentiles)->Apply(QuantileKernelArgs);

//
// Segmented Aggregate
//

static void BenchmarkSegmentedAggregate(
benchmark::State& state, int64_t num_rows, std::vector<Aggregate> aggregates,
const std::vector<std::shared_ptr<Array>>& arguments,
const std::vector<std::shared_ptr<Array>>& keys, int64_t num_segment_keys,
int64_t num_segments) {
ASSERT_GT(num_segments, 0);

auto rng = random::RandomArrayGenerator(42);
auto segment_key = rng.Int64(num_rows, /*min=*/0, /*max=*/num_segments - 1);
int64_t* values = segment_key->data()->GetMutableValues<int64_t>(1);
std::sort(values, values + num_rows);
// num_segment_keys copies of the segment key.
ArrayVector segment_keys(num_segment_keys, segment_key);
Comment on lines +902 to +906
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not much better than before, is it? I would expect something like:

Suggested change
auto segment_key = rng.Int64(num_rows, /*min=*/0, /*max=*/num_segments - 1);
int64_t* values = segment_key->data()->GetMutableValues<int64_t>(1);
std::sort(values, values + num_rows);
// num_segment_keys copies of the segment key.
ArrayVector segment_keys(num_segment_keys, segment_key);
ArrayVector segment_keys(num_segment_keys);
for (auto& segment_key : segment_keys) {
segment_key = rng.Int64(num_rows, /*min=*/0, /*max=*/num_segments - 1);
int64_t* values = segment_key->data()->GetMutableValues<int64_t>(1);
std::sort(values, values + num_rows);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see your point. It is only that I want to make the number of segments to be exactly as specified. Combining independently-random keys, even with the same distribution, will make the number of segments (potentially, much) bigger.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But that's also much more realistic, isn't it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That said, as you prefer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But that's also much more realistic, isn't it?

I think having exact number of segments would help on diagnostics of performance issues than introducing yet another level of randomness, so I'd rather keep it as is :)


BenchmarkAggregate(state, std::move(aggregates), arguments, keys, segment_keys);
}

template <typename... Args>
static void CountScalarSegmentedByInts(benchmark::State& state, Args&&...) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So "CountScalar" is actually the case where Keys=0 but there are segment keys?
This is quite misleading.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please let me explain a bit.

Though both are named "aggregation", depending on whether there are "group by" keys or not (take SQL select count(*) from t group by c and select count(*) from t for instance), most compute engines have two variants of them. Acero calls them "scalar aggregation" and "group by aggregation", and they work with aggregation functions "count/sum/..." and "hash_count/hash_sum/..." respectively. This is understandable because w/o a group by key, the aggregation just needs to hold one "scalar" value (e.g., current count/sum/...) during the whole computation, whereas a group by key immediately requires some structures like a hash table.

And segment keys, on the other hand, working orthogonally with group by keys, apply to both scalar and group by aggregations.

Back to your question, yes, "CountScalar" implies exactly that this is a "scalar aggregation" (i.e., w/o any group by keys) on a "count" function. "SegmentedByInts" implies that there are potential segment keys.

Hope this can clear things a bit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But select count(*) from t is not what this is doing. It's still doing a "group by", it's just not using a hash table for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, what this benchmark does is not simply a select count(*) from t. This is merely to explain the difference between a scalar agg and a group by agg. But that doesn't mean this benchmark is a group by - at least not in Acero.

Though I'm not entirely sure how the segmented non-group-by agg map to a SQL basis, in Acero, specifying segment keys doesn't actually require group-by's existence. One just specifies segment keys and group by keys (independently) within

class ARROW_ACERO_EXPORT AggregateNodeOptions : public ExecNodeOptions {
public:
/// \brief create an instance from values
explicit AggregateNodeOptions(std::vector<Aggregate> aggregates,
std::vector<FieldRef> keys = {},
std::vector<FieldRef> segment_keys = {})
: aggregates(std::move(aggregates)),
keys(std::move(keys)),
segment_keys(std::move(segment_keys)) {}
// aggregations which will be applied to the targeted fields
std::vector<Aggregate> aggregates;
// keys by which aggregations will be grouped (optional)
std::vector<FieldRef> keys;
// keys by which aggregations will be segmented (optional)
std::vector<FieldRef> segment_keys;
};
and call an "aggregate" node. The plan parsing will generate scalar agg or group by agg depending on if there are group by keys and assign segment keys to whichever of them.

So I would let the naming reflect the underlying implementation (scalar agg vs. group by agg).

(I understand the argument could be: a segment key still implies group by semantic. I agree. But that's a more end-to-end perspective and doesn't reflect how we handle the segment key).

constexpr int64_t num_rows = 32 * 1024;

// A trivial column to count from.
auto arg = ConstantArrayGenerator::Zeroes(num_rows, int32());

BenchmarkSegmentedAggregate(state, num_rows, {{"count", ""}}, {arg}, /*keys=*/{},
state.range(0), state.range(1));
}
BENCHMARK(CountScalarSegmentedByInts)
->ArgNames({"SegmentKeys", "Segments"})
->ArgsProduct({{0, 1, 2}, benchmark::CreateRange(1, 256, 8)});

template <typename... Args>
static void CountGroupByIntsSegmentedByInts(benchmark::State& state, Args&&...) {
constexpr int64_t num_rows = 32 * 1024;

// A trivial column to count from.
auto arg = ConstantArrayGenerator::Zeroes(num_rows, int32());

auto rng = random::RandomArrayGenerator(42);
int64_t num_keys = state.range(0);
ArrayVector keys(num_keys);
for (auto& key : keys) {
key = rng.Int64(num_rows, /*min=*/0, /*max=*/64);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit weird that this doesn't use num_segments. For comparison purposes, I would expect a similar cardinality in segmented and non-segmented keys.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally the cardinalities of a segment key and a group by key are relatively independent, so I chose not to use the one of segment key for group by as well. If we want to make the group by cardinality variable, we can use another independent parameter.

But my concerns is that the performance of segment keys and group by keys are also independent, that is why this benchmark uses a fixed group by key cardinality. What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point here is to compare performance of segmented vs. non-segmented keys, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In these two particular benchmarks, they are to show how the number of segment keys and the number of segments perform with a predefined group by setup. In other words, the group by portion of the benchmark is merely to make the benchmark more realistic. They are not designed for the comparison you suggested.

IIUC, an apple to apple comparison though might be: a segmented scalar agg (N segment keys and 0 group by keys) VS. a non-segmented group by agg (0 segment keys and N group by keys), with the same key distribution. In this comparison, we can answer how a group by can be done more efficiently by taking advantage of the segmented nature of the key(s) (i.e., using segmented agg). If you are suggesting such a comparison, yes that makes sense. I think I can add a new benchmark for it.

What do you think? Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, thanks for the explanation. A new benchmark can be added in a later PR if we want, this one is fine now that I understand the motivation.

}

BenchmarkSegmentedAggregate(state, num_rows, {{"hash_count", ""}}, {arg}, keys,
state.range(1), state.range(2));
}
BENCHMARK(CountGroupByIntsSegmentedByInts)
->ArgNames({"Keys", "SegmentKeys", "Segments"})
->ArgsProduct({{1, 2}, {0, 1, 2}, benchmark::CreateRange(1, 256, 8)});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried to use 1+ segment keys and 0 non-segment keys to get an idea of the purely-segmented performance, but I get:

Invalid: The provided function (hash_count) is a hash aggregate function.  Since there are no keys to group by, a scalar aggregate function was expected (normally these do not start with hash_)

It's a bit of a bummer, isn't it?

Copy link
Contributor Author

@zanmato1984 zanmato1984 Sep 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As my other comment explained, an aggregation w/o any group by keys (aka. non-segment keys here) is a scalar aggregation and requires "count" rather than "hash_count".

This is a "group by aggregation" benchmark (the counter-part of the previous "CountScalar" one) so there has to be at least one group by key.


} // namespace acero
} // namespace arrow
9 changes: 3 additions & 6 deletions cpp/src/arrow/acero/aggregate_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,17 +131,14 @@ void AggregatesToString(std::stringstream* ss, const Schema& input_schema,
template <typename BatchHandler>
Status HandleSegments(RowSegmenter* segmenter, const ExecBatch& batch,
const std::vector<int>& ids, const BatchHandler& handle_batch) {
int64_t offset = 0;
ARROW_ASSIGN_OR_RAISE(auto segment_exec_batch, batch.SelectValues(ids));
ExecSpan segment_batch(segment_exec_batch);

while (true) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only call-site in non-testing codes.

ARROW_ASSIGN_OR_RAISE(compute::Segment segment,
segmenter->GetNextSegment(segment_batch, offset));
if (segment.offset >= segment_batch.length) break; // condition of no-next-segment
ARROW_ASSIGN_OR_RAISE(auto segments, segmenter->GetSegments(segment_batch));
for (const auto& segment : segments) {
ARROW_RETURN_NOT_OK(handle_batch(batch, segment));
offset = segment.offset + segment.length;
}

return Status::OK();
}

Expand Down
Loading
Loading