Skip to content

Commit

Permalink
ref: Use a dataclass for execution options (#600)
Browse files Browse the repository at this point in the history
This reduces some duplication, in that we don't need to have the FFI
define a Python-compatible copy of the Rust execution options. Instead
we have it read straight from the documented dataclass.
  • Loading branch information
bjchambers authored Aug 4, 2023
1 parent 3266ee4 commit 39b07f9
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 52 deletions.
6 changes: 4 additions & 2 deletions sparrow-py/pysrc/sparrow_py/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Union

from . import sources
from ._execution import ExecutionOptions
from ._expr import Expr
from ._result import Result
from ._session import init_session
Expand All @@ -21,12 +22,13 @@ def record(fields: Dict[str, Expr]) -> Expr:


__all__ = [
"ExecutionOptions",
"Expr",
"init_session",
"record",
"Window",
"Result",
"SinceWindow",
"SlidingWindow",
"sources",
"Result",
"Window",
]
16 changes: 16 additions & 0 deletions sparrow-py/pysrc/sparrow_py/_execution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from dataclasses import dataclass
from typing import Optional


@dataclass
class ExecutionOptions:
"""Execution options for a query.
Attributes
----------
row_limit : Optional[int]
The maximum number of rows to return.
If not specified (the default), all rows are returned.
"""

row_limit: Optional[int] = None
8 changes: 5 additions & 3 deletions sparrow-py/pysrc/sparrow_py/_expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import sparrow_py as kt
import sparrow_py._ffi as _ffi

from ._execution import ExecutionOptions


#: The type of arguments to expressions.
Arg = Union["Expr", int, str, float, None]
Expand Down Expand Up @@ -323,7 +325,7 @@ def show(self, limit: int = 100) -> None:

def run(self, row_limit: Optional[int] = None) -> pd.DataFrame:
"""Run the expression."""
options = _ffi.ExecutionOptions(row_limit=row_limit)
options = ExecutionOptions(row_limit=row_limit)
batches = self._ffi_expr.execute(options).collect_pyarrow()
schema = batches[0].schema
table = pa.Table.from_batches(batches, schema=schema)
Expand Down Expand Up @@ -368,8 +370,8 @@ def _aggregation(
if window is None:
return Expr.call(op, *args, input, None, None)
elif isinstance(window, kt.SinceWindow):
return Expr.call(op, *args, input, window._predicate, None)
return Expr.call(op, *args, input, window.predicate, None)
elif isinstance(window, kt.SlidingWindow):
return Expr.call(op, *args, input, window._predicate, window._duration)
return Expr.call(op, *args, input, window.predicate, window.duration)
else:
raise NotImplementedError(f"Unknown window type {window!r}")
8 changes: 3 additions & 5 deletions sparrow-py/pysrc/sparrow_py/_ffi.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ from typing import Optional
from typing import Sequence

import pyarrow as pa
from sparrow_py.udf import Udf

from ._execution import ExecutionOptions
from .udf import Udf

class Session:
def __init__(self) -> None: ...

class ExecutionOptions(object):
def __init__(self, row_limit: Optional[int] = None) -> None: ...
row_limit: Optional[int]

class Execution(object):
def collect_pyarrow(self) -> List[pa.RecordBatch]: ...

Expand Down
46 changes: 33 additions & 13 deletions sparrow-py/pysrc/sparrow_py/_windows.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,45 @@
import sparrow_py
from dataclasses import dataclass

from ._expr import Expr


@dataclass(frozen=True)
class Window(object):
"""Base class for window functions."""

def __init__(self) -> None:
pass


@dataclass(frozen=True)
class SinceWindow(Window):
"""Window since the last time a predicate was true."""
"""
Window since the last time a predicate was true.
Aggregations will contain all values starting from the last time the predicate
evaluated to true (inclusive).
def __init__(self, predicate: "sparrow_py.Expr") -> None:
super().__init__()
self._predicate = predicate
Parameters
----------
predicate : Expr
The predicate to use for the window.
Each time the predicate evaluates to true the window will be cleared.
"""

predicate: Expr


@dataclass(frozen=True)
class SlidingWindow(Window):
"""Sliding windows where the width is a multiple of some condition."""
"""
Window for the last `duration` intervals of some `predicate`.
Parameters
----------
duration : int
The number of sliding intervals to use in the window.
predicate : Expr
The predicate to use for the window.
Each time the predicate evaluates to true the window starts a new interval.
"""

def __init__(self, duration: int, predicate: "sparrow_py.Expr") -> None:
super().__init__()
self._duration = duration
self._predicate = predicate
duration: int
predicate: Expr
23 changes: 0 additions & 23 deletions sparrow-py/src/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,6 @@ use sparrow_session::Execution as RustExecution;

use crate::error::{Error, ErrorContext};

#[pyclass]
#[derive(Default)]
pub(crate) struct ExecutionOptions {
#[pyo3(get, set)]
pub(crate) row_limit: Option<usize>,
}

/// Kaskada execution object.
#[pyclass]
pub(crate) struct Execution(Option<RustExecution>);
Expand All @@ -36,19 +29,3 @@ impl Execution {
Ok(results)
}
}

#[pymethods]
impl ExecutionOptions {
#[new]
fn new(row_limit: Option<usize>) -> Self {
Self { row_limit }
}
}

impl ExecutionOptions {
pub(crate) fn to_rust_options(&self) -> sparrow_session::ExecutionOptions {
sparrow_session::ExecutionOptions {
row_limit: self.row_limit,
}
}
}
20 changes: 15 additions & 5 deletions sparrow-py/src/expr.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::error::Error;
use crate::execution::{Execution, ExecutionOptions};
use crate::execution::Execution;
use crate::session::Session;
use arrow::pyarrow::ToPyArrow;
use pyo3::exceptions::{PyRuntimeError, PyValueError};
Expand Down Expand Up @@ -73,11 +73,9 @@ impl Expr {
self.session.clone()
}

fn execute(&self, options: Option<&ExecutionOptions>) -> Result<Execution, Error> {
fn execute(&self, options: Option<&PyAny>) -> Result<Execution, Error> {
let session = self.session.rust_session()?;
let options = options
.map(ExecutionOptions::to_rust_options)
.unwrap_or_default();
let options = extract_options(options)?;
let execution = session.execute(&self.rust_expr, options)?;
Ok(Execution::new(execution))
}
Expand Down Expand Up @@ -121,3 +119,15 @@ impl Arg {
}
}
}

fn extract_options(options: Option<&PyAny>) -> Result<sparrow_session::ExecutionOptions, Error> {
match options {
None => Ok(sparrow_session::ExecutionOptions::default()),
Some(options) => {
let py = options.py();
Ok(sparrow_session::ExecutionOptions {
row_limit: options.getattr(pyo3::intern!(py, "row_limit"))?.extract()?,
})
}
}
}
1 change: 0 additions & 1 deletion sparrow-py/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ fn ffi(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
m.add_class::<expr::Expr>()?;
m.add_class::<table::Table>()?;
m.add_class::<execution::Execution>()?;
m.add_class::<execution::ExecutionOptions>()?;

Ok(())
}

0 comments on commit 39b07f9

Please sign in to comment.