Skip to content

Commit

Permalink
feat: Add index lookup join plan node (facebookincubator#12163)
Browse files Browse the repository at this point in the history
Summary:

Add index lookup join plan node and add a method in table handle to indicate if the table support index lookup
or not.

Reviewed By: mbasmanova

Differential Revision: D68429744
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Jan 24, 2025
1 parent 276c1db commit 83e865f
Show file tree
Hide file tree
Showing 7 changed files with 472 additions and 2 deletions.
5 changes: 5 additions & 0 deletions velox/connectors/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ class ConnectorTableHandle : public ISerializable {
VELOX_UNSUPPORTED();
}

/// Returns true if the connector table handle supports index lookup.
virtual bool supportsIndexLookup() const {
return false;
}

virtual folly::dynamic serialize() const override;

protected:
Expand Down
81 changes: 81 additions & 0 deletions velox/core/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,17 @@ std::vector<PlanNodePtr> deserializeSources(
return {};
}

std::vector<TypedExprPtr> deserializeJoinConditions(
const folly::dynamic& obj,
void* context) {
if (obj.count("joinConditions") == 0) {
return {};
}

return ISerializable::deserialize<std::vector<ITypedExpr>>(
obj["joinConditions"], context);
}

PlanNodePtr deserializeSingleSource(const folly::dynamic& obj, void* context) {
auto sources = deserializeSources(obj, context);
VELOX_CHECK_EQ(1, sources.size());
Expand Down Expand Up @@ -1405,6 +1416,75 @@ PlanNodePtr MergeJoinNode::create(const folly::dynamic& obj, void* context) {
outputType);
}

PlanNodePtr IndexLookupJoinNode::create(
const folly::dynamic& obj,
void* context) {
auto sources = deserializeSources(obj, context);
VELOX_CHECK_EQ(2, sources.size());
TableScanNodePtr lookupSource =
std::dynamic_pointer_cast<const TableScanNode>(sources[1]);
VELOX_CHECK_NOT_NULL(lookupSource);

auto leftKeys = deserializeFields(obj["leftKeys"], context);
auto rightKeys = deserializeFields(obj["rightKeys"], context);

VELOX_CHECK_EQ(obj.count("filter"), 0);

auto joinConditions = deserializeJoinConditions(obj, context);

auto outputType = deserializeRowType(obj["outputType"]);

return std::make_shared<IndexLookupJoinNode>(
deserializePlanNodeId(obj),
joinTypeFromName(obj["joinType"].asString()),
std::move(leftKeys),
std::move(rightKeys),
std::move(joinConditions),
sources[0],
std::move(lookupSource),
std::move(outputType));
}

folly::dynamic IndexLookupJoinNode::serialize() const {
auto obj = serializeBase();
if (!joinConditions_.empty()) {
folly::dynamic serializedJoins = folly::dynamic::array;
serializedJoins.reserve(joinConditions_.size());
for (const auto& joinCondition : joinConditions_) {
serializedJoins.push_back(joinCondition->serialize());
}
obj["joinConditions"] = std::move(serializedJoins);
}
return obj;
}

void IndexLookupJoinNode::addDetails(std::stringstream& stream) const {
AbstractJoinNode::addDetails(stream);
if (joinConditions_.empty()) {
return;
}

std::vector<std::string> joinConditionStrs;
joinConditionStrs.reserve(joinConditions_.size());
for (const auto& joinCondition : joinConditions_) {
joinConditionStrs.push_back(joinCondition->toString());
}
stream << ", joinConditions: [" << folly::join(", ", joinConditionStrs)
<< " ]";
}

// static
bool IndexLookupJoinNode::isSupported(core::JoinType joinType) {
switch (joinType) {
case core::JoinType::kInner:
[[fallthrough]];
case core::JoinType::kLeft:
return true;
default:
return false;
}
}

NestedLoopJoinNode::NestedLoopJoinNode(
const PlanNodeId& id,
JoinType joinType,
Expand Down Expand Up @@ -2726,6 +2806,7 @@ void PlanNode::registerSerDe() {
registry.Register("HashJoinNode", HashJoinNode::create);
registry.Register("MergeExchangeNode", MergeExchangeNode::create);
registry.Register("MergeJoinNode", MergeJoinNode::create);
registry.Register("IndexLookupJoinNode", IndexLookupJoinNode::create);
registry.Register("NestedLoopJoinNode", NestedLoopJoinNode::create);
registry.Register("LimitNode", LimitNode::create);
registry.Register("LocalMergeNode", LocalMergeNode::create);
Expand Down
108 changes: 107 additions & 1 deletion velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -1762,12 +1762,118 @@ class MergeJoinNode : public AbstractJoinNode {

folly::dynamic serialize() const override;

/// If merge join supports this join type.
/// Returns true if the merge join supports this join type, otherwise false.
static bool isSupported(JoinType joinType);

static PlanNodePtr create(const folly::dynamic& obj, void* context);
};

/// Represents index lookup join. Translates to an exec::IndexLookupJoin
/// operator. Assumes the right input is a table scan source node that provides
/// indexed table lookup for the left input with the specified join keys and
/// conditions. The join keys must be a prefix of the index columns of the
/// lookup table. Each join condition must use columns from both sides. For the
/// right side, it can only use one index column. Each index column can either
/// be a join key or a join condition once. The table scan node of the right
/// input is translated to a connector::IndexSource within
/// exec::IndexLookupJoin.
///
/// Only INNER and LEFT joins are supported.
///
/// Take the following query for example, t is left table, r is the right table
/// with indexed columns. 'sid' is the join keys. 'u.event_type in t.event_list'
/// is the join condition.
///
/// SELECT t.sid, t.day_ts, u.event_type
/// FROM t LEFT JOIN u
/// ON t.sid = u.sid
/// AND contains(t.event_list, u.event_type)
/// AND t.ds BETWEEN '2024-01-01' AND '2024-01-07'
///
/// Here,
/// - 'joinType' is JoinType::kLeft
/// - 'left' describes scan of t with a filter on 'ds':t.ds BETWEEN '2024-01-01'
/// AND '2024-01-07'
/// - 'right' describes indexed table 'u' with ndex keys sid, event_type(and
/// maybe some more)
/// - 'leftKeys' is a list of one key 't.sid'
/// - 'rightKeys' is a list of one key 'u.sid'
/// - 'joinConditions' is a list of one expression: contains(t.event_list,
/// u.event_type)
/// - 'outputType' contains 3 columns : t.sid, t.day_ts, u.event_type
///
class IndexLookupJoinNode : public AbstractJoinNode {
public:
/// @param joinType Specifies the lookup join type. Only INNER and LEFT joins
/// are supported.
IndexLookupJoinNode(
const PlanNodeId& id,
JoinType joinType,
const std::vector<FieldAccessTypedExprPtr>& leftKeys,
const std::vector<FieldAccessTypedExprPtr>& rightKeys,
const std::vector<TypedExprPtr>& joinConditions,
PlanNodePtr left,
TableScanNodePtr right,
RowTypePtr outputType)
: AbstractJoinNode(
id,
joinType,
leftKeys,
rightKeys,
/*filter=*/nullptr,
std::move(left),
right,
outputType),
lookupSourceNode_(std::move(right)),
joinConditions_(joinConditions) {
VELOX_USER_CHECK(
!leftKeys.empty(),
"The lookup join node requires at least one join key");
VELOX_USER_CHECK_EQ(
leftKeys_.size(),
rightKeys_.size(),
"The lookup join node requires same number of join keys on left and right sides");
// TODO: add check that (1) 'rightKeys_' form an index prefix. each of
// 'joinConditions_' uses columns from both sides and uses exactly one index
// column from the right side.
VELOX_USER_CHECK(
lookupSourceNode_->tableHandle()->supportsIndexLookup(),
"The lookup table handle {} from connector {} doesn't support index lookup",
lookupSourceNode_->tableHandle()->name(),
lookupSourceNode_->tableHandle()->connectorId());
VELOX_USER_CHECK(
isSupported(joinType_),
"Unsupported index lookup join type {}",
joinTypeName(joinType_));
}

const TableScanNodePtr& lookupSource() const {
return lookupSourceNode_;
}

const std::vector<TypedExprPtr>& joinConditions() const {
return joinConditions_;
}

std::string_view name() const override {
return "IndexLookupJoin";
}

folly::dynamic serialize() const override;

static PlanNodePtr create(const folly::dynamic& obj, void* context);

/// Returns true if the lookup join supports this join type, otherwise false.
static bool isSupported(JoinType joinType);

private:
void addDetails(std::stringstream& stream) const override;

const TableScanNodePtr lookupSourceNode_;

const std::vector<TypedExprPtr> joinConditions_;
};

/// Represents inner/outer nested loop joins. Translates to an
/// exec::NestedLoopJoinProbe and exec::NestedLoopJoinBuild. A separate
/// pipeline is produced for the build side when generating exec::Operators.
Expand Down
3 changes: 2 additions & 1 deletion velox/exec/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,11 @@ add_executable(
FilterProjectTest.cpp
FunctionResolutionTest.cpp
HashBitRangeTest.cpp
HashJoinBridgeTest.cpp
HashJoinBridgeTesht.cpp
HashJoinTest.cpp
HashPartitionFunctionTest.cpp
HashTableTest.cpp
IndexLookupJoinTest.cpp
LimitTest.cpp
LocalPartitionTest.cpp
Main.cpp
Expand Down
Loading

0 comments on commit 83e865f

Please sign in to comment.