Skip to content

Commit

Permalink
apacheGH-43986: [C++][Acero] Some code cleanup to Grouper (apache#4…
Browse files Browse the repository at this point in the history
…3988)

### Rationale for this change

See apache#43986.

### What changes are included in this PR?

Mostly trivial changes, plus removing one `Grouper` implementation that's not wired.

### Are these changes tested?

No new tests needed.

### Are there any user-facing changes?

None.

* GitHub Issue: apache#43986

Authored-by: Ruoxi Sun <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
zanmato1984 authored Sep 9, 2024
1 parent d0f9f3e commit 27b43f4
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 46 deletions.
5 changes: 3 additions & 2 deletions cpp/src/arrow/acero/groupby_aggregate_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -369,13 +369,14 @@ Status GroupByNode::InputReceived(ExecNode* input, ExecBatch batch) {
DCHECK_EQ(input, inputs_[0]);

auto handler = [this](const ExecBatch& full_batch, const Segment& segment) {
if (!segment.extends && segment.offset == 0) RETURN_NOT_OK(OutputResult(false));
if (!segment.extends && segment.offset == 0)
RETURN_NOT_OK(OutputResult(/*is_last=*/false));
auto exec_batch = full_batch.Slice(segment.offset, segment.length);
auto batch = ExecSpan(exec_batch);
RETURN_NOT_OK(Consume(batch));
RETURN_NOT_OK(
ExtractSegmenterValues(&segmenter_values_, exec_batch, segment_key_field_ids_));
if (!segment.is_open) RETURN_NOT_OK(OutputResult(false));
if (!segment.is_open) RETURN_NOT_OK(OutputResult(/*is_last=*/false));
return Status::OK();
};
ARROW_RETURN_NOT_OK(
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/arrow/acero/scalar_aggregate_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ Status ScalarAggregateNode::InputReceived(ExecNode* input, ExecBatch batch) {
// (1) The segment is starting of a new segment group and points to
// the beginning of the batch, then it means no data in the batch belongs
// to the current segment group. We can output and reset kernel states.
if (!segment.extends && segment.offset == 0) RETURN_NOT_OK(OutputResult(false));
if (!segment.extends && segment.offset == 0)
RETURN_NOT_OK(OutputResult(/*is_last=*/false));

// We add segment to the current segment group aggregation
auto exec_batch = full_batch.Slice(segment.offset, segment.length);
Expand All @@ -244,7 +245,7 @@ Status ScalarAggregateNode::InputReceived(ExecNode* input, ExecBatch batch) {

// If the segment closes the current segment group, we can output segment group
// aggregation.
if (!segment.is_open) RETURN_NOT_OK(OutputResult(false));
if (!segment.is_open) RETURN_NOT_OK(OutputResult(/*is_last=*/false));

return Status::OK();
};
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/arrow/compute/kernels/hash_aggregate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2533,11 +2533,11 @@ struct GroupedCountDistinctImpl : public GroupedAggregator {
struct GroupedDistinctImpl : public GroupedCountDistinctImpl {
Result<Datum> Finalize() override {
ARROW_ASSIGN_OR_RAISE(auto uniques, grouper_->GetUniques());
ARROW_ASSIGN_OR_RAISE(auto groupings, grouper_->MakeGroupings(
*uniques[1].array_as<UInt32Array>(),
static_cast<uint32_t>(num_groups_), ctx_));
ARROW_ASSIGN_OR_RAISE(
auto list, grouper_->ApplyGroupings(*groupings, *uniques[0].make_array(), ctx_));
auto groupings, Grouper::MakeGroupings(*uniques[1].array_as<UInt32Array>(),
static_cast<uint32_t>(num_groups_), ctx_));
ARROW_ASSIGN_OR_RAISE(
auto list, Grouper::ApplyGroupings(*groupings, *uniques[0].make_array(), ctx_));
const auto& values = list->values();
DCHECK_EQ(values->offset(), 0);
auto* offsets = list->value_offsets()->mutable_data_as<int32_t>();
Expand Down
32 changes: 0 additions & 32 deletions cpp/src/arrow/compute/row/grouper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -332,38 +332,6 @@ Result<std::unique_ptr<RowSegmenter>> RowSegmenter::Make(

namespace {

struct GrouperNoKeysImpl : Grouper {
Result<std::shared_ptr<Array>> MakeConstantGroupIdArray(int64_t length,
group_id_t value) {
std::unique_ptr<ArrayBuilder> a_builder;
RETURN_NOT_OK(MakeBuilder(default_memory_pool(), g_group_id_type, &a_builder));
using GroupIdBuilder = typename TypeTraits<GroupIdType>::BuilderType;
auto builder = checked_cast<GroupIdBuilder*>(a_builder.get());
if (length != 0) {
RETURN_NOT_OK(builder->Resize(length));
}
for (int64_t i = 0; i < length; i++) {
builder->UnsafeAppend(value);
}
std::shared_ptr<Array> array;
RETURN_NOT_OK(builder->Finish(&array));
return array;
}
Status Reset() override { return Status::OK(); }
Result<Datum> Consume(const ExecSpan& batch, int64_t offset, int64_t length) override {
ARROW_ASSIGN_OR_RAISE(auto array, MakeConstantGroupIdArray(length, 0));
return Datum(array);
}
Result<ExecBatch> GetUniques() override {
auto data = ArrayData::Make(uint32(), 1, 0);
auto values = data->GetMutableValues<uint32_t>(0);
values[0] = 0;
ExecBatch out({Datum(data)}, 1);
return out;
}
uint32_t num_groups() const override { return 1; }
};

struct GrouperImpl : public Grouper {
static Result<std::unique_ptr<GrouperImpl>> Make(
const std::vector<TypeHolder>& key_types, ExecContext* ctx) {
Expand Down
12 changes: 6 additions & 6 deletions cpp/src/arrow/compute/row/grouper.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ inline bool operator!=(const Segment& segment1, const Segment& segment2) {

/// \brief a helper class to divide a batch into segments of equal values
///
/// For example, given a batch with two rows:
/// For example, given a batch with two columns specifed as segment keys:
///
/// A A
/// A A
/// A B
/// A B
/// A A
/// A A [other columns]...
/// A A ...
/// A B ...
/// A B ...
/// A A ...
///
/// Then the batch could be divided into 3 segments. The first would be rows 0 & 1,
/// the second would be rows 2 & 3, and the third would be row 4.
Expand Down

0 comments on commit 27b43f4

Please sign in to comment.