Skip to content

Commit

Permalink
Initialize window function after driver execution started (facebookin…
Browse files Browse the repository at this point in the history
…cubator#6411)

Summary:
Move window function initialization from window operator constructor
to  operator initialize() method. The window function initialization need
allocate memory from memory pool.

Pull Request resolved: facebookincubator#6411

Reviewed By: tanjialiang

Differential Revision: D48959676

Pulled By: xiaoxmeng

fbshipit-source-id: af43a24aae021f01496a53a3b7449a3ca8d668f8
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Sep 6, 2023
1 parent bf3a1ae commit 9b1dda8
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 14 deletions.
3 changes: 3 additions & 0 deletions velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,9 @@ class Operator : public BaseRuntimeStatWriter {

/// Does initialization work for this operator which requires memory
/// allocation from memory pool that can't be done under operator constructor.
///
/// NOTE: the default implementation set 'initialized_' to true to ensure we
/// never call this more than once.
virtual void initialize();

/// Indicates if this operator has been initialized or not.
Expand Down
24 changes: 15 additions & 9 deletions velox/exec/Window.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,16 @@ Window::Window(
"Window"),
numInputColumns_(windowNode->sources()[0]->outputType()->size()),
windowBuild_(std::make_unique<SortWindowBuild>(windowNode, pool())),
windowNode_(windowNode),
currentPartition_(nullptr),
stringAllocator_(pool()) {
auto inputType = windowNode->sources()[0]->outputType();
createWindowFunctions(windowNode, inputType, driverCtx->queryConfig());
stringAllocator_(pool()) {}

void Window::initialize() {
Operator::initialize();
VELOX_CHECK_NOT_NULL(windowNode_);
createWindowFunctions();
createPeerAndFrameBuffers();
windowNode_.reset();
}

Window::WindowFrame Window::createWindowFrame(
Expand Down Expand Up @@ -91,11 +95,13 @@ Window::WindowFrame Window::createWindowFrame(
createFrameChannelArg(frame.endValue)});
}

void Window::createWindowFunctions(
const std::shared_ptr<const core::WindowNode>& windowNode,
const RowTypePtr& inputType,
const core::QueryConfig& config) {
for (const auto& windowNodeFunction : windowNode->windowFunctions()) {
void Window::createWindowFunctions() {
VELOX_CHECK_NOT_NULL(windowNode_);
VELOX_CHECK(windowFunctions_.empty());
VELOX_CHECK(windowFrames_.empty());

const auto& inputType = windowNode_->sources()[0]->outputType();
for (const auto& windowNodeFunction : windowNode_->windowFunctions()) {
std::vector<WindowFunctionArg> functionArgs;
functionArgs.reserve(windowNodeFunction.functionCall->inputs().size());
for (auto& arg : windowNodeFunction.functionCall->inputs()) {
Expand All @@ -117,7 +123,7 @@ void Window::createWindowFunctions(
windowNodeFunction.ignoreNulls,
operatorCtx_->pool(),
&stringAllocator_,
config));
operatorCtx_->driverCtx()->queryConfig()));

windowFrames_.push_back(
createWindowFrame(windowNodeFunction.frame, inputType));
Expand Down
15 changes: 10 additions & 5 deletions velox/exec/Window.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ class Window : public Operator {
DriverCtx* driverCtx,
const std::shared_ptr<const core::WindowNode>& windowNode);

/// Initialize the window functions from 'windowNode_' once by driver operator
/// initialization. 'windowNode_' is reset after this call.
void initialize() override;

void addInput(RowVectorPtr input) override;

RowVectorPtr getOutput() override;
Expand Down Expand Up @@ -82,10 +86,7 @@ class Window : public Operator {
};

// Creates WindowFunction and frame objects for this operator.
void createWindowFunctions(
const std::shared_ptr<const core::WindowNode>& windowNode,
const RowTypePtr& inputType,
const core::QueryConfig& config);
void createWindowFunctions();

// Creates the buffers for peer and frame row
// indices to send in window function apply invocations.
Expand Down Expand Up @@ -149,7 +150,11 @@ class Window : public Operator {

// WindowBuild is used to store input rows and return WindowPartitions
// for the processing.
std::unique_ptr<WindowBuild> windowBuild_;
const std::unique_ptr<WindowBuild> windowBuild_;

// The cached window plan node used for window function initialization. It is
// reset after the initialization.
std::shared_ptr<const core::WindowNode> windowNode_;

// Used to access window partition rows and columns by the window
// operator and functions. This structure is owned by the WindowBuild.
Expand Down

0 comments on commit 9b1dda8

Please sign in to comment.