Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Improve isin performance #1203

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,18 @@ def project_window_op(
output_name,
)

def isin(
self, other: ArrayValue, lcol: str, rcol: str
) -> typing.Tuple[ArrayValue, str]:
node = nodes.InNode(
self.node,
other.node,
ex.deref(lcol),
ex.deref(rcol),
indicator_col=ids.ColumnId.unique(),
)
return ArrayValue(node), node.indicator_col.name

def relational_join(
self,
other: ArrayValue,
Expand Down
16 changes: 4 additions & 12 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2033,23 +2033,15 @@ def isin(self, other: Block):
return block

def _isin_inner(self: Block, col: str, unique_values: core.ArrayValue) -> Block:
unique_values, const = unique_values.create_constant(
True, dtype=bigframes.dtypes.BOOL_DTYPE
)
expr, (l_map, r_map) = self._expr.relational_join(
unique_values, ((col, unique_values.column_ids[0]),), type="left"
)
expr, matches = expr.project_to_id(ops.notnull_op.as_expr(r_map[const]))
expr, matches = self._expr.isin(unique_values, col, unique_values.column_ids[0])

new_index_cols = tuple(l_map[idx_col] for idx_col in self.index_columns)
new_value_cols = tuple(
l_map[val_col] if val_col != col else matches
for val_col in self.value_columns
val_col if val_col != col else matches for val_col in self.value_columns
)
expr = expr.select_columns((*new_index_cols, *new_value_cols))
expr = expr.select_columns((*self.index_columns, *new_value_cols))
return Block(
expr,
index_columns=new_index_cols,
index_columns=self.index_columns,
column_labels=self.column_labels,
index_labels=self._index_labels,
)
Expand Down
22 changes: 22 additions & 0 deletions bigframes/core/compile/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import bigframes.core.compile.concat as concat_impl
import bigframes.core.compile.default_ordering as default_ordering
import bigframes.core.compile.ibis_types
import bigframes.core.compile.isin
import bigframes.core.compile.scalar_op_compiler
import bigframes.core.compile.scalar_op_compiler as compile_scalar
import bigframes.core.compile.schema_translator
Expand Down Expand Up @@ -164,6 +165,27 @@ def compile_join(self, node: nodes.JoinNode, ordered: bool = True):
conditions=condition_pairs,
)

@_compile_node.register
def compile_isin(self, node: nodes.InNode, ordered: bool = True):
if ordered:
left_ordered = self.compile_ordered_ir(node.left_child)
right_ordered = self.compile_unordered_ir(node.right_child)
return bigframes.core.compile.isin.isin_ordered(
left=left_ordered,
right=right_ordered,
indicator_col=node.indicator_col.sql,
conditions=(node.left_col.id.sql, node.right_col.id.sql),
)
else:
left_unordered = self.compile_unordered_ir(node.left_child)
right_unordered = self.compile_unordered_ir(node.right_child)
return bigframes.core.compile.isin.isin_unordered(
left=left_unordered,
right=right_unordered,
indicator_col=node.indicator_col.sql,
conditions=(node.left_col.id.sql, node.right_col.id.sql),
)

@_compile_node.register
def compile_fromrange(self, node: nodes.FromRangeNode, ordered: bool = True):
# Both start and end are single elements and do not inherently have an order
Expand Down
114 changes: 114 additions & 0 deletions bigframes/core/compile/isin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Helpers to join ArrayValue objects."""

from __future__ import annotations

import itertools
from typing import Tuple

import bigframes_vendored.ibis.expr.datatypes as ibis_dtypes
import bigframes_vendored.ibis.expr.types as ibis_types

import bigframes.core.compile.compiled as compiled


def isin_ordered(
left: compiled.OrderedIR,
right: compiled.UnorderedIR,
indicator_col: str,
conditions: Tuple[str, str],
) -> compiled.OrderedIR:
"""Join two expressions by column equality.

Arguments:
left: Expression for left table to join.
right: Expression for right table to join.
indicator_col: the output column, indicating if left elements are in right elements
conditions: Id pairs to compare
Returns:
The joined expression.
"""
left_table = left._to_ibis_expr(
ordering_mode="unordered",
expose_hidden_cols=True,
)
right_table = right._to_ibis_expr()
new_column = (
value_to_join_key(left_table[conditions[0]])
.isin(value_to_join_key(right_table[conditions[1]]))
.name(indicator_col)
)

columns = tuple(
itertools.chain(
(left_table[col.get_name()] for col in left.columns), (new_column,)
)
)

return compiled.OrderedIR(
left_table,
columns=columns,
hidden_ordering_columns=tuple(
left_table[col] for col in left._hidden_column_ids
),
ordering=left._ordering,
)


def isin_unordered(
left: compiled.UnorderedIR,
right: compiled.UnorderedIR,
indicator_col: str,
conditions: Tuple[str, str],
) -> compiled.UnorderedIR:
"""Join two expressions by column equality.

Arguments:
left: Expression for left table to join.
right: Expression for right table to join.
conditions: Id pairs to compare
Returns:
The joined expression.
"""
left_table = left._to_ibis_expr()
right_table = right._to_ibis_expr()
new_column = (
value_to_join_key(left_table[conditions[0]])
.isin(value_to_join_key(right_table[conditions[1]]))
.name(indicator_col)
)

columns = tuple(
itertools.chain(
(left_table[col.get_name()] for col in left.columns), (new_column,)
)
)

return compiled.UnorderedIR(
left_table,
columns=columns,
)


def value_to_join_key(value: ibis_types.Value):
"""Converts nullable values to non-null string SQL will not match null keys together - but pandas does."""
if not value.type().is_string():
value = value.cast(ibis_dtypes.str)
return (
value.fill_null(ibis_types.literal("$NULL_SENTINEL$"))
if hasattr(value, "fill_null")
else value.fillna(ibis_types.literal("$NULL_SENTINEL$"))
)
93 changes: 93 additions & 0 deletions bigframes/core/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,99 @@ def remap_refs(self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId]):
return self


@dataclasses.dataclass(frozen=True, eq=False)
class InNode(BigFrameNode):
"""
Special Join Type that only returns rows from the left side, as well as adding a bool column indicating whether a match exists on the right side.

Modelled separately from join node, as this operation preserves row identity.
"""

left_child: BigFrameNode
right_child: BigFrameNode
left_col: ex.DerefOp
right_col: ex.DerefOp
indicator_col: bfet_ids.ColumnId

def _validate(self):
assert not (
set(self.left_child.ids) & set(self.right_child.ids)
), "Join ids collide"

@property
def row_preserving(self) -> bool:
return False

@property
def non_local(self) -> bool:
return True

@property
def child_nodes(self) -> typing.Sequence[BigFrameNode]:
return (self.left_child, self.right_child)

@property
def order_ambiguous(self) -> bool:
return False

@property
def explicitly_ordered(self) -> bool:
# Preserves left ordering always
return True

@property
def projection_base(self) -> BigFrameNode:
return self.left_child.projection_base

@property
def fields(self) -> Iterable[Field]:
return itertools.chain(
self.left_child.fields,
(Field(self.indicator_col, bigframes.dtypes.BOOL_DTYPE),),
)

@functools.cached_property
def variables_introduced(self) -> int:
"""Defines the number of variables generated by the current node. Used to estimate query planning complexity."""
return 1

@property
def joins(self) -> bool:
return True

@property
def row_count(self) -> Optional[int]:
return self.left_child.row_count

@property
def node_defined_ids(self) -> Tuple[bfet_ids.ColumnId, ...]:
return (self.indicator_col,)

def transform_children(
self, t: Callable[[BigFrameNode], BigFrameNode]
) -> BigFrameNode:
transformed = dataclasses.replace(
self, left_child=t(self.left_child), right_child=t(self.right_child)
)
if self == transformed:
# reusing existing object speeds up eq, and saves a small amount of memory
return self
return transformed

def prune(self, used_cols: COLUMN_SET) -> BigFrameNode:
return self

def remap_vars(
self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId]
) -> BigFrameNode:
return dataclasses.replace(
self, indicator_col=mappings.get(self.indicator_col, self.indicator_col)
)

def remap_refs(self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId]):
return dataclasses.replace(self, left_col=self.left_col.remap_column_refs(mappings, allow_partial_bindings=True), right_col=self.right_col.remap_column_refs(mappings, allow_partial_bindings=True)) # type: ignore


@dataclasses.dataclass(frozen=True, eq=False)
class JoinNode(BigFrameNode):
left_child: BigFrameNode
Expand Down
34 changes: 32 additions & 2 deletions bigframes/core/rewrite/implicit_align.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from __future__ import annotations

import dataclasses
from typing import Optional, Tuple
from typing import cast, Optional, Tuple

import bigframes.core.expression
import bigframes.core.guid
Expand Down Expand Up @@ -68,14 +68,15 @@ def get_expression_spec(
(
bigframes.core.nodes.WindowOpNode,
bigframes.core.nodes.PromoteOffsetsNode,
bigframes.core.nodes.InNode,
),
):
# we don't yet have a way of normalizing window ops into a ExpressionSpec, which only
# handles normalizing scalar expressions at the moment.
pass
else:
return ExpressionSpec(expression, curr_node)
curr_node = curr_node.child
curr_node = curr_node.child_nodes[0]


def _linearize_trees(
Expand All @@ -87,6 +88,10 @@ def _linearize_trees(
# base case: append tree does not have any additive nodes to linearize
if append_tree == append_tree.projection_base:
return base_tree
if isinstance(append_tree, bigframes.core.nodes.InNode):
return dataclasses.replace(
append_tree, left_child=_linearize_trees(base_tree, append_tree.left_child)
)
else:
assert isinstance(append_tree, ADDITIVE_NODES)
return append_tree.replace_child(_linearize_trees(base_tree, append_tree.child))
Expand Down Expand Up @@ -152,6 +157,31 @@ def pull_up_selection(
(bigframes.core.expression.DerefOp(field.id), field.id)
for field in node.fields
)
# InNode needs special handling, as its a binary node, but row identity is from left side only.
# TODO: Merge code with unary op paths
if isinstance(node, bigframes.core.nodes.InNode):
child_node, child_selections = pull_up_selection(
node.left_child, rename_vars=rename_vars
)
mapping = {out: ref.id for ref, out in child_selections}

new_in_node: bigframes.core.nodes.InNode = dataclasses.replace(
node, left_child=child_node
)
new_in_node = new_in_node.remap_refs(mapping)
if rename_vars:
new_in_node = cast(
bigframes.core.nodes.InNode,
new_in_node.remap_vars(
{node.indicator_col: bigframes.core.identifiers.ColumnId.unique()}
),
)
added_selection = (
bigframes.core.expression.DerefOp(new_in_node.indicator_col),
node.indicator_col,
)
new_selection = (*child_selections, added_selection)
return new_in_node, new_selection
assert isinstance(node, (bigframes.core.nodes.SelectionNode, *ADDITIVE_NODES))
child_node, child_selections = pull_up_selection(
node.child, rename_vars=rename_vars
Expand Down
Loading
Loading