diff --git a/velox/expression/Expr.cpp b/velox/expression/Expr.cpp index a901f3217793..8be2572bd7a5 100644 --- a/velox/expression/Expr.cpp +++ b/velox/expression/Expr.cpp @@ -852,16 +852,27 @@ void Expr::evaluateSharedSubexpr( VectorPtr& result, TEval eval) { // Captures the inputs referenced by distinctFields_. - std::vector expressionInputFields; + InputForSharedResults expressionInputFields; for (auto* field : distinctFields_) { - expressionInputFields.push_back( - context.getField(field->index(context)).get()); + expressionInputFields.addInput(context.getField(field->index(context))); } // Find the cached results for the same inputs, or create an entry if one // doesn't exist. auto sharedSubexprResultsIter = sharedSubexprResults_.find(expressionInputFields); + + // If any of the input vector is freed/expired, remove the entry from the + // results cache. + if (sharedSubexprResultsIter != sharedSubexprResults_.end() && + sharedSubexprResultsIter->first.isExpired()) { + sharedSubexprResults_.erase(sharedSubexprResultsIter); + VELOX_DCHECK( + sharedSubexprResults_.find(expressionInputFields) == + sharedSubexprResults_.end()); + sharedSubexprResultsIter = sharedSubexprResults_.end(); + } + if (sharedSubexprResultsIter == sharedSubexprResults_.end()) { auto maxSharedSubexprResultsCached = context.execCtx() ->queryCtx() diff --git a/velox/expression/Expr.h b/velox/expression/Expr.h index 2880b99d48c6..b425fd392bc9 100644 --- a/velox/expression/Expr.h +++ b/velox/expression/Expr.h @@ -608,6 +608,37 @@ class Expr { std::vector inputValues_; + /// Represents a set of inputs referenced by 'distinctFields_' that are + /// captured when the 'evaluateSharedSubexpr()' method is called on a shared + /// sub-expression. The purpose of this class is to ensure that cached + /// results are re-used for the correct set of live input vectors. + class InputForSharedResults { + public: + void addInput(const std::shared_ptr& input) { + inputVectors_.push_back(input.get()); + inputWeakVectors_.push_back(input); + } + + bool operator<(const InputForSharedResults& other) const { + return inputVectors_ < other.inputVectors_; + } + + bool isExpired() const { + for (const auto& input : inputWeakVectors_) { + if (input.expired()) { + return true; + } + } + return false; + } + + private: + // Used as a key in a map that keeps track of cached results. + std::vector inputVectors_; + // Used to check if inputs have expired. + std::vector> inputWeakVectors_; + }; + struct SharedResults { // The rows for which 'sharedSubexprValues_' has a value. std::unique_ptr sharedSubexprRows_ = nullptr; @@ -617,7 +648,7 @@ class Expr { // Maps the inputs referenced by distinctFields_ captuered when // evaluateSharedSubexpr() is called to the cached shared results. - std::map, SharedResults> sharedSubexprResults_; + std::map sharedSubexprResults_; // Pointers to the last base vector of cachable dictionary input. Used to // check if the current input's base vector is the same as the last. If it's