From c5d5cca454218321747d7cae37056083e1ddc864 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 11 Dec 2024 02:44:53 +0000 Subject: [PATCH 1/2] perf: Improve isin performance --- bigframes/core/__init__.py | 12 ++++ bigframes/core/blocks.py | 16 ++--- bigframes/core/compile/compiler.py | 22 ++++++ bigframes/core/compile/isin.py | 111 +++++++++++++++++++++++++++++ bigframes/core/nodes.py | 89 +++++++++++++++++++++++ 5 files changed, 238 insertions(+), 12 deletions(-) create mode 100644 bigframes/core/compile/isin.py diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index 3b1bf48558..a63999d5d2 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -412,6 +412,18 @@ def project_window_op( output_name, ) + def isin( + self, other: ArrayValue, lcol: str, rcol: str + ) -> typing.Tuple[ArrayValue, str]: + node = nodes.InNode( + self.node, + other.node, + ex.deref(lcol), + ex.deref(rcol), + indicator_col=ids.ColumnId.unique(), + ) + return ArrayValue(node), node.indicator_col.name + def relational_join( self, other: ArrayValue, diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index d1b2f91d60..a286b658a5 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -2033,23 +2033,15 @@ def isin(self, other: Block): return block def _isin_inner(self: Block, col: str, unique_values: core.ArrayValue) -> Block: - unique_values, const = unique_values.create_constant( - True, dtype=bigframes.dtypes.BOOL_DTYPE - ) - expr, (l_map, r_map) = self._expr.relational_join( - unique_values, ((col, unique_values.column_ids[0]),), type="left" - ) - expr, matches = expr.project_to_id(ops.notnull_op.as_expr(r_map[const])) + expr, matches = self._expr.isin(unique_values, col, unique_values.column_ids[0]) - new_index_cols = tuple(l_map[idx_col] for idx_col in self.index_columns) new_value_cols = tuple( - l_map[val_col] if val_col != col else matches - for val_col in self.value_columns + val_col if val_col != col else matches for val_col in self.value_columns ) - expr = expr.select_columns((*new_index_cols, *new_value_cols)) + expr = expr.select_columns((*self.index_columns, *new_value_cols)) return Block( expr, - index_columns=new_index_cols, + index_columns=self.index_columns, column_labels=self.column_labels, index_labels=self._index_labels, ) diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index 295e323843..cc21306c40 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -28,6 +28,7 @@ import bigframes.core.compile.concat as concat_impl import bigframes.core.compile.default_ordering as default_ordering import bigframes.core.compile.ibis_types +import bigframes.core.compile.isin import bigframes.core.compile.scalar_op_compiler import bigframes.core.compile.scalar_op_compiler as compile_scalar import bigframes.core.compile.schema_translator @@ -164,6 +165,27 @@ def compile_join(self, node: nodes.JoinNode, ordered: bool = True): conditions=condition_pairs, ) + @_compile_node.register + def compile_isin(self, node: nodes.InNode, ordered: bool = True): + if ordered: + left_ordered = self.compile_ordered_ir(node.left_child) + right_ordered = self.compile_unordered_ir(node.right_child) + return bigframes.core.compile.isin.isin_ordered( + left=left_ordered, + right=right_ordered, + indicator_col=node.indicator_col.sql, + conditions=(node.left_col.id.sql, node.right_col.id.sql), + ) + else: + left_unordered = self.compile_unordered_ir(node.left_child) + right_unordered = self.compile_unordered_ir(node.right_child) + return bigframes.core.compile.isin.isin_unordered( + left=left_unordered, + right=right_unordered, + indicator_col=node.indicator_col.sql, + conditions=(node.left_col.id.sql, node.right_col.id.sql), + ) + @_compile_node.register def compile_fromrange(self, node: nodes.FromRangeNode, ordered: bool = True): # Both start and end are single elements and do not inherently have an order diff --git a/bigframes/core/compile/isin.py b/bigframes/core/compile/isin.py new file mode 100644 index 0000000000..9c5f6c0719 --- /dev/null +++ b/bigframes/core/compile/isin.py @@ -0,0 +1,111 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Helpers to join ArrayValue objects.""" + +from __future__ import annotations + +import itertools +from typing import Tuple + +import bigframes_vendored.ibis.expr.datatypes as ibis_dtypes +import bigframes_vendored.ibis.expr.types as ibis_types + +import bigframes.core.compile.compiled as compiled + + +def isin_ordered( + left: compiled.OrderedIR, + right: compiled.UnorderedIR, + indicator_col: str, + conditions: Tuple[str, str], +) -> compiled.OrderedIR: + """Join two expressions by column equality. + + Arguments: + left: Expression for left table to join. + right: Expression for right table to join. + conditions: Id pairs to compare + Returns: + The joined expression. + """ + left_table = left._to_ibis_expr( + ordering_mode="unordered", + expose_hidden_cols=True, + ) + right_table = right._to_ibis_expr() + new_column = ( + value_to_join_key(left_table[conditions[0]]) + .isin(value_to_join_key(right_table[conditions[1]])) + .name(indicator_col) + ) + + columns = tuple( + itertools.chain( + (left_table[col.get_name()] for col in left.columns), (new_column,) + ) + ) + + return compiled.OrderedIR( + left_table, + columns=columns, + hidden_ordering_columns=left._hidden_column_ids, + ordering=left._ordering, + ) + + +def isin_unordered( + left: compiled.UnorderedIR, + right: compiled.UnorderedIR, + indicator_col: str, + conditions: Tuple[str, str], +) -> compiled.UnorderedIR: + """Join two expressions by column equality. + + Arguments: + left: Expression for left table to join. + right: Expression for right table to join. + conditions: Id pairs to compare + Returns: + The joined expression. + """ + left_table = left._to_ibis_expr() + right_table = right._to_ibis_expr() + new_column = ( + value_to_join_key(left_table[conditions[0]]) + .isin(value_to_join_key(right_table[conditions[1]])) + .name(indicator_col) + ) + + columns = tuple( + itertools.chain( + (left_table[col.get_name()] for col in left.columns), (new_column,) + ) + ) + + return compiled.UnorderedIR( + left_table, + columns=columns, + ) + + +def value_to_join_key(value: ibis_types.Value): + """Converts nullable values to non-null string SQL will not match null keys together - but pandas does.""" + if not value.type().is_string(): + value = value.cast(ibis_dtypes.str) + return ( + value.fill_null(ibis_types.literal("$NULL_SENTINEL$")) + if hasattr(value, "fill_null") + else value.fillna(ibis_types.literal("$NULL_SENTINEL$")) + ) diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index 420348cca9..e52135325e 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -386,6 +386,95 @@ def remap_refs(self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId]): return self +@dataclasses.dataclass(frozen=True, eq=False) +class InNode(BigFrameNode): + """ + Special Join Type that only returns rows from the left side, as well as adding a bool column indicating whether a match exists on the right side. + + Modelled separately from join node, as this operation preserves row identity. + """ + + left_child: BigFrameNode + right_child: BigFrameNode + left_col: ex.DerefOp + right_col: ex.DerefOp + indicator_col: bfet_ids.ColumnId + + def _validate(self): + assert not ( + set(self.left_child.ids) & set(self.right_child.ids) + ), "Join ids collide" + + @property + def row_preserving(self) -> bool: + return False + + @property + def non_local(self) -> bool: + return True + + @property + def child_nodes(self) -> typing.Sequence[BigFrameNode]: + return (self.left_child, self.right_child) + + @property + def order_ambiguous(self) -> bool: + return False + + @property + def explicitly_ordered(self) -> bool: + # Preserves left ordering always + return True + + @property + def fields(self) -> Iterable[Field]: + return itertools.chain( + self.left_child.fields, + (Field(self.indicator_col, bigframes.dtypes.BOOL_DTYPE),), + ) + + @functools.cached_property + def variables_introduced(self) -> int: + """Defines the number of variables generated by the current node. Used to estimate query planning complexity.""" + return 1 + + @property + def joins(self) -> bool: + return True + + @property + def row_count(self) -> Optional[int]: + return self.left_child.row_count + + @property + def node_defined_ids(self) -> Tuple[bfet_ids.ColumnId, ...]: + return (self.indicator_col,) + + def transform_children( + self, t: Callable[[BigFrameNode], BigFrameNode] + ) -> BigFrameNode: + transformed = dataclasses.replace( + self, left_child=t(self.left_child), right_child=t(self.right_child) + ) + if self == transformed: + # reusing existing object speeds up eq, and saves a small amount of memory + return self + return transformed + + def prune(self, used_cols: COLUMN_SET) -> BigFrameNode: + return self + + def remap_vars( + self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId] + ) -> BigFrameNode: + return dataclasses.replace( + self, indicator_col=mappings.get(self.indicator_col, self.indicator_col) + ) + + def remap_refs(self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId]): + return dataclasses.replace(self, left_col=self.left_col.remap_column_refs(mappings), right_col=self.right_col.remap_column_refs(mappings)) # type: ignore + + @dataclasses.dataclass(frozen=True, eq=False) class JoinNode(BigFrameNode): left_child: BigFrameNode From 8fd157a28590ae1661ceadf925c7bb6556b68651 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 11 Dec 2024 20:30:07 +0000 Subject: [PATCH 2/2] add implicit joiner support for isin --- bigframes/core/compile/isin.py | 5 ++- bigframes/core/nodes.py | 6 +++- bigframes/core/rewrite/implicit_align.py | 34 ++++++++++++++++-- tests/system/small/test_series.py | 45 ++++++++++++++++++++++++ 4 files changed, 86 insertions(+), 4 deletions(-) diff --git a/bigframes/core/compile/isin.py b/bigframes/core/compile/isin.py index 9c5f6c0719..2d32b8ce1a 100644 --- a/bigframes/core/compile/isin.py +++ b/bigframes/core/compile/isin.py @@ -36,6 +36,7 @@ def isin_ordered( Arguments: left: Expression for left table to join. right: Expression for right table to join. + indicator_col: the output column, indicating if left elements are in right elements conditions: Id pairs to compare Returns: The joined expression. @@ -60,7 +61,9 @@ def isin_ordered( return compiled.OrderedIR( left_table, columns=columns, - hidden_ordering_columns=left._hidden_column_ids, + hidden_ordering_columns=tuple( + left_table[col] for col in left._hidden_column_ids + ), ordering=left._ordering, ) diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index e52135325e..b2d9626875 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -426,6 +426,10 @@ def explicitly_ordered(self) -> bool: # Preserves left ordering always return True + @property + def projection_base(self) -> BigFrameNode: + return self.left_child.projection_base + @property def fields(self) -> Iterable[Field]: return itertools.chain( @@ -472,7 +476,7 @@ def remap_vars( ) def remap_refs(self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId]): - return dataclasses.replace(self, left_col=self.left_col.remap_column_refs(mappings), right_col=self.right_col.remap_column_refs(mappings)) # type: ignore + return dataclasses.replace(self, left_col=self.left_col.remap_column_refs(mappings, allow_partial_bindings=True), right_col=self.right_col.remap_column_refs(mappings, allow_partial_bindings=True)) # type: ignore @dataclasses.dataclass(frozen=True, eq=False) diff --git a/bigframes/core/rewrite/implicit_align.py b/bigframes/core/rewrite/implicit_align.py index 1d7fed09d2..2785b1895a 100644 --- a/bigframes/core/rewrite/implicit_align.py +++ b/bigframes/core/rewrite/implicit_align.py @@ -14,7 +14,7 @@ from __future__ import annotations import dataclasses -from typing import Optional, Tuple +from typing import cast, Optional, Tuple import bigframes.core.expression import bigframes.core.guid @@ -68,6 +68,7 @@ def get_expression_spec( ( bigframes.core.nodes.WindowOpNode, bigframes.core.nodes.PromoteOffsetsNode, + bigframes.core.nodes.InNode, ), ): # we don't yet have a way of normalizing window ops into a ExpressionSpec, which only @@ -75,7 +76,7 @@ def get_expression_spec( pass else: return ExpressionSpec(expression, curr_node) - curr_node = curr_node.child + curr_node = curr_node.child_nodes[0] def _linearize_trees( @@ -87,6 +88,10 @@ def _linearize_trees( # base case: append tree does not have any additive nodes to linearize if append_tree == append_tree.projection_base: return base_tree + if isinstance(append_tree, bigframes.core.nodes.InNode): + return dataclasses.replace( + append_tree, left_child=_linearize_trees(base_tree, append_tree.left_child) + ) else: assert isinstance(append_tree, ADDITIVE_NODES) return append_tree.replace_child(_linearize_trees(base_tree, append_tree.child)) @@ -152,6 +157,31 @@ def pull_up_selection( (bigframes.core.expression.DerefOp(field.id), field.id) for field in node.fields ) + # InNode needs special handling, as its a binary node, but row identity is from left side only. + # TODO: Merge code with unary op paths + if isinstance(node, bigframes.core.nodes.InNode): + child_node, child_selections = pull_up_selection( + node.left_child, rename_vars=rename_vars + ) + mapping = {out: ref.id for ref, out in child_selections} + + new_in_node: bigframes.core.nodes.InNode = dataclasses.replace( + node, left_child=child_node + ) + new_in_node = new_in_node.remap_refs(mapping) + if rename_vars: + new_in_node = cast( + bigframes.core.nodes.InNode, + new_in_node.remap_vars( + {node.indicator_col: bigframes.core.identifiers.ColumnId.unique()} + ), + ) + added_selection = ( + bigframes.core.expression.DerefOp(new_in_node.indicator_col), + node.indicator_col, + ) + new_selection = (*child_selections, added_selection) + return new_in_node, new_selection assert isinstance(node, (bigframes.core.nodes.SelectionNode, *ADDITIVE_NODES)) child_node, child_selections = pull_up_selection( node.child, rename_vars=rename_vars diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index 692b221a19..34539f1a95 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -1240,6 +1240,51 @@ def test_isin_bigframes_values(scalars_dfs, col_name, test_set, session): ) +@pytest.mark.parametrize( + ( + "col_name", + "test_set", + ), + [ + ( + "int64_col", + [314159, 2.0, 3, pd.NA], + ), + ( + "int64_col", + [2, 55555, 4], + ), + ( + "float64_col", + [-123.456, 1.25, pd.NA], + ), + ( + "int64_too", + [1, 2, pd.NA], + ), + ( + "string_col", + ["Hello, World!", "Hi", "こんにちは"], + ), + ], +) +def test_isin_bigframes_values_as_predicate( + scalars_dfs_maybe_ordered, col_name, test_set +): + scalars_df, scalars_pandas_df = scalars_dfs_maybe_ordered + bf_predicate = scalars_df[col_name].isin( + series.Series(test_set, session=scalars_df._session) + ) + bf_result = scalars_df[bf_predicate].to_pandas() + pd_predicate = scalars_pandas_df[col_name].isin(test_set) + pd_result = scalars_pandas_df[pd_predicate] + + pd.testing.assert_frame_equal( + pd_result.reset_index(), + bf_result.reset_index(), + ) + + def test_isnull(scalars_dfs): scalars_df, scalars_pandas_df = scalars_dfs col_name = "float64_col"