From 39b07f9ffadb702f7bdb0d5b417625508be14086 Mon Sep 17 00:00:00 2001 From: Ben Chambers <35960+bjchambers@users.noreply.github.com> Date: Thu, 3 Aug 2023 22:19:55 -0700 Subject: [PATCH] ref: Use a dataclass for execution options (#600) This reduces some duplication, in that we don't need to have the FFI define a Python-compatible copy of the Rust execution options. Instead we have it read straight from the documented dataclass. --- sparrow-py/pysrc/sparrow_py/__init__.py | 6 ++- sparrow-py/pysrc/sparrow_py/_execution.py | 16 ++++++++ sparrow-py/pysrc/sparrow_py/_expr.py | 8 ++-- sparrow-py/pysrc/sparrow_py/_ffi.pyi | 8 ++-- sparrow-py/pysrc/sparrow_py/_windows.py | 46 ++++++++++++++++------- sparrow-py/src/execution.rs | 23 ------------ sparrow-py/src/expr.rs | 20 +++++++--- sparrow-py/src/lib.rs | 1 - 8 files changed, 76 insertions(+), 52 deletions(-) create mode 100644 sparrow-py/pysrc/sparrow_py/_execution.py diff --git a/sparrow-py/pysrc/sparrow_py/__init__.py b/sparrow-py/pysrc/sparrow_py/__init__.py index f7a4a4a9d..b6500c005 100644 --- a/sparrow-py/pysrc/sparrow_py/__init__.py +++ b/sparrow-py/pysrc/sparrow_py/__init__.py @@ -4,6 +4,7 @@ from typing import Union from . import sources +from ._execution import ExecutionOptions from ._expr import Expr from ._result import Result from ._session import init_session @@ -21,12 +22,13 @@ def record(fields: Dict[str, Expr]) -> Expr: __all__ = [ + "ExecutionOptions", "Expr", "init_session", "record", - "Window", + "Result", "SinceWindow", "SlidingWindow", "sources", - "Result", + "Window", ] diff --git a/sparrow-py/pysrc/sparrow_py/_execution.py b/sparrow-py/pysrc/sparrow_py/_execution.py new file mode 100644 index 000000000..653c7cb42 --- /dev/null +++ b/sparrow-py/pysrc/sparrow_py/_execution.py @@ -0,0 +1,16 @@ +from dataclasses import dataclass +from typing import Optional + + +@dataclass +class ExecutionOptions: + """Execution options for a query. + + Attributes + ---------- + row_limit : Optional[int] + The maximum number of rows to return. + If not specified (the default), all rows are returned. + """ + + row_limit: Optional[int] = None diff --git a/sparrow-py/pysrc/sparrow_py/_expr.py b/sparrow-py/pysrc/sparrow_py/_expr.py index 2f2a389a2..76149b8a2 100644 --- a/sparrow-py/pysrc/sparrow_py/_expr.py +++ b/sparrow-py/pysrc/sparrow_py/_expr.py @@ -14,6 +14,8 @@ import sparrow_py as kt import sparrow_py._ffi as _ffi +from ._execution import ExecutionOptions + #: The type of arguments to expressions. Arg = Union["Expr", int, str, float, None] @@ -323,7 +325,7 @@ def show(self, limit: int = 100) -> None: def run(self, row_limit: Optional[int] = None) -> pd.DataFrame: """Run the expression.""" - options = _ffi.ExecutionOptions(row_limit=row_limit) + options = ExecutionOptions(row_limit=row_limit) batches = self._ffi_expr.execute(options).collect_pyarrow() schema = batches[0].schema table = pa.Table.from_batches(batches, schema=schema) @@ -368,8 +370,8 @@ def _aggregation( if window is None: return Expr.call(op, *args, input, None, None) elif isinstance(window, kt.SinceWindow): - return Expr.call(op, *args, input, window._predicate, None) + return Expr.call(op, *args, input, window.predicate, None) elif isinstance(window, kt.SlidingWindow): - return Expr.call(op, *args, input, window._predicate, window._duration) + return Expr.call(op, *args, input, window.predicate, window.duration) else: raise NotImplementedError(f"Unknown window type {window!r}") diff --git a/sparrow-py/pysrc/sparrow_py/_ffi.pyi b/sparrow-py/pysrc/sparrow_py/_ffi.pyi index 184542a11..ca1b56244 100644 --- a/sparrow-py/pysrc/sparrow_py/_ffi.pyi +++ b/sparrow-py/pysrc/sparrow_py/_ffi.pyi @@ -3,15 +3,13 @@ from typing import Optional from typing import Sequence import pyarrow as pa -from sparrow_py.udf import Udf + +from ._execution import ExecutionOptions +from .udf import Udf class Session: def __init__(self) -> None: ... -class ExecutionOptions(object): - def __init__(self, row_limit: Optional[int] = None) -> None: ... - row_limit: Optional[int] - class Execution(object): def collect_pyarrow(self) -> List[pa.RecordBatch]: ... diff --git a/sparrow-py/pysrc/sparrow_py/_windows.py b/sparrow-py/pysrc/sparrow_py/_windows.py index 14437016c..ae27614f0 100644 --- a/sparrow-py/pysrc/sparrow_py/_windows.py +++ b/sparrow-py/pysrc/sparrow_py/_windows.py @@ -1,25 +1,45 @@ -import sparrow_py +from dataclasses import dataclass +from ._expr import Expr + +@dataclass(frozen=True) class Window(object): """Base class for window functions.""" - def __init__(self) -> None: - pass - +@dataclass(frozen=True) class SinceWindow(Window): - """Window since the last time a predicate was true.""" + """ + Window since the last time a predicate was true. + + Aggregations will contain all values starting from the last time the predicate + evaluated to true (inclusive). - def __init__(self, predicate: "sparrow_py.Expr") -> None: - super().__init__() - self._predicate = predicate + Parameters + ---------- + predicate : Expr + The predicate to use for the window. + Each time the predicate evaluates to true the window will be cleared. + """ + predicate: Expr + +@dataclass(frozen=True) class SlidingWindow(Window): - """Sliding windows where the width is a multiple of some condition.""" + """ + Window for the last `duration` intervals of some `predicate`. + + Parameters + ---------- + duration : int + The number of sliding intervals to use in the window. + + predicate : Expr + The predicate to use for the window. + Each time the predicate evaluates to true the window starts a new interval. + """ - def __init__(self, duration: int, predicate: "sparrow_py.Expr") -> None: - super().__init__() - self._duration = duration - self._predicate = predicate + duration: int + predicate: Expr diff --git a/sparrow-py/src/execution.rs b/sparrow-py/src/execution.rs index 91b5c3fb1..405282f5f 100644 --- a/sparrow-py/src/execution.rs +++ b/sparrow-py/src/execution.rs @@ -4,13 +4,6 @@ use sparrow_session::Execution as RustExecution; use crate::error::{Error, ErrorContext}; -#[pyclass] -#[derive(Default)] -pub(crate) struct ExecutionOptions { - #[pyo3(get, set)] - pub(crate) row_limit: Option, -} - /// Kaskada execution object. #[pyclass] pub(crate) struct Execution(Option); @@ -36,19 +29,3 @@ impl Execution { Ok(results) } } - -#[pymethods] -impl ExecutionOptions { - #[new] - fn new(row_limit: Option) -> Self { - Self { row_limit } - } -} - -impl ExecutionOptions { - pub(crate) fn to_rust_options(&self) -> sparrow_session::ExecutionOptions { - sparrow_session::ExecutionOptions { - row_limit: self.row_limit, - } - } -} diff --git a/sparrow-py/src/expr.rs b/sparrow-py/src/expr.rs index 9bbe9cff0..a97c2aff4 100644 --- a/sparrow-py/src/expr.rs +++ b/sparrow-py/src/expr.rs @@ -1,5 +1,5 @@ use crate::error::Error; -use crate::execution::{Execution, ExecutionOptions}; +use crate::execution::Execution; use crate::session::Session; use arrow::pyarrow::ToPyArrow; use pyo3::exceptions::{PyRuntimeError, PyValueError}; @@ -73,11 +73,9 @@ impl Expr { self.session.clone() } - fn execute(&self, options: Option<&ExecutionOptions>) -> Result { + fn execute(&self, options: Option<&PyAny>) -> Result { let session = self.session.rust_session()?; - let options = options - .map(ExecutionOptions::to_rust_options) - .unwrap_or_default(); + let options = extract_options(options)?; let execution = session.execute(&self.rust_expr, options)?; Ok(Execution::new(execution)) } @@ -121,3 +119,15 @@ impl Arg { } } } + +fn extract_options(options: Option<&PyAny>) -> Result { + match options { + None => Ok(sparrow_session::ExecutionOptions::default()), + Some(options) => { + let py = options.py(); + Ok(sparrow_session::ExecutionOptions { + row_limit: options.getattr(pyo3::intern!(py, "row_limit"))?.extract()?, + }) + } + } +} diff --git a/sparrow-py/src/lib.rs b/sparrow-py/src/lib.rs index 97334b3e1..1495b6326 100644 --- a/sparrow-py/src/lib.rs +++ b/sparrow-py/src/lib.rs @@ -18,7 +18,6 @@ fn ffi(_py: Python<'_>, m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; - m.add_class::()?; Ok(()) }