diff --git a/crates/sparrow-session/src/expr.rs b/crates/sparrow-session/src/expr.rs index b4e650bfb..eb7463c13 100644 --- a/crates/sparrow-session/src/expr.rs +++ b/crates/sparrow-session/src/expr.rs @@ -12,16 +12,6 @@ impl Expr { _ => None, } } - - pub fn equivalent(&self, other: &Expr) -> bool { - // This isn't quite correct -- we should lock everything and then compare. - // But, this is a temporary hack for the Python builder. - self.0.value() == other.0.value() - && self.0.is_new() == other.0.is_new() - && self.0.value_type() == other.0.value_type() - && self.0.grouping() == other.0.grouping() - && self.0.time_domain() == other.0.time_domain() - } } pub enum Literal { diff --git a/sparrow-py/.flake8 b/sparrow-py/.flake8 index 6d1843ad4..e88a55d99 100644 --- a/sparrow-py/.flake8 +++ b/sparrow-py/.flake8 @@ -1,8 +1,7 @@ [flake8] -select = B,B9,C,D,DAR,E,F,N,W +select = B,B9,C,E,F,N,W ignore = E203,E501,W503 max-line-length = 100 max-complexity = 10 -docstring-convention = numpy rst-roles = class,const,func,meth,mod,ref rst-directives = deprecated \ No newline at end of file diff --git a/sparrow-py/README.md b/sparrow-py/README.md index 1c1fc00bb..ab985955b 100644 --- a/sparrow-py/README.md +++ b/sparrow-py/README.md @@ -1,9 +1,16 @@ -# Python Library for Kaskada +# Kaskada Timestreams -`sparrow-py` provides a Python library for building and executing Kaskada queries using an embedded Sparrow library. -It uses [Pyo3][pyo3] to generate Python wrappers for the Rust code, and then provides more Python-friendly implementations on top of that. + +Kaskada's `timestreams` library makes it easy to work with structured event-based data. +Define temporal queries on event-based data loaded from Python, using Pandas or PyArrow and push new data in as it occurs. +Or, execute the queries directly on events in your data lake and/or as they arrive on a stream. -[pyo3]: https://github.com/PyO3/pyo3 +With Kaskada you can unleash the value of real-time, temporal queries without the complexity of "big" infrastructure components like a distributed stream or stream processing system. + +Under the hood, `timestreams` is an efficient temporal query engine built in Rust. +It is built on Apache Arrow, using the same columnar execution strategy that makes ... + + ## Install Python diff --git a/sparrow-py/docs/conf.py b/sparrow-py/docs/conf.py index b46cdebc9..8f93ee912 100644 --- a/sparrow-py/docs/conf.py +++ b/sparrow-py/docs/conf.py @@ -1,11 +1,68 @@ """Sphinx configuration.""" +from typing import Any +from typing import Dict + project = "sparrow-py" author = "Kaskada Contributors" copyright = "2023, Kaskada Contributors" extensions = [ "sphinx.ext.autodoc", "sphinx.ext.napoleon", + "sphinx.ext.intersphinx", + "sphinx.ext.todo", "myst_parser", ] autodoc_typehints = "description" html_theme = "furo" +html_title = "Kaskada" +language = "en" + +intersphinx_mapping: Dict[str, Any] = { + 'python': ('http://docs.python.org/3', None), + 'pandas': ('http://pandas.pydata.org/docs', None), + 'pyarrow': ('https://arrow.apache.org/docs', None), +} + +html_theme_options: Dict[str, Any] = { + "footer_icons": [ + { + "name": "GitHub", + "url": "https://github.com/kaskada-ai/kaskada", + "html": """ + + + + """, + "class": "", + }, + ], + "source_repository": "https://github.com/kaskada-ai/kaskada/", + "source_branch": "main", + "source_directory": "sparrow-py/docs/", +} + +# Options for Todos +todo_include_todos = True + +# Options for Myst (markdown) +# https://myst-parser.readthedocs.io/en/v0.17.1/syntax/optional.html +myst_enable_extensions = [ + "colon_fence", + "deflist", + "smartquotes", + "replacements", +] +myst_heading_anchors = 3 + +# Suggested options from Furo theme +# -- Options for autodoc ---------------------------------------------------- +# https://www.sphinx-doc.org/en/master/usage/extensions/autodoc.html#configuration + +# Automatically extract typehints when specified and place them in +# descriptions of the relevant function/method. +autodoc_typehints = "description" + +# Don't show class signature with the class' name. +autodoc_class_signature = "separated" + +autodoc_type_aliases = { 'Arg': 'sparrow_py.Arg' } diff --git a/sparrow-py/docs/index.md b/sparrow-py/docs/index.md index 7033913b5..ec5732a99 100644 --- a/sparrow-py/docs/index.md +++ b/sparrow-py/docs/index.md @@ -1,5 +1,51 @@ -```{include} ../README.md --- -end-before: +hide-toc: true --- + +# Kaskada Timestreams + +```{include} ../README.md +:start-after: +:end-before: +``` + +## Getting Started with Timestreams + +Getting started with Timestreams is as simple as `pip` installing the Python library, loading some data and running a query. + +```python +import timestreams as t + +# Read data from a Parquet file. +data = t.sources.Parquet.from_file( + "path_to_file.parquet", + time = "time", + key = "user") +# Get the count of events associated with each user over time, as a dataframe. +data.count().run().to_pandas() +``` + +## What are "Timestreams"? +A [Timestream](reference/timestream) describes how a value changes over time. In the same way that SQL +queries transform tables and graph queries transform nodes and edges, +Kaskada queries transform Timestreams. + +In comparison to a timeseries which often contains simple values (e.g., numeric +observations) defined at fixed, periodic times (i.e., every minute), a Timestream +contains any kind of data (records or collections as well as primitives) and may +be defined at arbitrary times corresponding to when the events occur. + +```{toctree} +:hidden: + +quickstart +``` + +```{toctree} +:caption: Reference +:hidden: + +reference/timestream +reference/sources +reference/execution ``` \ No newline at end of file diff --git a/sparrow-py/docs/quickstart.md b/sparrow-py/docs/quickstart.md new file mode 100644 index 000000000..ff62d87ba --- /dev/null +++ b/sparrow-py/docs/quickstart.md @@ -0,0 +1,6 @@ +# Quick Start + +```{todo} + +Write the quick start. +``` \ No newline at end of file diff --git a/sparrow-py/docs/reference/execution.md b/sparrow-py/docs/reference/execution.md new file mode 100644 index 000000000..5bef2e650 --- /dev/null +++ b/sparrow-py/docs/reference/execution.md @@ -0,0 +1,11 @@ +# Execution + +There are several ways to execute a Timestream. +The {py:meth}`run method ` executes the Timestream over the current data, and produces a {py:class}`Result `. + +## Result +```{eval-rst} +.. autoclass:: sparrow_py.Result + :exclude-members: __init__ + :members: +``` \ No newline at end of file diff --git a/sparrow-py/docs/reference/sources.md b/sparrow-py/docs/reference/sources.md new file mode 100644 index 000000000..b2daf7f82 --- /dev/null +++ b/sparrow-py/docs/reference/sources.md @@ -0,0 +1,6 @@ +# Sources + +```{eval-rst} +.. automodule:: sparrow_py.sources + :members: +``` \ No newline at end of file diff --git a/sparrow-py/docs/reference/timestream.md b/sparrow-py/docs/reference/timestream.md new file mode 100644 index 000000000..af96ae146 --- /dev/null +++ b/sparrow-py/docs/reference/timestream.md @@ -0,0 +1,17 @@ +# Timestream + +```{todo} +- [ ] Expand the `Arg` type alias in timestreams accordingly. +``` + +```{eval-rst} +.. autoclass:: sparrow_py.Timestream + :exclude-members: __init__ + :members: + :special-members: +``` + +```{eval-rst} +.. autoclass:: sparrow_py.Arg + :members: +``` \ No newline at end of file diff --git a/sparrow-py/noxfile.py b/sparrow-py/noxfile.py index 6eeda9b25..edbad4fda 100644 --- a/sparrow-py/noxfile.py +++ b/sparrow-py/noxfile.py @@ -49,17 +49,20 @@ def check_lint(session: Session) -> None: "darglint", "flake8", "flake8-bugbear", - "flake8-docstrings", "flake8-rst-docstrings", "isort", "pep8-naming", + "pydocstyle", "pyupgrade", ) session.run session.run("black", "--check", *args) - session.run("darglint", "pysrc") session.run("flake8", *args) session.run("isort", "--filter-files", "--check-only", *args) + + # Only do darglint and pydocstyle on pysrc (source) + session.run("darglint", "pysrc") + session.run("pydocstyle", "--convention=numpy", "pysrc") # No way to run this as a check. # session.run("pyupgrade", "--py38-plus") @@ -97,7 +100,7 @@ def mypy(session: Session) -> None: # However, there is a possibility it slows things down, by making mypy # run twice -- once to determine what types need to be installed, then once # to check things with those stubs. - session.run("mypy", "--install-types", *args) + session.run("mypy", "--install-types", "--non-interactive", *args) if not session.posargs: session.run("mypy", f"--python-executable={sys.executable}", "noxfile.py") @@ -172,7 +175,7 @@ def docs(session: Session) -> None: """Build and serve the documentation with live reloading on file changes.""" args = session.posargs or ["--open-browser", "docs", "docs/_build"] install_self(session) - session.install("sphinx", "sphinx-autobuild", "furo", "myst-parser") + session.install("sphinx", "sphinx-autobuild", "furo", "myst-parser", "pandas", "pyarrow", "sphinx-autodoc-typehints") build_dir = Path("docs", "_build") if build_dir.exists(): diff --git a/sparrow-py/poetry.lock b/sparrow-py/poetry.lock index bae940712..d25430518 100644 --- a/sparrow-py/poetry.lock +++ b/sparrow-py/poetry.lock @@ -391,21 +391,6 @@ flake8 = ">=3.0.0" [package.extras] dev = ["coverage", "hypothesis", "hypothesmith (>=0.2)", "pre-commit", "pytest", "tox"] -[[package]] -name = "flake8-docstrings" -version = "1.7.0" -description = "Extension for flake8 which uses pydocstyle to check docstrings" -optional = false -python-versions = ">=3.7" -files = [ - {file = "flake8_docstrings-1.7.0-py2.py3-none-any.whl", hash = "sha256:51f2344026da083fc084166a9353f5082b01f72901df422f74b4d953ae88ac75"}, - {file = "flake8_docstrings-1.7.0.tar.gz", hash = "sha256:4c8cc748dc16e6869728699e5d0d685da9a10b0ea718e090b1ba088e67a941af"}, -] - -[package.dependencies] -flake8 = ">=3" -pydocstyle = ">=2.1" - [[package]] name = "flake8-rst-docstrings" version = "0.3.0" @@ -1336,13 +1321,13 @@ files = [ [[package]] name = "sphinx" -version = "7.1.1" +version = "7.1.2" description = "Python documentation generator" optional = false python-versions = ">=3.8" files = [ - {file = "sphinx-7.1.1-py3-none-any.whl", hash = "sha256:4e6c5ea477afa0fb90815210fd1312012e1d7542589ab251ac9b53b7c0751bce"}, - {file = "sphinx-7.1.1.tar.gz", hash = "sha256:59b8e391f0768a96cd233e8300fe7f0a8dc2f64f83dc2a54336a9a84f428ff4e"}, + {file = "sphinx-7.1.2-py3-none-any.whl", hash = "sha256:d170a81825b2fcacb6dfd5a0d7f578a053e45d3f2b153fecc948c37344eb4cbe"}, + {file = "sphinx-7.1.2.tar.gz", hash = "sha256:780f4d32f1d7d1126576e0e5ecc19dc32ab76cd24e950228dcf7b1f6d3d9e22f"}, ] [package.dependencies] @@ -1652,4 +1637,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = ">=3.8,<4.0" -content-hash = "2741172019f6a75bc8a80c0b3031e45181325830a4286e07c01ab610d44913d6" +content-hash = "55960b933d43ef46c40f57a767456fc7bbc12865d7f1cbcfe0537afa06c61889" diff --git a/sparrow-py/pyproject.toml b/sparrow-py/pyproject.toml index 8596cb5b7..a1f5796cd 100644 --- a/sparrow-py/pyproject.toml +++ b/sparrow-py/pyproject.toml @@ -15,7 +15,6 @@ coverage = {extras = ["toml"], version = ">=6.2"} darglint = ">=1.8.1" flake8 = ">=4.0.1" flake8-bugbear = ">=21.9.2" -flake8-docstrings = ">=1.6.0" flake8-rst-docstrings = ">=0.2.5" furo = ">=2021.11.12" isort = ">=5.10.1" @@ -23,6 +22,7 @@ mypy = ">=0.930" pandas-stubs = "^2.0.2" pep8-naming = ">=0.12.1" pyarrow = "^12.0.1" +pydocstyle = "^6.3.0" pytest = ">=6.2.5" pyupgrade = ">=2.29.1" safety = ">=1.10.3" diff --git a/sparrow-py/pysrc/sparrow_py/__init__.py b/sparrow-py/pysrc/sparrow_py/__init__.py index b6500c005..1ac6be591 100644 --- a/sparrow-py/pysrc/sparrow_py/__init__.py +++ b/sparrow-py/pysrc/sparrow_py/__init__.py @@ -1,29 +1,20 @@ """Kaskada query builder and local executon engine.""" -from typing import Dict -from typing import List -from typing import Union - from . import sources from ._execution import ExecutionOptions -from ._expr import Expr from ._result import Result from ._session import init_session +from ._timestream import Arg +from ._timestream import Timestream +from ._timestream import record from ._windows import SinceWindow from ._windows import SlidingWindow from ._windows import Window -def record(fields: Dict[str, Expr]) -> Expr: - """Create a record from the given keyword arguments.""" - import itertools - - args: List[Union[str, "Expr"]] = list(itertools.chain(*fields.items())) - return Expr.call("record", *args) - - __all__ = [ + "Arg", "ExecutionOptions", - "Expr", + "Timestream", "init_session", "record", "Result", diff --git a/sparrow-py/pysrc/sparrow_py/_execution.py b/sparrow-py/pysrc/sparrow_py/_execution.py index 5f1f5f3ee..981cd881e 100644 --- a/sparrow-py/pysrc/sparrow_py/_execution.py +++ b/sparrow-py/pysrc/sparrow_py/_execution.py @@ -19,4 +19,3 @@ class ExecutionOptions: row_limit: Optional[int] = None max_batch_size: Optional[int] = None - diff --git a/sparrow-py/pysrc/sparrow_py/_expr.py b/sparrow-py/pysrc/sparrow_py/_expr.py deleted file mode 100644 index b0efa6c14..000000000 --- a/sparrow-py/pysrc/sparrow_py/_expr.py +++ /dev/null @@ -1,375 +0,0 @@ -"""Defines classes representing Kaskada expressions.""" - -import sys -from typing import Callable -from typing import Dict -from typing import Optional -from typing import Sequence -from typing import Tuple -from typing import Union -from typing import final - -import pandas as pd -import pyarrow as pa -import sparrow_py as kt -import sparrow_py._ffi as _ffi - -from ._execution import ExecutionOptions -from ._result import Result - - -#: The type of arguments to expressions. -Arg = Union["Expr", int, str, float, None] - - -def _augment_error(args: Sequence[Arg], e: Exception) -> Exception: - """Augment an error with information about the arguments.""" - if sys.version_info >= (3, 11): - # If we can add notes to the exception, indicate the types. - # This works in Python >=3.11 - for n, arg in enumerate(args): - if isinstance(arg, Expr): - e.add_note(f"Arg[{n}]: Expr of type {arg.data_type}") - else: - e.add_note(f"Arg[{n}]: Literal {arg} ({type(arg)})") - return e - - -class Expr(object): - """A Kaskada expression.""" - - _ffi_expr: _ffi.Expr - - def __init__(self, ffi: _ffi.Expr) -> None: - """Create a new expression.""" - self._ffi_expr = ffi - - @staticmethod - def call(name: str, *args: Arg) -> "Expr": - """ - Construct a new expression calling the given name. - - Parameters - ---------- - name : str - Name of the operation to apply. - args : list[Expr] - List of arguments to the expression. - - Returns - ------- - Expression representing the given operation applied to the arguments. - - Raises - ------ - # noqa: DAR401 _augment_error - TypeError - If the argument types are invalid for the given function. - ValueError - If the argument values are invalid for the given function. - """ - ffi_args = [arg._ffi_expr if isinstance(arg, Expr) else arg for arg in args] - session = next(arg._ffi_expr.session() for arg in args if isinstance(arg, Expr)) - try: - return Expr(_ffi.Expr(session=session, operation=name, args=ffi_args)) - except TypeError as e: - # noqa: DAR401 - raise _augment_error(args, TypeError(str(e))) from e - except ValueError as e: - raise _augment_error(args, ValueError(str(e))) from e - - @property - def data_type(self) -> pa.DataType: - """Return the data type produced by this expression.""" - return self._ffi_expr.data_type() - - def __eq__(self, other: object) -> bool: - """Return true if the expressions are equivalent.""" - if not isinstance(other, Expr): - return False - return self._ffi_expr.equivalent(other._ffi_expr) - - @final - def pipe( - self, - func: Union[Callable[..., "Expr"], Tuple[Callable[..., "Expr"], str]], - *args: Arg, - **kwargs: Arg, - ) -> "Expr": - """ - Apply chainable functions that expect expressions. - - Parameters - ---------- - func : function - Function to apply to this expression. - ``args``, and ``kwargs`` are passed into ``func``. - Alternatively a ``(callable, keyword)`` tuple where - ``keyword`` is a string indicating the keyword of - ``callable`` that expects the expression. - args : iterable, optional - Positional arguments passed into ``func``. - kwargs : mapping, optional - A dictionary of keyword arguments passed into ``func``. - - Returns - ------- - Expr - The result of applying `func` to the arguments. - - Raises - ------ - ValueError - When using `self` with a specific `keyword` if the `keyword` also - appears on in the `kwargs`. - - Notes - ----- - Use ``.pipe`` when chaining together functions that expect - expressions. - - Instead of writing - - >>> func(g(h(df), arg1=a), arg2=b, arg3=c) # doctest: +SKIP - - You can write - - >>> (df.pipe(h) - ... .pipe(g, arg1=a) - ... .pipe(func, arg2=b, arg3=c) - ... ) # doctest: +SKIP - - If you have a function that takes the data as (say) the second - argument, pass a tuple indicating which keyword expects the - data. For example, suppose ``func`` takes its data as ``arg2``: - - >>> (df.pipe(h) - ... .pipe(g, arg1=a) - ... .pipe((func, 'arg2'), arg1=a, arg3=c) - ... ) # doctest: +SKIP - """ - if isinstance(func, tuple): - func, target = func - if target in kwargs: - msg = f"{target} is both the pipe target and a keyword argument" - raise ValueError(msg) - kwargs[target] = self - return func(*args, **kwargs) - else: - return func(self, *args, **kwargs) - - def __add__(self, rhs: Arg) -> "Expr": - """Add two expressions.""" - return Expr.call("add", self, rhs) - - def __radd__(self, lhs: Arg) -> "Expr": - """Add two expressions.""" - return Expr.call("add", lhs, self) - - def __sub__(self, rhs: Arg) -> "Expr": - """Subtract two expressions.""" - return Expr.call("sub", self, rhs) - - def __mul__(self, rhs: Arg) -> "Expr": - """Multiple two expressions.""" - return Expr.call("mul", self, rhs) - - def __truediv__(self, rhs: Arg) -> "Expr": - """Divide two expressions.""" - return Expr.call("div", self, rhs) - - def __lt__(self, rhs: Arg) -> "Expr": - """Less than comparison.""" - return Expr.call("lt", self, rhs) - - def __le__(self, rhs: Arg) -> "Expr": - """Less than or equal comparison.""" - return Expr.call("le", self, rhs) - - def __gt__(self, rhs: Arg) -> "Expr": - """Greater than comparison.""" - return Expr.call("gt", self, rhs) - - def __ge__(self, rhs: Arg) -> "Expr": - """Greater than or equal comparison.""" - return Expr.call("ge", self, rhs) - - def __and__(self, rhs: Arg) -> "Expr": - """Logical and.""" - return Expr.call("and", self, rhs) - - def __or__(self, rhs: Arg) -> "Expr": - """Logical or.""" - return Expr.call("or", self, rhs) - - def __getitem__(self, key: Arg) -> "Expr": - """ - Index into an expression. - - If the expression is a struct, the key should be a string corresponding - to a field. - - If the expression is a list, the key should be an integer index. If the - expression is a map, the key should be the same type as the map keys. - - Parameters - ---------- - key : Arg - The key to index into the expression. - - Returns - ------- - Expression accessing the given index. - - Raises - ------ - TypeError - When the expression is not a struct, list, or map. - """ - data_type = self.data_type - if isinstance(data_type, pa.StructType): - return Expr.call("fieldref", self, key) - elif isinstance(data_type, pa.MapType): - return Expr.call("get_map", self, key) - elif isinstance(data_type, pa.ListType): - return Expr.call("get_list", self, key) - else: - raise TypeError(f"Cannot index into {data_type}") - - def eq(self, other: "Expr") -> "Expr": - """Expression evaluating to true if self and other are equal.""" - return Expr.call("eq", self, other) - - def ne(self, other: "Expr") -> "Expr": - """Expression evaluating to true if self and other are not equal.""" - return Expr.call("ne", self, other) - - def select(self, invert: bool = False, *args: str) -> "Expr": - """ - Select the given fields from a struct. - - Parameters - ---------- - invert : bool - If false (default), select only the fields given. - If true, select all fields except those given. - args : list[str] - List of field names to select (or remove). - - Returns - ------- - Expression selecting (or excluding) the given fields. - """ - if invert: - return Expr.call("remove_fields", self, *args) - else: - return Expr.call("select_fields", self, *args) - - def extend(self, fields: Dict[str, "Expr"]) -> "Expr": - """Extend this record with the additoonal fields.""" - # This argument order is weird, and we shouldn't need to make a record - # in order to do the extension. - extension = kt.record(fields) - return Expr.call("extend_record", extension, self) - - def neg(self) -> "Expr": - """Apply logical or numerical negation, depending on the type.""" - data_type = self.data_type - if data_type == pa.bool_(): - return Expr.call("not", self) - else: - return Expr.call("neg", self) - - def is_null(self) -> "Expr": - """Return a boolean expression indicating if the expression is null.""" - return self.is_not_null().neg() - - def is_not_null(self) -> "Expr": - """Return a boolean expression indicating if the expression is not null.""" - return Expr.call("is_valid", self) - - def collect( - self, max: Optional[int], window: Optional["kt.Window"] = None - ) -> "Expr": - """Return an expression collecting the last `max` values in the `window`.""" - return _aggregation("collect", self, window, max) - - def sum(self, window: Optional["kt.Window"] = None) -> "Expr": - """Return the sum aggregation of the expression.""" - return _aggregation("sum", self, window) - - def first(self, window: Optional["kt.Window"] = None) -> "Expr": - """Return the first aggregation of the expression.""" - return _aggregation("first", self, window) - - def last(self, window: Optional["kt.Window"] = None) -> "Expr": - """Return the last aggregation of the expression.""" - return _aggregation("last", self, window) - - def preview(self, limit: int = 100) -> pd.DataFrame: - """ - Return the first N rows of the result as a Pandas DataFrame. - - This makes it easy to preview the results of the expression while - developing queries. - - Parameters - ---------- - limit : int, default 100 - Maximum number of rows to print. - """ - return self.run(row_limit=limit).to_pandas() - - def run(self, row_limit: Optional[int] = None, max_batch_size: Optional[int] = None) -> Result: - """ - Run the expression once, until completed. - - Parameters - ---------- - row_limit : Optional[int] - The maximum number of rows to return. - If not specified (the default), all rows are returned. - """ - options = ExecutionOptions(row_limit=row_limit, max_batch_size=max_batch_size) - execution = self._ffi_expr.execute(options) - return Result(execution) - - -def _aggregation( - op: str, input: Expr, window: Optional["kt.Window"], *args: Optional[Arg] -) -> Expr: - """ - Create the aggregation `op` with the given `input`, `window` and `args`. - - Parameters - ---------- - op : str - The operation to create. - input : Expr - The input to the expression. - window : Optional[Window] - The window to use for the aggregation. - *args : Optional[Arg] - Additional arguments to provide before `input` and the flattened window. - - Returns - ------- - The resulting expression. - - Raises - ------ - NotImplementedError - If the window is not a known type. - """ - # Note: things would be easier if we had a more normal order, which - # we could do as part of "aligning" Sparrow signatures to the new direction. - # However, `collect` currently has `collect(max, input, window)`, requiring - # us to add the *args like so. - if window is None: - return Expr.call(op, *args, input, None, None) - elif isinstance(window, kt.SinceWindow): - return Expr.call(op, *args, input, window.predicate, None) - elif isinstance(window, kt.SlidingWindow): - return Expr.call(op, *args, input, window.predicate, window.duration) - else: - raise NotImplementedError(f"Unknown window type {window!r}") diff --git a/sparrow-py/pysrc/sparrow_py/_result.py b/sparrow-py/pysrc/sparrow_py/_result.py index ce753ad33..83022dd5d 100644 --- a/sparrow-py/pysrc/sparrow_py/_result.py +++ b/sparrow-py/pysrc/sparrow_py/_result.py @@ -1,8 +1,13 @@ -from . import _ffi -import pandas as pd +from __future__ import annotations + from typing import Iterator + +import pandas as pd import pyarrow as pa +from . import _ffi + + class Result(object): """Result of running a timestream query.""" @@ -14,14 +19,15 @@ def to_pandas(self) -> pd.DataFrame: """ Convert the result to a Pandas DataFrame. - ```{note} - This method will block on the complete results of the query and collect - all results into memory. If this is not desired, use `iter_pandas` instead. - ``` - Returns ------- - The result as a Pandas DataFrame. + pd.DataFrame + The result as a Pandas DataFrame. + + Warnings + -------- + This method will block on the complete results of the query and collect + all results into memory. If this is not desired, use `iter_pandas` instead. """ batches = self._ffi_execution.collect_pyarrow() if len(batches) == 0: @@ -35,9 +41,10 @@ def iter_pandas(self) -> Iterator[pd.DataFrame]: """ Iterate over the results as Pandas DataFrames. - Returns - ------- - The result as a sequence of Pandas DataFrames. + Yields + ------ + pd.DataFrame + The next Pandas DataFrame. """ next_batch = self._ffi_execution.next_pyarrow() while next_batch is not None: @@ -46,14 +53,15 @@ def iter_pandas(self) -> Iterator[pd.DataFrame]: def iter_rows(self) -> Iterator[dict]: """ - Iterate over the results as dictionaries. + Iterate over the results as row dictionaries. - Returns - ------- - The result as a sequence of dictionaries. + Yields + ------ + dict + The next row as a dictionary. """ next_batch = self._ffi_execution.next_pyarrow() while next_batch is not None: for row in next_batch.to_pylist(): yield row - next_batch = self._ffi_execution.next_pyarrow() \ No newline at end of file + next_batch = self._ffi_execution.next_pyarrow() diff --git a/sparrow-py/pysrc/sparrow_py/_session.py b/sparrow-py/pysrc/sparrow_py/_session.py index 81a471eb2..8747e47cd 100644 --- a/sparrow-py/pysrc/sparrow_py/_session.py +++ b/sparrow-py/pysrc/sparrow_py/_session.py @@ -32,7 +32,8 @@ def _get_session() -> _ffi.Session: Returns ------- - The FFI session handle. + _ffi.Session + The FFI session handle. Raises ------ diff --git a/sparrow-py/pysrc/sparrow_py/_timestream.py b/sparrow-py/pysrc/sparrow_py/_timestream.py new file mode 100644 index 000000000..6d05fa708 --- /dev/null +++ b/sparrow-py/pysrc/sparrow_py/_timestream.py @@ -0,0 +1,736 @@ +"""Defines classes representing Kaskada expressions.""" + +from __future__ import annotations + +import sys +from typing import Callable +from typing import Dict +from typing import List +from typing import Optional +from typing import Sequence +from typing import Tuple +from typing import Union +from typing import final + +import pandas as pd +import pyarrow as pa +import sparrow_py as kt +import sparrow_py._ffi as _ffi + +from ._execution import ExecutionOptions +from ._result import Result + + +Arg = Union["Timestream", int, str, float, bool, None] +""" +The type of arguments to most Timestream functions. +May be a Timestream, a literal (int, str, float, bool), or null (`None`). +""" + +def _augment_error(args: Sequence[Arg], e: Exception) -> Exception: + """Augment an error with information about the arguments.""" + if sys.version_info >= (3, 11): + # If we can add notes to the exception, indicate the types. + # This works in Python >=3.11 + for n, arg in enumerate(args): + if isinstance(arg, Timestream): + e.add_note(f"Arg[{n}]: Timestream[{arg.data_type}]") + else: + e.add_note(f"Arg[{n}]: Literal {arg} ({type(arg)})") + return e + + +class Timestream(object): + """ + A `Timestream` represents a computation producing a Timestream. + """ + + _ffi_expr: _ffi.Expr + + def __init__(self, ffi: _ffi.Expr) -> None: + """Create a new expression.""" + self._ffi_expr = ffi + + @staticmethod + def _call(func: str, *args: Arg) -> Timestream: + """ + Construct a new Timestream by calling the given function. + + Parameters + ---------- + func : str + Name of the function to apply. + *args : Timestream | int | str | float | bool | None + List of arguments to the expression. + + Returns + ------- + Timestream + Timestream representing the result of the function applied to the arguments. + + Raises + ------ + # noqa: DAR401 _augment_error + TypeError + If the argument types are invalid for the given function. + ValueError + If the argument values are invalid for the given function. + """ + ffi_args = [ + arg._ffi_expr if isinstance(arg, Timestream) else arg for arg in args + ] + session = next( + arg._ffi_expr.session() for arg in args if isinstance(arg, Timestream) + ) + try: + return Timestream(_ffi.Expr(session=session, operation=func, args=ffi_args)) + except TypeError as e: + # noqa: DAR401 + raise _augment_error(args, TypeError(str(e))) from e + except ValueError as e: + raise _augment_error(args, ValueError(str(e))) from e + + @property + def data_type(self) -> pa.DataType: + """The PyArrow type of values in this Timestream.""" + return self._ffi_expr.data_type() + + @final + def pipe( + self, + func: Union[ + Callable[..., Timestream], Tuple[Callable[..., Timestream], str] + ], + *args: Arg, + **kwargs: Arg, + ) -> Timestream: + """ + Apply chainable functions that produce Timestreams. + + Parameters + ---------- + func : Callable[..., Timestream] | Tuple[Callable[..., Timestream], str] + Function to apply to this Timestream. Alternatively a `(func, + keyword)` tuple where `keyword` is a string indicating the keyword + of `func` that expects the Timestream. + args : iterable, optional + Positional arguments passed into ``func``. + kwargs : mapping, optional + A dictionary of keyword arguments passed into ``func``. + + Returns + ------- + Timestream + The result of applying `func` to the arguments. + + Raises + ------ + ValueError + When using `self` with a specific `keyword` if the `keyword` also + appears on in the `kwargs`. + + Notes + ----- + Use ``.pipe`` when chaining together functions that expect Timestreams. + + Examples + -------- + Instead of writing + + >>> func(g(h(df), arg1=a), arg2=b, arg3=c) # doctest: +SKIP + + You can write + + >>> (df.pipe(h) + >>> .pipe(g, arg1=a) + >>> .pipe(func, arg2=b, arg3=c) + >>> ) # doctest: +SKIP + + If you have a function that takes the data as (say) the second + argument, pass a tuple indicating which keyword expects the + data. For example, suppose ``func`` takes its data as ``arg2``: + + >>> (df.pipe(h) + >>> .pipe(g, arg1=a) + >>> .pipe((func, 'arg2'), arg1=a, arg3=c) + >>> ) # doctest: +SKIP + """ + if isinstance(func, tuple): + func, target = func + if target in kwargs: + msg = f"{target} is both the pipe target and a keyword argument" + raise ValueError(msg) + kwargs[target] = self + return func(*args, **kwargs) + else: + return func(self, *args, **kwargs) + + def __add__(self, rhs: Arg) -> Timestream: + """ + Create a Timestream adding this and `rhs`. + + Parameters + ---------- + rhs : Arg + The Timestream or literal value to add to this. + + Returns + ------- + Timestream + The Timestream resulting from `self + rhs`. + """ + return Timestream._call("add", self, rhs) + + def __radd__(self, lhs: Arg) -> Timestream: + """ + Create a Timestream adding `lhs` and this. + + Parameters + ---------- + lhs : Arg + The Timestream or literal value to add to this. + + Returns + ------- + Timestream + The Timestream resulting from `lhs + self`. + """ + return Timestream._call("add", lhs, self) + + def __sub__(self, rhs: Arg) -> Timestream: + """ + Create a Timestream substracting `rhs` from this. + + Parameters + ---------- + rhs : Arg + The Timestream or literal value to subtract from this. + + Returns + ------- + Timestream + The Timestream resulting from `self - rhs`. + """ + return Timestream._call("sub", self, rhs) + + def __rsub__(self, lhs: Arg) -> Timestream: + """ + Create a Timestream substracting this from the `lhs`. + + Parameters + ---------- + lhs : Arg + The Timestream or literal value to subtract this from. + + Returns + ------- + Timestream + The Timestream resulting from `lhs - self`. + """ + return Timestream._call("sub", lhs, self) + + def __mul__(self, rhs: Arg) -> Timestream: + """ + Create a Timestream multiplying this and `rhs`. + + Parameters + ---------- + rhs : Arg + The Timestream or literal value to multiply with this. + + Returns + ------- + Timestream + The Timestream resulting from `self * rhs`. + """ + return Timestream._call("mul", self, rhs) + + def __rmul__(self, lhs: Arg) -> Timestream: + """ + Create a Timestream multiplying `lhs` and this. + + Parameters + ---------- + lhs : Arg + The Timestream or literal value to multiply with this. + + Returns + ------- + Timestream + The Timestream resulting from `lhs * self`. + """ + return Timestream._call("mul", lhs, self) + + def __truediv__(self, divisor: Arg) -> Timestream: + """ + Create a Timestream by dividing this and `divisor`. + + Parameters + ---------- + divisor : Arg + The Timestream or literal value to divide this by. + + Returns + ------- + Timestream + The Timestream resulting from `self / divisor`. + """ + return Timestream._call("div", self, divisor) + + def __rtruediv__(self, dividend: Arg) -> Timestream: + """ + Create a Timestream by dividing this and `dividend`. + + Parameters + ---------- + dividend : Arg + The Timestream or literal value to divide by this. + + Returns + ------- + Timestream + The Timestream resulting from `dividend / self`. + """ + return Timestream._call("div", dividend, self) + + def __lt__(self, rhs: Arg) -> Timestream: + """ + Create a Timestream that is true if this is less than `rhs`. + + Parameters + ---------- + rhs : Arg + The Timestream or literal value to compare to. + + Returns + ------- + Timestream + The Timestream resulting from `self < rhs`. + """ + return Timestream._call("lt", self, rhs) + + def __le__(self, rhs: Arg) -> Timestream: + """ + Create a Timestream that is true if this is less than or equal to `rhs`. + + Parameters + ---------- + rhs : Arg + The Timestream or literal value to compare to. + + Returns + ------- + Timestream + The Timestream resulting from `self <= rhs`. + """ + return Timestream._call("lte", self, rhs) + + def __gt__(self, rhs: Arg) -> Timestream: + """ + Create a Timestream that is true if this is greater than `rhs`. + + Parameters + ---------- + rhs : Arg + The Timestream or literal value to compare to. + + Returns + ------- + Timestream + The Timestream resulting from `self > rhs`. + """ + return Timestream._call("gt", self, rhs) + + def __ge__(self, rhs: Arg) -> Timestream: + """ + Create a Timestream that is true if this is greater than or equal to `rhs`. + + Parameters + ---------- + rhs : Arg + The Timestream or literal value to compare to. + + Returns + ------- + Timestream + The Timestream resulting from `self >= rhs`. + """ + return Timestream._call("gte", self, rhs) + + def __and__(self, rhs: Arg) -> Timestream: + """ + Create the logical conjunction of this Timestream and `rhs`. + + Parameters + ---------- + rhs : Arg + The Timestream or literal value to conjoin with. + + Returns + ------- + Timestream + The Timestream resulting from `self & rhs`. + """ + return Timestream._call("and", self, rhs) + + def __or__(self, rhs: Arg) -> Timestream: + """ + Create the logical disjunction of this Timestream nad `rhs`. + + Parameters + ---------- + rhs : Arg + The Timestream or literal value to disjoin with. + + Returns + ------- + Timestream + The Timestream resulting from `self | rhs`. + """ + return Timestream._call("or", self, rhs) + + def eq(self, other: Arg) -> Timestream: + """ + Create a Timestream that is true if this is equal to `other`. + + Parameters + ---------- + other : Arg + The Timestream or literal value to compare to. + + Returns + ------- + Timestream + The Timestream indicating whether the `self` and `other` are equal. + """ + return Timestream._call("eq", self, other) + + def ne(self, other: Arg) -> Timestream: + """ + Create a Timestream that is true if this is not equal to `other`. + + Parameters + ---------- + other : Arg + The Timestream or literal value to compare to. + + Returns + ------- + Timestream + The Timestream indicating whether `self` and `other` are not equal. + """ + return Timestream._call("neq", self, other) + + def __getitem__(self, key: Arg) -> Timestream: + """ + Index into the elements of a Timestream. + + If the Timestream contains records, the key should be a string corresponding + to a field. + + If the Timestream contains lists, the key should be an integer index. + + If the Timestream contains maps, the key should be the same type as the map keys. + + Parameters + ---------- + key : Arg + The key to index into the expression. + + Returns + ------- + Timestream + Timestream with the resulting value (or `null` if absent) at each point. + + Raises + ------ + TypeError + When the Timestream is not a record, list, or map. + """ + data_type = self.data_type + if isinstance(data_type, pa.StructType): + return Timestream._call("fieldref", self, key) + elif isinstance(data_type, pa.MapType): + return Timestream._call("get_map", self, key) + elif isinstance(data_type, pa.ListType): + return Timestream._call("get_list", self, key) + else: + raise TypeError(f"Cannot index into {data_type}") + + def select(self, *args: str) -> Timestream: + """ + Select the given fields from a Timestream of records. + + Parameters + ---------- + args : list[str] + List of field names to select. + + Returns + ------- + Timestream + Timestream with the same records limited to the specified fields. + """ + return Timestream._call("select_fields", self, *args) + + def remove(self, *args: str) -> Timestream: + """ + Remove the given fileds from a Timestream of records. + + Parameters + ---------- + args : list[str] + List of field names to exclude. + + Returns + ------- + Timestream + Timestream with the same records and the given fields excluded. + """ + return Timestream._call("remove_fields", self, *args) + + def extend(self, fields: Dict[str, Timestream]) -> Timestream: + """ + Extend this Timestream of records with additional fields. + + If a field exists in the base Timestream and the `fields`, the value + from the `fields` will be taken. + + Parameters + ---------- + fields : dict[str, Timestream] + Fields to add to each record in the Timestream. + + Returns + ------- + Timestream + Timestream with the given fields added. + """ + # This argument order is weird, and we shouldn't need to make a record + # in order to do the extension. + extension = record(fields) + return Timestream._call("extend_record", extension, self) + + def neg(self) -> Timestream: + """ + Create a Timestream from the logical or numeric negation of self. + + Returns + ------- + Timestream + Timestream with the logical or numeric negation of self. + """ + data_type = self.data_type + if data_type == pa.bool_(): + return Timestream._call("not", self) + else: + return Timestream._call("neg", self) + + def is_null(self) -> Timestream: + """ + Create a boolean Timestream containing `true` when self is `null`. + + Returns + ------- + Timestream + Timestream with `true` when self is `null` and `false` when it isn't. + """ + return self.is_not_null().neg() + + def is_not_null(self) -> Timestream: + """ + Create a boolean Timestream containing `true` when self is not `null`. + + Returns + ------- + Timestream + Timestream with `true` when self is not `null` and `false` when it is. + """ + return Timestream._call("is_valid", self) + + def collect( + self, max: Optional[int], window: Optional["kt.Window"] = None + ) -> Timestream: + """ + Create a Timestream collecting up to the last `max` values in the `window`. + + Collects the values for each key separately. + + Parameters + ---------- + max : Optional[int] + The maximum number of values to collect. + If `None` all values are collected. + window : Optional[Window] + The window to use for the aggregation. + If not specified, the entire Timestream is used. + + Returns + ------- + Timestream + Timestream containing the collected list at each point. + """ + return _aggregation("collect", self, window, max) + + def sum(self, window: Optional["kt.Window"] = None) -> Timestream: + """ + Create a Timestream summing the values in the `window`. + + Computes the sum for each key separately. + + Parameters + ---------- + window : Optional[Window] + The window to use for the aggregation. + If not specified, the entire Timestream is used. + + Returns + ------- + Timestream + Timestream containing the sum up to and including each point. + """ + return _aggregation("sum", self, window) + + def first(self, window: Optional["kt.Window"] = None) -> Timestream: + """ + Create a Timestream containing the first value in the `window`. + + Computed for each key separately. + + Parameters + ---------- + window : Optional[Window] + The window to use for the aggregation. + If not specified, the entire Timestream is used. + + Returns + ------- + Timestream + Timestream containing the first value for the key in the window for + each point. + """ + return _aggregation("first", self, window) + + def last(self, window: Optional["kt.Window"] = None) -> Timestream: + """ + Create a Timestream containing the last value in the `window`. + + Computed for each key separately. + + Parameters + ---------- + window : Optional[Window] + The window to use for the aggregation. + If not specified, the entire Timestream is used. + + Returns + ------- + Timestream + Timestream containing the last value for the key in the window for + each point. + """ + return _aggregation("last", self, window) + + def preview(self, limit: int = 100) -> pd.DataFrame: + """ + Return the first N rows of the result as a Pandas DataFrame. + + This makes it easy to preview the content of the Timestream. + + Parameters + ---------- + limit : int + Maximum number of rows to print. + + Returns + ------- + pd.DataFrame + The Pandas DataFrame containing the first `limit` points. + """ + return self.run(row_limit=limit).to_pandas() + + def run( + self, row_limit: Optional[int] = None, max_batch_size: Optional[int] = None + ) -> Result: + """ + Run the Timestream once. + + Parameters + ---------- + row_limit : Optional[int] + The maximum number of rows to return. + If not specified all rows are returned. + + max_batch_size : Optional[int] + The maximum number of rows to return in each batch. + If not specified the default is used. + + Returns + ------- + Result + The `Result` object to use for accessing the results. + """ + options = ExecutionOptions(row_limit=row_limit, max_batch_size=max_batch_size) + execution = self._ffi_expr.execute(options) + return Result(execution) + + +def _aggregation( + op: str, input: Timestream, window: Optional["kt.Window"], *args: Optional[Arg] +) -> Timestream: + """ + Create the aggregation `op` with the given `input`, `window` and `args`. + + Parameters + ---------- + op : str + The operation to create. + input : Timestream + The input to the aggregation. + window : Optional[Window] + The window to use for the aggregation. + *args : Optional[Arg] + Additional arguments to provide before `input` and the flattened window. + + Returns + ------- + Timestream + The resulting Timestream. + + Raises + ------ + NotImplementedError + If the window is not a known type. + """ + # Note: things would be easier if we had a more normal order, which + # we could do as part of "aligning" Sparrow signatures to the new direction. + # However, `collect` currently has `collect(max, input, window)`, requiring + # us to add the *args like so. + if window is None: + return Timestream._call(op, *args, input, None, None) + elif isinstance(window, kt.SinceWindow): + return Timestream._call(op, *args, input, window.predicate, None) + elif isinstance(window, kt.SlidingWindow): + return Timestream._call(op, *args, input, window.predicate, window.duration) + else: + raise NotImplementedError(f"Unknown window type {window!r}") + + +def record(fields: Dict[str, Timestream]) -> Timestream: + """ + Create a record Timestream from the given fields. + + Parameters + ---------- + fields : dict[str, Timestream] + The fields to include in the record. + + Returns + ------- + Timestream + Timestream containing records with the given fields. + """ + import itertools + + args: List[Union[str, Timestream]] = list(itertools.chain(*fields.items())) + return Timestream._call("record", *args) diff --git a/sparrow-py/pysrc/sparrow_py/_windows.py b/sparrow-py/pysrc/sparrow_py/_windows.py index ae27614f0..10365b10e 100644 --- a/sparrow-py/pysrc/sparrow_py/_windows.py +++ b/sparrow-py/pysrc/sparrow_py/_windows.py @@ -1,6 +1,6 @@ from dataclasses import dataclass -from ._expr import Expr +from ._timestream import Timestream @dataclass(frozen=True) @@ -18,12 +18,12 @@ class SinceWindow(Window): Parameters ---------- - predicate : Expr - The predicate to use for the window. + predicate : Timestream + The boolean Timestream to use as predicate for the window. Each time the predicate evaluates to true the window will be cleared. """ - predicate: Expr + predicate: Timestream @dataclass(frozen=True) @@ -36,10 +36,10 @@ class SlidingWindow(Window): duration : int The number of sliding intervals to use in the window. - predicate : Expr - The predicate to use for the window. + predicate : Timestream + The boolean Timestream to use as predicate for the window Each time the predicate evaluates to true the window starts a new interval. """ duration: int - predicate: Expr + predicate: Timestream diff --git a/sparrow-py/pysrc/sparrow_py/sources/source.py b/sparrow-py/pysrc/sparrow_py/sources/source.py index e24ad8854..ad5733324 100644 --- a/sparrow-py/pysrc/sparrow_py/sources/source.py +++ b/sparrow-py/pysrc/sparrow_py/sources/source.py @@ -4,15 +4,15 @@ import pyarrow as pa import sparrow_py._ffi as _ffi -from .._expr import Expr from .._session import _get_session +from .._timestream import Timestream _TABLE_NUM: int = 0 -class Source(Expr): - """A source expression.""" +class Source(Timestream): + """A source (input) Timestream.""" # TODO: Clean-up naming on the FFI side. _ffi_table: _ffi.Table diff --git a/sparrow-py/pytests/conftest.py b/sparrow-py/pytests/conftest.py index d0e400e6e..ec62eed73 100644 --- a/sparrow-py/pytests/conftest.py +++ b/sparrow-py/pytests/conftest.py @@ -4,7 +4,7 @@ import pandas as pd import pytest -import sparrow_py +import sparrow_py as kt from sparrow_py import init_session @@ -23,7 +23,7 @@ def golden(request: pytest.FixtureRequest, pytestconfig: pytest.Config): # noqa output = 0 def handler( - query: Union[sparrow_py.Expr, pd.DataFrame], + query: Union[kt.Timestream, pd.DataFrame], format: Union[Literal["csv"], Literal["parquet"], Literal["json"]] = "json", ): """ @@ -31,7 +31,7 @@ def handler( Parameters ---------- - query : sparrow_py.Expr to execute or pd.DataFrame + query : kt.Timestream to execute or pd.DataFrame The query to run (or a result to use). format : str, optional The format to store the golden file in. @@ -46,10 +46,12 @@ def handler( if isinstance(query, pd.DataFrame): df = query - elif isinstance(query, sparrow_py.Expr): + elif isinstance(query, kt.Timestream): df = query.run().to_pandas() else: - raise ValueError(f"query must be an Expr or a DataFrame, was {type(query)}") + raise ValueError( + f"query must be a Timestream or a DataFrame, was {type(query)}" + ) test_name = request.node.name module_name = request.node.module.__name__ diff --git a/sparrow-py/pytests/golden/record_test/test_remove_record.json b/sparrow-py/pytests/golden/record_test/test_remove_record.json new file mode 100644 index 000000000..e55310483 --- /dev/null +++ b/sparrow-py/pytests/golden/record_test/test_remove_record.json @@ -0,0 +1,6 @@ +{"_time":851042397000000000,"_subsort":0,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:39:57-08:00","key":"A","m":5.0} +{"_time":851042398000000000,"_subsort":1,"_key_hash":2867199309159137213,"_key":"B","time":"1996-12-19T16:39:58-08:00","key":"B","m":24.0} +{"_time":851042399000000000,"_subsort":2,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:39:59-08:00","key":"A","m":17.0} +{"_time":851042400000000000,"_subsort":3,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:40:00-08:00","key":"A","m":null} +{"_time":851042401000000000,"_subsort":4,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:40:01-08:00","key":"A","m":12.0} +{"_time":851042402000000000,"_subsort":5,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:40:02-08:00","key":"A","m":null} diff --git a/sparrow-py/pytests/golden/record_test/test_select_record.json b/sparrow-py/pytests/golden/record_test/test_select_record.json index e55310483..f45a716ff 100644 --- a/sparrow-py/pytests/golden/record_test/test_select_record.json +++ b/sparrow-py/pytests/golden/record_test/test_select_record.json @@ -1,6 +1,6 @@ -{"_time":851042397000000000,"_subsort":0,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:39:57-08:00","key":"A","m":5.0} -{"_time":851042398000000000,"_subsort":1,"_key_hash":2867199309159137213,"_key":"B","time":"1996-12-19T16:39:58-08:00","key":"B","m":24.0} -{"_time":851042399000000000,"_subsort":2,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:39:59-08:00","key":"A","m":17.0} -{"_time":851042400000000000,"_subsort":3,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:40:00-08:00","key":"A","m":null} -{"_time":851042401000000000,"_subsort":4,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:40:01-08:00","key":"A","m":12.0} -{"_time":851042402000000000,"_subsort":5,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:40:02-08:00","key":"A","m":null} +{"_time":851042397000000000,"_subsort":0,"_key_hash":12960666915911099378,"_key":"A","n":10.0} +{"_time":851042398000000000,"_subsort":1,"_key_hash":2867199309159137213,"_key":"B","n":3.0} +{"_time":851042399000000000,"_subsort":2,"_key_hash":12960666915911099378,"_key":"A","n":6.0} +{"_time":851042400000000000,"_subsort":3,"_key_hash":12960666915911099378,"_key":"A","n":9.0} +{"_time":851042401000000000,"_subsort":4,"_key_hash":12960666915911099378,"_key":"A","n":null} +{"_time":851042402000000000,"_subsort":5,"_key_hash":12960666915911099378,"_key":"A","n":null} diff --git a/sparrow-py/pytests/golden/timestream_test/test_timestream_preview.json b/sparrow-py/pytests/golden/timestream_test/test_timestream_preview.json new file mode 100644 index 000000000..34f9540be --- /dev/null +++ b/sparrow-py/pytests/golden/timestream_test/test_timestream_preview.json @@ -0,0 +1,4 @@ +{"_time":851042397000000000,"_subsort":0,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:39:57-08:00","key":"A","m":5.0,"n":10} +{"_time":851042398000000000,"_subsort":1,"_key_hash":2867199309159137213,"_key":"B","time":"1996-12-19T16:39:58-08:00","key":"B","m":24.0,"n":3} +{"_time":851042399000000000,"_subsort":2,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:39:59-08:00","key":"A","m":17.0,"n":6} +{"_time":851042400000000000,"_subsort":3,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:40:00-08:00","key":"A","m":null,"n":9} diff --git a/sparrow-py/pytests/record_test.py b/sparrow-py/pytests/record_test.py index fca5caf07..9ecce17af 100644 --- a/sparrow-py/pytests/record_test.py +++ b/sparrow-py/pytests/record_test.py @@ -39,4 +39,8 @@ def test_extend_record(source, golden) -> None: def test_select_record(source, golden) -> None: - golden(source.select("m", "n")) + golden(source.select("n")) + + +def test_remove_record(source, golden) -> None: + golden(source.remove("n")) diff --git a/sparrow-py/pytests/result_test.py b/sparrow-py/pytests/result_test.py index a640be808..a1ac10757 100644 --- a/sparrow-py/pytests/result_test.py +++ b/sparrow-py/pytests/result_test.py @@ -27,6 +27,7 @@ def test_iterate_pandas(golden, source_int64) -> None: with pytest.raises(StopIteration): next(results) + def test_iterate_rows(golden, source_int64) -> None: results = source_int64.run(row_limit=2).iter_rows() assert next(results)["m"] == 5 diff --git a/sparrow-py/pytests/expr_test.py b/sparrow-py/pytests/timestream_test.py similarity index 58% rename from sparrow-py/pytests/expr_test.py rename to sparrow-py/pytests/timestream_test.py index 94e47b8c6..8d989123b 100644 --- a/sparrow-py/pytests/expr_test.py +++ b/sparrow-py/pytests/timestream_test.py @@ -39,33 +39,40 @@ def test_field_ref_not_a_struct(source1) -> None: source1["x"]["x"] -def test_expr(source1) -> None: +def test_timestream_math(source1) -> None: x = source1["x"] - assert x + 1 == x + 1 + assert (x + 1).data_type == x.data_type + assert (1 + x).data_type == x.data_type + assert (x - 1).data_type == x.data_type + assert (1 - x).data_type == x.data_type + assert (x * 1).data_type == x.data_type + assert (1 * x).data_type == x.data_type + assert (1 / x).data_type == x.data_type + assert (1 + x).data_type == x.data_type -def test_expr_comparison(source1) -> None: +def test_timestream_comparison(source1) -> None: x = source1["x"] - assert (x > 1) == (x > 1) - # Python doesn't have a `__rgt__` (reverse gt) dunder method. - # Instead, if the LHS doesn't support `gt` with the RHS, it tries - # rhs `lt` lhs. - assert (1 < x) == (x > 1) - - # We can't overload `__eq__` to do this, so we have to use a method. - x.eq(1) - - -# def test_expr_pipe(source1) -> None: -# assert source1.x.pipe(math.add, 1) == math.add(source1.x, 1) -# assert source1.x.pipe((math.add, "rhs"), 1) == math.add(1, rhs=source1.x) - -# assert source1.x.pipe(math.gt, 1) == math.gt(source1.x, 1) -# assert source1.x.pipe((math.gt, "rhs"), 1) == math.gt(1, rhs=source1.x) - - -def test_expr_arithmetic_types(source1) -> None: + # Tests the various comparison operators. Even though Python doesn't have a + # `__rgt__` (reverse gt) dunder method, if the LHS doesn't support `gt` with + # the RHS it seems to try `rhs lt lhs`. + assert (x > 1).data_type == pa.bool_() + assert (1 > x).data_type == pa.bool_() + assert (x < 1).data_type == pa.bool_() + assert (1 < x).data_type == pa.bool_() + assert (x >= 1).data_type == pa.bool_() + assert (1 >= x).data_type == pa.bool_() + assert (x <= 1).data_type == pa.bool_() + assert (1 <= x).data_type == pa.bool_() + + # For `eq` and `ne` we only support timestream on the LHS since it is a method. + # We can't overload `__eq__` since that must take any RHS and must return `bool`. + assert x.eq(1).data_type == pa.bool_() + assert x.ne(1).data_type == pa.bool_() + + +def test_timestream_arithmetic_types(source1) -> None: x = source1["x"] assert (x.eq(1)).data_type == pa.bool_() assert (x + 1).data_type == pa.float64() @@ -78,11 +85,11 @@ def test_expr_arithmetic_types(source1) -> None: x.eq(1) + source1["y"] assert "Incompatible argument types" in str(e) if sys.version_info >= (3, 11): - assert "Arg[0]: Expr of type bool" in e.value.__notes__ - assert "Arg[1]: Expr of type int32" in e.value.__notes__ + assert "Arg[0]: Timestream[bool]" in e.value.__notes__ + assert "Arg[1]: Timestream[int32]" in e.value.__notes__ -def test_expr_preview(source1, golden) -> None: +def test_timestream_preview(source1, golden) -> None: content = "\n".join( [ "time,key,m,n", @@ -96,4 +103,4 @@ def test_expr_preview(source1, golden) -> None: ) source = kt.sources.CsvSource("time", "key", content) - golden(source.preview(limit=4)) \ No newline at end of file + golden(source.preview(limit=4)) diff --git a/sparrow-py/src/expr.rs b/sparrow-py/src/expr.rs index eb068e0c3..5b758309d 100644 --- a/sparrow-py/src/expr.rs +++ b/sparrow-py/src/expr.rs @@ -60,14 +60,6 @@ impl Expr { Ok(Self { rust_expr, session }) } - /// Return true if the two expressions are equivalent. - /// - /// Pyo3 doesn't currently support `__eq__` and we don't want to do `__richcmp__`. - /// This is mostly only used for testing, so it seems ok. - fn equivalent(&self, other: &Expr) -> PyResult { - Ok(self.rust_expr.equivalent(&other.rust_expr) && self.session == other.session) - } - /// Return the session this expression is in. fn session(&self) -> Session { self.session.clone()