Skip to content

Commit

Permalink
feat: Asynchronous materializations (#617)
Browse files Browse the repository at this point in the history
  • Loading branch information
bjchambers authored Aug 8, 2023
1 parent 7c33a43 commit 14eed90
Show file tree
Hide file tree
Showing 16 changed files with 155 additions and 16 deletions.
6 changes: 6 additions & 0 deletions crates/sparrow-runtime/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ pub struct ExecutionOptions {
pub stop_signal_rx: Option<tokio::sync::watch::Receiver<bool>>,
/// Maximum rows to emit in a single batch.
pub max_batch_size: Option<usize>,
/// If true, the execution is a materialization.
///
/// It will subscribe to the input stream and continue running as new data
/// arrives. It won't send final ticks.
pub materialize: bool,
}

impl ExecutionOptions {
Expand Down Expand Up @@ -236,6 +241,7 @@ pub async fn execute_new(
progress_updates_tx,
output_at_time,
bounded_lateness_ns: options.bounded_lateness_ns,
materialize: options.materialize,
};

// Start executing the query. We pass the response channel to the
Expand Down
7 changes: 7 additions & 0 deletions crates/sparrow-runtime/src/execute/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ pub(crate) struct OperationContext {
///
/// If not set, defaults to the [BOUNDED_LATENESS_NS] const.
pub bounded_lateness_ns: Option<i64>,
/// If true, the execution is a materialization.
///
/// It will subscribe to the input stream and continue running as new data
/// arrives. It won't send final ticks.
///
/// Derived from the ExecutionOptions,
pub materialize: bool,
}

impl OperationContext {
Expand Down
44 changes: 38 additions & 6 deletions crates/sparrow-runtime/src/execute/operation/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,15 +163,46 @@ impl ScanOperation {
// in-memory batch as the "hot-store" for history+stream hybrid
// queries.
assert!(requested_slice.is_none());
let batch = in_memory.clone();
return Ok(Box::new(Self {
projected_schema,
input_stream: futures::stream::once(async move {
Batch::try_new_from_batch(batch.current().clone())
// TODO: Consider stoppable batch scans (and queries).
let input_stream = if context.materialize {
in_memory
.subscribe()
.map_err(|e| e.change_context(Error::internal_msg("invalid input")))
.and_then(|batch| async move {
Batch::try_new_from_batch(batch)
.into_report()
.change_context(Error::internal_msg("invalid input"))
})
// TODO: Share this code / unify it with other scans.
.take_until(async move {
let mut stop_signal_rx =
stop_signal_rx.expect("stop signal for use with materialization");
while !*stop_signal_rx.borrow() {
match stop_signal_rx.changed().await {
Ok(_) => (),
Err(e) => {
tracing::error!(
"stop signal receiver dropped unexpectedly: {:?}",
e
);
break;
}
}
}
})
.boxed()
} else {
let batch = in_memory.current().clone();
futures::stream::once(async move {
Batch::try_new_from_batch(batch)
.into_report()
.change_context(Error::internal_msg("invalid input"))
})
.boxed(),
.boxed()
};
return Ok(Box::new(Self {
projected_schema,
input_stream,
key_hash_index: KeyHashIndex::default(),
progress_updates_tx: context.progress_updates_tx.clone(),
}));
Expand Down Expand Up @@ -477,6 +508,7 @@ mod tests {
progress_updates_tx,
output_at_time: None,
bounded_lateness_ns: None,
materialize: false,
};

executor
Expand Down
2 changes: 2 additions & 0 deletions crates/sparrow-runtime/src/execute/operation/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ pub(super) async fn run_operation(
progress_updates_tx,
output_at_time: None,
bounded_lateness_ns: None,
materialize: false,
};
executor
.execute(
Expand Down Expand Up @@ -246,6 +247,7 @@ pub(super) async fn run_operation_json(
progress_updates_tx,
output_at_time: None,
bounded_lateness_ns: None,
materialize: false,
};
executor
.execute(
Expand Down
20 changes: 19 additions & 1 deletion crates/sparrow-session/src/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ pub struct Execution {
output: tokio_stream::wrappers::ReceiverStream<RecordBatch>,
/// Future which resolves to the first error or None.
status: Status,
/// Stop signal. Send `true` to stop execution.
stop_signal_rx: tokio::sync::watch::Sender<bool>,
}

enum Status {
Expand All @@ -26,6 +28,7 @@ impl Execution {
rt: tokio::runtime::Runtime,
output_rx: tokio::sync::mpsc::Receiver<RecordBatch>,
progress: BoxStream<'static, error_stack::Result<ExecuteResponse, Error>>,
stop_signal_rx: tokio::sync::watch::Sender<bool>,
) -> Self {
let output = tokio_stream::wrappers::ReceiverStream::new(output_rx);
let status = Status::Running(Box::pin(async move {
Expand All @@ -34,7 +37,12 @@ impl Execution {
Ok(())
}));

Self { rt, output, status }
Self {
rt,
output,
status,
stop_signal_rx,
}
}

/// Check the status future.
Expand Down Expand Up @@ -70,6 +78,16 @@ impl Execution {
}
}

/// Send the stop signal.
///
/// This method does *not* wait for all batches to be processed.
pub fn stop(&mut self) {
self.stop_signal_rx.send_if_modified(|stop| {
*stop = true;
true
});
}

pub async fn next(&mut self) -> error_stack::Result<Option<RecordBatch>, Error> {
self.is_done()?;
Ok(self.output.next().await)
Expand Down
11 changes: 8 additions & 3 deletions crates/sparrow-session/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ pub struct ExecutionOptions {
pub row_limit: Option<usize>,
/// The maximum number of rows to return in a single batch.
pub max_batch_size: Option<usize>,
/// Whether to run execute as a materialization or not.
pub materialize: bool,
}

/// Adds a table to the session.
Expand Down Expand Up @@ -302,10 +304,12 @@ impl Session {
let destination = Destination::Channel(output_tx);
let data_context = self.data_context.clone();

let options = options.to_sparrow_options();
let (stop_signal_tx, stop_signal_rx) = tokio::sync::watch::channel(false);
let mut options = options.to_sparrow_options();
options.stop_signal_rx = Some(stop_signal_rx);

// Hacky. Use the existing execution logic. This weird things with downloading checkpoints, etc.
let result = rt
let progress = rt
.block_on(sparrow_runtime::execute::execute_new(
plan,
destination,
Expand All @@ -316,7 +320,7 @@ impl Session {
.map_err(|e| e.change_context(Error::Execute))
.boxed();

Ok(Execution::new(rt, output_rx, result))
Ok(Execution::new(rt, output_rx, progress, stop_signal_tx))
}
}

Expand All @@ -342,6 +346,7 @@ impl ExecutionOptions {
fn to_sparrow_options(&self) -> sparrow_runtime::execute::ExecutionOptions {
let mut options = sparrow_runtime::execute::ExecutionOptions {
max_batch_size: self.max_batch_size,
materialize: self.materialize,
..Default::default()
};

Expand Down
4 changes: 4 additions & 0 deletions sparrow-py/pysrc/sparrow_py/_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ class ExecutionOptions:
max_batch_size : Optional[int]
The maximum batch size to use when returning results.
If not specified, the default batch size will be used.
materialize : bool
If true, the query will be a continuous materialization.
"""

row_limit: Optional[int] = None
max_batch_size: Optional[int] = None
materialize: bool = None
2 changes: 2 additions & 0 deletions sparrow-py/pysrc/sparrow_py/_ffi.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ class Session:
class Execution(object):
def collect_pyarrow(self) -> List[pa.RecordBatch]: ...
def next_pyarrow(self) -> Optional[pa.RecordBatch]: ...
def stop(self) -> None: ...

async def next_pyarrow_async(self) -> Optional[pa.RecordBatch]: ...

class Expr:
Expand Down
4 changes: 4 additions & 0 deletions sparrow-py/pysrc/sparrow_py/_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,7 @@ async def iter_rows_async(self) -> AsyncIterator[dict]:
for row in next_batch.to_pylist():
yield row
next_batch = await self._ffi_execution.next_pyarrow_async()

def stop(self) -> None:
"""Stop the underlying execution."""
self._ffi_execution.stop()
10 changes: 8 additions & 2 deletions sparrow-py/pysrc/sparrow_py/_timestream.py
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,10 @@ def preview(self, limit: int = 100) -> pd.DataFrame:
return self.run(row_limit=limit).to_pandas()

def run(
self, row_limit: Optional[int] = None, max_batch_size: Optional[int] = None
self,
row_limit: Optional[int] = None,
max_batch_size: Optional[int] = None,
materialize: bool = False,
) -> Result:
"""
Run the Timestream once.
Expand All @@ -851,6 +854,9 @@ def run(
The maximum number of rows to return in each batch.
If not specified the default is used.
materialize : bool
If true, the execution will be a continuous materialization.
Returns
-------
Result
Expand All @@ -860,7 +866,7 @@ def run(
if not pa.types.is_struct(self.data_type):
# The execution engine requires a struct, so wrap this in a record.
expr = record({"result": self})
options = ExecutionOptions(row_limit=row_limit, max_batch_size=max_batch_size)
options = ExecutionOptions(row_limit=row_limit, max_batch_size=max_batch_size, materialize=materialize)
execution = expr._ffi_expr.execute(options)
return Result(execution)

Expand Down
5 changes: 5 additions & 0 deletions sparrow-py/pysrc/sparrow_py/sources/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,8 @@ def __init__(
) -> None:
content = pd.read_csv(StringIO(csv_string), dtype_backend="pyarrow", **kwargs)
super().__init__(time_column_name, key_column_name, content)

def add_csv_string(self, csv_string: str, **kwargs) -> None:
"""Add data to the source."""
content = pd.read_csv(StringIO(csv_string), dtype_backend="pyarrow", **kwargs)
self.add(content)
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"_time":851042397000000000,"_subsort":0,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:39:57-08:00","key":"A","m":5.0,"n":10.0}
{"_time":851042398000000000,"_subsort":1,"_key_hash":2867199309159137213,"_key":"B","time":"1996-12-19T16:39:58-08:00","key":"B","m":24.0,"n":3.0}
{"_time":851042399000000000,"_subsort":2,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:39:59-08:00","key":"A","m":17.0,"n":6.0}
{"_time":851042400000000000,"_subsort":3,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:40:00-08:00","key":"A","m":null,"n":9.0}
{"_time":851042401000000000,"_subsort":4,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:40:01-08:00","key":"A","m":12.0,"n":null}
{"_time":851042402000000000,"_subsort":5,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-19T16:40:02-08:00","key":"A","m":null,"n":null}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"_time":851128797000000000,"_subsort":6,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-20T16:39:57-08:00","key":"A","m":5.0,"n":10.0}
{"_time":851128798000000000,"_subsort":7,"_key_hash":2867199309159137213,"_key":"B","time":"1996-12-20T16:39:58-08:00","key":"B","m":24.0,"n":3.0}
{"_time":851128799000000000,"_subsort":8,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-20T16:39:59-08:00","key":"A","m":17.0,"n":6.0}
{"_time":851128800000000000,"_subsort":9,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-20T16:40:00-08:00","key":"A","m":null,"n":9.0}
{"_time":851128801000000000,"_subsort":10,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-20T16:40:01-08:00","key":"A","m":12.0,"n":null}
{"_time":851128802000000000,"_subsort":11,"_key_hash":12960666915911099378,"_key":"A","time":"1996-12-20T16:40:02-08:00","key":"A","m":null,"n":null}
28 changes: 28 additions & 0 deletions sparrow-py/pytests/result_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,31 @@ async def test_iter_pandas_async(golden, source_int64) -> None:
golden.jsonl(await batches.__anext__())
with pytest.raises(StopAsyncIteration):
await batches.__anext__()

@pytest.mark.asyncio
async def test_iter_pandas_async_materialize(golden, source_int64) -> None:
data2 = "\n".join(
[
"time,key,m,n",
"1996-12-20T16:39:57-08:00,A,5,10",
"1996-12-20T16:39:58-08:00,B,24,3",
"1996-12-20T16:39:59-08:00,A,17,6",
"1996-12-20T16:40:00-08:00,A,,9",
"1996-12-20T16:40:01-08:00,A,12,",
"1996-12-20T16:40:02-08:00,A,,",
]
)

execution = source_int64.run(materialize=True)
batches = execution.iter_pandas_async()

# Await the first batch.
golden.jsonl(await batches.__anext__())

# Add data and await the second batch.
source_int64.add_csv_string(data2)
golden.jsonl(await batches.__anext__())

execution.stop()
with pytest.raises(StopAsyncIteration):
print(await batches.__anext__())
5 changes: 5 additions & 0 deletions sparrow-py/src/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,9 @@ impl Execution {
})
})
}

fn stop(&mut self) -> Result<()> {
self.execution()?.as_mut().unwrap().stop();
Ok(())
}
}
11 changes: 7 additions & 4 deletions sparrow-py/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,14 @@ fn extract_options(options: Option<&PyAny>) -> Result<sparrow_session::Execution
None => Ok(sparrow_session::ExecutionOptions::default()),
Some(options) => {
let py = options.py();
let row_limit = pyo3::intern!(py, "row_limit");
let max_batch_size = pyo3::intern!(py, "max_batch_size");
let materialize = pyo3::intern!(py, "materialize");

Ok(sparrow_session::ExecutionOptions {
row_limit: options.getattr(pyo3::intern!(py, "row_limit"))?.extract()?,
max_batch_size: options
.getattr(pyo3::intern!(py, "max_batch_size"))?
.extract()?,
row_limit: options.getattr(row_limit)?.extract()?,
max_batch_size: options.getattr(max_batch_size)?.extract()?,
materialize: options.getattr(materialize)?.extract()?,
})
}
}
Expand Down

0 comments on commit 14eed90

Please sign in to comment.