From 2bb67c17c15e823a6b6aac633dbf99e1a4fdaf08 Mon Sep 17 00:00:00 2001
From: Ben Chambers <35960+bjchambers@users.noreply.github.com>
Date: Fri, 4 Aug 2023 16:03:50 -0700
Subject: [PATCH] feat: Rename `Expr` to `Timestream`; stub docs (#603)
Main change is renaming `Expr` to `Timestream`.
Other changes include:
- Adding methods (and tests) for basic math and comparison operators.
- Getting `nox -s docs` working for some rudimentary documentation
(WIP).
- Fleshing out docs for existing methods.
---
crates/sparrow-session/src/expr.rs | 10 -
sparrow-py/.flake8 | 3 +-
sparrow-py/README.md | 15 +-
sparrow-py/docs/conf.py | 57 ++
sparrow-py/docs/index.md | 50 +-
sparrow-py/docs/quickstart.md | 6 +
sparrow-py/docs/reference/execution.md | 11 +
sparrow-py/docs/reference/sources.md | 6 +
sparrow-py/docs/reference/timestream.md | 17 +
sparrow-py/noxfile.py | 11 +-
sparrow-py/poetry.lock | 23 +-
sparrow-py/pyproject.toml | 2 +-
sparrow-py/pysrc/sparrow_py/__init__.py | 19 +-
sparrow-py/pysrc/sparrow_py/_execution.py | 1 -
sparrow-py/pysrc/sparrow_py/_expr.py | 375 ---------
sparrow-py/pysrc/sparrow_py/_result.py | 40 +-
sparrow-py/pysrc/sparrow_py/_session.py | 3 +-
sparrow-py/pysrc/sparrow_py/_timestream.py | 736 ++++++++++++++++++
sparrow-py/pysrc/sparrow_py/_windows.py | 14 +-
sparrow-py/pysrc/sparrow_py/sources/source.py | 6 +-
sparrow-py/pytests/conftest.py | 12 +-
.../record_test/test_remove_record.json | 6 +
.../record_test/test_select_record.json | 12 +-
.../test_timestream_preview.json | 4 +
sparrow-py/pytests/record_test.py | 6 +-
sparrow-py/pytests/result_test.py | 1 +
.../{expr_test.py => timestream_test.py} | 59 +-
sparrow-py/src/expr.rs | 8 -
28 files changed, 1008 insertions(+), 505 deletions(-)
create mode 100644 sparrow-py/docs/quickstart.md
create mode 100644 sparrow-py/docs/reference/execution.md
create mode 100644 sparrow-py/docs/reference/sources.md
create mode 100644 sparrow-py/docs/reference/timestream.md
delete mode 100644 sparrow-py/pysrc/sparrow_py/_expr.py
create mode 100644 sparrow-py/pysrc/sparrow_py/_timestream.py
create mode 100644 sparrow-py/pytests/golden/record_test/test_remove_record.json
create mode 100644 sparrow-py/pytests/golden/timestream_test/test_timestream_preview.json
rename sparrow-py/pytests/{expr_test.py => timestream_test.py} (58%)
diff --git a/crates/sparrow-session/src/expr.rs b/crates/sparrow-session/src/expr.rs
index b4e650bfb..eb7463c13 100644
--- a/crates/sparrow-session/src/expr.rs
+++ b/crates/sparrow-session/src/expr.rs
@@ -12,16 +12,6 @@ impl Expr {
_ => None,
}
}
-
- pub fn equivalent(&self, other: &Expr) -> bool {
- // This isn't quite correct -- we should lock everything and then compare.
- // But, this is a temporary hack for the Python builder.
- self.0.value() == other.0.value()
- && self.0.is_new() == other.0.is_new()
- && self.0.value_type() == other.0.value_type()
- && self.0.grouping() == other.0.grouping()
- && self.0.time_domain() == other.0.time_domain()
- }
}
pub enum Literal {
diff --git a/sparrow-py/.flake8 b/sparrow-py/.flake8
index 6d1843ad4..e88a55d99 100644
--- a/sparrow-py/.flake8
+++ b/sparrow-py/.flake8
@@ -1,8 +1,7 @@
[flake8]
-select = B,B9,C,D,DAR,E,F,N,W
+select = B,B9,C,E,F,N,W
ignore = E203,E501,W503
max-line-length = 100
max-complexity = 10
-docstring-convention = numpy
rst-roles = class,const,func,meth,mod,ref
rst-directives = deprecated
\ No newline at end of file
diff --git a/sparrow-py/README.md b/sparrow-py/README.md
index 1c1fc00bb..ab985955b 100644
--- a/sparrow-py/README.md
+++ b/sparrow-py/README.md
@@ -1,9 +1,16 @@
-# Python Library for Kaskada
+# Kaskada Timestreams
-`sparrow-py` provides a Python library for building and executing Kaskada queries using an embedded Sparrow library.
-It uses [Pyo3][pyo3] to generate Python wrappers for the Rust code, and then provides more Python-friendly implementations on top of that.
+
+Kaskada's `timestreams` library makes it easy to work with structured event-based data.
+Define temporal queries on event-based data loaded from Python, using Pandas or PyArrow and push new data in as it occurs.
+Or, execute the queries directly on events in your data lake and/or as they arrive on a stream.
-[pyo3]: https://github.com/PyO3/pyo3
+With Kaskada you can unleash the value of real-time, temporal queries without the complexity of "big" infrastructure components like a distributed stream or stream processing system.
+
+Under the hood, `timestreams` is an efficient temporal query engine built in Rust.
+It is built on Apache Arrow, using the same columnar execution strategy that makes ...
+
+
## Install Python
diff --git a/sparrow-py/docs/conf.py b/sparrow-py/docs/conf.py
index b46cdebc9..8f93ee912 100644
--- a/sparrow-py/docs/conf.py
+++ b/sparrow-py/docs/conf.py
@@ -1,11 +1,68 @@
"""Sphinx configuration."""
+from typing import Any
+from typing import Dict
+
project = "sparrow-py"
author = "Kaskada Contributors"
copyright = "2023, Kaskada Contributors"
extensions = [
"sphinx.ext.autodoc",
"sphinx.ext.napoleon",
+ "sphinx.ext.intersphinx",
+ "sphinx.ext.todo",
"myst_parser",
]
autodoc_typehints = "description"
html_theme = "furo"
+html_title = "Kaskada"
+language = "en"
+
+intersphinx_mapping: Dict[str, Any] = {
+ 'python': ('http://docs.python.org/3', None),
+ 'pandas': ('http://pandas.pydata.org/docs', None),
+ 'pyarrow': ('https://arrow.apache.org/docs', None),
+}
+
+html_theme_options: Dict[str, Any] = {
+ "footer_icons": [
+ {
+ "name": "GitHub",
+ "url": "https://github.com/kaskada-ai/kaskada",
+ "html": """
+
+ """,
+ "class": "",
+ },
+ ],
+ "source_repository": "https://github.com/kaskada-ai/kaskada/",
+ "source_branch": "main",
+ "source_directory": "sparrow-py/docs/",
+}
+
+# Options for Todos
+todo_include_todos = True
+
+# Options for Myst (markdown)
+# https://myst-parser.readthedocs.io/en/v0.17.1/syntax/optional.html
+myst_enable_extensions = [
+ "colon_fence",
+ "deflist",
+ "smartquotes",
+ "replacements",
+]
+myst_heading_anchors = 3
+
+# Suggested options from Furo theme
+# -- Options for autodoc ----------------------------------------------------
+# https://www.sphinx-doc.org/en/master/usage/extensions/autodoc.html#configuration
+
+# Automatically extract typehints when specified and place them in
+# descriptions of the relevant function/method.
+autodoc_typehints = "description"
+
+# Don't show class signature with the class' name.
+autodoc_class_signature = "separated"
+
+autodoc_type_aliases = { 'Arg': 'sparrow_py.Arg' }
diff --git a/sparrow-py/docs/index.md b/sparrow-py/docs/index.md
index 7033913b5..ec5732a99 100644
--- a/sparrow-py/docs/index.md
+++ b/sparrow-py/docs/index.md
@@ -1,5 +1,51 @@
-```{include} ../README.md
---
-end-before:
+hide-toc: true
---
+
+# Kaskada Timestreams
+
+```{include} ../README.md
+:start-after:
+:end-before:
+```
+
+## Getting Started with Timestreams
+
+Getting started with Timestreams is as simple as `pip` installing the Python library, loading some data and running a query.
+
+```python
+import timestreams as t
+
+# Read data from a Parquet file.
+data = t.sources.Parquet.from_file(
+ "path_to_file.parquet",
+ time = "time",
+ key = "user")
+# Get the count of events associated with each user over time, as a dataframe.
+data.count().run().to_pandas()
+```
+
+## What are "Timestreams"?
+A [Timestream](reference/timestream) describes how a value changes over time. In the same way that SQL
+queries transform tables and graph queries transform nodes and edges,
+Kaskada queries transform Timestreams.
+
+In comparison to a timeseries which often contains simple values (e.g., numeric
+observations) defined at fixed, periodic times (i.e., every minute), a Timestream
+contains any kind of data (records or collections as well as primitives) and may
+be defined at arbitrary times corresponding to when the events occur.
+
+```{toctree}
+:hidden:
+
+quickstart
+```
+
+```{toctree}
+:caption: Reference
+:hidden:
+
+reference/timestream
+reference/sources
+reference/execution
```
\ No newline at end of file
diff --git a/sparrow-py/docs/quickstart.md b/sparrow-py/docs/quickstart.md
new file mode 100644
index 000000000..ff62d87ba
--- /dev/null
+++ b/sparrow-py/docs/quickstart.md
@@ -0,0 +1,6 @@
+# Quick Start
+
+```{todo}
+
+Write the quick start.
+```
\ No newline at end of file
diff --git a/sparrow-py/docs/reference/execution.md b/sparrow-py/docs/reference/execution.md
new file mode 100644
index 000000000..5bef2e650
--- /dev/null
+++ b/sparrow-py/docs/reference/execution.md
@@ -0,0 +1,11 @@
+# Execution
+
+There are several ways to execute a Timestream.
+The {py:meth}`run method ` executes the Timestream over the current data, and produces a {py:class}`Result `.
+
+## Result
+```{eval-rst}
+.. autoclass:: sparrow_py.Result
+ :exclude-members: __init__
+ :members:
+```
\ No newline at end of file
diff --git a/sparrow-py/docs/reference/sources.md b/sparrow-py/docs/reference/sources.md
new file mode 100644
index 000000000..b2daf7f82
--- /dev/null
+++ b/sparrow-py/docs/reference/sources.md
@@ -0,0 +1,6 @@
+# Sources
+
+```{eval-rst}
+.. automodule:: sparrow_py.sources
+ :members:
+```
\ No newline at end of file
diff --git a/sparrow-py/docs/reference/timestream.md b/sparrow-py/docs/reference/timestream.md
new file mode 100644
index 000000000..af96ae146
--- /dev/null
+++ b/sparrow-py/docs/reference/timestream.md
@@ -0,0 +1,17 @@
+# Timestream
+
+```{todo}
+- [ ] Expand the `Arg` type alias in timestreams accordingly.
+```
+
+```{eval-rst}
+.. autoclass:: sparrow_py.Timestream
+ :exclude-members: __init__
+ :members:
+ :special-members:
+```
+
+```{eval-rst}
+.. autoclass:: sparrow_py.Arg
+ :members:
+```
\ No newline at end of file
diff --git a/sparrow-py/noxfile.py b/sparrow-py/noxfile.py
index 6eeda9b25..edbad4fda 100644
--- a/sparrow-py/noxfile.py
+++ b/sparrow-py/noxfile.py
@@ -49,17 +49,20 @@ def check_lint(session: Session) -> None:
"darglint",
"flake8",
"flake8-bugbear",
- "flake8-docstrings",
"flake8-rst-docstrings",
"isort",
"pep8-naming",
+ "pydocstyle",
"pyupgrade",
)
session.run
session.run("black", "--check", *args)
- session.run("darglint", "pysrc")
session.run("flake8", *args)
session.run("isort", "--filter-files", "--check-only", *args)
+
+ # Only do darglint and pydocstyle on pysrc (source)
+ session.run("darglint", "pysrc")
+ session.run("pydocstyle", "--convention=numpy", "pysrc")
# No way to run this as a check.
# session.run("pyupgrade", "--py38-plus")
@@ -97,7 +100,7 @@ def mypy(session: Session) -> None:
# However, there is a possibility it slows things down, by making mypy
# run twice -- once to determine what types need to be installed, then once
# to check things with those stubs.
- session.run("mypy", "--install-types", *args)
+ session.run("mypy", "--install-types", "--non-interactive", *args)
if not session.posargs:
session.run("mypy", f"--python-executable={sys.executable}", "noxfile.py")
@@ -172,7 +175,7 @@ def docs(session: Session) -> None:
"""Build and serve the documentation with live reloading on file changes."""
args = session.posargs or ["--open-browser", "docs", "docs/_build"]
install_self(session)
- session.install("sphinx", "sphinx-autobuild", "furo", "myst-parser")
+ session.install("sphinx", "sphinx-autobuild", "furo", "myst-parser", "pandas", "pyarrow", "sphinx-autodoc-typehints")
build_dir = Path("docs", "_build")
if build_dir.exists():
diff --git a/sparrow-py/poetry.lock b/sparrow-py/poetry.lock
index bae940712..d25430518 100644
--- a/sparrow-py/poetry.lock
+++ b/sparrow-py/poetry.lock
@@ -391,21 +391,6 @@ flake8 = ">=3.0.0"
[package.extras]
dev = ["coverage", "hypothesis", "hypothesmith (>=0.2)", "pre-commit", "pytest", "tox"]
-[[package]]
-name = "flake8-docstrings"
-version = "1.7.0"
-description = "Extension for flake8 which uses pydocstyle to check docstrings"
-optional = false
-python-versions = ">=3.7"
-files = [
- {file = "flake8_docstrings-1.7.0-py2.py3-none-any.whl", hash = "sha256:51f2344026da083fc084166a9353f5082b01f72901df422f74b4d953ae88ac75"},
- {file = "flake8_docstrings-1.7.0.tar.gz", hash = "sha256:4c8cc748dc16e6869728699e5d0d685da9a10b0ea718e090b1ba088e67a941af"},
-]
-
-[package.dependencies]
-flake8 = ">=3"
-pydocstyle = ">=2.1"
-
[[package]]
name = "flake8-rst-docstrings"
version = "0.3.0"
@@ -1336,13 +1321,13 @@ files = [
[[package]]
name = "sphinx"
-version = "7.1.1"
+version = "7.1.2"
description = "Python documentation generator"
optional = false
python-versions = ">=3.8"
files = [
- {file = "sphinx-7.1.1-py3-none-any.whl", hash = "sha256:4e6c5ea477afa0fb90815210fd1312012e1d7542589ab251ac9b53b7c0751bce"},
- {file = "sphinx-7.1.1.tar.gz", hash = "sha256:59b8e391f0768a96cd233e8300fe7f0a8dc2f64f83dc2a54336a9a84f428ff4e"},
+ {file = "sphinx-7.1.2-py3-none-any.whl", hash = "sha256:d170a81825b2fcacb6dfd5a0d7f578a053e45d3f2b153fecc948c37344eb4cbe"},
+ {file = "sphinx-7.1.2.tar.gz", hash = "sha256:780f4d32f1d7d1126576e0e5ecc19dc32ab76cd24e950228dcf7b1f6d3d9e22f"},
]
[package.dependencies]
@@ -1652,4 +1637,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p
[metadata]
lock-version = "2.0"
python-versions = ">=3.8,<4.0"
-content-hash = "2741172019f6a75bc8a80c0b3031e45181325830a4286e07c01ab610d44913d6"
+content-hash = "55960b933d43ef46c40f57a767456fc7bbc12865d7f1cbcfe0537afa06c61889"
diff --git a/sparrow-py/pyproject.toml b/sparrow-py/pyproject.toml
index 8596cb5b7..a1f5796cd 100644
--- a/sparrow-py/pyproject.toml
+++ b/sparrow-py/pyproject.toml
@@ -15,7 +15,6 @@ coverage = {extras = ["toml"], version = ">=6.2"}
darglint = ">=1.8.1"
flake8 = ">=4.0.1"
flake8-bugbear = ">=21.9.2"
-flake8-docstrings = ">=1.6.0"
flake8-rst-docstrings = ">=0.2.5"
furo = ">=2021.11.12"
isort = ">=5.10.1"
@@ -23,6 +22,7 @@ mypy = ">=0.930"
pandas-stubs = "^2.0.2"
pep8-naming = ">=0.12.1"
pyarrow = "^12.0.1"
+pydocstyle = "^6.3.0"
pytest = ">=6.2.5"
pyupgrade = ">=2.29.1"
safety = ">=1.10.3"
diff --git a/sparrow-py/pysrc/sparrow_py/__init__.py b/sparrow-py/pysrc/sparrow_py/__init__.py
index b6500c005..1ac6be591 100644
--- a/sparrow-py/pysrc/sparrow_py/__init__.py
+++ b/sparrow-py/pysrc/sparrow_py/__init__.py
@@ -1,29 +1,20 @@
"""Kaskada query builder and local executon engine."""
-from typing import Dict
-from typing import List
-from typing import Union
-
from . import sources
from ._execution import ExecutionOptions
-from ._expr import Expr
from ._result import Result
from ._session import init_session
+from ._timestream import Arg
+from ._timestream import Timestream
+from ._timestream import record
from ._windows import SinceWindow
from ._windows import SlidingWindow
from ._windows import Window
-def record(fields: Dict[str, Expr]) -> Expr:
- """Create a record from the given keyword arguments."""
- import itertools
-
- args: List[Union[str, "Expr"]] = list(itertools.chain(*fields.items()))
- return Expr.call("record", *args)
-
-
__all__ = [
+ "Arg",
"ExecutionOptions",
- "Expr",
+ "Timestream",
"init_session",
"record",
"Result",
diff --git a/sparrow-py/pysrc/sparrow_py/_execution.py b/sparrow-py/pysrc/sparrow_py/_execution.py
index 5f1f5f3ee..981cd881e 100644
--- a/sparrow-py/pysrc/sparrow_py/_execution.py
+++ b/sparrow-py/pysrc/sparrow_py/_execution.py
@@ -19,4 +19,3 @@ class ExecutionOptions:
row_limit: Optional[int] = None
max_batch_size: Optional[int] = None
-
diff --git a/sparrow-py/pysrc/sparrow_py/_expr.py b/sparrow-py/pysrc/sparrow_py/_expr.py
deleted file mode 100644
index b0efa6c14..000000000
--- a/sparrow-py/pysrc/sparrow_py/_expr.py
+++ /dev/null
@@ -1,375 +0,0 @@
-"""Defines classes representing Kaskada expressions."""
-
-import sys
-from typing import Callable
-from typing import Dict
-from typing import Optional
-from typing import Sequence
-from typing import Tuple
-from typing import Union
-from typing import final
-
-import pandas as pd
-import pyarrow as pa
-import sparrow_py as kt
-import sparrow_py._ffi as _ffi
-
-from ._execution import ExecutionOptions
-from ._result import Result
-
-
-#: The type of arguments to expressions.
-Arg = Union["Expr", int, str, float, None]
-
-
-def _augment_error(args: Sequence[Arg], e: Exception) -> Exception:
- """Augment an error with information about the arguments."""
- if sys.version_info >= (3, 11):
- # If we can add notes to the exception, indicate the types.
- # This works in Python >=3.11
- for n, arg in enumerate(args):
- if isinstance(arg, Expr):
- e.add_note(f"Arg[{n}]: Expr of type {arg.data_type}")
- else:
- e.add_note(f"Arg[{n}]: Literal {arg} ({type(arg)})")
- return e
-
-
-class Expr(object):
- """A Kaskada expression."""
-
- _ffi_expr: _ffi.Expr
-
- def __init__(self, ffi: _ffi.Expr) -> None:
- """Create a new expression."""
- self._ffi_expr = ffi
-
- @staticmethod
- def call(name: str, *args: Arg) -> "Expr":
- """
- Construct a new expression calling the given name.
-
- Parameters
- ----------
- name : str
- Name of the operation to apply.
- args : list[Expr]
- List of arguments to the expression.
-
- Returns
- -------
- Expression representing the given operation applied to the arguments.
-
- Raises
- ------
- # noqa: DAR401 _augment_error
- TypeError
- If the argument types are invalid for the given function.
- ValueError
- If the argument values are invalid for the given function.
- """
- ffi_args = [arg._ffi_expr if isinstance(arg, Expr) else arg for arg in args]
- session = next(arg._ffi_expr.session() for arg in args if isinstance(arg, Expr))
- try:
- return Expr(_ffi.Expr(session=session, operation=name, args=ffi_args))
- except TypeError as e:
- # noqa: DAR401
- raise _augment_error(args, TypeError(str(e))) from e
- except ValueError as e:
- raise _augment_error(args, ValueError(str(e))) from e
-
- @property
- def data_type(self) -> pa.DataType:
- """Return the data type produced by this expression."""
- return self._ffi_expr.data_type()
-
- def __eq__(self, other: object) -> bool:
- """Return true if the expressions are equivalent."""
- if not isinstance(other, Expr):
- return False
- return self._ffi_expr.equivalent(other._ffi_expr)
-
- @final
- def pipe(
- self,
- func: Union[Callable[..., "Expr"], Tuple[Callable[..., "Expr"], str]],
- *args: Arg,
- **kwargs: Arg,
- ) -> "Expr":
- """
- Apply chainable functions that expect expressions.
-
- Parameters
- ----------
- func : function
- Function to apply to this expression.
- ``args``, and ``kwargs`` are passed into ``func``.
- Alternatively a ``(callable, keyword)`` tuple where
- ``keyword`` is a string indicating the keyword of
- ``callable`` that expects the expression.
- args : iterable, optional
- Positional arguments passed into ``func``.
- kwargs : mapping, optional
- A dictionary of keyword arguments passed into ``func``.
-
- Returns
- -------
- Expr
- The result of applying `func` to the arguments.
-
- Raises
- ------
- ValueError
- When using `self` with a specific `keyword` if the `keyword` also
- appears on in the `kwargs`.
-
- Notes
- -----
- Use ``.pipe`` when chaining together functions that expect
- expressions.
-
- Instead of writing
-
- >>> func(g(h(df), arg1=a), arg2=b, arg3=c) # doctest: +SKIP
-
- You can write
-
- >>> (df.pipe(h)
- ... .pipe(g, arg1=a)
- ... .pipe(func, arg2=b, arg3=c)
- ... ) # doctest: +SKIP
-
- If you have a function that takes the data as (say) the second
- argument, pass a tuple indicating which keyword expects the
- data. For example, suppose ``func`` takes its data as ``arg2``:
-
- >>> (df.pipe(h)
- ... .pipe(g, arg1=a)
- ... .pipe((func, 'arg2'), arg1=a, arg3=c)
- ... ) # doctest: +SKIP
- """
- if isinstance(func, tuple):
- func, target = func
- if target in kwargs:
- msg = f"{target} is both the pipe target and a keyword argument"
- raise ValueError(msg)
- kwargs[target] = self
- return func(*args, **kwargs)
- else:
- return func(self, *args, **kwargs)
-
- def __add__(self, rhs: Arg) -> "Expr":
- """Add two expressions."""
- return Expr.call("add", self, rhs)
-
- def __radd__(self, lhs: Arg) -> "Expr":
- """Add two expressions."""
- return Expr.call("add", lhs, self)
-
- def __sub__(self, rhs: Arg) -> "Expr":
- """Subtract two expressions."""
- return Expr.call("sub", self, rhs)
-
- def __mul__(self, rhs: Arg) -> "Expr":
- """Multiple two expressions."""
- return Expr.call("mul", self, rhs)
-
- def __truediv__(self, rhs: Arg) -> "Expr":
- """Divide two expressions."""
- return Expr.call("div", self, rhs)
-
- def __lt__(self, rhs: Arg) -> "Expr":
- """Less than comparison."""
- return Expr.call("lt", self, rhs)
-
- def __le__(self, rhs: Arg) -> "Expr":
- """Less than or equal comparison."""
- return Expr.call("le", self, rhs)
-
- def __gt__(self, rhs: Arg) -> "Expr":
- """Greater than comparison."""
- return Expr.call("gt", self, rhs)
-
- def __ge__(self, rhs: Arg) -> "Expr":
- """Greater than or equal comparison."""
- return Expr.call("ge", self, rhs)
-
- def __and__(self, rhs: Arg) -> "Expr":
- """Logical and."""
- return Expr.call("and", self, rhs)
-
- def __or__(self, rhs: Arg) -> "Expr":
- """Logical or."""
- return Expr.call("or", self, rhs)
-
- def __getitem__(self, key: Arg) -> "Expr":
- """
- Index into an expression.
-
- If the expression is a struct, the key should be a string corresponding
- to a field.
-
- If the expression is a list, the key should be an integer index. If the
- expression is a map, the key should be the same type as the map keys.
-
- Parameters
- ----------
- key : Arg
- The key to index into the expression.
-
- Returns
- -------
- Expression accessing the given index.
-
- Raises
- ------
- TypeError
- When the expression is not a struct, list, or map.
- """
- data_type = self.data_type
- if isinstance(data_type, pa.StructType):
- return Expr.call("fieldref", self, key)
- elif isinstance(data_type, pa.MapType):
- return Expr.call("get_map", self, key)
- elif isinstance(data_type, pa.ListType):
- return Expr.call("get_list", self, key)
- else:
- raise TypeError(f"Cannot index into {data_type}")
-
- def eq(self, other: "Expr") -> "Expr":
- """Expression evaluating to true if self and other are equal."""
- return Expr.call("eq", self, other)
-
- def ne(self, other: "Expr") -> "Expr":
- """Expression evaluating to true if self and other are not equal."""
- return Expr.call("ne", self, other)
-
- def select(self, invert: bool = False, *args: str) -> "Expr":
- """
- Select the given fields from a struct.
-
- Parameters
- ----------
- invert : bool
- If false (default), select only the fields given.
- If true, select all fields except those given.
- args : list[str]
- List of field names to select (or remove).
-
- Returns
- -------
- Expression selecting (or excluding) the given fields.
- """
- if invert:
- return Expr.call("remove_fields", self, *args)
- else:
- return Expr.call("select_fields", self, *args)
-
- def extend(self, fields: Dict[str, "Expr"]) -> "Expr":
- """Extend this record with the additoonal fields."""
- # This argument order is weird, and we shouldn't need to make a record
- # in order to do the extension.
- extension = kt.record(fields)
- return Expr.call("extend_record", extension, self)
-
- def neg(self) -> "Expr":
- """Apply logical or numerical negation, depending on the type."""
- data_type = self.data_type
- if data_type == pa.bool_():
- return Expr.call("not", self)
- else:
- return Expr.call("neg", self)
-
- def is_null(self) -> "Expr":
- """Return a boolean expression indicating if the expression is null."""
- return self.is_not_null().neg()
-
- def is_not_null(self) -> "Expr":
- """Return a boolean expression indicating if the expression is not null."""
- return Expr.call("is_valid", self)
-
- def collect(
- self, max: Optional[int], window: Optional["kt.Window"] = None
- ) -> "Expr":
- """Return an expression collecting the last `max` values in the `window`."""
- return _aggregation("collect", self, window, max)
-
- def sum(self, window: Optional["kt.Window"] = None) -> "Expr":
- """Return the sum aggregation of the expression."""
- return _aggregation("sum", self, window)
-
- def first(self, window: Optional["kt.Window"] = None) -> "Expr":
- """Return the first aggregation of the expression."""
- return _aggregation("first", self, window)
-
- def last(self, window: Optional["kt.Window"] = None) -> "Expr":
- """Return the last aggregation of the expression."""
- return _aggregation("last", self, window)
-
- def preview(self, limit: int = 100) -> pd.DataFrame:
- """
- Return the first N rows of the result as a Pandas DataFrame.
-
- This makes it easy to preview the results of the expression while
- developing queries.
-
- Parameters
- ----------
- limit : int, default 100
- Maximum number of rows to print.
- """
- return self.run(row_limit=limit).to_pandas()
-
- def run(self, row_limit: Optional[int] = None, max_batch_size: Optional[int] = None) -> Result:
- """
- Run the expression once, until completed.
-
- Parameters
- ----------
- row_limit : Optional[int]
- The maximum number of rows to return.
- If not specified (the default), all rows are returned.
- """
- options = ExecutionOptions(row_limit=row_limit, max_batch_size=max_batch_size)
- execution = self._ffi_expr.execute(options)
- return Result(execution)
-
-
-def _aggregation(
- op: str, input: Expr, window: Optional["kt.Window"], *args: Optional[Arg]
-) -> Expr:
- """
- Create the aggregation `op` with the given `input`, `window` and `args`.
-
- Parameters
- ----------
- op : str
- The operation to create.
- input : Expr
- The input to the expression.
- window : Optional[Window]
- The window to use for the aggregation.
- *args : Optional[Arg]
- Additional arguments to provide before `input` and the flattened window.
-
- Returns
- -------
- The resulting expression.
-
- Raises
- ------
- NotImplementedError
- If the window is not a known type.
- """
- # Note: things would be easier if we had a more normal order, which
- # we could do as part of "aligning" Sparrow signatures to the new direction.
- # However, `collect` currently has `collect(max, input, window)`, requiring
- # us to add the *args like so.
- if window is None:
- return Expr.call(op, *args, input, None, None)
- elif isinstance(window, kt.SinceWindow):
- return Expr.call(op, *args, input, window.predicate, None)
- elif isinstance(window, kt.SlidingWindow):
- return Expr.call(op, *args, input, window.predicate, window.duration)
- else:
- raise NotImplementedError(f"Unknown window type {window!r}")
diff --git a/sparrow-py/pysrc/sparrow_py/_result.py b/sparrow-py/pysrc/sparrow_py/_result.py
index ce753ad33..83022dd5d 100644
--- a/sparrow-py/pysrc/sparrow_py/_result.py
+++ b/sparrow-py/pysrc/sparrow_py/_result.py
@@ -1,8 +1,13 @@
-from . import _ffi
-import pandas as pd
+from __future__ import annotations
+
from typing import Iterator
+
+import pandas as pd
import pyarrow as pa
+from . import _ffi
+
+
class Result(object):
"""Result of running a timestream query."""
@@ -14,14 +19,15 @@ def to_pandas(self) -> pd.DataFrame:
"""
Convert the result to a Pandas DataFrame.
- ```{note}
- This method will block on the complete results of the query and collect
- all results into memory. If this is not desired, use `iter_pandas` instead.
- ```
-
Returns
-------
- The result as a Pandas DataFrame.
+ pd.DataFrame
+ The result as a Pandas DataFrame.
+
+ Warnings
+ --------
+ This method will block on the complete results of the query and collect
+ all results into memory. If this is not desired, use `iter_pandas` instead.
"""
batches = self._ffi_execution.collect_pyarrow()
if len(batches) == 0:
@@ -35,9 +41,10 @@ def iter_pandas(self) -> Iterator[pd.DataFrame]:
"""
Iterate over the results as Pandas DataFrames.
- Returns
- -------
- The result as a sequence of Pandas DataFrames.
+ Yields
+ ------
+ pd.DataFrame
+ The next Pandas DataFrame.
"""
next_batch = self._ffi_execution.next_pyarrow()
while next_batch is not None:
@@ -46,14 +53,15 @@ def iter_pandas(self) -> Iterator[pd.DataFrame]:
def iter_rows(self) -> Iterator[dict]:
"""
- Iterate over the results as dictionaries.
+ Iterate over the results as row dictionaries.
- Returns
- -------
- The result as a sequence of dictionaries.
+ Yields
+ ------
+ dict
+ The next row as a dictionary.
"""
next_batch = self._ffi_execution.next_pyarrow()
while next_batch is not None:
for row in next_batch.to_pylist():
yield row
- next_batch = self._ffi_execution.next_pyarrow()
\ No newline at end of file
+ next_batch = self._ffi_execution.next_pyarrow()
diff --git a/sparrow-py/pysrc/sparrow_py/_session.py b/sparrow-py/pysrc/sparrow_py/_session.py
index 81a471eb2..8747e47cd 100644
--- a/sparrow-py/pysrc/sparrow_py/_session.py
+++ b/sparrow-py/pysrc/sparrow_py/_session.py
@@ -32,7 +32,8 @@ def _get_session() -> _ffi.Session:
Returns
-------
- The FFI session handle.
+ _ffi.Session
+ The FFI session handle.
Raises
------
diff --git a/sparrow-py/pysrc/sparrow_py/_timestream.py b/sparrow-py/pysrc/sparrow_py/_timestream.py
new file mode 100644
index 000000000..6d05fa708
--- /dev/null
+++ b/sparrow-py/pysrc/sparrow_py/_timestream.py
@@ -0,0 +1,736 @@
+"""Defines classes representing Kaskada expressions."""
+
+from __future__ import annotations
+
+import sys
+from typing import Callable
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Sequence
+from typing import Tuple
+from typing import Union
+from typing import final
+
+import pandas as pd
+import pyarrow as pa
+import sparrow_py as kt
+import sparrow_py._ffi as _ffi
+
+from ._execution import ExecutionOptions
+from ._result import Result
+
+
+Arg = Union["Timestream", int, str, float, bool, None]
+"""
+The type of arguments to most Timestream functions.
+May be a Timestream, a literal (int, str, float, bool), or null (`None`).
+"""
+
+def _augment_error(args: Sequence[Arg], e: Exception) -> Exception:
+ """Augment an error with information about the arguments."""
+ if sys.version_info >= (3, 11):
+ # If we can add notes to the exception, indicate the types.
+ # This works in Python >=3.11
+ for n, arg in enumerate(args):
+ if isinstance(arg, Timestream):
+ e.add_note(f"Arg[{n}]: Timestream[{arg.data_type}]")
+ else:
+ e.add_note(f"Arg[{n}]: Literal {arg} ({type(arg)})")
+ return e
+
+
+class Timestream(object):
+ """
+ A `Timestream` represents a computation producing a Timestream.
+ """
+
+ _ffi_expr: _ffi.Expr
+
+ def __init__(self, ffi: _ffi.Expr) -> None:
+ """Create a new expression."""
+ self._ffi_expr = ffi
+
+ @staticmethod
+ def _call(func: str, *args: Arg) -> Timestream:
+ """
+ Construct a new Timestream by calling the given function.
+
+ Parameters
+ ----------
+ func : str
+ Name of the function to apply.
+ *args : Timestream | int | str | float | bool | None
+ List of arguments to the expression.
+
+ Returns
+ -------
+ Timestream
+ Timestream representing the result of the function applied to the arguments.
+
+ Raises
+ ------
+ # noqa: DAR401 _augment_error
+ TypeError
+ If the argument types are invalid for the given function.
+ ValueError
+ If the argument values are invalid for the given function.
+ """
+ ffi_args = [
+ arg._ffi_expr if isinstance(arg, Timestream) else arg for arg in args
+ ]
+ session = next(
+ arg._ffi_expr.session() for arg in args if isinstance(arg, Timestream)
+ )
+ try:
+ return Timestream(_ffi.Expr(session=session, operation=func, args=ffi_args))
+ except TypeError as e:
+ # noqa: DAR401
+ raise _augment_error(args, TypeError(str(e))) from e
+ except ValueError as e:
+ raise _augment_error(args, ValueError(str(e))) from e
+
+ @property
+ def data_type(self) -> pa.DataType:
+ """The PyArrow type of values in this Timestream."""
+ return self._ffi_expr.data_type()
+
+ @final
+ def pipe(
+ self,
+ func: Union[
+ Callable[..., Timestream], Tuple[Callable[..., Timestream], str]
+ ],
+ *args: Arg,
+ **kwargs: Arg,
+ ) -> Timestream:
+ """
+ Apply chainable functions that produce Timestreams.
+
+ Parameters
+ ----------
+ func : Callable[..., Timestream] | Tuple[Callable[..., Timestream], str]
+ Function to apply to this Timestream. Alternatively a `(func,
+ keyword)` tuple where `keyword` is a string indicating the keyword
+ of `func` that expects the Timestream.
+ args : iterable, optional
+ Positional arguments passed into ``func``.
+ kwargs : mapping, optional
+ A dictionary of keyword arguments passed into ``func``.
+
+ Returns
+ -------
+ Timestream
+ The result of applying `func` to the arguments.
+
+ Raises
+ ------
+ ValueError
+ When using `self` with a specific `keyword` if the `keyword` also
+ appears on in the `kwargs`.
+
+ Notes
+ -----
+ Use ``.pipe`` when chaining together functions that expect Timestreams.
+
+ Examples
+ --------
+ Instead of writing
+
+ >>> func(g(h(df), arg1=a), arg2=b, arg3=c) # doctest: +SKIP
+
+ You can write
+
+ >>> (df.pipe(h)
+ >>> .pipe(g, arg1=a)
+ >>> .pipe(func, arg2=b, arg3=c)
+ >>> ) # doctest: +SKIP
+
+ If you have a function that takes the data as (say) the second
+ argument, pass a tuple indicating which keyword expects the
+ data. For example, suppose ``func`` takes its data as ``arg2``:
+
+ >>> (df.pipe(h)
+ >>> .pipe(g, arg1=a)
+ >>> .pipe((func, 'arg2'), arg1=a, arg3=c)
+ >>> ) # doctest: +SKIP
+ """
+ if isinstance(func, tuple):
+ func, target = func
+ if target in kwargs:
+ msg = f"{target} is both the pipe target and a keyword argument"
+ raise ValueError(msg)
+ kwargs[target] = self
+ return func(*args, **kwargs)
+ else:
+ return func(self, *args, **kwargs)
+
+ def __add__(self, rhs: Arg) -> Timestream:
+ """
+ Create a Timestream adding this and `rhs`.
+
+ Parameters
+ ----------
+ rhs : Arg
+ The Timestream or literal value to add to this.
+
+ Returns
+ -------
+ Timestream
+ The Timestream resulting from `self + rhs`.
+ """
+ return Timestream._call("add", self, rhs)
+
+ def __radd__(self, lhs: Arg) -> Timestream:
+ """
+ Create a Timestream adding `lhs` and this.
+
+ Parameters
+ ----------
+ lhs : Arg
+ The Timestream or literal value to add to this.
+
+ Returns
+ -------
+ Timestream
+ The Timestream resulting from `lhs + self`.
+ """
+ return Timestream._call("add", lhs, self)
+
+ def __sub__(self, rhs: Arg) -> Timestream:
+ """
+ Create a Timestream substracting `rhs` from this.
+
+ Parameters
+ ----------
+ rhs : Arg
+ The Timestream or literal value to subtract from this.
+
+ Returns
+ -------
+ Timestream
+ The Timestream resulting from `self - rhs`.
+ """
+ return Timestream._call("sub", self, rhs)
+
+ def __rsub__(self, lhs: Arg) -> Timestream:
+ """
+ Create a Timestream substracting this from the `lhs`.
+
+ Parameters
+ ----------
+ lhs : Arg
+ The Timestream or literal value to subtract this from.
+
+ Returns
+ -------
+ Timestream
+ The Timestream resulting from `lhs - self`.
+ """
+ return Timestream._call("sub", lhs, self)
+
+ def __mul__(self, rhs: Arg) -> Timestream:
+ """
+ Create a Timestream multiplying this and `rhs`.
+
+ Parameters
+ ----------
+ rhs : Arg
+ The Timestream or literal value to multiply with this.
+
+ Returns
+ -------
+ Timestream
+ The Timestream resulting from `self * rhs`.
+ """
+ return Timestream._call("mul", self, rhs)
+
+ def __rmul__(self, lhs: Arg) -> Timestream:
+ """
+ Create a Timestream multiplying `lhs` and this.
+
+ Parameters
+ ----------
+ lhs : Arg
+ The Timestream or literal value to multiply with this.
+
+ Returns
+ -------
+ Timestream
+ The Timestream resulting from `lhs * self`.
+ """
+ return Timestream._call("mul", lhs, self)
+
+ def __truediv__(self, divisor: Arg) -> Timestream:
+ """
+ Create a Timestream by dividing this and `divisor`.
+
+ Parameters
+ ----------
+ divisor : Arg
+ The Timestream or literal value to divide this by.
+
+ Returns
+ -------
+ Timestream
+ The Timestream resulting from `self / divisor`.
+ """
+ return Timestream._call("div", self, divisor)
+
+ def __rtruediv__(self, dividend: Arg) -> Timestream:
+ """
+ Create a Timestream by dividing this and `dividend`.
+
+ Parameters
+ ----------
+ dividend : Arg
+ The Timestream or literal value to divide by this.
+
+ Returns
+ -------
+ Timestream
+ The Timestream resulting from `dividend / self`.
+ """
+ return Timestream._call("div", dividend, self)
+
+ def __lt__(self, rhs: Arg) -> Timestream:
+ """
+ Create a Timestream that is true if this is less than `rhs`.
+
+ Parameters
+ ----------
+ rhs : Arg
+ The Timestream or literal value to compare to.
+
+ Returns
+ -------
+ Timestream
+ The Timestream resulting from `self < rhs`.
+ """
+ return Timestream._call("lt", self, rhs)
+
+ def __le__(self, rhs: Arg) -> Timestream:
+ """
+ Create a Timestream that is true if this is less than or equal to `rhs`.
+
+ Parameters
+ ----------
+ rhs : Arg
+ The Timestream or literal value to compare to.
+
+ Returns
+ -------
+ Timestream
+ The Timestream resulting from `self <= rhs`.
+ """
+ return Timestream._call("lte", self, rhs)
+
+ def __gt__(self, rhs: Arg) -> Timestream:
+ """
+ Create a Timestream that is true if this is greater than `rhs`.
+
+ Parameters
+ ----------
+ rhs : Arg
+ The Timestream or literal value to compare to.
+
+ Returns
+ -------
+ Timestream
+ The Timestream resulting from `self > rhs`.
+ """
+ return Timestream._call("gt", self, rhs)
+
+ def __ge__(self, rhs: Arg) -> Timestream:
+ """
+ Create a Timestream that is true if this is greater than or equal to `rhs`.
+
+ Parameters
+ ----------
+ rhs : Arg
+ The Timestream or literal value to compare to.
+
+ Returns
+ -------
+ Timestream
+ The Timestream resulting from `self >= rhs`.
+ """
+ return Timestream._call("gte", self, rhs)
+
+ def __and__(self, rhs: Arg) -> Timestream:
+ """
+ Create the logical conjunction of this Timestream and `rhs`.
+
+ Parameters
+ ----------
+ rhs : Arg
+ The Timestream or literal value to conjoin with.
+
+ Returns
+ -------
+ Timestream
+ The Timestream resulting from `self & rhs`.
+ """
+ return Timestream._call("and", self, rhs)
+
+ def __or__(self, rhs: Arg) -> Timestream:
+ """
+ Create the logical disjunction of this Timestream nad `rhs`.
+
+ Parameters
+ ----------
+ rhs : Arg
+ The Timestream or literal value to disjoin with.
+
+ Returns
+ -------
+ Timestream
+ The Timestream resulting from `self | rhs`.
+ """
+ return Timestream._call("or", self, rhs)
+
+ def eq(self, other: Arg) -> Timestream:
+ """
+ Create a Timestream that is true if this is equal to `other`.
+
+ Parameters
+ ----------
+ other : Arg
+ The Timestream or literal value to compare to.
+
+ Returns
+ -------
+ Timestream
+ The Timestream indicating whether the `self` and `other` are equal.
+ """
+ return Timestream._call("eq", self, other)
+
+ def ne(self, other: Arg) -> Timestream:
+ """
+ Create a Timestream that is true if this is not equal to `other`.
+
+ Parameters
+ ----------
+ other : Arg
+ The Timestream or literal value to compare to.
+
+ Returns
+ -------
+ Timestream
+ The Timestream indicating whether `self` and `other` are not equal.
+ """
+ return Timestream._call("neq", self, other)
+
+ def __getitem__(self, key: Arg) -> Timestream:
+ """
+ Index into the elements of a Timestream.
+
+ If the Timestream contains records, the key should be a string corresponding
+ to a field.
+
+ If the Timestream contains lists, the key should be an integer index.
+
+ If the Timestream contains maps, the key should be the same type as the map keys.
+
+ Parameters
+ ----------
+ key : Arg
+ The key to index into the expression.
+
+ Returns
+ -------
+ Timestream
+ Timestream with the resulting value (or `null` if absent) at each point.
+
+ Raises
+ ------
+ TypeError
+ When the Timestream is not a record, list, or map.
+ """
+ data_type = self.data_type
+ if isinstance(data_type, pa.StructType):
+ return Timestream._call("fieldref", self, key)
+ elif isinstance(data_type, pa.MapType):
+ return Timestream._call("get_map", self, key)
+ elif isinstance(data_type, pa.ListType):
+ return Timestream._call("get_list", self, key)
+ else:
+ raise TypeError(f"Cannot index into {data_type}")
+
+ def select(self, *args: str) -> Timestream:
+ """
+ Select the given fields from a Timestream of records.
+
+ Parameters
+ ----------
+ args : list[str]
+ List of field names to select.
+
+ Returns
+ -------
+ Timestream
+ Timestream with the same records limited to the specified fields.
+ """
+ return Timestream._call("select_fields", self, *args)
+
+ def remove(self, *args: str) -> Timestream:
+ """
+ Remove the given fileds from a Timestream of records.
+
+ Parameters
+ ----------
+ args : list[str]
+ List of field names to exclude.
+
+ Returns
+ -------
+ Timestream
+ Timestream with the same records and the given fields excluded.
+ """
+ return Timestream._call("remove_fields", self, *args)
+
+ def extend(self, fields: Dict[str, Timestream]) -> Timestream:
+ """
+ Extend this Timestream of records with additional fields.
+
+ If a field exists in the base Timestream and the `fields`, the value
+ from the `fields` will be taken.
+
+ Parameters
+ ----------
+ fields : dict[str, Timestream]
+ Fields to add to each record in the Timestream.
+
+ Returns
+ -------
+ Timestream
+ Timestream with the given fields added.
+ """
+ # This argument order is weird, and we shouldn't need to make a record
+ # in order to do the extension.
+ extension = record(fields)
+ return Timestream._call("extend_record", extension, self)
+
+ def neg(self) -> Timestream:
+ """
+ Create a Timestream from the logical or numeric negation of self.
+
+ Returns
+ -------
+ Timestream
+ Timestream with the logical or numeric negation of self.
+ """
+ data_type = self.data_type
+ if data_type == pa.bool_():
+ return Timestream._call("not", self)
+ else:
+ return Timestream._call("neg", self)
+
+ def is_null(self) -> Timestream:
+ """
+ Create a boolean Timestream containing `true` when self is `null`.
+
+ Returns
+ -------
+ Timestream
+ Timestream with `true` when self is `null` and `false` when it isn't.
+ """
+ return self.is_not_null().neg()
+
+ def is_not_null(self) -> Timestream:
+ """
+ Create a boolean Timestream containing `true` when self is not `null`.
+
+ Returns
+ -------
+ Timestream
+ Timestream with `true` when self is not `null` and `false` when it is.
+ """
+ return Timestream._call("is_valid", self)
+
+ def collect(
+ self, max: Optional[int], window: Optional["kt.Window"] = None
+ ) -> Timestream:
+ """
+ Create a Timestream collecting up to the last `max` values in the `window`.
+
+ Collects the values for each key separately.
+
+ Parameters
+ ----------
+ max : Optional[int]
+ The maximum number of values to collect.
+ If `None` all values are collected.
+ window : Optional[Window]
+ The window to use for the aggregation.
+ If not specified, the entire Timestream is used.
+
+ Returns
+ -------
+ Timestream
+ Timestream containing the collected list at each point.
+ """
+ return _aggregation("collect", self, window, max)
+
+ def sum(self, window: Optional["kt.Window"] = None) -> Timestream:
+ """
+ Create a Timestream summing the values in the `window`.
+
+ Computes the sum for each key separately.
+
+ Parameters
+ ----------
+ window : Optional[Window]
+ The window to use for the aggregation.
+ If not specified, the entire Timestream is used.
+
+ Returns
+ -------
+ Timestream
+ Timestream containing the sum up to and including each point.
+ """
+ return _aggregation("sum", self, window)
+
+ def first(self, window: Optional["kt.Window"] = None) -> Timestream:
+ """
+ Create a Timestream containing the first value in the `window`.
+
+ Computed for each key separately.
+
+ Parameters
+ ----------
+ window : Optional[Window]
+ The window to use for the aggregation.
+ If not specified, the entire Timestream is used.
+
+ Returns
+ -------
+ Timestream
+ Timestream containing the first value for the key in the window for
+ each point.
+ """
+ return _aggregation("first", self, window)
+
+ def last(self, window: Optional["kt.Window"] = None) -> Timestream:
+ """
+ Create a Timestream containing the last value in the `window`.
+
+ Computed for each key separately.
+
+ Parameters
+ ----------
+ window : Optional[Window]
+ The window to use for the aggregation.
+ If not specified, the entire Timestream is used.
+
+ Returns
+ -------
+ Timestream
+ Timestream containing the last value for the key in the window for
+ each point.
+ """
+ return _aggregation("last", self, window)
+
+ def preview(self, limit: int = 100) -> pd.DataFrame:
+ """
+ Return the first N rows of the result as a Pandas DataFrame.
+
+ This makes it easy to preview the content of the Timestream.
+
+ Parameters
+ ----------
+ limit : int
+ Maximum number of rows to print.
+
+ Returns
+ -------
+ pd.DataFrame
+ The Pandas DataFrame containing the first `limit` points.
+ """
+ return self.run(row_limit=limit).to_pandas()
+
+ def run(
+ self, row_limit: Optional[int] = None, max_batch_size: Optional[int] = None
+ ) -> Result:
+ """
+ Run the Timestream once.
+
+ Parameters
+ ----------
+ row_limit : Optional[int]
+ The maximum number of rows to return.
+ If not specified all rows are returned.
+
+ max_batch_size : Optional[int]
+ The maximum number of rows to return in each batch.
+ If not specified the default is used.
+
+ Returns
+ -------
+ Result
+ The `Result` object to use for accessing the results.
+ """
+ options = ExecutionOptions(row_limit=row_limit, max_batch_size=max_batch_size)
+ execution = self._ffi_expr.execute(options)
+ return Result(execution)
+
+
+def _aggregation(
+ op: str, input: Timestream, window: Optional["kt.Window"], *args: Optional[Arg]
+) -> Timestream:
+ """
+ Create the aggregation `op` with the given `input`, `window` and `args`.
+
+ Parameters
+ ----------
+ op : str
+ The operation to create.
+ input : Timestream
+ The input to the aggregation.
+ window : Optional[Window]
+ The window to use for the aggregation.
+ *args : Optional[Arg]
+ Additional arguments to provide before `input` and the flattened window.
+
+ Returns
+ -------
+ Timestream
+ The resulting Timestream.
+
+ Raises
+ ------
+ NotImplementedError
+ If the window is not a known type.
+ """
+ # Note: things would be easier if we had a more normal order, which
+ # we could do as part of "aligning" Sparrow signatures to the new direction.
+ # However, `collect` currently has `collect(max, input, window)`, requiring
+ # us to add the *args like so.
+ if window is None:
+ return Timestream._call(op, *args, input, None, None)
+ elif isinstance(window, kt.SinceWindow):
+ return Timestream._call(op, *args, input, window.predicate, None)
+ elif isinstance(window, kt.SlidingWindow):
+ return Timestream._call(op, *args, input, window.predicate, window.duration)
+ else:
+ raise NotImplementedError(f"Unknown window type {window!r}")
+
+
+def record(fields: Dict[str, Timestream]) -> Timestream:
+ """
+ Create a record Timestream from the given fields.
+
+ Parameters
+ ----------
+ fields : dict[str, Timestream]
+ The fields to include in the record.
+
+ Returns
+ -------
+ Timestream
+ Timestream containing records with the given fields.
+ """
+ import itertools
+
+ args: List[Union[str, Timestream]] = list(itertools.chain(*fields.items()))
+ return Timestream._call("record", *args)
diff --git a/sparrow-py/pysrc/sparrow_py/_windows.py b/sparrow-py/pysrc/sparrow_py/_windows.py
index ae27614f0..10365b10e 100644
--- a/sparrow-py/pysrc/sparrow_py/_windows.py
+++ b/sparrow-py/pysrc/sparrow_py/_windows.py
@@ -1,6 +1,6 @@
from dataclasses import dataclass
-from ._expr import Expr
+from ._timestream import Timestream
@dataclass(frozen=True)
@@ -18,12 +18,12 @@ class SinceWindow(Window):
Parameters
----------
- predicate : Expr
- The predicate to use for the window.
+ predicate : Timestream
+ The boolean Timestream to use as predicate for the window.
Each time the predicate evaluates to true the window will be cleared.
"""
- predicate: Expr
+ predicate: Timestream
@dataclass(frozen=True)
@@ -36,10 +36,10 @@ class SlidingWindow(Window):
duration : int
The number of sliding intervals to use in the window.
- predicate : Expr
- The predicate to use for the window.
+ predicate : Timestream
+ The boolean Timestream to use as predicate for the window
Each time the predicate evaluates to true the window starts a new interval.
"""
duration: int
- predicate: Expr
+ predicate: Timestream
diff --git a/sparrow-py/pysrc/sparrow_py/sources/source.py b/sparrow-py/pysrc/sparrow_py/sources/source.py
index e24ad8854..ad5733324 100644
--- a/sparrow-py/pysrc/sparrow_py/sources/source.py
+++ b/sparrow-py/pysrc/sparrow_py/sources/source.py
@@ -4,15 +4,15 @@
import pyarrow as pa
import sparrow_py._ffi as _ffi
-from .._expr import Expr
from .._session import _get_session
+from .._timestream import Timestream
_TABLE_NUM: int = 0
-class Source(Expr):
- """A source expression."""
+class Source(Timestream):
+ """A source (input) Timestream."""
# TODO: Clean-up naming on the FFI side.
_ffi_table: _ffi.Table
diff --git a/sparrow-py/pytests/conftest.py b/sparrow-py/pytests/conftest.py
index d0e400e6e..ec62eed73 100644
--- a/sparrow-py/pytests/conftest.py
+++ b/sparrow-py/pytests/conftest.py
@@ -4,7 +4,7 @@
import pandas as pd
import pytest
-import sparrow_py
+import sparrow_py as kt
from sparrow_py import init_session
@@ -23,7 +23,7 @@ def golden(request: pytest.FixtureRequest, pytestconfig: pytest.Config): # noqa
output = 0
def handler(
- query: Union[sparrow_py.Expr, pd.DataFrame],
+ query: Union[kt.Timestream, pd.DataFrame],
format: Union[Literal["csv"], Literal["parquet"], Literal["json"]] = "json",
):
"""
@@ -31,7 +31,7 @@ def handler(
Parameters
----------
- query : sparrow_py.Expr to execute or pd.DataFrame
+ query : kt.Timestream to execute or pd.DataFrame
The query to run (or a result to use).
format : str, optional
The format to store the golden file in.
@@ -46,10 +46,12 @@ def handler(
if isinstance(query, pd.DataFrame):
df = query
- elif isinstance(query, sparrow_py.Expr):
+ elif isinstance(query, kt.Timestream):
df = query.run().to_pandas()
else:
- raise ValueError(f"query must be an Expr or a DataFrame, was {type(query)}")
+ raise ValueError(
+ f"query must be a Timestream or a DataFrame, was {type(query)}"
+ )
test_name = request.node.name
module_name = request.node.module.__name__
diff --git a/sparrow-py/pytests/golden/record_test/test_remove_record.json b/sparrow-py/pytests/golden/record_test/test_remove_record.json
new file mode 100644
index 000000000..e55310483
--- /dev/null
+++ b/sparrow-py/pytests/golden/record_test/test_remove_record.json
@@ -0,0 +1,6 @@
+{"_time":851042397000000000,"_subsort":0,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:39:57-08:00","key":"A","m":5.0}
+{"_time":851042398000000000,"_subsort":1,"_key_hash":2867199309159137213,"_key":"B","time":"1996-12-19T16:39:58-08:00","key":"B","m":24.0}
+{"_time":851042399000000000,"_subsort":2,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:39:59-08:00","key":"A","m":17.0}
+{"_time":851042400000000000,"_subsort":3,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:40:00-08:00","key":"A","m":null}
+{"_time":851042401000000000,"_subsort":4,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:40:01-08:00","key":"A","m":12.0}
+{"_time":851042402000000000,"_subsort":5,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:40:02-08:00","key":"A","m":null}
diff --git a/sparrow-py/pytests/golden/record_test/test_select_record.json b/sparrow-py/pytests/golden/record_test/test_select_record.json
index e55310483..f45a716ff 100644
--- a/sparrow-py/pytests/golden/record_test/test_select_record.json
+++ b/sparrow-py/pytests/golden/record_test/test_select_record.json
@@ -1,6 +1,6 @@
-{"_time":851042397000000000,"_subsort":0,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:39:57-08:00","key":"A","m":5.0}
-{"_time":851042398000000000,"_subsort":1,"_key_hash":2867199309159137213,"_key":"B","time":"1996-12-19T16:39:58-08:00","key":"B","m":24.0}
-{"_time":851042399000000000,"_subsort":2,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:39:59-08:00","key":"A","m":17.0}
-{"_time":851042400000000000,"_subsort":3,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:40:00-08:00","key":"A","m":null}
-{"_time":851042401000000000,"_subsort":4,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:40:01-08:00","key":"A","m":12.0}
-{"_time":851042402000000000,"_subsort":5,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:40:02-08:00","key":"A","m":null}
+{"_time":851042397000000000,"_subsort":0,"_key_hash":12960666915911099378,"_key":"A","n":10.0}
+{"_time":851042398000000000,"_subsort":1,"_key_hash":2867199309159137213,"_key":"B","n":3.0}
+{"_time":851042399000000000,"_subsort":2,"_key_hash":12960666915911099378,"_key":"A","n":6.0}
+{"_time":851042400000000000,"_subsort":3,"_key_hash":12960666915911099378,"_key":"A","n":9.0}
+{"_time":851042401000000000,"_subsort":4,"_key_hash":12960666915911099378,"_key":"A","n":null}
+{"_time":851042402000000000,"_subsort":5,"_key_hash":12960666915911099378,"_key":"A","n":null}
diff --git a/sparrow-py/pytests/golden/timestream_test/test_timestream_preview.json b/sparrow-py/pytests/golden/timestream_test/test_timestream_preview.json
new file mode 100644
index 000000000..34f9540be
--- /dev/null
+++ b/sparrow-py/pytests/golden/timestream_test/test_timestream_preview.json
@@ -0,0 +1,4 @@
+{"_time":851042397000000000,"_subsort":0,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:39:57-08:00","key":"A","m":5.0,"n":10}
+{"_time":851042398000000000,"_subsort":1,"_key_hash":2867199309159137213,"_key":"B","time":"1996-12-19T16:39:58-08:00","key":"B","m":24.0,"n":3}
+{"_time":851042399000000000,"_subsort":2,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:39:59-08:00","key":"A","m":17.0,"n":6}
+{"_time":851042400000000000,"_subsort":3,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:40:00-08:00","key":"A","m":null,"n":9}
diff --git a/sparrow-py/pytests/record_test.py b/sparrow-py/pytests/record_test.py
index fca5caf07..9ecce17af 100644
--- a/sparrow-py/pytests/record_test.py
+++ b/sparrow-py/pytests/record_test.py
@@ -39,4 +39,8 @@ def test_extend_record(source, golden) -> None:
def test_select_record(source, golden) -> None:
- golden(source.select("m", "n"))
+ golden(source.select("n"))
+
+
+def test_remove_record(source, golden) -> None:
+ golden(source.remove("n"))
diff --git a/sparrow-py/pytests/result_test.py b/sparrow-py/pytests/result_test.py
index a640be808..a1ac10757 100644
--- a/sparrow-py/pytests/result_test.py
+++ b/sparrow-py/pytests/result_test.py
@@ -27,6 +27,7 @@ def test_iterate_pandas(golden, source_int64) -> None:
with pytest.raises(StopIteration):
next(results)
+
def test_iterate_rows(golden, source_int64) -> None:
results = source_int64.run(row_limit=2).iter_rows()
assert next(results)["m"] == 5
diff --git a/sparrow-py/pytests/expr_test.py b/sparrow-py/pytests/timestream_test.py
similarity index 58%
rename from sparrow-py/pytests/expr_test.py
rename to sparrow-py/pytests/timestream_test.py
index 94e47b8c6..8d989123b 100644
--- a/sparrow-py/pytests/expr_test.py
+++ b/sparrow-py/pytests/timestream_test.py
@@ -39,33 +39,40 @@ def test_field_ref_not_a_struct(source1) -> None:
source1["x"]["x"]
-def test_expr(source1) -> None:
+def test_timestream_math(source1) -> None:
x = source1["x"]
- assert x + 1 == x + 1
+ assert (x + 1).data_type == x.data_type
+ assert (1 + x).data_type == x.data_type
+ assert (x - 1).data_type == x.data_type
+ assert (1 - x).data_type == x.data_type
+ assert (x * 1).data_type == x.data_type
+ assert (1 * x).data_type == x.data_type
+ assert (1 / x).data_type == x.data_type
+ assert (1 + x).data_type == x.data_type
-def test_expr_comparison(source1) -> None:
+def test_timestream_comparison(source1) -> None:
x = source1["x"]
- assert (x > 1) == (x > 1)
- # Python doesn't have a `__rgt__` (reverse gt) dunder method.
- # Instead, if the LHS doesn't support `gt` with the RHS, it tries
- # rhs `lt` lhs.
- assert (1 < x) == (x > 1)
-
- # We can't overload `__eq__` to do this, so we have to use a method.
- x.eq(1)
-
-
-# def test_expr_pipe(source1) -> None:
-# assert source1.x.pipe(math.add, 1) == math.add(source1.x, 1)
-# assert source1.x.pipe((math.add, "rhs"), 1) == math.add(1, rhs=source1.x)
-
-# assert source1.x.pipe(math.gt, 1) == math.gt(source1.x, 1)
-# assert source1.x.pipe((math.gt, "rhs"), 1) == math.gt(1, rhs=source1.x)
-
-
-def test_expr_arithmetic_types(source1) -> None:
+ # Tests the various comparison operators. Even though Python doesn't have a
+ # `__rgt__` (reverse gt) dunder method, if the LHS doesn't support `gt` with
+ # the RHS it seems to try `rhs lt lhs`.
+ assert (x > 1).data_type == pa.bool_()
+ assert (1 > x).data_type == pa.bool_()
+ assert (x < 1).data_type == pa.bool_()
+ assert (1 < x).data_type == pa.bool_()
+ assert (x >= 1).data_type == pa.bool_()
+ assert (1 >= x).data_type == pa.bool_()
+ assert (x <= 1).data_type == pa.bool_()
+ assert (1 <= x).data_type == pa.bool_()
+
+ # For `eq` and `ne` we only support timestream on the LHS since it is a method.
+ # We can't overload `__eq__` since that must take any RHS and must return `bool`.
+ assert x.eq(1).data_type == pa.bool_()
+ assert x.ne(1).data_type == pa.bool_()
+
+
+def test_timestream_arithmetic_types(source1) -> None:
x = source1["x"]
assert (x.eq(1)).data_type == pa.bool_()
assert (x + 1).data_type == pa.float64()
@@ -78,11 +85,11 @@ def test_expr_arithmetic_types(source1) -> None:
x.eq(1) + source1["y"]
assert "Incompatible argument types" in str(e)
if sys.version_info >= (3, 11):
- assert "Arg[0]: Expr of type bool" in e.value.__notes__
- assert "Arg[1]: Expr of type int32" in e.value.__notes__
+ assert "Arg[0]: Timestream[bool]" in e.value.__notes__
+ assert "Arg[1]: Timestream[int32]" in e.value.__notes__
-def test_expr_preview(source1, golden) -> None:
+def test_timestream_preview(source1, golden) -> None:
content = "\n".join(
[
"time,key,m,n",
@@ -96,4 +103,4 @@ def test_expr_preview(source1, golden) -> None:
)
source = kt.sources.CsvSource("time", "key", content)
- golden(source.preview(limit=4))
\ No newline at end of file
+ golden(source.preview(limit=4))
diff --git a/sparrow-py/src/expr.rs b/sparrow-py/src/expr.rs
index eb068e0c3..5b758309d 100644
--- a/sparrow-py/src/expr.rs
+++ b/sparrow-py/src/expr.rs
@@ -60,14 +60,6 @@ impl Expr {
Ok(Self { rust_expr, session })
}
- /// Return true if the two expressions are equivalent.
- ///
- /// Pyo3 doesn't currently support `__eq__` and we don't want to do `__richcmp__`.
- /// This is mostly only used for testing, so it seems ok.
- fn equivalent(&self, other: &Expr) -> PyResult {
- Ok(self.rust_expr.equivalent(&other.rust_expr) && self.session == other.session)
- }
-
/// Return the session this expression is in.
fn session(&self) -> Session {
self.session.clone()