diff --git a/crates/sparrow-runtime/src/execute.rs b/crates/sparrow-runtime/src/execute.rs index cfe29d6fc..b516fe6ab 100644 --- a/crates/sparrow-runtime/src/execute.rs +++ b/crates/sparrow-runtime/src/execute.rs @@ -81,6 +81,11 @@ pub struct ExecutionOptions { pub stop_signal_rx: Option>, /// Maximum rows to emit in a single batch. pub max_batch_size: Option, + /// If true, the execution is a materialization. + /// + /// It will subscribe to the input stream and continue running as new data + /// arrives. It won't send final ticks. + pub materialize: bool, } impl ExecutionOptions { @@ -236,6 +241,7 @@ pub async fn execute_new( progress_updates_tx, output_at_time, bounded_lateness_ns: options.bounded_lateness_ns, + materialize: options.materialize, }; // Start executing the query. We pass the response channel to the diff --git a/crates/sparrow-runtime/src/execute/operation.rs b/crates/sparrow-runtime/src/execute/operation.rs index 8414b858d..b93dd0690 100644 --- a/crates/sparrow-runtime/src/execute/operation.rs +++ b/crates/sparrow-runtime/src/execute/operation.rs @@ -96,6 +96,13 @@ pub(crate) struct OperationContext { /// /// If not set, defaults to the [BOUNDED_LATENESS_NS] const. pub bounded_lateness_ns: Option, + /// If true, the execution is a materialization. + /// + /// It will subscribe to the input stream and continue running as new data + /// arrives. It won't send final ticks. + /// + /// Derived from the ExecutionOptions, + pub materialize: bool, } impl OperationContext { diff --git a/crates/sparrow-runtime/src/execute/operation/scan.rs b/crates/sparrow-runtime/src/execute/operation/scan.rs index c906347a2..1026b586b 100644 --- a/crates/sparrow-runtime/src/execute/operation/scan.rs +++ b/crates/sparrow-runtime/src/execute/operation/scan.rs @@ -163,15 +163,46 @@ impl ScanOperation { // in-memory batch as the "hot-store" for history+stream hybrid // queries. assert!(requested_slice.is_none()); - let batch = in_memory.clone(); - return Ok(Box::new(Self { - projected_schema, - input_stream: futures::stream::once(async move { - Batch::try_new_from_batch(batch.current().clone()) + // TODO: Consider stoppable batch scans (and queries). + let input_stream = if context.materialize { + in_memory + .subscribe() + .map_err(|e| e.change_context(Error::internal_msg("invalid input"))) + .and_then(|batch| async move { + Batch::try_new_from_batch(batch) + .into_report() + .change_context(Error::internal_msg("invalid input")) + }) + // TODO: Share this code / unify it with other scans. + .take_until(async move { + let mut stop_signal_rx = + stop_signal_rx.expect("stop signal for use with materialization"); + while !*stop_signal_rx.borrow() { + match stop_signal_rx.changed().await { + Ok(_) => (), + Err(e) => { + tracing::error!( + "stop signal receiver dropped unexpectedly: {:?}", + e + ); + break; + } + } + } + }) + .boxed() + } else { + let batch = in_memory.current().clone(); + futures::stream::once(async move { + Batch::try_new_from_batch(batch) .into_report() .change_context(Error::internal_msg("invalid input")) }) - .boxed(), + .boxed() + }; + return Ok(Box::new(Self { + projected_schema, + input_stream, key_hash_index: KeyHashIndex::default(), progress_updates_tx: context.progress_updates_tx.clone(), })); @@ -477,6 +508,7 @@ mod tests { progress_updates_tx, output_at_time: None, bounded_lateness_ns: None, + materialize: false, }; executor diff --git a/crates/sparrow-runtime/src/execute/operation/testing.rs b/crates/sparrow-runtime/src/execute/operation/testing.rs index 33e3e27bd..f1a131623 100644 --- a/crates/sparrow-runtime/src/execute/operation/testing.rs +++ b/crates/sparrow-runtime/src/execute/operation/testing.rs @@ -189,6 +189,7 @@ pub(super) async fn run_operation( progress_updates_tx, output_at_time: None, bounded_lateness_ns: None, + materialize: false, }; executor .execute( @@ -246,6 +247,7 @@ pub(super) async fn run_operation_json( progress_updates_tx, output_at_time: None, bounded_lateness_ns: None, + materialize: false, }; executor .execute( diff --git a/crates/sparrow-session/src/execution.rs b/crates/sparrow-session/src/execution.rs index fb7af3dd6..0933d64ee 100644 --- a/crates/sparrow-session/src/execution.rs +++ b/crates/sparrow-session/src/execution.rs @@ -13,6 +13,8 @@ pub struct Execution { output: tokio_stream::wrappers::ReceiverStream, /// Future which resolves to the first error or None. status: Status, + /// Stop signal. Send `true` to stop execution. + stop_signal_rx: tokio::sync::watch::Sender, } enum Status { @@ -26,6 +28,7 @@ impl Execution { rt: tokio::runtime::Runtime, output_rx: tokio::sync::mpsc::Receiver, progress: BoxStream<'static, error_stack::Result>, + stop_signal_rx: tokio::sync::watch::Sender, ) -> Self { let output = tokio_stream::wrappers::ReceiverStream::new(output_rx); let status = Status::Running(Box::pin(async move { @@ -34,7 +37,12 @@ impl Execution { Ok(()) })); - Self { rt, output, status } + Self { + rt, + output, + status, + stop_signal_rx, + } } /// Check the status future. @@ -70,6 +78,16 @@ impl Execution { } } + /// Send the stop signal. + /// + /// This method does *not* wait for all batches to be processed. + pub fn stop(&mut self) { + self.stop_signal_rx.send_if_modified(|stop| { + *stop = true; + true + }); + } + pub async fn next(&mut self) -> error_stack::Result, Error> { self.is_done()?; Ok(self.output.next().await) diff --git a/crates/sparrow-session/src/session.rs b/crates/sparrow-session/src/session.rs index ce0760dc1..200c0e168 100644 --- a/crates/sparrow-session/src/session.rs +++ b/crates/sparrow-session/src/session.rs @@ -28,6 +28,8 @@ pub struct ExecutionOptions { pub row_limit: Option, /// The maximum number of rows to return in a single batch. pub max_batch_size: Option, + /// Whether to run execute as a materialization or not. + pub materialize: bool, } /// Adds a table to the session. @@ -302,10 +304,12 @@ impl Session { let destination = Destination::Channel(output_tx); let data_context = self.data_context.clone(); - let options = options.to_sparrow_options(); + let (stop_signal_tx, stop_signal_rx) = tokio::sync::watch::channel(false); + let mut options = options.to_sparrow_options(); + options.stop_signal_rx = Some(stop_signal_rx); // Hacky. Use the existing execution logic. This weird things with downloading checkpoints, etc. - let result = rt + let progress = rt .block_on(sparrow_runtime::execute::execute_new( plan, destination, @@ -316,7 +320,7 @@ impl Session { .map_err(|e| e.change_context(Error::Execute)) .boxed(); - Ok(Execution::new(rt, output_rx, result)) + Ok(Execution::new(rt, output_rx, progress, stop_signal_tx)) } } @@ -342,6 +346,7 @@ impl ExecutionOptions { fn to_sparrow_options(&self) -> sparrow_runtime::execute::ExecutionOptions { let mut options = sparrow_runtime::execute::ExecutionOptions { max_batch_size: self.max_batch_size, + materialize: self.materialize, ..Default::default() }; diff --git a/sparrow-py/pysrc/sparrow_py/_execution.py b/sparrow-py/pysrc/sparrow_py/_execution.py index 981cd881e..1f4025ad7 100644 --- a/sparrow-py/pysrc/sparrow_py/_execution.py +++ b/sparrow-py/pysrc/sparrow_py/_execution.py @@ -15,7 +15,11 @@ class ExecutionOptions: max_batch_size : Optional[int] The maximum batch size to use when returning results. If not specified, the default batch size will be used. + + materialize : bool + If true, the query will be a continuous materialization. """ row_limit: Optional[int] = None max_batch_size: Optional[int] = None + materialize: bool = None \ No newline at end of file diff --git a/sparrow-py/pysrc/sparrow_py/_ffi.pyi b/sparrow-py/pysrc/sparrow_py/_ffi.pyi index 886c3b217..9968b0814 100644 --- a/sparrow-py/pysrc/sparrow_py/_ffi.pyi +++ b/sparrow-py/pysrc/sparrow_py/_ffi.pyi @@ -13,6 +13,8 @@ class Session: class Execution(object): def collect_pyarrow(self) -> List[pa.RecordBatch]: ... def next_pyarrow(self) -> Optional[pa.RecordBatch]: ... + def stop(self) -> None: ... + async def next_pyarrow_async(self) -> Optional[pa.RecordBatch]: ... class Expr: diff --git a/sparrow-py/pysrc/sparrow_py/_result.py b/sparrow-py/pysrc/sparrow_py/_result.py index 025c43a01..aa0e2c7e1 100644 --- a/sparrow-py/pysrc/sparrow_py/_result.py +++ b/sparrow-py/pysrc/sparrow_py/_result.py @@ -94,3 +94,7 @@ async def iter_rows_async(self) -> AsyncIterator[dict]: for row in next_batch.to_pylist(): yield row next_batch = await self._ffi_execution.next_pyarrow_async() + + def stop(self) -> None: + """Stop the underlying execution.""" + self._ffi_execution.stop() \ No newline at end of file diff --git a/sparrow-py/pysrc/sparrow_py/_timestream.py b/sparrow-py/pysrc/sparrow_py/_timestream.py index 54aa921fb..8bac68a41 100644 --- a/sparrow-py/pysrc/sparrow_py/_timestream.py +++ b/sparrow-py/pysrc/sparrow_py/_timestream.py @@ -836,7 +836,10 @@ def preview(self, limit: int = 100) -> pd.DataFrame: return self.run(row_limit=limit).to_pandas() def run( - self, row_limit: Optional[int] = None, max_batch_size: Optional[int] = None + self, + row_limit: Optional[int] = None, + max_batch_size: Optional[int] = None, + materialize: bool = False, ) -> Result: """ Run the Timestream once. @@ -851,6 +854,9 @@ def run( The maximum number of rows to return in each batch. If not specified the default is used. + materialize : bool + If true, the execution will be a continuous materialization. + Returns ------- Result @@ -860,7 +866,7 @@ def run( if not pa.types.is_struct(self.data_type): # The execution engine requires a struct, so wrap this in a record. expr = record({"result": self}) - options = ExecutionOptions(row_limit=row_limit, max_batch_size=max_batch_size) + options = ExecutionOptions(row_limit=row_limit, max_batch_size=max_batch_size, materialize=materialize) execution = expr._ffi_expr.execute(options) return Result(execution) diff --git a/sparrow-py/pysrc/sparrow_py/sources/arrow.py b/sparrow-py/pysrc/sparrow_py/sources/arrow.py index 8fdc519f9..a576c18b2 100644 --- a/sparrow-py/pysrc/sparrow_py/sources/arrow.py +++ b/sparrow-py/pysrc/sparrow_py/sources/arrow.py @@ -65,3 +65,8 @@ def __init__( ) -> None: content = pd.read_csv(StringIO(csv_string), dtype_backend="pyarrow", **kwargs) super().__init__(time_column_name, key_column_name, content) + + def add_csv_string(self, csv_string: str, **kwargs) -> None: + """Add data to the source.""" + content = pd.read_csv(StringIO(csv_string), dtype_backend="pyarrow", **kwargs) + self.add(content) \ No newline at end of file diff --git a/sparrow-py/pytests/golden/result_test/test_iter_pandas_async_materialize.jsonl b/sparrow-py/pytests/golden/result_test/test_iter_pandas_async_materialize.jsonl new file mode 100644 index 000000000..96419c55d --- /dev/null +++ b/sparrow-py/pytests/golden/result_test/test_iter_pandas_async_materialize.jsonl @@ -0,0 +1,6 @@ +{"_time":851042397000000000,"_subsort":0,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:39:57-08:00","key":"A","m":5.0,"n":10.0} +{"_time":851042398000000000,"_subsort":1,"_key_hash":2867199309159137213,"_key":"B","time":"1996-12-19T16:39:58-08:00","key":"B","m":24.0,"n":3.0} +{"_time":851042399000000000,"_subsort":2,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:39:59-08:00","key":"A","m":17.0,"n":6.0} +{"_time":851042400000000000,"_subsort":3,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:40:00-08:00","key":"A","m":null,"n":9.0} +{"_time":851042401000000000,"_subsort":4,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:40:01-08:00","key":"A","m":12.0,"n":null} +{"_time":851042402000000000,"_subsort":5,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:40:02-08:00","key":"A","m":null,"n":null} diff --git a/sparrow-py/pytests/golden/result_test/test_iter_pandas_async_materialize_1.jsonl b/sparrow-py/pytests/golden/result_test/test_iter_pandas_async_materialize_1.jsonl new file mode 100644 index 000000000..79b23c2c3 --- /dev/null +++ b/sparrow-py/pytests/golden/result_test/test_iter_pandas_async_materialize_1.jsonl @@ -0,0 +1,6 @@ +{"_time":851128797000000000,"_subsort":6,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-20T16:39:57-08:00","key":"A","m":5.0,"n":10.0} +{"_time":851128798000000000,"_subsort":7,"_key_hash":2867199309159137213,"_key":"B","time":"1996-12-20T16:39:58-08:00","key":"B","m":24.0,"n":3.0} +{"_time":851128799000000000,"_subsort":8,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-20T16:39:59-08:00","key":"A","m":17.0,"n":6.0} +{"_time":851128800000000000,"_subsort":9,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-20T16:40:00-08:00","key":"A","m":null,"n":9.0} +{"_time":851128801000000000,"_subsort":10,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-20T16:40:01-08:00","key":"A","m":12.0,"n":null} +{"_time":851128802000000000,"_subsort":11,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-20T16:40:02-08:00","key":"A","m":null,"n":null} diff --git a/sparrow-py/pytests/result_test.py b/sparrow-py/pytests/result_test.py index e83ac8925..3064bb9d3 100644 --- a/sparrow-py/pytests/result_test.py +++ b/sparrow-py/pytests/result_test.py @@ -49,3 +49,31 @@ async def test_iter_pandas_async(golden, source_int64) -> None: golden.jsonl(await batches.__anext__()) with pytest.raises(StopAsyncIteration): await batches.__anext__() + +@pytest.mark.asyncio +async def test_iter_pandas_async_materialize(golden, source_int64) -> None: + data2 = "\n".join( + [ + "time,key,m,n", + "1996-12-20T16:39:57-08:00,A,5,10", + "1996-12-20T16:39:58-08:00,B,24,3", + "1996-12-20T16:39:59-08:00,A,17,6", + "1996-12-20T16:40:00-08:00,A,,9", + "1996-12-20T16:40:01-08:00,A,12,", + "1996-12-20T16:40:02-08:00,A,,", + ] + ) + + execution = source_int64.run(materialize=True) + batches = execution.iter_pandas_async() + + # Await the first batch. + golden.jsonl(await batches.__anext__()) + + # Add data and await the second batch. + source_int64.add_csv_string(data2) + golden.jsonl(await batches.__anext__()) + + execution.stop() + with pytest.raises(StopAsyncIteration): + print(await batches.__anext__()) \ No newline at end of file diff --git a/sparrow-py/src/execution.rs b/sparrow-py/src/execution.rs index b11062e2b..51b663289 100644 --- a/sparrow-py/src/execution.rs +++ b/sparrow-py/src/execution.rs @@ -94,4 +94,9 @@ impl Execution { }) }) } + + fn stop(&mut self) -> Result<()> { + self.execution()?.as_mut().unwrap().stop(); + Ok(()) + } } diff --git a/sparrow-py/src/expr.rs b/sparrow-py/src/expr.rs index 5acd62afc..641833db8 100644 --- a/sparrow-py/src/expr.rs +++ b/sparrow-py/src/expr.rs @@ -124,11 +124,14 @@ fn extract_options(options: Option<&PyAny>) -> Result Ok(sparrow_session::ExecutionOptions::default()), Some(options) => { let py = options.py(); + let row_limit = pyo3::intern!(py, "row_limit"); + let max_batch_size = pyo3::intern!(py, "max_batch_size"); + let materialize = pyo3::intern!(py, "materialize"); + Ok(sparrow_session::ExecutionOptions { - row_limit: options.getattr(pyo3::intern!(py, "row_limit"))?.extract()?, - max_batch_size: options - .getattr(pyo3::intern!(py, "max_batch_size"))? - .extract()?, + row_limit: options.getattr(row_limit)?.extract()?, + max_batch_size: options.getattr(max_batch_size)?.extract()?, + materialize: options.getattr(materialize)?.extract()?, }) } }