Skip to content

Commit

Permalink
apacheGH-33484: [C++][Compute] Implement Grouper::Reset (apache#41352)
Browse files Browse the repository at this point in the history
### Rationale for this change

Recently I've been working on some improvement for `Grouper` and I found adding `Reset` function could be beneficial. Then I trace down to apache#33484 from a TODO in code. Here comes this PR.

### What changes are included in this PR?

Add `Reset` function for all the concrete `Grouper` implementations, and eliminate the recreation of `Grouper` in `AnyKeysSegmenter`.

Also add more `RowSegmenter` cases covering `AnyKeysSegmenter`.

### Are these changes tested?

Yes. Legacy UTs should cover it well. Also added some new UTs.

### Are there any user-facing changes?

None.

* GitHub Issue: apache#33484

Lead-authored-by: Ruoxi Sun <[email protected]>
Co-authored-by: Rossi Sun <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
zanmato1984 and pitrou authored May 14, 2024
1 parent e6ab174 commit ada965f
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 51 deletions.
172 changes: 136 additions & 36 deletions cpp/src/arrow/acero/hash_aggregate_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,12 @@ void TestSegments(std::unique_ptr<RowSegmenter>& segmenter, const ExecSpan& batc
ASSERT_EQ(expected_segment, segment);
offset = segment.offset + segment.length;
}
// Assert next is the last (empty) segment.
ASSERT_OK_AND_ASSIGN(auto segment, segmenter->GetNextSegment(batch, offset));
ASSERT_GE(segment.offset, batch.length);
ASSERT_EQ(segment.length, 0);
ASSERT_TRUE(segment.is_open);
ASSERT_TRUE(segment.extends);
}

Result<std::unique_ptr<Grouper>> MakeGrouper(const std::vector<TypeHolder>& key_types) {
Expand Down Expand Up @@ -682,48 +688,142 @@ TEST(RowSegmenter, Basics) {
}

TEST(RowSegmenter, NonOrdered) {
std::vector<TypeHolder> types = {int32()};
auto batch = ExecBatchFromJSON(types, "[[1], [1], [2], [1], [2]]");
ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(types));
TestSegments(segmenter, ExecSpan(batch),
{{0, 2, false, true},
{2, 1, false, false},
{3, 1, false, false},
{4, 1, true, false},
{5, 0, true, true}});
{
std::vector<TypeHolder> types = {int32()};
auto batch = ExecBatchFromJSON(types, "[[1], [1], [2], [1], [2]]");
ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(types));
TestSegments(segmenter, ExecSpan(batch),
{{0, 2, false, true},
{2, 1, false, false},
{3, 1, false, false},
{4, 1, true, false},
{5, 0, true, true}});
}
{
std::vector<TypeHolder> types = {int32(), int32()};
auto batch = ExecBatchFromJSON(types, "[[1, 1], [1, 1], [2, 2], [1, 2], [2, 2]]");
ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(types));
TestSegments(segmenter, ExecSpan(batch),
{{0, 2, false, true},
{2, 1, false, false},
{3, 1, false, false},
{4, 1, true, false},
{5, 0, true, true}});
}
}

TEST(RowSegmenter, EmptyBatches) {
std::vector<TypeHolder> types = {int32()};
std::vector<ExecBatch> batches = {
ExecBatchFromJSON(types, "[]"), ExecBatchFromJSON(types, "[]"),
ExecBatchFromJSON(types, "[[1]]"), ExecBatchFromJSON(types, "[]"),
ExecBatchFromJSON(types, "[[1]]"), ExecBatchFromJSON(types, "[]"),
ExecBatchFromJSON(types, "[[2], [2]]"), ExecBatchFromJSON(types, "[]"),
};
ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(types));
TestSegments(segmenter, ExecSpan(batches[0]), {});
TestSegments(segmenter, ExecSpan(batches[1]), {});
TestSegments(segmenter, ExecSpan(batches[2]), {{0, 1, true, true}});
TestSegments(segmenter, ExecSpan(batches[3]), {});
TestSegments(segmenter, ExecSpan(batches[4]), {{0, 1, true, true}});
TestSegments(segmenter, ExecSpan(batches[5]), {});
TestSegments(segmenter, ExecSpan(batches[6]), {{0, 2, true, false}});
TestSegments(segmenter, ExecSpan(batches[7]), {});
{
std::vector<TypeHolder> types = {int32()};
std::vector<ExecBatch> batches = {
ExecBatchFromJSON(types, "[]"), ExecBatchFromJSON(types, "[]"),
ExecBatchFromJSON(types, "[[1]]"), ExecBatchFromJSON(types, "[]"),
ExecBatchFromJSON(types, "[[1]]"), ExecBatchFromJSON(types, "[]"),
ExecBatchFromJSON(types, "[[2], [2]]"), ExecBatchFromJSON(types, "[]"),
};
ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(types));
TestSegments(segmenter, ExecSpan(batches[0]), {});
TestSegments(segmenter, ExecSpan(batches[1]), {});
TestSegments(segmenter, ExecSpan(batches[2]), {{0, 1, true, true}});
TestSegments(segmenter, ExecSpan(batches[3]), {});
TestSegments(segmenter, ExecSpan(batches[4]), {{0, 1, true, true}});
TestSegments(segmenter, ExecSpan(batches[5]), {});
TestSegments(segmenter, ExecSpan(batches[6]), {{0, 2, true, false}});
TestSegments(segmenter, ExecSpan(batches[7]), {});
}
{
std::vector<TypeHolder> types = {int32(), int32()};
std::vector<ExecBatch> batches = {
ExecBatchFromJSON(types, "[]"),
ExecBatchFromJSON(types, "[]"),
ExecBatchFromJSON(types, "[[1, 1]]"),
ExecBatchFromJSON(types, "[]"),
ExecBatchFromJSON(types, "[[1, 1]]"),
ExecBatchFromJSON(types, "[]"),
ExecBatchFromJSON(types, "[[2, 2], [2, 2]]"),
ExecBatchFromJSON(types, "[]"),
};
ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(types));
TestSegments(segmenter, ExecSpan(batches[0]), {});
TestSegments(segmenter, ExecSpan(batches[1]), {});
TestSegments(segmenter, ExecSpan(batches[2]), {{0, 1, true, true}});
TestSegments(segmenter, ExecSpan(batches[3]), {});
TestSegments(segmenter, ExecSpan(batches[4]), {{0, 1, true, true}});
TestSegments(segmenter, ExecSpan(batches[5]), {});
TestSegments(segmenter, ExecSpan(batches[6]), {{0, 2, true, false}});
TestSegments(segmenter, ExecSpan(batches[7]), {});
}
}

TEST(RowSegmenter, MultipleSegments) {
std::vector<TypeHolder> types = {int32()};
auto batch = ExecBatchFromJSON(types, "[[1], [1], [2], [5], [3], [3], [5], [5], [4]]");
ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(types));
TestSegments(segmenter, ExecSpan(batch),
{{0, 2, false, true},
{2, 1, false, false},
{3, 1, false, false},
{4, 2, false, false},
{6, 2, false, false},
{8, 1, true, false},
{9, 0, true, true}});
{
std::vector<TypeHolder> types = {int32()};
auto batch =
ExecBatchFromJSON(types, "[[1], [1], [2], [5], [3], [3], [5], [5], [4]]");
ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(types));
TestSegments(segmenter, ExecSpan(batch),
{{0, 2, false, true},
{2, 1, false, false},
{3, 1, false, false},
{4, 2, false, false},
{6, 2, false, false},
{8, 1, true, false},
{9, 0, true, true}});
}
{
std::vector<TypeHolder> types = {int32(), int32()};
auto batch = ExecBatchFromJSON(
types,
"[[1, 1], [1, 1], [2, 2], [5, 5], [3, 3], [3, 3], [5, 5], [5, 5], [4, 4]]");
ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(types));
TestSegments(segmenter, ExecSpan(batch),
{{0, 2, false, true},
{2, 1, false, false},
{3, 1, false, false},
{4, 2, false, false},
{6, 2, false, false},
{8, 1, true, false},
{9, 0, true, true}});
}
}

TEST(RowSegmenter, MultipleSegmentsMultipleBatches) {
{
std::vector<TypeHolder> types = {int32()};
std::vector<ExecBatch> batches = {
ExecBatchFromJSON(types, "[[1]]"), ExecBatchFromJSON(types, "[[1], [2]]"),
ExecBatchFromJSON(types, "[[5], [3]]"),
ExecBatchFromJSON(types, "[[3], [5], [5]]"), ExecBatchFromJSON(types, "[[4]]")};

ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(types));
TestSegments(segmenter, ExecSpan(batches[0]), {{0, 1, true, true}});
TestSegments(segmenter, ExecSpan(batches[1]),
{{0, 1, false, true}, {1, 1, true, false}});
TestSegments(segmenter, ExecSpan(batches[2]),
{{0, 1, false, false}, {1, 1, true, false}});
TestSegments(segmenter, ExecSpan(batches[3]),
{{0, 1, false, true}, {1, 2, true, false}});
TestSegments(segmenter, ExecSpan(batches[4]), {{0, 1, true, false}});
}
{
std::vector<TypeHolder> types = {int32(), int32()};
std::vector<ExecBatch> batches = {
ExecBatchFromJSON(types, "[[1, 1]]"),
ExecBatchFromJSON(types, "[[1, 1], [2, 2]]"),
ExecBatchFromJSON(types, "[[5, 5], [3, 3]]"),
ExecBatchFromJSON(types, "[[3, 3], [5, 5], [5, 5]]"),
ExecBatchFromJSON(types, "[[4, 4]]")};

ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(types));
TestSegments(segmenter, ExecSpan(batches[0]), {{0, 1, true, true}});
TestSegments(segmenter, ExecSpan(batches[1]),
{{0, 1, false, true}, {1, 1, true, false}});
TestSegments(segmenter, ExecSpan(batches[2]),
{{0, 1, false, false}, {1, 1, true, false}});
TestSegments(segmenter, ExecSpan(batches[3]),
{{0, 1, false, true}, {1, 2, true, false}});
TestSegments(segmenter, ExecSpan(batches[4]), {{0, 1, true, false}});
}
}

namespace {
Expand Down
43 changes: 28 additions & 15 deletions cpp/src/arrow/compute/row/grouper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -217,18 +217,18 @@ struct SimpleKeySegmenter : public BaseRowSegmenter {
struct AnyKeysSegmenter : public BaseRowSegmenter {
static Result<std::unique_ptr<RowSegmenter>> Make(
const std::vector<TypeHolder>& key_types, ExecContext* ctx) {
ARROW_RETURN_NOT_OK(Grouper::Make(key_types, ctx)); // check types
return std::make_unique<AnyKeysSegmenter>(key_types, ctx);
ARROW_ASSIGN_OR_RAISE(auto grouper, Grouper::Make(key_types, ctx)); // check types
return std::make_unique<AnyKeysSegmenter>(key_types, ctx, std::move(grouper));
}

AnyKeysSegmenter(const std::vector<TypeHolder>& key_types, ExecContext* ctx)
AnyKeysSegmenter(const std::vector<TypeHolder>& key_types, ExecContext* ctx,
std::unique_ptr<Grouper> grouper)
: BaseRowSegmenter(key_types),
ctx_(ctx),
grouper_(nullptr),
grouper_(std::move(grouper)),
save_group_id_(kNoGroupId) {}

Status Reset() override {
grouper_ = nullptr;
ARROW_RETURN_NOT_OK(grouper_->Reset());
save_group_id_ = kNoGroupId;
return Status::OK();
}
Expand All @@ -245,7 +245,6 @@ struct AnyKeysSegmenter : public BaseRowSegmenter {
// first row of a new segment to see if it extends the previous segment.
template <typename Batch>
Result<group_id_t> MapGroupIdAt(const Batch& batch, int64_t offset) {
if (!grouper_) return kNoGroupId;
ARROW_ASSIGN_OR_RAISE(auto datum, grouper_->Consume(batch, offset,
/*length=*/1));
if (!datum.is_array()) {
Expand All @@ -264,9 +263,6 @@ struct AnyKeysSegmenter : public BaseRowSegmenter {
if (offset == batch.length) {
return MakeSegment(batch.length, offset, 0, kEmptyExtends);
}
// ARROW-18311: make Grouper support Reset()
// so it can be reset instead of recreated below
//
// the group id must be computed prior to resetting the grouper, since it is compared
// to save_group_id_, and after resetting the grouper produces incomparable group ids
ARROW_ASSIGN_OR_RAISE(auto group_id, MapGroupIdAt(batch, offset));
Expand All @@ -276,7 +272,7 @@ struct AnyKeysSegmenter : public BaseRowSegmenter {
return extends;
};
// resetting drops grouper's group-ids, freeing-up memory for the next segment
ARROW_ASSIGN_OR_RAISE(grouper_, Grouper::Make(key_types_, ctx_)); // TODO: reset it
ARROW_RETURN_NOT_OK(grouper_->Reset());
// GH-34475: cache the grouper-consume result across invocations of GetNextSegment
ARROW_ASSIGN_OR_RAISE(auto datum, grouper_->Consume(batch, offset));
if (datum.is_array()) {
Expand All @@ -299,7 +295,6 @@ struct AnyKeysSegmenter : public BaseRowSegmenter {
}

private:
ExecContext* const ctx_;
std::unique_ptr<Grouper> grouper_;
group_id_t save_group_id_;
};
Expand Down Expand Up @@ -354,6 +349,7 @@ struct GrouperNoKeysImpl : Grouper {
RETURN_NOT_OK(builder->Finish(&array));
return std::move(array);
}
Status Reset() override { return Status::OK(); }
Result<Datum> Consume(const ExecSpan& batch, int64_t offset, int64_t length) override {
ARROW_ASSIGN_OR_RAISE(auto array, MakeConstantGroupIdArray(length, 0));
return Datum(array);
Expand Down Expand Up @@ -419,6 +415,14 @@ struct GrouperImpl : public Grouper {
return std::move(impl);
}

Status Reset() override {
map_.clear();
offsets_.clear();
key_bytes_.clear();
num_groups_ = 0;
return Status::OK();
}

Result<Datum> Consume(const ExecSpan& batch, int64_t offset, int64_t length) override {
ARROW_RETURN_NOT_OK(CheckAndCapLengthForConsume(batch.length, offset, &length));
if (offset != 0 || length != batch.length) {
Expand Down Expand Up @@ -595,7 +599,17 @@ struct GrouperFastImpl : public Grouper {
return std::move(impl);
}

~GrouperFastImpl() { map_.cleanup(); }
Status Reset() override {
rows_.Clean();
rows_minibatch_.Clean();
map_.cleanup();
RETURN_NOT_OK(map_.init(encode_ctx_.hardware_flags, ctx_->memory_pool()));
// TODO: It is now assumed that the dictionaries_ are identical to the first batch
// throughout the grouper's lifespan so no resetting is needed. But if we want to
// support different dictionaries for different batches, we need to reset the
// dictionaries_ here.
return Status::OK();
}

Result<Datum> Consume(const ExecSpan& batch, int64_t offset, int64_t length) override {
ARROW_RETURN_NOT_OK(CheckAndCapLengthForConsume(batch.length, offset, &length));
Expand Down Expand Up @@ -838,8 +852,7 @@ struct GrouperFastImpl : public Grouper {
return out;
}

static constexpr int log_minibatch_max_ = 10;
static constexpr int minibatch_size_max_ = 1 << log_minibatch_max_;
static constexpr int minibatch_size_max_ = arrow::util::MiniBatch::kMiniBatchLength;
static constexpr int minibatch_size_min_ = 128;
int minibatch_size_;

Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/compute/row/grouper.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ class ARROW_EXPORT Grouper {
static Result<std::unique_ptr<Grouper>> Make(const std::vector<TypeHolder>& key_types,
ExecContext* ctx = default_exec_context());

/// Reset all intermediate state, make the grouper logically as just `Make`ed.
/// The underlying buffers, if any, may or may not be released though.
virtual Status Reset() = 0;

/// Consume a batch of keys, producing the corresponding group ids as an integer array,
/// over a slice defined by an offset and length, which defaults to the batch length.
/// Currently only uint32 indices will be produced, eventually the bit width will only
Expand Down

0 comments on commit ada965f

Please sign in to comment.