Skip to content

Commit

Permalink
resolve comments
Browse files Browse the repository at this point in the history
  • Loading branch information
duanmeng committed Dec 6, 2023
1 parent f15ff74 commit 9bc5a5c
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 28 deletions.
7 changes: 4 additions & 3 deletions velox/exec/AggregateUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ std::vector<core::LambdaTypedExprPtr> extractLambdaInputs(

std::vector<AggregateInfo> AggregateUtil::toAggregateInfo(
const core::AggregationNode& aggregationNode,
const RowTypePtr& inputType,
const RowTypePtr& outputType,
core::AggregationNode::Step step,
const OperatorCtx& operatorCtx,
uint32_t numKeys,
std::shared_ptr<core::ExpressionEvaluator>& expressionEvaluator,
Expand All @@ -48,6 +45,10 @@ std::vector<AggregateInfo> AggregateUtil::toAggregateInfo(
std::vector<AggregateInfo> aggregates;
aggregates.reserve(numAggregates);

const auto& inputType = aggregationNode.sources()[0]->outputType();
const auto& outputType = aggregationNode.outputType();
const auto step = aggregationNode.step();

for (auto i = 0; i < numAggregates; i++) {
const auto& aggregate = aggregationNode.aggregates()[i];

Expand Down
18 changes: 9 additions & 9 deletions velox/exec/AggregateUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,19 @@ class AggregateUtil {
/// a hash aggregation plan node or a streaming aggregation plan node.
///
/// @param aggregationNode Plan node of this aggregation.
/// @param inputType Input type of the aggregationNode.
/// @param outputType Output type of the aggregationNode.
/// @param step Aggregation step of the aggregationNode.
/// @param operatorCtx Operator context.
/// @param numKeys Number of group keys.
/// @param expressionEvaluator Expression evaluation.
/// @param isStreaming Indicate this aggregation if streaming or not.
/// @param numKeys Number of grouping keys.
/// @param expressionEvaluator An Expression evaluator. It is used by an
/// aggregate operator to compile and eval lambda expression. It should be
/// initiated/assigned for at most one time.
/// @param isStreaming Indicate whether this aggregation is streaming or not.
/// Pass true if the aggregate operator is a StreamingAggregation and false if
/// the aggregate operator is a HashAggregation. This parameter will be
/// removed after sorted, distinct aggregation, and lambda functions support
/// are added to StreamingAggregation.
/// @return List of AggregationInfo.
static std::vector<AggregateInfo> toAggregateInfo(
const core::AggregationNode& aggregationNode,
const RowTypePtr& inputType,
const RowTypePtr& outputType,
core::AggregationNode::Step step,
const OperatorCtx& operatorCtx,
uint32_t numKeys,
std::shared_ptr<core::ExpressionEvaluator>& expressionEvaluator,
Expand Down
9 changes: 1 addition & 8 deletions velox/exec/HashAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ void HashAggregation::initialize() {
VELOX_CHECK(pool()->trackUsage());

auto inputType = aggregationNode_->sources()[0]->outputType();

auto hashers =
createVectorHashers(inputType, aggregationNode_->groupingKeys());
auto numHashers = hashers.size();
Expand All @@ -68,13 +67,7 @@ void HashAggregation::initialize() {

std::shared_ptr<core::ExpressionEvaluator> expressionEvaluator;
std::vector<AggregateInfo> aggregateInfos = AggregateUtil::toAggregateInfo(
*aggregationNode_,
inputType,
outputType_,
aggregationNode_->step(),
*operatorCtx_,
numHashers,
expressionEvaluator);
*aggregationNode_, *operatorCtx_, numHashers, expressionEvaluator);

// Check that aggregate result type match the output type.
for (auto i = 0; i < aggregateInfos.size(); i++) {
Expand Down
9 changes: 1 addition & 8 deletions velox/exec/StreamingAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,7 @@ void StreamingAggregation::initialize() {

std::shared_ptr<core::ExpressionEvaluator> expressionEvaluator;
aggregates_ = AggregateUtil::toAggregateInfo(
*aggregationNode_,
inputType,
outputType_,
aggregationNode_->step(),
*operatorCtx_,
numKeys,
expressionEvaluator,
true);
*aggregationNode_, *operatorCtx_, numKeys, expressionEvaluator, true);

// Setup masks and accumulators
const auto numAggregates = aggregationNode_->aggregates().size();
Expand Down

0 comments on commit 9bc5a5c

Please sign in to comment.