diff --git a/slide/_type_utils.py b/slide/_type_utils.py new file mode 100644 index 0000000..17ea978 --- /dev/null +++ b/slide/_type_utils.py @@ -0,0 +1,179 @@ +from typing import Dict, Iterable, Tuple +import pyarrow as pa +from triad.utils.pyarrow import TRIAD_DEFAULT_TIMESTAMP + +_ORDERED_PYARROW_TYPES = [ + pa.bool_(), + pa.int8(), + pa.uint8(), + pa.int16(), + pa.uint16(), + pa.int32(), + pa.uint32(), + pa.int64(), + pa.uint64(), + pa.float16(), + pa.float32(), + pa.float64(), + pa.string(), +] + + +def _generate_union_inference_types() -> Iterable[ # noqa: C901 + Tuple[pa.DataType, pa.DataType, pa.DataType] +]: + a = pa.bool_() + for b in _ORDERED_PYARROW_TYPES[1:]: + yield a, b, b + a = pa.int8() + yield a, pa.bool_(), a + yield a, pa.uint8(), pa.int16() + yield a, pa.uint16(), pa.int32() + yield a, pa.uint32(), pa.int64() + yield a, pa.uint64(), pa.float64() + for b in [ + pa.int16(), + pa.int32(), + pa.int64(), + pa.float16(), + pa.float32(), + pa.float64(), + pa.string(), + ]: + yield a, b, b + + a = pa.uint8() + yield a, pa.bool_(), a + yield a, pa.int8(), pa.int16() + for b in _ORDERED_PYARROW_TYPES[3:]: + yield a, b, b + + a = pa.int16() + for b in _ORDERED_PYARROW_TYPES[:3]: + yield a, b, a + yield a, pa.uint16(), pa.int32() + yield a, pa.uint32(), pa.int64() + yield a, pa.uint64(), pa.float64() + for b in [ + pa.int32(), + pa.int64(), + pa.float16(), + pa.float32(), + pa.float64(), + pa.string(), + ]: + yield a, b, b + + a = pa.uint16() + yield a, pa.bool_(), a + yield a, pa.int8(), pa.int32() + yield a, pa.uint8(), a + yield a, pa.int16(), pa.int32() + for b in _ORDERED_PYARROW_TYPES[5:]: + yield a, b, b + + a = pa.int32() + for b in _ORDERED_PYARROW_TYPES[:5]: + yield a, b, a + yield a, pa.uint32(), pa.int64() + yield a, pa.uint64(), pa.float64() + for b in [ + pa.int64(), + pa.float16(), + pa.float32(), + pa.float64(), + pa.string(), + ]: + yield a, b, b + + a = pa.uint32() + yield a, pa.bool_(), a + yield a, pa.int8(), pa.int64() + yield a, pa.uint8(), a + yield a, pa.int16(), pa.int64() + yield a, pa.uint16(), a + for b in _ORDERED_PYARROW_TYPES[7:]: + yield a, b, b + + a = pa.int64() + for b in _ORDERED_PYARROW_TYPES[:7]: + yield a, b, a + yield a, pa.uint64(), pa.float64() + for b in [ + pa.float16(), + pa.float32(), + pa.float64(), + pa.string(), + ]: + yield a, b, b + + a = pa.uint64() + yield a, pa.bool_(), a + yield a, pa.int8(), pa.float64() + yield a, pa.uint8(), a + yield a, pa.int16(), pa.float64() + yield a, pa.uint16(), a + yield a, pa.int32(), pa.float64() + yield a, pa.uint32(), a + for b in _ORDERED_PYARROW_TYPES[9:]: + yield a, b, b + + a = pa.float16() + for b in _ORDERED_PYARROW_TYPES[:9]: + yield a, b, a + for b in _ORDERED_PYARROW_TYPES[10:]: + yield a, b, b + + a = pa.float32() + for b in _ORDERED_PYARROW_TYPES[:10]: + yield a, b, a + for b in _ORDERED_PYARROW_TYPES[11:]: + yield a, b, b + + a = pa.float64() + for b in _ORDERED_PYARROW_TYPES[:11]: + yield a, b, a + for b in _ORDERED_PYARROW_TYPES[10:]: + yield a, b, b + + a = pa.string() + for b in _ORDERED_PYARROW_TYPES[:12]: + yield a, b, a + + yield pa.date32(), pa.date64(), pa.date64() + yield pa.date64(), pa.date32(), pa.date64() + + +_UNION_INFERENCE_DICT: Dict[Tuple[pa.DataType, pa.DataType], pa.DataType] = { + (x[0], x[1]): x[2] for x in _generate_union_inference_types() +} + + +def infer_union_type( # noqa: C901 + t1: pa.DataType, + t2: pa.DataType, +) -> pa.DataType: + if t1 == t2: + return t1 + if pa.types.is_timestamp(t1): + if pa.types.is_timestamp(t2) or pa.types.is_date(t2): + return TRIAD_DEFAULT_TIMESTAMP + elif pa.types.is_string(t2): + return pa.string() + raise ValueError(f"can't infer unioned schema for {t1} and {t2}") + if pa.types.is_timestamp(t2): + if pa.types.is_timestamp(t1) or pa.types.is_date(t1): + return TRIAD_DEFAULT_TIMESTAMP + elif pa.types.is_string(t1): + return pa.string() + raise ValueError(f"can't infer unioned schema for {t1} and {t2}") + if pa.types.is_nested(t1) or pa.types.is_nested(t2): + raise ValueError(f"can't infer unioned schema for {t1} and {t2}") + if pa.types.is_binary(t1) or pa.types.is_binary(t2): + raise ValueError(f"can't infer unioned schema for {t1} and {t2}") + key = (t1, t2) + if key in _UNION_INFERENCE_DICT: + return _UNION_INFERENCE_DICT[key] + raise ValueError( # pragma: no cover + f"can't infer unioned schema for {t1} and {t2}" + ) diff --git a/slide/expressions/__init__.py b/slide/expressions/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/slide/operators/__init__.py b/slide/operators/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/slide/operators/execution_plan.py b/slide/operators/execution_plan.py new file mode 100644 index 0000000..dbddeda --- /dev/null +++ b/slide/operators/execution_plan.py @@ -0,0 +1,446 @@ +from builtins import isinstance +from typing import Any, Dict, List, Optional, Tuple, Union + +import pandas as pd +import pyarrow as pa +from slide._type_utils import infer_union_type +from slide.operators.graph import Context, Graph, Node, Operator + + +class DataFrameOperator(Operator): + def __init__(self, *args: Any, **kwargs: Any): + super().__init__(*args, **kwargs) + self._nodes: Dict[str, Node] = {} + + @property + def output_schema(self) -> pa.Schema: + raise NotImplementedError # pragma: no cover + + @property + def nodes(self) -> Dict[str, Node]: + return self._nodes + + +class MapOperator(Operator): + def __init__(self, *args: Any, **kwargs: Any): + super().__init__(*args, **kwargs) + + nodes = {x.node for x in args if isinstance(x, MapOperator)} + nodes = nodes.union( + x.node for x in kwargs.values() if isinstance(x, MapOperator) + ) + self._node = Node(nodes) + + @property + def output_type(self) -> pa.DataType: + raise NotImplementedError # pragma: no cover + + @property + def output_name(self) -> Optional[str]: + raise NotImplementedError # pragma: no cover + + @property + def node(self) -> Node: + return self._node + + +class GetDataFrameOperator(DataFrameOperator): + def __init__(self, name: str, input_schema: pa.Schema): + super().__init__(name, str(input_schema)) + self._name = name + self._schema = input_schema + + for f in input_schema: + self.nodes[f.name] = Node(set()) + + @property + def output_schema(self) -> pa.Schema: + return self._schema + + def execute(self, context: Context) -> None: + context[self] = context[self._name] + + +class UnionOperator(DataFrameOperator): + def __init__( + self, df1: DataFrameOperator, df2: DataFrameOperator, distinct: bool = True + ): + super().__init__(df1, df2, distinct) + fields1: List[pa.Field] = [] + fields2: List[pa.Field] = [] + for f1, f2 in zip(df1.output_schema, df2.output_schema): + inf_type = infer_union_type(f1.type, f2.type) + fields1.append(pa.field(f1.name, inf_type)) + fields2.append(pa.field(f2.name, inf_type)) + self.nodes[f1.name] = Node({df1.nodes[f1.name], df2.nodes[f2.name]}) + self._schema1 = pa.schema(fields1) + self._schema2 = pa.schema(fields2) + self._output_schema = self._schema1 + self._df1 = df1 + self._df2 = df2 + self._distinct = distinct + + @property + def output_schema(self) -> pa.Schema: + return self._output_schema + + def execute(self, context: Context) -> None: + df1 = context[self._df1] + df2 = context[self._df2] + df1 = context.utils.cast_df(df1, self._schema1, self._df1.output_schema) + df2 = context.utils.cast_df(df2, self._schema2, self._df2.output_schema) + context[self] = context.utils.union(df1, df2, unique=self._distinct) + + +class ExceptOperator(DataFrameOperator): + def __init__( + self, df1: DataFrameOperator, df2: DataFrameOperator, distinct: bool = True + ): + super().__init__(df1, df2, distinct) + self._df1 = df1 + self._df2 = df2 + self._distinct = distinct + for f1, f2 in zip(df1.output_schema, df2.output_schema): + self.nodes[f1.name] = Node({df1.nodes[f1.name], df2.nodes[f2.name]}) + + @property + def output_schema(self) -> pa.Schema: + return self._df1.output_schema + + def execute(self, context: Context) -> None: + df1 = context[self._df1] + df2 = context[self._df2] + context[self] = context.utils.except_df(df1, df2, unique=self._distinct) + + +class IntersectOperator(DataFrameOperator): + def __init__( + self, df1: DataFrameOperator, df2: DataFrameOperator, distinct: bool = True + ): + super().__init__(df1, df2, distinct) + self._df1 = df1 + self._df2 = df2 + self._distinct = distinct + for f1, f2 in zip(df1.output_schema, df2.output_schema): + self.nodes[f1.name] = Node({df1.nodes[f1.name], df2.nodes[f2.name]}) + + @property + def output_schema(self) -> pa.Schema: + return self._df1.output_schema + + def execute(self, context: Context) -> None: + df1 = context[self._df1] + df2 = context[self._df2] + context[self] = context.utils.intersect(df1, df2, unique=self._distinct) + + +class FilterOperator(DataFrameOperator): + def __init__(self, df: DataFrameOperator, filter_col: str, drop: bool = True): + super().__init__(df, filter_col, drop) + self._df = df + self._filter_col = filter_col + self._drop = drop + if not drop: + self._output_schema = df.output_schema + else: + self._output_schema = [x for x in df.output_schema if x.name != filter_col] + for f in self._output_schema: + self.nodes[f.name] = Node({df.nodes[f.name], df.nodes[filter_col]}) + + @property + def output_schema(self) -> pa.Schema: + return self._output_schema + + def execute(self, context: Context) -> None: + df = context[self._df] + res = context.utils.filter_df(df, df[self._filter_col]) + if self._drop: + res = context.utils.drop_columns(df, [self._filter_col]) + context[self] = res + + +class JoinOperator(DataFrameOperator): + def __init__(self, df1: DataFrameOperator, df2: DataFrameOperator, how: str): + super().__init__(df1, df2, how) + self._df1 = df1 + self._df2 = df2 + self._how = how + self._on = list( + set(f.name for f in df1.output_schema).intersection( # noqa: C401 + f.name for f in df2.output_schema + ) + ) + f1 = [f for f in df1.output_schema if f.name not in self._on] + f2 = [f for f in df2.output_schema if f.name not in self._on] + self._output_schema = pa.schema(f1 + f2) + + on_nodes = [df1.nodes[n] for n in self._on] + [df2.nodes[n] for n in self._on] + for f in f1: + self.nodes[f.name] = Node({df1.nodes[f.name], *on_nodes}) + for f in f2: + self.nodes[f.name] = Node({df2.nodes[f.name], *on_nodes}) + + @property + def output_schema(self) -> pa.Schema: + return self._output_schema + + def execute(self, context: Context) -> None: + df1 = context[self._df1] + df2 = context[self._df2] + res = context.utils.join(df1, df2, join_type=self._how, on=self._on) + context[self] = context.utils.select_columns( + res, [f.name for f in self.output_schema] + ) + + +class OutputDataFrameOperator(DataFrameOperator): + def __init__(self, df: DataFrameOperator): + super().__init__(df) + self._df = df + + for k, v in df.nodes.items(): + self.nodes[k] = v + + def execute(self, context: Context) -> None: + context.set_output(context[self._df]) + + +class GetColumn(MapOperator): + def __init__(self, df: DataFrameOperator, name: str): + super().__init__(df, name) + self._name = name + self._df = df + self._node = df.nodes[name] + + @property + def output_type(self) -> pa.DataType: + return self._df.output_schema.field_by_name(self._name).type + + @property + def output_name(self) -> Optional[str]: + return self._name + + def execute(self, context: Context) -> None: # type: ignore + context[self] = context[self._df][self._name] + + +class LitColumn(MapOperator): + def __init__(self, value: Any, input_type: Optional[pa.DataType] = None): + super().__init__(value, str(input_type)) + self._value = value + self._output_type = pa.scalar(value, input_type).type + + @property + def output_type(self) -> pa.DataType: + return self._output_type + + def execute(self, context: Context) -> None: + context[self] = self._value + + +class UnaryOperator(MapOperator): + def __init__(self, op: str, col: MapOperator): + super().__init__(op, col) + self._op = op + self._col = col + self._output_type = self._get_output_type(op, col.output_type) + + @property + def output_type(self) -> pa.DataType: + return self._output_type + + @property + def output_name(self) -> Optional[str]: + return self._col.output_name + + def execute(self, context: Context) -> None: + if self._op in ["+", "-"]: + context[self] = context.utils.unary_arithmetic_op( + context[self._col], op=self._op + ) + elif self._op == "~": + context[self] = context.utils.logical_not(context[self._col]) + else: + raise NotImplementedError(self._op) # pragma: no cover + + def _get_output_type(self, op: str, input_type: pa.DataType) -> pa.DataType: + if op == "+": + if pa.types.is_integer(input_type) or pa.types.is_floating(input_type): + return input_type + elif op == "-": + if pa.types.is_integer(input_type): + return pa.int64() + if pa.types.is_floating(input_type): + return input_type + elif op == "~": + if pa.types.is_boolean(input_type): + return input_type + raise ValueError(f"'{op}' can't be applied to {input_type}") + + +class BinaryOperator(MapOperator): + def __init__(self, op: str, col1: MapOperator, col2: MapOperator): + super().__init__(op, col1, col2) + self._op = op + self._col1 = col1 + self._col2 = col2 + self._output_type = self._get_output_type( + op, col1.output_type, col2.output_type + ) + + @property + def output_type(self) -> pa.DataType: + return self._output_type + + def execute(self, context: Context) -> None: + if self._op in ["+", "-", "*", "/"]: + res = context.utils.binary_arithmetic_op( + context[self._col1], context[self._col2], op=self._op + ) + if ( # int/int -> int + pa.types.is_integer(self._col1.output_type) + and pa.types.is_integer(self._col2.output_type) + and not pd.api.types.is_integer_dtype(res.dtype) + ): + res = context.utils.cast(res, "int64") + context[self] = res + elif self._op in ["&", "|"]: + context[self] = context.utils.binary_logical_op( + context[self._col1], + context[self._col2], + op="and" if self._op == "&" else "or", + ) + else: + raise NotImplementedError(self._op) # pragma: no cover + + def _get_output_type( # noqa: C901 + self, op: str, t1: pa.DataType, t2: pa.DataType + ) -> pa.DataType: + if op == "+": + if pa.types.is_integer(t1): + if pa.types.is_integer(t2): + return pa.int64() + if pa.types.is_floating(t2): + return pa.float64() + elif pa.types.is_floating(t1): + if pa.types.is_integer(t2) or pa.types.is_floating(t2): + return pa.float64() + # TODO: time + interval + if op == "-": + if pa.types.is_integer(t1): + if pa.types.is_integer(t2): + return pa.int64() + if pa.types.is_floating(t2): + return pa.float64() + elif pa.types.is_floating(t1): + if pa.types.is_integer(t2) or pa.types.is_floating(t2): + return pa.float64() + # TODO: time - interval + # TODO: time - time + elif op in ["*", "/"]: + if pa.types.is_integer(t1): + if pa.types.is_integer(t2): + return pa.int64() + if pa.types.is_floating(t2): + return pa.float64() + elif pa.types.is_floating(t1): + if pa.types.is_integer(t2) or pa.types.is_floating(t2): + return pa.float64() + elif op in ["&", "|"]: + if (pa.types.is_boolean(t1) or pa.types.is_null(t1)) and ( + pa.types.is_boolean(t2) or pa.types.is_null(t2) + ): + return pa.bool_() + raise ValueError( # pragma: no cover + f"'{op}' can't be applied to {t1} and {t2}" + ) + + +class ColsToDataFrameOperator(DataFrameOperator): + def __init__( + self, + *args: Union[MapOperator, Tuple[MapOperator, str]], + reference: DataFrameOperator, + ): + self._data: List[Any] = [ + (x, x.output_name) if isinstance(x, MapOperator) else x for x in args + ] + super().__init__(*self._data, reference) + self._output_schema = pa.schema( + [pa.field(x[1], x[0].output_type) for x in self._data] + ) + + self._ref = reference + self._nodes = {v: k.node for k, v in self._data} + + @property + def output_schema(self) -> pa.Schema: + return self._output_schema + + @property + def nodes(self) -> Dict[str, Node]: + return self._nodes + + def execute(self, context: Context) -> None: + cols = [context[x] for x, _ in self._data] + names = [y for _, y in self._data] + context[self] = context.utils.cols_to_df( + cols, names=names, reference=context[self._ref] + ) + + +class ExecutionPlan(Graph): + def __init__(self): + super().__init__() + self._output_schema: Optional[pa.Schema] = None + + def df(self, df: Any, input_schema: pa.Schema) -> Operator: + return self.add(GetDataFrameOperator(df, input_schema)) + + def union( + self, df1: DataFrameOperator, df2: DataFrameOperator, distinct: bool = True + ) -> Operator: + return self.add(UnionOperator(df1, df2, distinct=distinct)) + + def except_df( + self, df1: DataFrameOperator, df2: DataFrameOperator, distinct: bool = True + ) -> Operator: + return self.add(ExceptOperator(df1, df2, distinct=distinct)) + + def intersect( + self, df1: DataFrameOperator, df2: DataFrameOperator, distinct: bool = True + ) -> Operator: + return self.add(IntersectOperator(df1, df2, distinct=distinct)) + + def filter_df( + self, df: DataFrameOperator, filter_col: str, drop: bool = True + ) -> Operator: + return self.add(FilterOperator(df, filter_col, drop)) + + def join( + self, df1: DataFrameOperator, df2: DataFrameOperator, how: str + ) -> Operator: + return self.add(JoinOperator(df1, df2, how)) + + def output(self, df: DataFrameOperator) -> None: + self.add(OutputDataFrameOperator(df)) + self.set_output_schema(df.output_schema) + + def col(self, df: DataFrameOperator, name: str) -> Operator: + return self.add(GetColumn(df, name)) + + def lit(self, value: Any, input_type: Optional[pa.DataType] = None) -> Operator: + return self.add(LitColumn(value, input_type)) + + def unary(self, op: str, col: MapOperator) -> Operator: + return self.add(UnaryOperator(op, col)) + + def binary(self, op: str, col1: MapOperator, col2: MapOperator) -> Operator: + return self.add(BinaryOperator(op, col1, col2)) + + def cols_to_df( + self, + *args: Union[MapOperator, Tuple[MapOperator, str]], + reference: DataFrameOperator, + ) -> Operator: + return self.add(ColsToDataFrameOperator(*args, reference=reference)) diff --git a/slide/operators/graph.py b/slide/operators/graph.py new file mode 100644 index 0000000..fcb588d --- /dev/null +++ b/slide/operators/graph.py @@ -0,0 +1,94 @@ +from typing import Any, Dict, List, Optional, Union, Set + +import pyarrow as pa +from slide.exceptions import SlideInvalidOperation +from slide.utils import SlideUtils +from triad import assert_or_throw, to_uuid + + +class Node: + def __init__(self, parents: Set["Node"]): + self._parents = parents + + +class Operator: + def __init__(self, *args: Any, **kwargs: Any): + self._uuid = to_uuid(self.identifier, args, kwargs) + pass + + @property + def identifier(self) -> str: + return str(type(self)) + + @property + def key(self) -> str: + return "_" + to_uuid(self)[:8] + + def __uuid__(self) -> str: + return self._uuid + + def execute(self, context: "Context") -> None: + raise NotImplementedError # pragma: no cover + + +class Context: + def __init__(self, utils: SlideUtils): + self._utils = utils + self._output: Any = None + self._results: Dict[str, Any] = {} + + @property + def utils(self) -> SlideUtils: + return self._utils + + @property + def output(self) -> Any: + return self._output + + def set_output(self, df: Any) -> None: + self._output = df + + def __setitem__(self, op: Union[str, Operator], value: Any) -> None: + self._results[op if isinstance(op, str) else op.key] = value + + def __getitem__(self, op: Union[str, Operator]) -> Any: + return self._results[op if isinstance(op, str) else op.key] + + +class Graph: + def __init__(self): + self._steps: List[Operator] = [] + self._steps_dict: Dict[str, Operator] = {} + self._output_schema: Optional[pa.Schema] = None + + @property + def output_schema(self) -> pa.Schema: + assert_or_throw( + self._output_schema is not None, SlideInvalidOperation("output is not set") + ) + return self._output_schema + + def set_output_schema(self, schema: pa.Schema) -> None: + assert_or_throw( + self._output_schema is None, SlideInvalidOperation("output is already set") + ) + self._output_schema = schema + + def add(self, op: Operator) -> Operator: + key = op.key + if key in self._steps_dict: + return self._steps_dict[key] + self._steps_dict[key] = op + self._steps.append(op) + return op + + @property + def steps(self) -> List[Operator]: + return self._steps + + def __len__(self) -> int: + return len(self._steps) + + def execute(self, context: Context) -> None: + for step in self.steps: + step.execute(context) diff --git a/slide/utils.py b/slide/utils.py index 5c602be..3b1f938 100644 --- a/slide/utils.py +++ b/slide/utils.py @@ -8,6 +8,7 @@ Optional, Tuple, TypeVar, + Union, ) import numpy as np @@ -16,9 +17,9 @@ from triad.utils.assertion import assert_or_throw from triad.utils.pyarrow import ( TRIAD_DEFAULT_TIMESTAMP, - apply_schema, to_pa_datatype, to_single_pandas_dtype, + _TypeConverter, ) from slide._string_utils import LikeExpr, LikeExprShortcut @@ -102,17 +103,17 @@ def series_to_array(self, col: TCol) -> List[Any]: """ raise NotImplementedError # pragma: no cover - def to_constant_series( + def scalar_to_series( self, - constant: Any, - from_series: TCol, + scalar: Any, + reference: Union[TCol, TDf], dtype: Any = None, name: Optional[str] = None, ) -> TCol: # pragma: no cover - """Convert a constant to a series with the same index of ``from_series`` + """Convert a scalar to a series with the same index of ``reference`` - :param constant: the constant - :param from_series: the reference series for index + :param scalar: the scalar + :param reference: the reference series or dataframe for index :param dtype: default data type, defaults to None :param name: name of the series, defaults to None :return: the series @@ -120,9 +121,9 @@ def to_constant_series( raise NotImplementedError def get_col_pa_type(self, col: Any) -> pa.DataType: - """Get column or constant pyarrow data type + """Get column or scalar pyarrow data type - :param col: the column or the constant + :param col: the column or the scalar :return: pyarrow data type """ if self.is_series(col): @@ -133,11 +134,11 @@ def get_col_pa_type(self, col: Any) -> pa.DataType: return self.to_safe_pa_type(type(col)) def unary_arithmetic_op(self, col: Any, op: str) -> Any: - """Unary arithmetic operator on series/constants + """Unary arithmetic operator on series/scalars - :param col: a series or a constant + :param col: a series or a scalar :param op: can be ``+`` or ``-`` - :return: the transformed series or constant + :return: the transformed series or scalar :raises NotImplementedError: if ``op`` is not supported .. note: @@ -153,10 +154,10 @@ def unary_arithmetic_op(self, col: Any, op: str) -> Any: def binary_arithmetic_op(self, col1: Any, col2: Any, op: str) -> Any: """Binary arithmetic operations ``+``, ``-``, ``*``, ``/`` - :param col1: the first column (series or constant) - :param col2: the second column (series or constant) + :param col1: the first column (series or scalar) + :param col2: the second column (series or scalar) :param op: ``+``, ``-``, ``*``, ``/`` - :return: the result after the operation (series or constant) + :return: the result after the operation (series or scalar) :raises NotImplementedError: if ``op`` is not supported .. note: @@ -170,16 +171,20 @@ def binary_arithmetic_op(self, col1: Any, col2: Any, op: str) -> Any: if op == "*": return col1 * col2 if op == "/": + # for int/int, we should force the output to int + # but int columns can be in non-int types so it is impossible + # to judge from the dtypes of the input, so the logic using + # this function should be responsible to do this check return col1 / col2 raise NotImplementedError(f"{op} is not supported") # pragma: no cover def comparison_op(self, col1: Any, col2: Any, op: str) -> Any: """Binary comparison ``<``, ``<=``, ``==``, ``>``, ``>=`` - :param col1: the first column (series or constant) - :param col2: the second column (series or constant) + :param col1: the first column (series or scalar) + :param col2: the second column (series or scalar) :param op: ``<``, ``<=``, ``==``, ``>``, ``>=`` - :return: the result after the operation (series or constant) + :return: the result after the operation (series or scalar) :raises NotImplementedError: if ``op`` is not supported .. note: @@ -208,10 +213,10 @@ def comparison_op(self, col1: Any, col2: Any, op: str) -> Any: def binary_logical_op(self, col1: Any, col2: Any, op: str) -> Any: """Binary logical operations ``and``, ``or`` - :param col1: the first column (series or constant) - :param col2: the second column (series or constant) + :param col1: the first column (series or scalar) + :param col2: the second column (series or scalar) :param op: ``and``, ``or`` - :return: the result after the operation (series or constant) + :return: the result after the operation (series or scalar) :raises NotImplementedError: if ``op`` is not supported .. note: @@ -222,12 +227,16 @@ def binary_logical_op(self, col1: Any, col2: Any, op: str) -> Any: c2 = self._safe_bool(col2) if op == "and": if not self.is_series(c1) and not self.is_series(c2): + if c1 is None: + return c2 and c1 return c1 and c2 - return c1 & c2 + return (pd.NA if c1 is None else c1) & (pd.NA if c2 is None else c2) elif op == "or": if not self.is_series(c1) and not self.is_series(c2): + if c1 is None: + return c2 or c1 return c1 or c2 - return c1 | c2 + return (pd.NA if c1 is None else c1) | (pd.NA if c2 is None else c2) raise NotImplementedError(f"{op} is not supported") # pragma: no cover def logical_not(self, col: Any) -> Any: @@ -248,12 +257,12 @@ def cast( # noqa: C901 """Cast ``col`` to a new type. ``type_obj`` must be able to be converted by :func:`~triad.utils.pyarrow.self.to_safe_pa_type`. - :param col: a series or a constant + :param col: a series or a scalar :param type_obj: an objected that can be accepted by :func:`~triad.utils.pyarrow.self.to_safe_pa_type` :param input_type: an objected that is either None or to be accepted by :func:`~triad.utils.pyarrow.self.to_safe_pa_type`, defaults to None. - :return: the new column or constant + :return: the new column or scalar .. note: @@ -262,27 +271,49 @@ def cast( # noqa: C901 nulls or strings, where the pandas dtype may not provide the accurate type information. """ - to_type = self.to_safe_pa_type(type_obj) - t_type = to_single_pandas_dtype(to_type, use_extension_types=True) try: if self.is_series(col): - try: - inf_type = self.get_col_pa_type(col) - has_hint = input_type is not None - from_type = ( - inf_type if not has_hint else self.to_safe_pa_type(input_type) + to_type = self.to_safe_pa_type(type_obj) + input_pa_type = ( + None if input_type is None else self.to_safe_pa_type(input_type) + ) + if ( # nested/binary as input/output + pa.types.is_nested(to_type) + or pa.types.is_binary(to_type) + or ( + input_pa_type is not None + and ( + pa.types.is_nested(input_pa_type) + or pa.types.is_binary(input_pa_type) + ) + ) + ): + assert_or_throw( + pd.api.types.is_object_dtype(col.dtype), + SlideCastError(f"unexpected column type {col.dtype}"), + ) + assert_or_throw( + input_type is None + or self.to_safe_pa_type(input_type) == to_type, + SlideCastError(f"unexpected column type hint {input_type}"), ) - if pa.types.is_string(to_type): - if ( - has_hint - and pa.types.is_string(from_type) - and pa.types.is_string(inf_type) - ): - return col - elif from_type == inf_type == to_type: + return col + + t_type = to_single_pandas_dtype(to_type, use_extension_types=True) + inf_type = self.get_col_pa_type(col) + has_hint = input_type is not None + from_type = input_pa_type or inf_type + + if pa.types.is_string(to_type): + if ( + has_hint + and pa.types.is_string(from_type) + and pa.types.is_string(inf_type) + ): return col - except Exception: # pragma: no cover - return col.astype(t_type) + elif from_type == inf_type == to_type: + return col + if pa.types.is_boolean(to_type): return self._cast_to_bool(col, from_type, inf_type, t_type) if pa.types.is_integer(to_type): @@ -295,7 +326,7 @@ def cast( # noqa: C901 return self._cast_to_date(col, from_type, inf_type, t_type) elif pa.types.is_string(to_type): return self._cast_to_str(col, from_type, inf_type, t_type) - return col.astype(t_type) + return col.astype(t_type) # pragma: no cover else: if col is None: return None @@ -313,13 +344,13 @@ def cast( # noqa: C901 return None return res except (TypeError, ValueError) as te: - raise SlideCastError(f"unable to cast from {from_type} to {t_type}") from te + raise SlideCastError(str(te)) from te def filter_df(self, df: TDf, cond: Any) -> TDf: - """Filter dataframe by a boolean series or a constant + """Filter dataframe by a boolean series or a scalar :param df: the dataframe - :param cond: a boolean seris or a constant + :param cond: a boolean seris or a scalar :return: the filtered dataframe .. note: @@ -335,9 +366,9 @@ def filter_df(self, df: TDf, cond: Any) -> TDf: return df.head(0) def is_value(self, col: Any, value: Any, positive: bool = True) -> Any: - """Check if the series or constant is ``value`` + """Check if the series or scalar is ``value`` - :param col: the series or constant + :param col: the series or scalar :param value: ``None``, ``True`` or ``False`` :param positive: check ``is value`` or ``is not value``, defaults to True (``is value``) @@ -367,12 +398,12 @@ def is_value(self, col: Any, value: Any, positive: bool = True) -> Any: )[0] def is_in(self, col: Any, values: List[Any], positive: bool) -> Any: # noqa: C901 - """Check if a series or a constant is in ``values`` + """Check if a series or a scalar is in ``values`` - :param col: the series or the constant - :param values: a list of constants and series (can mix) + :param col: the series or the scalar + :param values: a list of scalars and series (can mix) :param positive: ``is in`` or ``is not in`` - :return: the correspondent boolean series or constant + :return: the correspondent boolean series or scalar .. note: @@ -382,13 +413,13 @@ def is_in(self, col: Any, values: List[Any], positive: bool) -> Any: # noqa: C9 if self.is_series(col): cols = [x for x in values if self.is_series(x)] others = [x for x in values if not self.is_series(x)] - has_null_constant = any(pd.isna(x) for x in others) + has_null = any(pd.isna(x) for x in others) innulls: Any = None if positive: o: Any = col.isin(others) for c in cols: o = o | (col == c) - if not has_null_constant: + if not has_null: if innulls is None: innulls = c.isnull() else: @@ -397,12 +428,12 @@ def is_in(self, col: Any, values: List[Any], positive: bool) -> Any: # noqa: C9 o = ~col.isin(others) for c in cols: o = o & (col != c) - if not has_null_constant: + if not has_null: if innulls is None: innulls = c.isnull() else: innulls = innulls | c.isnull() - if has_null_constant: + if has_null: o = o.mask(o == (0 if positive else 1), None) elif innulls is not None: o = o.mask(innulls & (o == (0 if positive else 1)), None) @@ -414,13 +445,13 @@ def is_in(self, col: Any, values: List[Any], positive: bool) -> Any: # noqa: C9 return None if pd.isna(res) else bool(res) def is_between(self, col: Any, lower: Any, upper: Any, positive: bool) -> Any: - """Check if a series or a constant is ``>=lower`` and ``<=upper`` + """Check if a series or a scalar is ``>=lower`` and ``<=upper`` - :param col: the series or the constant - :param lower: the lower bound, which can be series or a constant - :param upper: the upper bound, which can be series or a constant + :param col: the series or the scalar + :param lower: the lower bound, which can be series or a scalar + :param upper: the upper bound, which can be series or a scalar :param positive: ``is between`` or ``is not between`` - :return: the correspondent boolean series or constant + :return: the correspondent boolean series or scalar .. note: @@ -431,12 +462,12 @@ def is_between(self, col: Any, lower: Any, upper: Any, positive: bool) -> Any: return None if self.is_series(col): left = ( - self.to_constant_series(False, col) + self.scalar_to_series(False, col) if lower is None else (lower <= col).fillna(False) ) right = ( - self.to_constant_series(False, col) + self.scalar_to_series(False, col) if upper is None else (col <= upper).fillna(False) ) @@ -468,10 +499,10 @@ def is_between(self, col: Any, lower: Any, upper: Any, positive: bool) -> Any: return None if pd.isna(res) else bool(res) def coalesce(self, cols: List[Any]) -> Any: - """Coalesce multiple series and constants + """Coalesce multiple series and scalars - :param cols: the collection of series and constants in order - :return: the coalesced series or constant + :param cols: the collection of series and scalars in order + :return: the coalesced series or scalar .. note: @@ -489,10 +520,10 @@ def case_when(self, *pairs: Tuple[Any, Any], default: Any = None) -> Any: """SQL ``CASE WHEN`` :param pairs: condition and value pairs, both can be either a - series or a constant + series or a scalar :param default: default value if none of the conditions satisfies, defaults to None - :return: the final series or constant + :return: the final series or scalar .. note: @@ -533,11 +564,11 @@ def like( # noqa: C901 ) -> Any: """SQL ``LIKE`` - :param col: a series or a constant + :param col: a series or a scalar :param expr: a pattern expression :param ignore_case: whether to ignore case, defaults to False :param positive: ``LIKE`` or ``NOT LIKE``, defaults to True - :return: the correspondent boolean series or constant + :return: the correspondent boolean series or scalar .. note: @@ -587,7 +618,7 @@ def like_series(col: TCol) -> TCol: if self.is_series(col): if expr is None: - return self.to_constant_series(float("nan"), col) + return self.scalar_to_series(float("nan"), col) nulls = col.isnull() res = like_series(col) if positive: @@ -599,12 +630,19 @@ def like_series(col: TCol) -> TCol: )[0] return None if pd.isna(res) else bool(res) - def cols_to_df(self, cols: List[Any], names: Optional[List[str]] = None) -> TDf: + def cols_to_df( + self, + cols: List[Any], + names: Optional[List[str]] = None, + reference: Union[TCol, TDf, None] = None, + ) -> TDf: """Construct the dataframe from a list of columns (series) - :param cols: the collection of series or constants, at least one value must + :param cols: the collection of series or scalars, at least one value must be a series :param names: the correspondent column names, defaults to None + :param reference: the reference series of dataframe when all cols are scalars + , defaults to None :return: the dataframe @@ -645,63 +683,23 @@ def as_pandas(self, df: TDf) -> pd.DataFrame: """ raise NotImplementedError # pragma: no cover - def as_array_iterable( + def create_native_converter( self, - df: TDf, - schema: pa.Schema, - columns: Optional[List[str]] = None, - type_safe: bool = False, - ) -> Iterable[List[Any]]: - """Convert pandas like dataframe to iterable of rows in the format of list. + input_schema: pa.Schema, + type_safe: bool, + ) -> "SlideDataFrameNativeConverter": + """Create a converter that convert the dataframes into python native iterables - :param df: pandas like dataframe - :param schema: schema of the input - :param columns: columns to output, None for all columns + :param input_schema: schema of the input dataframe :param type_safe: whether to enforce the types in schema, if False, it will - return the original values from the dataframe - :return: iterable of rows, each row is a list + return the original values from the dataframes + :return: the converter - .. note:: + .. tip:: - If there are nested types in schema, the conversion can be slower + This converter can be reused on multiple dataframes with the same structure """ - if self.empty(df): - return - if columns is not None: - df = df[columns] - schema = pa.schema([schema.field(n) for n in columns]) - if not type_safe: - for arr in df.astype(object).itertuples(index=False, name=None): - yield list(arr) - elif all(not pa.types.is_nested(x) for x in schema.types): - p = self.as_arrow(df, schema, type_safe) - d = p.to_pydict() - cols = [d[n] for n in schema.names] - for arr in zip(*cols): - yield list(arr) - else: - # If schema has nested types, the conversion will be much slower - for arr in apply_schema( - schema, - df.itertuples(index=False, name=None), - copy=True, - deep=True, - str_as_json=True, - ): - yield arr - - def as_array( - self, - df: TDf, - schema, - columns: Optional[List[str]] = None, - type_safe: bool = False, - ) -> List[List[Any]]: - return list( - self.as_array_iterable( - df, schema=schema, columns=columns, type_safe=type_safe - ) - ) + return SlideDataFrameNativeConverter(self, input_schema, type_safe) def to_schema(self, df: TDf) -> pa.Schema: """Extract pandas dataframe schema as pyarrow schema. This is a replacement @@ -736,6 +734,25 @@ def get_fields() -> Iterable[pa.Field]: return pa.schema(list(get_fields())) + def drop_columns(self, df: TDf, columns: List[str]) -> TDf: + """Drop columns from the dataframe + + :param df: the dataframe + :param columns: columns to be dropped + :return: the new dataframe without those columns + """ + cols = [c for c in df.columns if c not in columns] + return df[cols] + + def select_columns(self, df: TDf, columns: List[str]) -> TDf: + """Select columns from the dataframe + + :param df: the dataframe + :param columns: columns to be chosen + :return: the new dataframe with those columns + """ + return df[columns] + def cast_df( # noqa: C901 self, df: TDf, schema: pa.Schema, input_schema: Optional[pa.Schema] = None ) -> TDf: @@ -1120,3 +1137,190 @@ def _cast_to_date( if pd.__version__ < "1.2": # pragma: no cover return col.astype(safe_dtype).dt.floor("D") return col.astype(safe_dtype).dt.date + + +class SlideDataFrameNativeConverter: + def __init__( + self, + utils: SlideUtils, + schema: pa.Schema, + type_safe: bool, + ): + """Convert pandas like dataframe to iterable of rows in the format of list. + + :param utils: the associated SlideUtils + :param schema: schema of the input dataframe + :param type_safe: whether to enforce the types in schema, if False, it will + return the original values from the dataframes + + .. note:: + + If there are nested types in schema, the conversion can be slower + """ + self._utils = utils + self._schema = schema + self._has_time = any( + pa.types.is_timestamp(x) or pa.types.is_date(x) for x in schema.types + ) + if not type_safe: + self._as_array_iterable = self._as_array_iterable_not_type_safe + self._as_arrays = self._as_arrays_not_type_safe + self._as_dict_iterable = self._as_dict_iterable_not_type_safe + self._as_dicts = self._as_dicts_not_type_safe + else: + self._split_nested(self._schema) + if self._converter is None: + self._as_array_iterable = self._as_array_iterable_simple + self._as_arrays = self._as_arrays_simple + self._as_dict_iterable = self._as_dict_iterable_simple + self._as_dicts = self._as_dicts_simple + elif self._simple_part is None: + self._as_array_iterable = self._as_array_iterable_nested + self._as_arrays = self._as_arrays_nested + self._as_dict_iterable = self._as_dict_iterable_nested + self._as_dicts = self._as_dicts_nested + else: + self._as_array_iterable = self._as_array_iterable_hybrid + self._as_arrays = self._as_arrays_hybrid + self._as_dict_iterable = self._as_dict_iterable_hybrid + self._as_dicts = self._as_dicts_hybrid + pass + + def as_array_iterable(self, df: Any) -> Iterable[List[Any]]: + """Convert the dataframe to an iterable of rows in the format of list. + + :param df: the dataframe + :return: an iterable of rows, each row is a list + """ + return self._as_array_iterable(df) + + def as_arrays(self, df: Any) -> List[List[Any]]: + """Convert the dataframe to a list of rows in the format of list. + + :param df: the dataframe + :return: a list of rows, each row is a list + """ + return self._as_arrays(df) + + def as_dict_iterable(self, df: Any) -> Iterable[Dict[str, Any]]: + """Convert the dataframe to an iterable of rows in the format of dict. + + :param df: the dataframe + :return: an iterable of rows, each row is a dict + """ + return self._as_dict_iterable(df) + + def as_dicts(self, df: Any) -> List[Dict[str, Any]]: + """Convert the dataframe to a list of rows in the format of dict. + + :param df: the dataframe + :return: a list of rows, each row is a dict + """ + return self._as_dicts(df) + + def _time_safe(self, df: Any) -> Any: + return df.astype(object) if self._has_time else df + + def _as_array_iterable_not_type_safe(self, df: Any) -> Iterable[List[Any]]: + for arr in self._time_safe(df).itertuples(index=False, name=None): + yield list(arr) + + def _as_arrays_not_type_safe(self, df: Any) -> List[List[Any]]: + return self._time_safe(self._utils.as_pandas(df)).values.tolist() + + def _as_dict_iterable_not_type_safe(self, df: Any) -> Iterable[Dict[str, Any]]: + names = list(self._schema.names) + for arr in self._time_safe(df).itertuples(index=False, name=None): + yield dict(zip(names, arr)) + + def _as_dicts_not_type_safe(self, df: Any) -> List[Dict[str, Any]]: + return self._time_safe(self._utils.as_pandas(df)).to_dict("records") + + def _as_array_iterable_simple(self, df: Any) -> Iterable[List[Any]]: + return self._get_arrow_arrays_simple(df, self._schema) + + def _as_arrays_simple(self, df: Any) -> List[List[Any]]: + return list(self._get_arrow_arrays_simple(df, self._schema)) + + def _as_dict_iterable_simple(self, df: Any) -> Iterable[Dict[str, Any]]: + for arr in self._get_arrow_arrays_simple(df, self._schema): + yield dict(zip(self._schema.names, arr)) + + def _as_dicts_simple(self, df: Any) -> List[Dict[str, Any]]: + return list(self._as_dict_iterable_simple(df)) + + def _as_array_iterable_hybrid(self, df: Any) -> Iterable[List[Any]]: + for arr1, arr2 in zip(self._simple_part(df), self._nested_part(df)): + yield self._remap_arrs(arr1, arr2) + + def _as_arrays_hybrid(self, df: Any) -> List[List[Any]]: + return list(self._as_array_iterable_hybrid(df)) + + def _as_dict_iterable_hybrid(self, df: Any) -> Iterable[Dict[str, Any]]: + names = list(self._schema.names) + for arr in self._as_array_iterable_hybrid(df): + yield dict(zip(names, arr)) + + def _as_dicts_hybrid(self, df: Any) -> List[Dict[str, Any]]: + return list(self._as_dict_iterable_hybrid(df)) + + def _as_array_iterable_nested(self, df: Any) -> Iterable[List[Any]]: + return self._nested_part(df) + + def _as_arrays_nested(self, df: Any) -> List[List[Any]]: + return list(self._nested_part(df)) + + def _as_dict_iterable_nested(self, df: Any) -> Iterable[Dict[str, Any]]: + names = list(self._schema.names) + for arr in self._nested_part(df): + yield dict(zip(names, arr)) + + def _as_dicts_nested(self, df: Any) -> List[Dict[str, Any]]: + return list(self._as_dict_iterable_nested(df)) + + def _split_nested(self, schema: pa.Schema) -> None: + cols1: List[pa.Field] = [] + cols2: List[pa.Field] = [] + self._remap: List[Tuple[int, int]] = [] + for field in schema: + if pa.types.is_nested(field.type): + self._remap.append((1, len(cols2))) + cols2.append(field) + else: + self._remap.append((0, len(cols1))) + cols1.append(field) + self._simple_schema = pa.schema(cols1) + self._simple_part: Any = ( + None + if len(cols1) == 0 + else lambda df: self._get_arrow_arrays_simple( + df[self._simple_schema.names], self._simple_schema + ) + ) + self._nested_schema = pa.schema(cols2) + self._converter: Any = ( + None + if len(cols2) == 0 + else _TypeConverter( + pa.schema(cols2), copy=True, deep=True, str_as_json=True + ) + ) + self._nested_part = lambda df: self._get_arrays_nested( + df[self._nested_schema.names], self._nested_schema + ) + + def _remap_arrs(self, *arrs: List[List[Any]]) -> List[Any]: + return [arrs[x[0]][x[1]] for x in self._remap] + + def _get_arrow_arrays_simple( + self, df: Any, schema: pa.Schema + ) -> Iterable[List[Any]]: + p = self._utils.as_arrow(df, schema, True) + d = p.to_pydict() + cols = [d[n] for n in schema.names] + for arr in zip(*cols): + yield list(arr) + + def _get_arrays_nested(self, df: Any, schema: pa.Schema) -> Iterable[List[Any]]: + for item in df.itertuples(index=False, name=None): + yield self._converter.row_to_py(item) diff --git a/slide_dask/utils.py b/slide_dask/utils.py index aaef79e..a5ab674 100644 --- a/slide_dask/utils.py +++ b/slide_dask/utils.py @@ -1,4 +1,4 @@ -from typing import Any, Callable, List, Optional +from typing import Any, Callable, List, Optional, Union import dask.dataframe as dd import numpy as np @@ -8,6 +8,7 @@ from slide.utils import SlideUtils from triad.utils.assertion import assert_or_throw from triad.utils.pyarrow import to_pandas_dtype +from pandas.api.types import is_object_dtype class DaskUtils(SlideUtils[dd.DataFrame, dd.Series]): @@ -33,38 +34,76 @@ def to_series(self, obj: Any, name: Optional[str] = None) -> dd.Series: def series_to_array(self, col: dd.Series) -> List[Any]: return col.compute().tolist() - def to_constant_series( + def scalar_to_series( self, - constant: Any, - from_series: dd.Series, + scalar: Any, + reference: Union[dd.Series, dd.DataFrame], dtype: Any = None, name: Optional[str] = None, ) -> dd.Series: + if pa.types.is_nested(pa.scalar(scalar).type): + assert_or_throw( + dtype is None or is_object_dtype(dtype), + ValueError( + "for nested scalar type, dtype must be None or dtype(object)" + ), + ) + if self.is_series(reference): + return reference.map(lambda _: scalar, meta=(name, dtype)) + else: + return reference[reference.columns[0]].map( + lambda _: scalar, meta=(name, dtype) + ) if dtype is not None: - return from_series.map(lambda _: constant, meta=(name, dtype)) - tdf = from_series.to_frame() + if self.is_series(reference): + return reference.map(lambda _: scalar, meta=(name, dtype)) + else: + return reference[reference.columns[0]].map( + lambda _: scalar, meta=(name, dtype) + ) + tdf = reference.to_frame() if isinstance(reference, dd.Series) else reference tn = name or "_tmp_" - tdf[tn] = constant + tdf[tn] = scalar return tdf[tn] - def cols_to_df( - self, cols: List[Any], names: Optional[List[str]] = None + def cols_to_df( # noqa: C901 + self, + cols: List[Any], + names: Optional[List[str]] = None, + reference: Union[dd.Series, dd.DataFrame, None] = None, ) -> dd.DataFrame: - assert_or_throw( - any(self.is_series(s) for s in cols), - SlideInvalidOperation("at least one value in cols should be series"), - ) + _cols = list(cols) + _ref: Any = None + _nested: List[int] = [] + _ref_idx = -1 + for i in range(len(cols)): + if self.is_series(_cols[i]): + if _ref is None: + _ref = _cols[i] + _ref_idx = i + elif pa.types.is_nested(pa.scalar(_cols[i]).type): + _nested.append(i) + if _ref is None: + assert_or_throw( + reference is not None, + SlideInvalidOperation( + "reference can't be null when all cols are scalars" + ), + ) + _cols[0] = self.scalar_to_series(_cols[0], reference=reference) + _ref = _cols[0] + _ref_idx = 0 + for n in _nested: + if not self.is_series(_cols[n]): + _cols[n] = self.scalar_to_series(_cols[n], reference=_ref) if names is None: col_names: List[str] = [c.name for c in cols] else: col_names = names - for i in range(len(cols)): - if self.is_series(cols[i]): - break - tdf = cols[i].to_frame(col_names[i]) - for j in range(len(cols)): - if i != j: - tdf[col_names[j]] = cols[j] + tdf = _ref.to_frame(col_names[_ref_idx]) + for j in range(len(_cols)): + if _ref_idx != j: + tdf[col_names[j]] = _cols[j] return tdf[col_names] def is_compatile_index(self, df: dd.DataFrame) -> bool: diff --git a/slide_pandas/__init__.py b/slide_pandas/__init__.py index e69de29..ca74f5f 100644 --- a/slide_pandas/__init__.py +++ b/slide_pandas/__init__.py @@ -0,0 +1,2 @@ +# flake8: noqa +from slide_pandas.utils import PandasUtils diff --git a/slide_pandas/utils.py b/slide_pandas/utils.py index 1371826..e079d94 100644 --- a/slide_pandas/utils.py +++ b/slide_pandas/utils.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import Any, Callable, Dict, List, Optional +from typing import Any, Callable, Dict, List, Optional, Union import numpy as np import pandas as pd @@ -32,25 +32,59 @@ def to_series(self, obj: Any, name: Optional[str] = None) -> pd.Series: def series_to_array(self, col: pd.Series) -> List[Any]: return col.tolist() - def to_constant_series( + def scalar_to_series( self, - constant: Any, - from_series: pd.Series, + scalar: Any, + reference: Union[pd.Series, pd.DataFrame], dtype: Any = None, name: Optional[str] = None, ) -> pd.Series: - return pd.Series(constant, index=from_series.index, dtype=dtype, name=name) + if pa.types.is_nested(pa.scalar(scalar).type): + assert_or_throw( + dtype is None or is_object_dtype(dtype), + ValueError( + "for nested scalar type, dtype must be None or dtype(object)" + ), + ) + if self.is_series(reference): + res = reference.map(lambda _: scalar) + else: + res = reference[reference.columns[0]].map(lambda _: scalar) + if name is not None: + res = res.rename(name) + return res + return pd.Series(scalar, index=reference.index, dtype=dtype, name=name) def cols_to_df( - self, cols: List[pd.Series], names: Optional[List[str]] = None + self, + cols: List[Any], + names: Optional[List[str]] = None, + reference: Union[pd.Series, pd.DataFrame, None] = None, ) -> pd.DataFrame: - assert_or_throw( - any(self.is_series(s) for s in cols), - SlideInvalidOperation("at least one value in cols should be series"), - ) + _cols = list(cols) + _ref: Any = None + _nested: List[int] = [] + for i in range(len(cols)): + if self.is_series(_cols[i]): + if _ref is None: + _ref = _cols[i] + elif pa.types.is_nested(pa.scalar(_cols[i]).type): + _nested.append(i) + if _ref is None: + assert_or_throw( + reference is not None, + SlideInvalidOperation( + "reference can't be null when all cols are scalars" + ), + ) + _cols[0] = self.scalar_to_series(_cols[0], reference=reference) + _ref = _cols[0] + for n in _nested: + if not self.is_series(_cols[n]): + _cols[n] = self.scalar_to_series(_cols[n], reference=_ref) if names is None: - return pd.DataFrame({c.name: c for c in cols}) - return pd.DataFrame(dict(zip(names, cols))) + return pd.DataFrame({c.name: c for c in _cols}) + return pd.DataFrame(dict(zip(names, _cols))) def as_pandas(self, df: pd.DataFrame) -> pd.DataFrame: return df diff --git a/slide_test/suite.py b/slide_test/suite.py index 954412b..c287f53 100644 --- a/slide_test/suite.py +++ b/slide_test/suite.py @@ -1,5 +1,4 @@ import json -import pickle from datetime import date, datetime from typing import Any from unittest import TestCase @@ -93,16 +92,45 @@ def test_to_series(self): pd.DataFrame(dict(x=[0, 1], y=[2, 3], z=[4, 5], w=[2, 3])), ) - def test_to_constant_series(self): + def test_scalar_to_series(self): s = self.utils.to_series(pd.Series([0, 1], name="x")) - s1 = self.utils.to_constant_series("a", s, name="y") - s2 = self.utils.to_constant_series(None, s, name="z", dtype="float64") + s1 = self.utils.scalar_to_series("a", s, name="y") + s2 = self.utils.scalar_to_series(None, s, name="z", dtype="float64") df = self.utils.cols_to_df([s, s1, s2]) assert_pdf_eq( self.to_pd(df), pd.DataFrame(dict(x=[0, 1], y=["a", "a"], z=[None, None])), ) + s = self.to_df(pd.DataFrame(dict(x=pd.Series([0, 1])))) + s1 = self.utils.scalar_to_series("a", s, name="y") + s2 = self.utils.scalar_to_series(None, s, name="z", dtype="float64") + df = self.utils.cols_to_df([s1, s2]) + assert_pdf_eq( + self.to_pd(df), + pd.DataFrame(dict(y=["a", "a"], z=[None, None])), + ) + + s = self.utils.to_series(pd.Series([0, 1], name="x")) + s1 = self.utils.scalar_to_series({"x": 1}, s, name="y") + s2 = self.utils.scalar_to_series( + [1, 2], s, name="z", dtype=np.dtype(object) + ) + df = self.utils.cols_to_df([s1, s2]) + assert [[{"x": 1}, [1, 2]], [{"x": 1}, [1, 2]]] == self.to_pd( + df + ).values.tolist() + + s = self.to_df(pd.DataFrame(dict(x=pd.Series([0, 1])))) + s1 = self.utils.scalar_to_series( + {"x": 1}, s, name="y", dtype=np.dtype(object) + ) + s2 = self.utils.scalar_to_series([1, 2], s, name="z") + df = self.utils.cols_to_df([s1, s2]) + assert [[{"x": 1}, [1, 2]], [{"x": 1}, [1, 2]]] == self.to_pd( + df + ).values.tolist() + def test_get_col_pa_type(self): df = self.to_df( [["a", 1, 1.1, True, datetime.now()]], @@ -306,14 +334,20 @@ def test_(pdf: pd.DataFrame, op: str): df["j"] = self.utils.binary_logical_op(True, None, op) df["k"] = self.utils.binary_logical_op(False, None, op) df["l"] = self.utils.binary_logical_op(None, None, op) + df["m"] = self.utils.binary_logical_op(None, True, op) + df["n"] = self.utils.binary_logical_op(None, False, op) + df["o"] = self.utils.binary_logical_op(df.a, None, op) + df["p"] = self.utils.binary_logical_op(None, df.b, op) assert_duck_eq( - self.to_pd(df[list("defghijkl")]), + self.to_pd(df[list("defghijklmnop")]), f""" SELECT a {op} b AS d, a {op} TRUE AS e, TRUE {op} b AS f, a {op} FALSE AS g, FALSE {op} b AS h, TRUE {op} FALSE AS i, - TRUE {op} NULL AS j, FALSE {op} NULL AS k, NULL {op} NULL AS l + TRUE {op} NULL AS j, FALSE {op} NULL AS k, NULL {op} NULL AS l, + NULL {op} TRUE AS m, NULL {op} FALSE AS n, + a {op} NULL AS o, NULL {op} b AS p FROM pdf """, pdf=pdf, @@ -1116,7 +1150,7 @@ def test_like_sql(self): check_order=False, ) - def test_cast_constant(self): + def test_cast_scalar(self): assert self.utils.cast(None, bool) is None assert self.utils.cast(True, bool) assert not self.utils.cast(False, bool) @@ -1793,6 +1827,42 @@ def test_cast_time(self): check_order=False, ) + def test_cast_nested(self): + # happy path + pdf = pd.DataFrame(dict(a=[None, [1, 2]], b=[{"d": "x"}, None])) + + schema = Schema("a:[int],b:{d:str}").pa_schema + df = self.to_df(pdf, "a:[int],b:{d:str}") + df["h"] = self.utils.cast(df.a, schema[0].type) + df["i"] = self.utils.cast(df.b, schema[1].type, schema[1].type) + + assert [[None, {"d": "x"}], [[1, 2], None]] == self.to_pd( + df[list("hi")] + ).values.tolist() + + with raises(SlideCastError): + df["j"] = self.utils.cast(df.a, schema[0].type, str) + + with raises(SlideCastError): + df["j"] = self.utils.cast(df.a, str, schema[0].type) + + def test_cast_binary(self): + # happy path + pdf = pd.DataFrame(dict(a=[None, b"\0abc"])) + + schema = Schema("a:binary").pa_schema + coerce = pd.__version__ >= "1.2" + df = self.to_df(pdf, "a:binary", coerce=coerce) + df["h"] = self.utils.cast(df.a, schema[0].type) + + assert [[None], [b"\0abc"]] == self.to_pd(df[["h"]]).values.tolist() + + with raises(SlideCastError): + df["j"] = self.utils.cast(df.a, schema[0].type, str) + + with raises(SlideCastError): + df["j"] = self.utils.cast(df.a, str, schema[0].type) + def test_cast_df(self): a = pd.DataFrame(dict(a=[1, 2, None], b=[True, None, False])) df = self.utils.cast_df( @@ -1850,6 +1920,25 @@ def test_cols_to_df(self): with raises(SlideInvalidOperation): res = self.utils.cols_to_df([123, 456], names=["x", "y"]) + res = self.utils.cols_to_df([123, 456], names=["x", "y"], reference=df) + assert_pdf_eq( + self.to_pd(res), self.to_pd(self.to_df([[123, 456]], "x:long,y:long")) + ) + + # has nested type, all scalars + res = self.utils.cols_to_df([[1, 2], 456], names=["x", "y"], reference=df) + assert [[[1, 2], 456]] == self.to_pd(res).values.tolist() + + res = self.utils.cols_to_df([456, [1, 2]], names=["x", "y"], reference=df) + assert [[456, [1, 2]]] == self.to_pd(res).values.tolist() + + # has nested type, and series + res = self.utils.cols_to_df([[1, 2], df["a"]], names=["x", "y"]) + assert [[[1, 2], "a"]] == self.to_pd(res).values.tolist() + + res = self.utils.cols_to_df([df["a"], [1, 2]], names=["x", "y"]) + assert [["a", [1, 2]]] == self.to_pd(res).values.tolist() + def test_to_schema(self): df = self.to_df([[1.0, 2], [2.0, 3]]) raises(ValueError, lambda: self.utils.to_schema(df)) @@ -1892,130 +1981,170 @@ def test_index_compatible(self): df = df.reset_index(drop=True) self.utils.ensure_compatible(df) - def test_as_array_iterable(self): + def test_converter_not_safe(self): schema = Schema("a:str,b:int").pa_schema + c = self.utils.create_native_converter(schema, type_safe=False) df = self.to_df([], "a:str,b:int") - assert [] == self.utils.as_array(df, schema) - assert [] == self.utils.as_array(df, schema, type_safe=True) + assert [] == c.as_arrays(df) + assert [] == list(c.as_array_iterable(df)) + assert [] == c.as_dicts(df) + assert [] == list(c.as_dict_iterable(df)) - df = self.to_df([["a", 1]], "a:str,b:int") - assert [["a", 1]] == self.utils.as_array(df, schema) - assert [["a", 1]] == self.utils.as_array(df, schema, columns=["a", "b"]) - assert [[1, "a"]] == self.utils.as_array(df, schema, columns=["b", "a"]) + df = self.to_df([["xx", 123]], "a:str,b:int") + assert [["xx", 123]] == c.as_arrays(df) + assert [["xx", 123]] == list(c.as_array_iterable(df)) + assert [{"a": "xx", "b": 123}] == c.as_dicts(df) + assert [{"a": "xx", "b": 123}] == list(c.as_dict_iterable(df)) - # prevent pandas auto type casting schema = Schema("a:double,b:int").pa_schema - df = self.to_df([[1.0, 1.0]], "a:double,b:int") - data = self.utils.as_array(df, schema) - assert [[1.0, 1]] == data - assert isinstance(data[0][0], float) - assert isinstance(data[0][1], int) - assert [[1.0, 1]] == self.utils.as_array(df, schema, columns=["a", "b"]) - assert [[1, 1.0]] == self.utils.as_array(df, schema, columns=["b", "a"]) - - df = self.to_df([[np.float64(1.0), 1.0]], "a:double,b:int") - assert [[1.0, 1]] == self.utils.as_array(df, schema) - assert isinstance(self.utils.as_array(df, schema)[0][0], float) - assert isinstance(self.utils.as_array(df, schema)[0][1], int) - - schema = Schema("a:datetime,b:int").pa_schema + c = self.utils.create_native_converter(schema, type_safe=False) + df = self.to_df([[1.1, 1.1]], "a:double,b:double") + res = c.as_arrays(df) + assert [[1.1, 1.1]] == res + assert isinstance(res[0][1], float) + + schema = Schema("a:datetime,b:date").pa_schema + c = self.utils.create_native_converter(schema, type_safe=False) df = self.to_df( - [[pd.Timestamp("2020-01-01"), 1.0]], - "a:datetime,b:int", - ) - assert [[datetime(2020, 1, 1), 1]] == self.utils.as_array(df, schema) - assert isinstance( - self.utils.as_array(df, schema, type_safe=True)[0][0], datetime - ) - assert isinstance( - self.utils.as_array(df, schema, type_safe=True)[0][1], int - ) - - df = self.to_df([[pd.NaT, 1.0]], "a:datetime,b:int") - assert self.utils.as_array(df, schema, type_safe=True)[0][0] is None - assert isinstance( - self.utils.as_array(df, schema, type_safe=True)[0][1], int - ) - - schema = Schema("a:double,b:int").pa_schema - df = self.to_df([[1.0, 1.0]], "a:double,b:int") - assert [[1.0, 1]] == self.utils.as_array(df, schema, type_safe=True) - assert isinstance(self.utils.as_array(df, schema)[0][0], float) - assert isinstance(self.utils.as_array(df, schema)[0][1], int) - - def test_as_array_iterable_datetime(self): + [[datetime(2020, 1, 1), date(2020, 1, 2)]], "a:datetime,b:date" + ) + res = c.as_arrays(df) + assert [[datetime(2020, 1, 1), date(2020, 1, 2)]] == res + + def test_converter_simple(self): + schema = Schema("a:bool,b:int").pa_schema + c = self.utils.create_native_converter(schema, type_safe=True) + df = self.to_df([[True, None], [None, 1], [None, None]], "a:bool,b:int") + res = c.as_arrays(df) + expected = [[True, None], [None, 1], [None, None]] + assert expected == res + res = list(c.as_array_iterable(df)) + assert expected == res + res = c.as_dicts(df) + assert [dict(zip(["a", "b"], x)) for x in expected] == res + res = list(c.as_dict_iterable(df)) + assert [dict(zip(["a", "b"], x)) for x in expected] == res + + schema = Schema("a:str,b:double").pa_schema + c = self.utils.create_native_converter(schema, type_safe=True) + df = self.to_df([["ab", None], [None, 1.1], [None, None]], "a:str,b:double") + res = list(c.as_array_iterable(df)) + expected = [["ab", None], [None, 1.1], [None, None]] + assert expected == res + res = list(c.as_array_iterable(df)) + assert expected == res + res = c.as_dicts(df) + assert [dict(zip(["a", "b"], x)) for x in expected] == res + res = list(c.as_dict_iterable(df)) + assert [dict(zip(["a", "b"], x)) for x in expected] == res + + schema = Schema("a:datetime,b:date").pa_schema + c = self.utils.create_native_converter(schema, type_safe=True) df = self.to_df( - [[datetime(2020, 1, 1, 2, 3, 4, 5), date(2020, 2, 2)]], - columns="a:datetime,b:date", - ) - v1 = list( - self.utils.as_array_iterable( - df, schema=expression_to_schema("a:datetime,b:date"), type_safe=True - ) - )[0] - assert not isinstance(v1[0], pd.Timestamp) - assert isinstance(v1[0], datetime) - assert isinstance(v1[1], date) - - def test_nested(self): - # data = [[dict(b=[30, "40"])]] - # s = expression_to_schema("a:{a:str,b:[int]}") - # df = self.to_df(data, "a:{a:str,b:[int]}") - # a = df.as_array(type_safe=True) - # assert [[dict(a=None, b=[30, 40])]] == a - - data = [[[json.dumps(dict(b=[30, "40"]))]]] - df = self.to_df(data, "a:[{a:str,b:[int]}]", coerce=False) - a = self.utils.as_array( - df, schema=Schema("a:[{a:str,b:[int]}]").pa_schema, type_safe=True - ) - assert [[[dict(a=None, b=[30, 40])]]] == a + [[datetime(2020, 1, 1), datetime(2020, 1, 2)]], "a:datetime,b:date" + ) + res = c.as_arrays(df) + expected = [[datetime(2020, 1, 1), date(2020, 1, 2)]] + assert expected == res + assert isinstance(res[0][0], datetime) + assert isinstance(res[0][1], date) + res = list(c.as_array_iterable(df)) + assert expected == res + res = c.as_dicts(df) + assert [dict(zip(["a", "b"], x)) for x in expected] == res + res = list(c.as_dict_iterable(df)) + assert [dict(zip(["a", "b"], x)) for x in expected] == res + + def test_converter_nested(self): + data = [[dict(b=[30, "40"])]] + schema = expression_to_schema("a:{a:str,b:[int]}") + c = self.utils.create_native_converter(schema, type_safe=True) + df = self.to_df(data, "a:{a:str,b:[int]}") + a = c.as_arrays(df) + assert [[dict(a=None, b=[30, 40])]] == a + a = list(c.as_array_iterable(df)) + assert [[dict(a=None, b=[30, 40])]] == a + a = c.as_dicts(df) + assert [{"a": dict(a=None, b=[30, 40])}] == a + a = list(c.as_dict_iterable(df)) + assert [{"a": dict(a=None, b=[30, 40])}] == a + + data = [[json.dumps(dict(b=[30, "40"]))]] + df = self.to_df(data, "a:str") + a = c.as_arrays(df) + assert [[dict(a=None, b=[30, 40])]] == a + + data = [[["1", 2]]] + schema = expression_to_schema("a:[int]") + c = self.utils.create_native_converter(schema, type_safe=True) + df = self.to_df(data, "a:[int]") + a = c.as_arrays(df) + assert [[[1, 2]]] == a data = [[json.dumps(["1", 2])]] - df = self.to_df(data, "a:[int]", coerce=False) - a = self.utils.as_array( - df, schema=Schema("a:[int]").pa_schema, type_safe=True - ) + schema = expression_to_schema("a:[int]") + c = self.utils.create_native_converter(schema, type_safe=True) + df = self.to_df(data, "a:str") + a = c.as_arrays(df) assert [[[1, 2]]] == a - def test_binary(self): - b = pickle.dumps("xyz") - data = [[b, b"xy"]] - df = self.to_df(data, "a:bytes,b:bytes") - a = self.utils.as_array( - df, schema=Schema("a:bytes,b:bytes").pa_schema, type_safe=True - ) - assert [[b, b"xy"]] == a - - def test_nan_none(self): + def test_converter_binary(self): + data = [[b"\0abc"]] + schema = expression_to_schema("a:binary") + c = self.utils.create_native_converter(schema, type_safe=True) + df = self.to_df(data, "a:binary") + a = c.as_arrays(df) + assert [[b"\0abc"]] == a + a = list(c.as_array_iterable(df)) + assert [[b"\0abc"]] == a + a = c.as_dicts(df) + assert [{"a": b"\0abc"}] == a + a = list(c.as_dict_iterable(df)) + assert [{"a": b"\0abc"}] == a + + def test_converter_hybrid(self): + data = [[b"\0abc", 1, [1, 2], "ab"]] + schema = expression_to_schema("a:binary,b:long,c:[long],d:str") + c = self.utils.create_native_converter(schema, type_safe=True) + df = self.to_df(data, "a:binary,b:long,c:[long],d:str") + a = c.as_arrays(df) + assert data == a + a = list(c.as_array_iterable(df)) + assert data == a + a = c.as_dicts(df) + assert [{"a": b"\0abc", "b": 1, "c": [1, 2], "d": "ab"}] == a + a = list(c.as_dict_iterable(df)) + assert [{"a": b"\0abc", "b": 1, "c": [1, 2], "d": "ab"}] == a + + def test_converter_nan_none(self): schema = Schema("b:str,c:double").pa_schema + c = self.utils.create_native_converter(schema, type_safe=True) df = self.to_df([[None, None]], "b:str,c:double") - arr = self.utils.as_array(df, schema, type_safe=True)[0] - assert arr[0] is None - assert arr[1] is None - - df = self.to_df([], "b:str,c:double") - assert len(self.utils.as_array(df, schema)) == 0 + a = c.as_arrays(df) + assert a[0][0] is None + assert a[0][1] is None schema = Schema("b:int,c:bool").pa_schema + c = self.utils.create_native_converter(schema, type_safe=True) df = self.to_df([[None, None]], "b:int,c:bool") - arr = self.utils.as_array(df, schema, type_safe=True)[0] - assert arr[0] is None - assert arr[1] is None + a = c.as_arrays(df) + assert a[0][0] is None + assert a[0][1] is None - def test_boolean_enforce(self): + def test_converter_boolean(self): schema = Schema("b:int,c:bool").pa_schema + c = self.utils.create_native_converter(schema, type_safe=True) df = self.to_df([[1, True], [2, False], [3, None]], "b:int,c:bool") - arr = self.utils.as_array(df, schema, type_safe=True) - assert [[1, True], [2, False], [3, None]] == arr + a = c.as_arrays(df) + assert [[1, True], [2, False], [3, None]] == a - df = self.to_df([[1, 1], [2, 0]], "b:int,c:bool") - arr = self.utils.as_array(df, schema, type_safe=True) - assert [[1, True], [2, False]] == arr + df = self.to_df([[1, 1], [2, 0]], "b:int,c:int") + a = c.as_arrays(df) + assert [[1, True], [2, False]] == a - df = self.to_df([[1, 1.0], [2, 0.0]], "b:int,c:bool") - arr = self.utils.as_array(df, schema, type_safe=True) - assert [[1, True], [2, False]] == arr + df = self.to_df([[1, 1.0], [2, 0.0]], "b:int,c:double") + a = c.as_arrays(df) + assert [[1, True], [2, False]] == a def test_sql_group_by_apply(self): df = self.to_df([["a", 1], ["a", 2], [None, 3]], "b:str,c:long") @@ -2589,3 +2718,18 @@ def test_join_multi_sql(self): b=b, c=c, ) + + def test_drop_select_columns(self): + pdf = pd.DataFrame([[0, 1, 2], [3, 4, 5]], columns=["a", "b", "c"]) + df = self.to_df(pdf) + assert_pdf_eq( + self.to_pd(self.utils.select_columns(df, ["a", "c"])), + pd.DataFrame([[0, 2], [3, 5]], columns=["a", "c"]), + check_order=False, + ) + + assert_pdf_eq( + self.to_pd(self.utils.drop_columns(df, ["a", "c"])), + pd.DataFrame([[1], [4]], columns=["b"]), + check_order=False, + ) diff --git a/tests/slide/operators/test_execution_plan_df.py b/tests/slide/operators/test_execution_plan_df.py new file mode 100644 index 0000000..147bda4 --- /dev/null +++ b/tests/slide/operators/test_execution_plan_df.py @@ -0,0 +1,196 @@ +import pandas as pd +from slide.operators.execution_plan import ExecutionPlan +from slide.operators.graph import Context +from slide_pandas import PandasUtils +from slide_test.utils import assert_duck_eq +from triad import Schema + + +def test_select(): + plan = ExecutionPlan() + df = plan.df("a", Schema("a:long,b:double").pa_schema) + assert Schema(df.output_schema) == "a:long,b:double" + a = plan.col(df, "a") + ln = len(plan) + plan.col(df, "a") + assert ln == len(plan) # dedup + b = plan.col(df, "b") + c = plan.binary("+", a, b) + df = plan.cols_to_df(a, b, (c, "c"), reference=df) + assert Schema(df.output_schema) == "a:long,b:double,c:double" + plan.output(df) + assert Schema(plan.output_schema) == "a:long,b:double,c:double" + + ctx = Context(PandasUtils()) + pdf = pd.DataFrame([[0, 1.2], [2, 3.1]], columns=["a", "b"]) + ctx["a"] = pdf + plan.execute(ctx) + + assert_duck_eq( + ctx.output, + "SELECT a, b, a+b AS c FROM a", + a=pdf, + check_order=False, + ) + + +def test_union(): + plan = ExecutionPlan() + df1 = plan.df("a", Schema("a:uint32,b:int,c:bool").pa_schema) + df2 = plan.df("b", Schema("aa:int8,bb:double,cc:str").pa_schema) + df = plan.union(df1, df2) + assert Schema(df.output_schema) == "a:long,b:double,c:str" + plan.output(df) + assert Schema(plan.output_schema) == "a:long,b:double,c:str" + + ctx = Context(PandasUtils()) + pdf1 = pd.DataFrame([[0, None, True], [2, 3, False]], columns=["a", "b", "c"]) + ctx["a"] = pdf1 + pdf2 = pd.DataFrame([[-1, 1.1, "x"], [-2, 3.1, None]], columns=["aa", "bb", "cc"]) + ctx["b"] = pdf2 + plan.execute(ctx) + + assert_duck_eq( + ctx.output, + "SELECT a, b, c FROM a UNION SELECT aa,bb,cc FROM b", + a=pdf1, + b=pdf2, + check_order=False, + ) + + +def test_union_all(): + plan = ExecutionPlan() + df1 = plan.df("a", Schema("a:uint32,b:int,c:bool").pa_schema) + df = plan.union(df1, df1, distinct=False) + assert Schema(df.output_schema) == Schema("a:uint32,b:int,c:bool") + plan.output(df) + assert Schema(plan.output_schema) == Schema("a:uint32,b:int,c:bool") + + ctx = Context(PandasUtils()) + pdf1 = pd.DataFrame([[0, None, True], [2, 3, False]], columns=["a", "b", "c"]) + ctx["a"] = pdf1 + plan.execute(ctx) + + assert_duck_eq( + ctx.output, + "SELECT a, b, c FROM a UNION ALL SELECT a, b, c FROM a", + a=pdf1, + check_order=False, + ) + + +def test_except(): + + plan = ExecutionPlan() + df1 = plan.df("a", Schema("a:uint32,b:int").pa_schema) + df2 = plan.df("b", Schema("a:long,b:long").pa_schema) + df = plan.except_df(df1, df2) + assert Schema(df.output_schema) == Schema("a:uint32,b:int") + plan.output(df) + assert Schema(plan.output_schema) == Schema("a:uint32,b:int") + + ctx = Context(PandasUtils()) + pdf1 = pd.DataFrame([[1, 2], [2, 3], [1, 2], [2, 3]], columns=["a", "b"]) + ctx["a"] = pdf1 + pdf2 = pd.DataFrame([[2, 3], [4, 5]], columns=["a", "b"]) + ctx["b"] = pdf2 + plan.execute(ctx) + + assert_duck_eq( + ctx.output, + "SELECT a, b FROM a EXCEPT SELECT a, b FROM b", + a=pdf1, + b=pdf2, + check_order=False, + ) + + +def test_intersect(): + plan = ExecutionPlan() + df1 = plan.df("a", Schema("a:uint32,b:int").pa_schema) + df2 = plan.df("b", Schema("a:long,b:long").pa_schema) + df = plan.intersect(df1, df2) + assert Schema(df.output_schema) == Schema("a:uint32,b:int") + plan.output(df) + assert Schema(plan.output_schema) == Schema("a:uint32,b:int") + + ctx = Context(PandasUtils()) + pdf1 = pd.DataFrame([[1, 2], [2, 3], [1, 2], [2, 3]], columns=["a", "b"]) + ctx["a"] = pdf1 + pdf2 = pd.DataFrame([[2, 3], [4, 5]], columns=["a", "b"]) + ctx["b"] = pdf2 + plan.execute(ctx) + + assert_duck_eq( + ctx.output, + "SELECT a, b FROM a INTERSECT SELECT a, b FROM b", + a=pdf1, + b=pdf2, + check_order=False, + ) + + +def test_filter(): + plan = ExecutionPlan() + df = plan.df("a", Schema("a:long,b:bool,c:bool").pa_schema) + df = plan.filter_df(df, "c", drop=False) + assert Schema(df.output_schema) == Schema("a:long,b:bool,c:bool") + plan.output(df) + assert Schema(plan.output_schema) == Schema("a:long,b:bool,c:bool") + + ctx = Context(PandasUtils()) + pdf1 = pd.DataFrame([[1, True, False], [2, True, True]], columns=["a", "b", "c"]) + ctx["a"] = pdf1 + plan.execute(ctx) + + assert_duck_eq( + ctx.output, + "SELECT a,b,c FROM a WHERE c", + a=pdf1, + check_order=False, + ) + + plan = ExecutionPlan() + df = plan.df("a", Schema("a:long,b:bool,c:bool").pa_schema) + df = plan.filter_df(df, "b") + assert Schema(df.output_schema) == Schema("a:long,c:bool") + plan.output(df) + assert Schema(plan.output_schema) == Schema("a:long,c:bool") + + ctx = Context(PandasUtils()) + pdf1 = pd.DataFrame([[1, True, False], [2, True, True]], columns=["a", "b", "c"]) + ctx["a"] = pdf1 + plan.execute(ctx) + + assert_duck_eq( + ctx.output, + "SELECT a,c FROM a WHERE b", + a=pdf1, + check_order=False, + ) + + +def test_join(): + plan = ExecutionPlan() + df1 = plan.df("a", Schema("a:uint32,b:int").pa_schema) + df2 = plan.df("b", Schema("a:long,c:long").pa_schema) + df = plan.join(df1, df2, "inner") + assert Schema(df.output_schema) == Schema("b:int,c:long") + plan.output(df) + assert Schema(plan.output_schema) == Schema("b:int,c:long") + + ctx = Context(PandasUtils()) + pdf1 = pd.DataFrame([[1, 2], [2, 3], [1, 2], [2, 3]], columns=["a", "b"]) + ctx["a"] = pdf1 + pdf2 = pd.DataFrame([[2, 3], [-4, 5]], columns=["a", "c"]) + ctx["b"] = pdf2 + plan.execute(ctx) + + assert_duck_eq( + ctx.output, + "SELECT a.b, b.c FROM a INNER JOIN b ON a.a = b.a", + a=pdf1, + b=pdf2, + check_order=False, + ) diff --git a/tests/slide/operators/test_execution_plan_map.py b/tests/slide/operators/test_execution_plan_map.py new file mode 100644 index 0000000..a24ec29 --- /dev/null +++ b/tests/slide/operators/test_execution_plan_map.py @@ -0,0 +1,297 @@ +import pandas as pd +import pyarrow as pa +from pytest import raises +from slide.operators.execution_plan import ExecutionPlan +from slide.operators.graph import Context, Operator +from slide_pandas import PandasUtils +from slide_test.utils import assert_duck_eq +from triad import Schema + + +def test_col_op(): + def f(plan: ExecutionPlan, df: Operator): + col0 = plan.col(df, "c") + assert pa.bool_() == col0.output_type + assert "c" == col0.output_name + col1 = plan.col(df, "a") + assert pa.uint32() == col1.output_type + assert "a" == col1.output_name + col2 = plan.col(df, "b") + assert pa.float32() == col2.output_type + assert "b" == col2.output_name + + return col0, col1, col2 + + pdf = pd.DataFrame([[0, 1.1, True], [3, 4.1, False]], columns=["a", "b", "c"]) + assert_duck_eq( + run_plan(pdf, "a:uint,b:float32,c:bool", f), + "SELECT c, a, b FROM a", + a=pdf, + check_order=False, + ) + + +def test_lit_op(): + def f(plan: ExecutionPlan, df: Operator): + col0 = plan.lit(None) + assert pa.null() == col0.output_type + col1 = plan.lit("abc") + assert pa.string() == col1.output_type + col2 = plan.lit(1, pa.uint8()) + assert pa.uint8() == col2.output_type + col3 = plan.col(df, "a") + + return (col1, "x"), (col2, "y"), col3 + + pdf = pd.DataFrame([[0, 1.1, True], [3, 4.1, False]], columns=["a", "b", "c"]) + assert_duck_eq( + run_plan(pdf, "a:uint,b:float32,c:bool", f), + "SELECT 'abc' AS x, 1 AS y, a FROM a", + a=pdf, + check_order=False, + ) + + +def test_pure_lit_op(): + def f(plan: ExecutionPlan, df: Operator): + col0 = plan.lit(None) + assert pa.null() == col0.output_type + col1 = plan.lit("abc") + assert pa.string() == col1.output_type + col2 = plan.lit(1, pa.uint8()) + assert pa.uint8() == col2.output_type + col3 = plan.lit(b"\0abc") + assert pa.binary() == col3.output_type + col4 = plan.lit([1, 2]) + assert pa.types.is_nested(col4.output_type) + + return (col1, "a"), (col2, "b"), (col3, "c"), (col4, "d") + + pdf = pd.DataFrame([[0, 1.1, True], [3, 4.1, False]], columns=["a", "b", "c"]) + res = run_plan(pdf, "a:uint,b:float32,c:bool", f) + expected = [["abc", 1, b"\0abc", [1, 2]], ["abc", 1, b"\0abc", [1, 2]]] + + assert expected == res.astype(object).values.tolist() + + +def test_unary_op(): + def f(plan: ExecutionPlan, df: Operator): + col0 = plan.col(df, "c") + col1 = plan.col(df, "a") + col2 = plan.col(df, "b") + col3 = plan.unary("+", col1) + assert pa.uint32() == col3.output_type + assert "a" == col3.output_name + col4 = plan.unary("-", col1) + assert pa.int64() == col4.output_type + col5 = plan.unary("+", col2) + assert pa.float32() == col5.output_type + col6 = plan.unary("-", col2) + assert pa.float32() == col6.output_type + + raises(ValueError, lambda: plan.unary("-", col0)) + raises(ValueError, lambda: plan.unary("+", col0)) + raises(ValueError, lambda: plan.unary("~", col1)) + raises(ValueError, lambda: plan.unary("~", col2)) + + col10 = plan.unary("~", col0) + return (col3, "c3"), (col4, "c4"), (col5, "c5"), (col6, "c6"), (col10, "c10") + + pdf = pd.DataFrame([[0, 1.1, True], [3, 4.1, False]], columns=["a", "b", "c"]) + assert_duck_eq( + run_plan(pdf, "a:uint,b:float32,c:bool", f), + """ + SELECT + a AS c3, -a AS c4, + b AS c5, -b AS c6, + NOT c AS c10 + FROM a + """, + a=pdf, + check_order=False, + ) + + +def test_binary_op_num(): + def f(plan: ExecutionPlan, df: Operator): + col1 = plan.col(df, "a") + col2 = plan.col(df, "b") + cola = plan.binary("+", col1, col1) + assert pa.int64() == cola.output_type + colb = plan.binary("-", col1, col1) + assert pa.int64() == colb.output_type + colc = plan.binary("*", col1, col1) + assert pa.int64() == colc.output_type + cold = plan.binary("/", col1, plan.lit(2)) + assert pa.int64() == cold.output_type + + cole = plan.binary("+", col1, col2) + assert pa.float64() == cole.output_type + colf = plan.binary("-", col1, col2) + assert pa.float64() == colf.output_type + colg = plan.binary("*", col1, col2) + assert pa.float64() == colg.output_type + colh = plan.binary("/", col1, col2) + assert pa.float64() == colh.output_type + + coli = plan.binary("+", col2, col1) + assert pa.float64() == coli.output_type + colj = plan.binary("-", col2, col1) + assert pa.float64() == colj.output_type + colk = plan.binary("*", col2, col1) + assert pa.float64() == colk.output_type + coll = plan.binary("/", col2, col1) + assert pa.float64() == coll.output_type + + colm = plan.binary("+", col2, col2) + assert pa.float64() == colm.output_type + coln = plan.binary("-", col2, col2) + assert pa.float64() == coln.output_type + colo = plan.binary("*", col2, col2) + assert pa.float64() == colo.output_type + colp = plan.binary("/", col2, col2) + assert pa.float64() == colp.output_type + + return ( + (cola, "a"), + (colb, "b"), + (colc, "c"), + (cold, "d"), + (cole, "e"), + (colf, "f"), + (colg, "g"), + (colh, "h"), + (coli, "i"), + (colj, "j"), + (colk, "k"), + (coll, "l"), + (colm, "m"), + (coln, "n"), + (colo, "o"), + (colp, "p"), + ) + + pdf = pd.DataFrame([[1, 1.1], [3, 4.1]], columns=["a", "b"]) + assert_duck_eq( + run_plan(pdf, "a:uint,b:float32", f), + """ + SELECT + a+a AS a, a-a AS b, a*a AS c, a/2 AS d, + a+b AS e, a-b AS f, a*b AS g, a/b AS h, + b+a AS i, b-a AS j, b*a AS k, b/a AS l, + b+b AS m, b-b AS n, b*b AS o, b/b AS p + FROM a + """, + a=pdf, + check_order=False, + ) + + +def test_binary_op_logical(): + def f(plan: ExecutionPlan, df: Operator): + col1 = plan.col(df, "a") + col2 = plan.col(df, "b") + cola = plan.binary("&", col1, col2) + assert pa.bool_() == cola.output_type + colb = plan.binary("|", col1, col2) + assert pa.bool_() == colb.output_type + colc = plan.binary("&", col1, plan.lit(True)) + assert pa.bool_() == colc.output_type + cold = plan.binary("&", col1, plan.lit(False)) + assert pa.bool_() == cold.output_type + cole = plan.binary("&", col1, plan.lit(None)) + assert pa.bool_() == cole.output_type + colf = plan.binary("|", col1, plan.lit(True)) + assert pa.bool_() == colf.output_type + colg = plan.binary("|", col1, plan.lit(False)) + assert pa.bool_() == colg.output_type + colh = plan.binary("|", col1, plan.lit(None)) + assert pa.bool_() == colh.output_type + + return ( + (cola, "a"), + (colb, "b"), + (colc, "c"), + (cold, "d"), + (cole, "e"), + (colf, "f"), + (colg, "g"), + (colh, "h"), + ) + + pdf = pd.DataFrame( + [ + [True, True], + [True, False], + [True, None], + [False, True], + [False, False], + [False, None], + [None, True], + [None, False], + [None, None], + ], + columns=["a", "b"], + ) + assert_duck_eq( + run_plan(pdf, "a:bool,b:bool", f), + """ + SELECT + a AND b AS a, a OR b AS b, + a AND TRUE AS c, a AND FALSE AS d, a AND NULL AS e, + a OR TRUE AS f, a OR FALSE AS g, a OR NULL AS h + FROM a + """, + a=pdf, + check_order=False, + ) + + +def test_binary_op_logical_2(): + def f(plan: ExecutionPlan, df: Operator, sql): + output = [] + n = 0 + for op in ["&", "|"]: + for left in [True, False, None]: + for right in [True, False, None]: + name = f"_{n}" + col = plan.binary(op, plan.lit(left), plan.lit(right)) + assert pa.bool_() == col.output_type + output.append((col, name)) + ls = "NULL" if left is None else str(left).upper() + rs = "NULL" if right is None else str(right).upper() + o = "AND" if op == "&" else "OR" + sql.append(f"{ls} {o} {rs} AS {name}") + n += 1 + return output + + pdf = pd.DataFrame( + [ + [True, True], + [True, False], + ], + columns=["a", "b"], + ) + sql = [] + res = run_plan(pdf, "a:bool,b:bool", lambda a, b: f(a, b, sql)) + _sql = ", ".join(sql) + assert_duck_eq( + res, + f"SELECT {_sql} FROM a", + a=pdf, + check_order=False, + ) + + +def run_plan(pdf, schema, plan_func): + plan = ExecutionPlan() + df = plan.df("a", Schema(schema).pa_schema) + args = plan_func(plan, df) + res = plan.cols_to_df(*args, reference=df) + plan.output(res) + + ctx = Context(PandasUtils()) + ctx["a"] = pdf + plan.execute(ctx) + + return ctx.output diff --git a/tests/slide/test_type_utils.py b/tests/slide/test_type_utils.py new file mode 100644 index 0000000..016d5da --- /dev/null +++ b/tests/slide/test_type_utils.py @@ -0,0 +1,29 @@ +import pyarrow as pa +from pytest import raises +from triad import Schema + +from slide._type_utils import infer_union_type + + +def test_infer_union_type(): + schema = Schema( + "a:int32,b:float32,c:string,d:datetime,e:date,f:[int],g:{a:str},h:binary" + ) + assert pa.int32() == infer_union_type(schema["a"].type, schema["a"].type) + assert pa.float32() == infer_union_type(schema["a"].type, schema["b"].type) + assert pa.string() == infer_union_type(schema["a"].type, schema["c"].type) + assert pa.string() == infer_union_type(schema["c"].type, schema["a"].type) + assert schema["d"].type == infer_union_type(schema["d"].type, schema["d"].type) + assert schema["d"].type == infer_union_type(schema["d"].type, schema["e"].type) + assert schema["d"].type == infer_union_type(schema["e"].type, schema["d"].type) + assert pa.string() == infer_union_type(schema["d"].type, schema["c"].type) + assert pa.string() == infer_union_type(schema["c"].type, schema["d"].type) + assert schema["f"].type == infer_union_type(schema["f"].type, schema["f"].type) + assert schema["g"].type == infer_union_type(schema["g"].type, schema["g"].type) + + raises(ValueError, lambda: infer_union_type(schema["f"].type, schema["g"].type)) + raises(ValueError, lambda: infer_union_type(schema["c"].type, schema["g"].type)) + raises(ValueError, lambda: infer_union_type(schema["d"].type, schema["a"].type)) + raises(ValueError, lambda: infer_union_type(schema["a"].type, schema["d"].type)) + raises(ValueError, lambda: infer_union_type(schema["a"].type, schema["h"].type)) + raises(ValueError, lambda: infer_union_type(schema["h"].type, schema["a"].type)) diff --git a/tests/slide_dask/test_utils.py b/tests/slide_dask/test_utils.py index 4c43ea5..2d50741 100644 --- a/tests/slide_dask/test_utils.py +++ b/tests/slide_dask/test_utils.py @@ -22,20 +22,25 @@ def to_df( columns: Any = None, coerce: bool = True, ): + def _get_pdf(df: pd.DataFrame) -> pd.DataFrame: + if coerce: + return df.convert_dtypes() + return df + if isinstance(columns, str): s = expression_to_schema(columns) df = dd.from_pandas( - pd.DataFrame(data, columns=s.names).convert_dtypes(), npartitions=2 + _get_pdf(pd.DataFrame(data, columns=s.names)), npartitions=2 ) if coerce: df = self.utils.cast_df(df, s) return df elif isinstance(data, list): return dd.from_pandas( - pd.DataFrame(data, columns=columns).convert_dtypes(), npartitions=2 + _get_pdf(pd.DataFrame(data, columns=columns)), npartitions=2 ) elif isinstance(data, pd.DataFrame): - return dd.from_pandas(data.convert_dtypes(), npartitions=2) + return dd.from_pandas(_get_pdf(data), npartitions=2) elif isinstance(data, dd.DataFrame): return data raise NotImplementedError