Skip to content

Commit

Permalink
max-by
Browse files Browse the repository at this point in the history
  • Loading branch information
yma11 committed Apr 21, 2024
1 parent 09efd20 commit 73e2885
Show file tree
Hide file tree
Showing 13 changed files with 228 additions and 305 deletions.
2 changes: 2 additions & 0 deletions velox/exec/Aggregate.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
#pragma once

#include <iostream>
#include <folly/Synchronized.h>

#include "velox/common/memory/HashStringAllocator.h"
Expand Down Expand Up @@ -384,6 +385,7 @@ class Aggregate {
if (mask & nullMask_) {
group[nullByte_] = mask & ~nullMask_;
--numNulls_;
std::cout << "null in group is cleared." << std::endl;
return true;
}
}
Expand Down
8 changes: 4 additions & 4 deletions velox/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,12 @@ target_link_libraries(
velox_arrow_bridge
velox_common_compression)

if(${VELOX_BUILD_TESTING})
#if(${VELOX_BUILD_TESTING})
add_subdirectory(fuzzer)
add_subdirectory(tests)
elseif(${VELOX_BUILD_TEST_UTILS})
add_subdirectory(tests/utils)
endif()
#if(${VELOX_BUILD_TEST_UTILS})
#add_subdirectory(tests/utils)
#endif()

if(${VELOX_ENABLE_BENCHMARKS})
add_subdirectory(benchmarks)
Expand Down
3 changes: 3 additions & 0 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

#include <iostream>
#include "Driver.h"
#include <folly/ScopeGuard.h>
#include <folly/executors/QueuedImmediateExecutor.h>
Expand Down Expand Up @@ -596,6 +597,7 @@ StopReason Driver::runInternal(
curOperatorId_,
kOpMethodGetOutput);
if (intermediateResult) {
std::cout << "driver" <<intermediateResult->toString(0) << std::endl;
VELOX_CHECK(
intermediateResult->size() > 0,
"Operator::getOutput() must return nullptr or "
Expand Down Expand Up @@ -703,6 +705,7 @@ StopReason Driver::runInternal(
curOperatorId_,
kOpMethodGetOutput);
if (result) {
std::cout << "final result " << result->toString(0) << std::endl;
VELOX_CHECK(
result->size() > 0,
"Operator::getOutput() must return nullptr or "
Expand Down
19 changes: 17 additions & 2 deletions velox/exec/FilterProject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/
#include "velox/exec/FilterProject.h"
#include <iostream>
#include "velox/core/Expressions.h"
#include "velox/expression/Expr.h"
#include "velox/expression/FieldReference.h"
Expand Down Expand Up @@ -122,10 +123,16 @@ bool FilterProject::isFinished() {
}

RowVectorPtr FilterProject::getOutput() {
// for (vector_size_t i = 0; i < input_->size(); i++) {
// std::cout << "FilterProject input:" << input_->toString(i) << std::endl;
// }

if (allInputProcessed()) {
return nullptr;
}

for (vector_size_t i = 0; i < input_->size(); i++) {
std::cout << "FilterProject input:" << input_->toString(i) << std::endl;
}
vector_size_t size = input_->size();
LocalSelectivityVector localRows(*operatorCtx_->execCtx(), size);
auto* rows = localRows.get();
Expand All @@ -140,10 +147,14 @@ RowVectorPtr FilterProject::getOutput() {
}

if (!hasFilter_) {

numProcessedInputRows_ = size;
VELOX_CHECK(!isIdentityProjection_);
auto results = project(*rows, evalCtx);

for (vector_size_t i = 0; i < results.size(); i++) {
std::cout << "FilterProject fillOutput:" << results.at(i)->toString()
<< std::endl;
}
return fillOutput(size, nullptr, results);
}

Expand All @@ -165,6 +176,10 @@ RowVectorPtr FilterProject::getOutput() {
}
results = project(*rows, evalCtx);
}
for (vector_size_t i = 0; i < results.size(); i++) {
std::cout << "FilterProject input:" << results.at(i)->toString()
<< std::endl;
}

return fillOutput(
numOut,
Expand Down
11 changes: 11 additions & 0 deletions velox/exec/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,7 @@ void GroupingSet::initializeGlobalAggregation() {
void GroupingSet::addGlobalAggregationInput(
const RowVectorPtr& input,
bool mayPushdown) {
std::cout << "GlobalAggregationInput " << input->toString(0) << std::endl;
initializeGlobalAggregation();

auto numRows = input->size();
Expand Down Expand Up @@ -584,8 +585,13 @@ bool GroupingSet::getGlobalAggregationOutput(
auto& resultVector = result->childAt(aggregates_[i].output);
if (isPartial_) {
function->extractAccumulators(groups, 1, &resultVector);
std::cout << "extractAccumulator result" << resultVector->toString(0)
<< std::endl;

} else {
function->extractValues(groups, 1, &resultVector);
std::cout << "extractValue result" << resultVector->toString(0)
<< std::endl;
}
}

Expand Down Expand Up @@ -763,8 +769,13 @@ void GroupingSet::extractGroups(
if (isPartial_) {
function->extractAccumulators(
groups.data(), groups.size(), &aggregateVector);
std::cout << "extractAccumulator result" << aggregateVector->toString()
<< std::endl;

} else {
function->extractValues(groups.data(), groups.size(), &aggregateVector);
std::cout << "extractValue result" << aggregateVector->toString()
<< std::endl;
}
}

Expand Down
2 changes: 2 additions & 0 deletions velox/exec/HashAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <iostream>
#include "velox/exec/HashAggregation.h"
#include <optional>
#include "velox/exec/Task.h"
Expand Down Expand Up @@ -317,6 +318,7 @@ RowVectorPtr HashAggregation::getOutput() {
return nullptr;
}
numOutputRows_ += output_->size();
std::cout << "hashAgg " << output_->toString(0) << std::endl;
return output_;
}

Expand Down
11 changes: 10 additions & 1 deletion velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/
#include "velox/exec/Operator.h"
#include <iostream>
#include "velox/common/base/Counters.h"
#include "velox/common/base/StatsReporter.h"
#include "velox/common/base/SuccinctPrinter.h"
Expand Down Expand Up @@ -207,6 +208,9 @@ RowVectorPtr Operator::fillOutput(
if (size == input_->size() &&
(!mapping || isSequence(mapping->as<vector_size_t>(), 0, size))) {
if (isIdentityProjection_) {
for (vector_size_t i = 0; i < input_->size(); i++) {
std::cout << "Identity projection:" << input_->toString(i) << std::endl;
}
return std::move(input_);
}
wrapResults = false;
Expand All @@ -226,12 +230,17 @@ RowVectorPtr Operator::fillOutput(
size,
wrapResults ? mapping : nullptr);

return std::make_shared<RowVector>(
auto res = std::make_shared<RowVector>(
operatorCtx_->pool(),
outputType_,
nullptr,
size,
std::move(projectedChildren));
for (vector_size_t i = 0; i < res->size(); i++) {
std::cout << "fill Output end: " << res->toString(i) << std::endl;
}

return res;
}

RowVectorPtr Operator::fillOutput(
Expand Down
55 changes: 1 addition & 54 deletions velox/exec/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,62 +28,9 @@ target_link_libraries(aggregate_companion_functions_test velox_exec

add_executable(
velox_exec_test
AddressableNonNullValueListTest.cpp
AggregationTest.cpp
AggregateFunctionRegistryTest.cpp
ArrowStreamTest.cpp
AssignUniqueIdTest.cpp
AsyncConnectorTest.cpp
ContainerRowSerdeTest.cpp
CustomJoinTest.cpp
EnforceSingleRowTest.cpp
ExchangeClientTest.cpp
ExpandTest.cpp
FilterProjectTest.cpp
FunctionResolutionTest.cpp
HashBitRangeTest.cpp
HashJoinBridgeTest.cpp
HashJoinTest.cpp
HashPartitionFunctionTest.cpp
HashTableTest.cpp
LimitTest.cpp
LocalPartitionTest.cpp
Main.cpp
MarkDistinctTest.cpp
MemoryReclaimerTest.cpp
MergeJoinTest.cpp
MergeTest.cpp
MultiFragmentTest.cpp
NestedLoopJoinTest.cpp
OrderByTest.cpp
OutputBufferManagerTest.cpp
PartitionedOutputTest.cpp
PlanNodeSerdeTest.cpp
PlanNodeToStringTest.cpp
PrefixSortTest.cpp
PrintPlanWithStatsTest.cpp
ProbeOperatorStateTest.cpp
RoundRobinPartitionFunctionTest.cpp
RowContainerTest.cpp
RowNumberTest.cpp
SortBufferTest.cpp
SpillerTest.cpp
SpillTest.cpp
SplitToStringTest.cpp
SqlTest.cpp
StreamingAggregationTest.cpp
TableScanTest.cpp
TableWriteTest.cpp
TaskListenerTest.cpp
ThreadDebugInfoTest.cpp
TopNRowNumberTest.cpp
TopNTest.cpp
UnnestTest.cpp
UnorderedStreamReaderTest.cpp
ValuesTest.cpp
VectorHasherTest.cpp
WindowFunctionRegistryTest.cpp
WindowTest.cpp)
)

add_executable(
velox_exec_infra_test
Expand Down
Loading

0 comments on commit 73e2885

Please sign in to comment.