Skip to content

Commit

Permalink
feat: More work on Python API (#598)
Browse files Browse the repository at this point in the history
- Introduce options for limits, and the method `show()` for displaying
the results of a query. Will use `ipython.display()` if available, or
print the Pandas dataframe to stdout otherwise.
- Added pydoc for collect
- Fixed some issues in the pytest for adding data (specifically around
how the `_time` column round-tripped through the JSON in the golden
test).
- Updated some of the modules to allow `import sparrow_py as kt` and
using `kt.sources.CsvSource`. The trick was to make sure the root module
(`sparrow_py` currently) re-exports the submodule (`sources`).
  • Loading branch information
bjchambers authored Aug 4, 2023
1 parent ed026ae commit d6bf2b2
Show file tree
Hide file tree
Showing 36 changed files with 567 additions and 468 deletions.
12 changes: 6 additions & 6 deletions crates/sparrow-runtime/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ pub async fn execute(

#[derive(Default, Debug)]
pub struct ExecutionOptions {
changed_since_time: Timestamp,
final_at_time: Option<Timestamp>,
bounded_lateness_ns: Option<i64>,
compute_snapshot_config: Option<ComputeSnapshotConfig>,
limits: Option<Limits>,
stop_signal_rx: Option<tokio::sync::watch::Receiver<bool>>,
pub changed_since_time: Timestamp,
pub final_at_time: Option<Timestamp>,
pub bounded_lateness_ns: Option<i64>,
pub compute_snapshot_config: Option<ComputeSnapshotConfig>,
pub limits: Option<Limits>,
pub stop_signal_rx: Option<tokio::sync::watch::Receiver<bool>>,
}

impl ExecutionOptions {
Expand Down
2 changes: 1 addition & 1 deletion crates/sparrow-session/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ mod table;
pub use error::Error;
pub use execution::Execution;
pub use expr::{Expr, Literal};
pub use session::Session;
pub use session::{ExecutionOptions, Session};
pub use table::Table;
64 changes: 28 additions & 36 deletions crates/sparrow-session/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ use arrow_schema::SchemaRef;
use error_stack::{IntoReport, IntoReportCompat, ResultExt};
use futures::{StreamExt, TryStreamExt};
use itertools::Itertools;
use sparrow_api::kaskada::v1alpha::execute_request::Limits;
use sparrow_api::kaskada::v1alpha::{
ComputeTable, FeatureSet, PerEntityBehavior, TableConfig, TableMetadata,
};
use sparrow_compiler::{AstDfgRef, DataContext, Dfg, DiagnosticCollector};
use sparrow_plan::TableId;
use sparrow_runtime::execute::output::Destination;
use sparrow_runtime::execute::ExecutionOptions;
use sparrow_syntax::{ExprOp, LiteralValue, Located, Location, Resolved};
use uuid::Uuid;

Expand All @@ -23,6 +23,12 @@ pub struct Session {
dfg: Dfg,
}

#[derive(Default)]
pub struct ExecutionOptions {
/// The maximum number of rows to return.
pub row_limit: Option<usize>,
}

/// Adds a table to the session.
impl Session {
pub fn add_literal(&mut self, literal: Literal) -> error_stack::Result<Expr, Error> {
Expand Down Expand Up @@ -231,7 +237,11 @@ impl Session {
}
}

pub fn execute(&self, expr: &Expr) -> error_stack::Result<Execution, Error> {
pub fn execute(
&self,
expr: &Expr,
options: ExecutionOptions,
) -> error_stack::Result<Execution, Error> {
// TODO: Decorations?
let primary_group_info = self
.data_context
Expand Down Expand Up @@ -276,7 +286,8 @@ impl Session {

let destination = Destination::Channel(output_tx);
let data_context = self.data_context.clone();
let options = ExecutionOptions::default();

let options = options.to_sparrow_options();

// Hacky. Use the existing execution logic. This weird things with downloading checkpoints, etc.
let result = rt
Expand All @@ -291,39 +302,6 @@ impl Session {
.boxed();

Ok(Execution::new(rt, output_rx, result))

// // Spawn the root task
// rt.block_on(async move {
// let (output_tx, output_rx) = tokio::sync::mpsc::channel(10);

// let destination = Destination::Channel(output_tx);
// let data_context = self.data_context.clone();
// let options = ExecutionOptions::default();

// // Hacky. Use the existing execution logic. This weird things with downloading checkpoints, etc.
// let mut results =
// sparrow_runtime::execute::execute_new(plan, destination, data_context, options)
// .await
// .change_context(Error::Execute)?
// .boxed();

// // Hacky. Try to get the last response so we can see if there are any errors, etc.
// let mut _last = None;
// while let Some(response) = results.try_next().await.change_context(Error::Execute)? {
// _last = Some(response);
// }

// let batches: Vec<_> = tokio_stream::wrappers::ReceiverStream::new(output_rx)
// .collect()
// .await;

// // Hacky: Assume we produce at least one batch.
// // New execution plans contain the schema ref which cleans this up.
// let schema = batches[0].schema();
// let batch = arrow_select::concat::concat_batches(&schema, &batches)
// .into_report()
// .change_context(Error::Execute)?;
// Ok(batch)
}

pub(super) fn hacky_table_mut(
Expand All @@ -349,6 +327,20 @@ static RECORD_EXTENSION_ARGUMENTS: [Located<String>; 2] = [
Located::internal_string("base"),
];

impl ExecutionOptions {
fn to_sparrow_options(&self) -> sparrow_runtime::execute::ExecutionOptions {
let mut options = sparrow_runtime::execute::ExecutionOptions::default();

if let Some(row_limit) = self.row_limit {
options.limits = Some(Limits {
preview_rows: row_limit as i64,
});
}

options
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down
3 changes: 2 additions & 1 deletion sparrow-py/.darglint
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[darglint]
strictness = short
docstring_style=numpy
ignore_regex=^(test)?_(.*),
ignore_regex=^(test)?_(.*),
message_template={path}:{line} in {obj}: {msg_id} {msg}
6 changes: 5 additions & 1 deletion sparrow-py/noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,11 @@ def mypy(session: Session) -> None:
args = session.posargs or ["pysrc", "pytests", "docs/conf.py"]
session.install("mypy", "pytest", "pandas-stubs")
install_self(session)
session.run("mypy", *args)
# Using `--install-types` should make this less picky about missing stubs.
# However, there is a possibility it slows things down, by making mypy
# run twice -- once to determine what types need to be installed, then once
# to check things with those stubs.
session.run("mypy", "--install-types", *args)
if not session.posargs:
session.run("mypy", f"--python-executable={sys.executable}", "noxfile.py")

Expand Down
1 change: 0 additions & 1 deletion sparrow-py/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ show_error_context = true

# pyproject.toml
[tool.pytest.ini_options]
addopts = "-ra -q"
testpaths = [
"pytests",
]
16 changes: 13 additions & 3 deletions sparrow-py/pysrc/sparrow_py/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
from typing import List
from typing import Union

from . import sources
from ._expr import Expr
from ._result import Result
from ._session import init_session
from ._windows import SinceWindow
from ._windows import SlidingWindow
from .expr import Expr
from .session import init_session


def record(fields: Dict[str, Expr]) -> Expr:
Expand All @@ -17,4 +19,12 @@ def record(fields: Dict[str, Expr]) -> Expr:
return Expr.call("record", *args)


__all__ = ["Expr", "init_session", "record", "SinceWindow", "SlidingWindow"]
__all__ = [
"Expr",
"init_session",
"record",
"SinceWindow",
"SlidingWindow",
"sources",
"Result",
]
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,16 @@

import pandas as pd
import pyarrow as pa
import sparrow_py
import sparrow_py as kt
import sparrow_py._ffi as _ffi

from ._result import Result
from ._windows import SinceWindow
from ._windows import SlidingWindow
from ._windows import Window


#: The type of arguments to expressions.
Arg = Union["Expr", int, str, float, None]


Expand Down Expand Up @@ -268,7 +270,7 @@ def extend(self, fields: Dict[str, "Expr"]) -> "Expr":
"""Extend this record with the additoonal fields."""
# This argument order is weird, and we shouldn't need to make a record
# in order to do the extension.
extension = sparrow_py.record(fields)
extension = kt.record(fields)
return Expr.call("extend_record", extension, self)

def neg(self) -> "Expr":
Expand All @@ -287,9 +289,8 @@ def is_not_null(self) -> "Expr":
"""Return a boolean expression indicating if the expression is not null."""
return Expr.call("is_valid", self)

def collect(
self, max: Optional[int] = None, window: Optional[Window] = None
) -> "Expr":
def collect(self, max: Optional[int], window: Optional[Window] = None) -> "Expr":
"""Return an expression collecting the last `max` values in the `window`."""
return _aggregation("collect", self, window, max)

def sum(self, window: Optional[Window] = None) -> "Expr":
Expand All @@ -304,9 +305,29 @@ def last(self, window: Optional[Window] = None) -> "Expr":
"""Return the last aggregation of the expression."""
return _aggregation("last", self, window)

def run(self) -> pd.DataFrame:
def show(self, limit: int = 100) -> None:
"""
Print the first N rows of the result.
This is intended for debugging purposes.
Parameters
----------
limit : int
Maximum number of rows to print.
"""
df = self.run(row_limit=limit)
try:
import ipython # type: ignore

ipython.display(df)
except ImportError:
print(df)

def run(self, row_limit: Optional[int] = None) -> pd.DataFrame:
"""Run the expression."""
batches = self._ffi_expr.execute().collect_pyarrow()
options = _ffi.ExecutionOptions(row_limit=row_limit)
batches = self._ffi_expr.execute(options).collect_pyarrow()
schema = batches[0].schema
table = pa.Table.from_batches(batches, schema=schema)
return table.to_pandas()
Expand All @@ -330,13 +351,18 @@ def _aggregation(
input : Expr
The input to the expression.
window : Optional[Window]
The window to use for the aggregation.
The window to use for the aggregation.
*args : Optional[Arg]
Additional arguments to provide before `input` and the flattened window.
Returns
-------
The resulting expression.
Raises
------
UnimplementedError
If the window is not a known type.
"""

# Note: things would be easier if we had a more normal order, which
Expand All @@ -345,9 +371,9 @@ def _aggregation(
# us to add the *args like so.
if window is None:
return Expr.call(op, *args, input, None, None)
elif isinstance(window, SinceWindow):
elif isinstance(window, kt.SinceWindow):
return Expr.call(op, *args, input, window._predicate, None)
elif isinstance(window, SlidingWindow):
return Expr.call(op, *args, input, window._predicate, window._duration)
else:
raise ValueError(f"Unknown window type {window!r}")
raise UnimplementedError(f"Unknown window type {window!r}")
9 changes: 6 additions & 3 deletions sparrow-py/pysrc/sparrow_py/_ffi.pyi
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
from typing import List
from typing import Optional
from typing import Sequence
from typing import List

import pyarrow as pa
from sparrow_py.udf import Udf

class Session:
def __init__(self) -> None: ...

class ExecutionOptions(object):
def __init__(self, row_limit: Optional[int] = None) -> None: ...
row_limit: Optional[int]

class Execution(object):
def collect_pyarrow(self) -> List[pa.RecordBatch]: ...
Expand All @@ -23,7 +26,7 @@ class Expr:
def data_type_string(self) -> str: ...
def equivalent(self, other: Expr) -> bool: ...
def session(self) -> Session: ...
def execute(self) -> Execution: ...
def execute(self, options: Optional[ExecutionOptions] = None) -> Execution: ...

def call_udf(udf: Udf, result_type: pa.DataType, *args: pa.Array) -> pa.Array: ...

Expand All @@ -40,4 +43,4 @@ class Table(Expr):
) -> None: ...
@property
def name(self) -> str: ...
def add_pyarrow(self, data: pa.RecordBatch) -> None: ...
def add_pyarrow(self, data: pa.RecordBatch) -> None: ...
2 changes: 2 additions & 0 deletions sparrow-py/pysrc/sparrow_py/_result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class Result(object):
"""Result of running a timestream query."""
File renamed without changes.
4 changes: 2 additions & 2 deletions sparrow-py/pysrc/sparrow_py/_windows.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ def __init__(self) -> None:
class SinceWindow(Window):
"""Window since the last time a predicate was true."""

def __init__(self, predicate: "sparrow_py.expr.Expr") -> None:
def __init__(self, predicate: "sparrow_py.Expr") -> None:
super().__init__()
self._predicate = predicate


class SlidingWindow(Window):
"""Sliding windows where the width is a multiple of some condition."""

def __init__(self, duration: int, predicate: "sparrow_py.expr.Expr") -> None:
def __init__(self, duration: int, predicate: "sparrow_py.Expr") -> None:
super().__init__()
self._duration = duration
self._predicate = predicate
7 changes: 4 additions & 3 deletions sparrow-py/pysrc/sparrow_py/sources/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@
from typing import Optional

import pyarrow as pa
import sparrow_py as kt
import sparrow_py._ffi as _ffi
from sparrow_py.expr import Expr
from sparrow_py.session import _get_session

from .._session import _get_session


_TABLE_NUM: int = 0


class Source(Expr):
class Source(kt.Expr):
"""A source expression."""

# TODO: Clean-up naming on the FFI side.
Expand Down
Loading

0 comments on commit d6bf2b2

Please sign in to comment.