From f6eb9934c8a935928e6995e88261d0b8325d1631 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Thu, 21 Nov 2024 09:56:57 +0800 Subject: [PATCH] [CHORE] connect: Optimize plans in connect (#3378) 1. Optimize logical plans before executing them in connect. 2. Run the native executor on a spawned task on the connect runtime instead of a dedicated thread. The native executor already spawns it's own thread, and it also returns an awaitable receiver, so we should be able to just receive from the connect runtime. --------- Co-authored-by: Colin Ho --- Cargo.lock | 2 - src/daft-connect/Cargo.toml | 4 +- src/daft-connect/src/op/execute/root.rs | 36 ++--- src/daft-local-execution/src/channel.rs | 4 + src/daft-local-execution/src/lib.rs | 6 +- src/daft-local-execution/src/run.rs | 205 +++++++++++++++++------- src/daft-logical-plan/src/builder.rs | 74 +++++---- 7 files changed, 215 insertions(+), 116 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 36e3598748..b980db2ba8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1935,7 +1935,6 @@ dependencies = [ "async-stream", "common-daft-config", "daft-local-execution", - "daft-local-plan", "daft-logical-plan", "daft-scan", "daft-table", @@ -1945,7 +1944,6 @@ dependencies = [ "pyo3", "spark-connect", "tokio", - "tokio-util", "tonic", "tracing", "uuid 1.10.0", diff --git a/src/daft-connect/Cargo.toml b/src/daft-connect/Cargo.toml index f94cb284be..362ad1bc08 100644 --- a/src/daft-connect/Cargo.toml +++ b/src/daft-connect/Cargo.toml @@ -3,7 +3,6 @@ arrow2 = {workspace = true} async-stream = "0.3.6" common-daft-config = {workspace = true} daft-local-execution = {workspace = true} -daft-local-plan = {workspace = true} daft-logical-plan = {workspace = true} daft-scan = {workspace = true} daft-table = {workspace = true} @@ -13,13 +12,12 @@ futures = "0.3.31" pyo3 = {workspace = true, optional = true} spark-connect = {workspace = true} tokio = {version = "1.40.0", features = ["full"]} -tokio-util = {workspace = true} tonic = "0.12.3" tracing = {workspace = true} uuid = {version = "1.10.0", features = ["v4"]} [features] -python = ["dep:pyo3", "common-daft-config/python", "daft-local-execution/python", "daft-local-plan/python", "daft-logical-plan/python", "daft-scan/python", "daft-table/python"] +python = ["dep:pyo3", "common-daft-config/python", "daft-local-execution/python", "daft-logical-plan/python", "daft-scan/python", "daft-table/python"] [lints] workspace = true diff --git a/src/daft-connect/src/op/execute/root.rs b/src/daft-connect/src/op/execute/root.rs index 66fc1f7ab8..1e1fac147b 100644 --- a/src/daft-connect/src/op/execute/root.rs +++ b/src/daft-connect/src/op/execute/root.rs @@ -1,9 +1,9 @@ use std::{collections::HashMap, future::ready}; use common_daft_config::DaftExecutionConfig; +use daft_local_execution::NativeExecutor; use futures::stream; use spark_connect::{ExecutePlanResponse, Relation}; -use tokio_util::sync::CancellationToken; use tonic::{codegen::tokio_stream::wrappers::ReceiverStream, Status}; use crate::{ @@ -28,36 +28,32 @@ impl Session { let finished = context.finished(); - let (tx, rx) = tokio::sync::mpsc::channel::>(16); - std::thread::spawn(move || { - let result = (|| -> eyre::Result<()> { + let (tx, rx) = tokio::sync::mpsc::channel::>(1); + tokio::spawn(async move { + let execution_fut = async { let plan = translation::to_logical_plan(command)?; - let logical_plan = plan.build(); - let physical_plan = daft_local_plan::translate(&logical_plan)?; - + let optimized_plan = plan.optimize()?; let cfg = DaftExecutionConfig::default(); - let results = daft_local_execution::run_local( - &physical_plan, - HashMap::new(), - cfg.into(), - None, - CancellationToken::new(), // todo: maybe implement cancelling - )?; + let native_executor = NativeExecutor::from_logical_plan_builder(&optimized_plan)?; + let mut result_stream = native_executor + .run(HashMap::new(), cfg.into(), None)? + .into_stream(); - for result in results { + while let Some(result) = result_stream.next().await { let result = result?; let tables = result.get_tables()?; - for table in tables.as_slice() { let response = context.gen_response(table)?; - tx.blocking_send(Ok(response)).unwrap(); + if tx.send(Ok(response)).await.is_err() { + return Ok(()); + } } } Ok(()) - })(); + }; - if let Err(e) = result { - tx.blocking_send(Err(e)).unwrap(); + if let Err(e) = execution_fut.await { + let _ = tx.send(Err(e)).await; } }); diff --git a/src/daft-local-execution/src/channel.rs b/src/daft-local-execution/src/channel.rs index 7a58e79ade..8adaae0616 100644 --- a/src/daft-local-execution/src/channel.rs +++ b/src/daft-local-execution/src/channel.rs @@ -16,6 +16,10 @@ impl Receiver { pub(crate) fn blocking_recv(&self) -> Option { self.0.recv().ok() } + + pub(crate) fn into_inner(self) -> loole::Receiver { + self.0 + } } pub(crate) fn create_channel(buffer_size: usize) -> (Sender, Receiver) { diff --git a/src/daft-local-execution/src/lib.rs b/src/daft-local-execution/src/lib.rs index bda9bfcd09..df22857519 100644 --- a/src/daft-local-execution/src/lib.rs +++ b/src/daft-local-execution/src/lib.rs @@ -18,7 +18,7 @@ use std::{ use common_error::{DaftError, DaftResult}; use common_runtime::RuntimeTask; use lazy_static::lazy_static; -pub use run::{run_local, NativeExecutor}; +pub use run::{run_local, ExecutionEngineResult, NativeExecutor}; use snafu::{futures::TryFutureExt, ResultExt, Snafu}; lazy_static! { @@ -200,6 +200,8 @@ type Result = std::result::Result; #[cfg(feature = "python")] pub fn register_modules(parent: &Bound) -> PyResult<()> { - parent.add_class::()?; + use run::PyNativeExecutor; + + parent.add_class::()?; Ok(()) } diff --git a/src/daft-local-execution/src/run.rs b/src/daft-local-execution/src/run.rs index 039df990cd..caf69f156d 100644 --- a/src/daft-local-execution/src/run.rs +++ b/src/daft-local-execution/src/run.rs @@ -10,7 +10,10 @@ use common_daft_config::DaftExecutionConfig; use common_error::DaftResult; use common_tracing::refresh_chrome_trace; use daft_local_plan::{translate, LocalPhysicalPlan}; +use daft_logical_plan::LogicalPlanBuilder; use daft_micropartition::MicroPartition; +use futures::{FutureExt, Stream}; +use loole::RecvFuture; use tokio_util::sync::CancellationToken; #[cfg(feature = "python")] use { @@ -44,32 +47,25 @@ impl LocalPartitionIterator { } } -#[cfg_attr(feature = "python", pyclass(module = "daft.daft"))] -pub struct NativeExecutor { - local_physical_plan: Arc, - cancel: CancellationToken, -} - -impl Drop for NativeExecutor { - fn drop(&mut self) { - self.cancel.cancel(); - } +#[cfg_attr( + feature = "python", + pyclass(module = "daft.daft", name = "NativeExecutor") +)] +pub struct PyNativeExecutor { + executor: NativeExecutor, } #[cfg(feature = "python")] #[pymethods] -impl NativeExecutor { +impl PyNativeExecutor { #[staticmethod] pub fn from_logical_plan_builder( logical_plan_builder: &PyLogicalPlanBuilder, py: Python, ) -> PyResult { py.allow_threads(|| { - let logical_plan = logical_plan_builder.builder.build(); - let local_physical_plan = translate(&logical_plan)?; Ok(Self { - local_physical_plan, - cancel: CancellationToken::new(), + executor: NativeExecutor::from_logical_plan_builder(&logical_plan_builder.builder)?, }) }) } @@ -94,13 +90,9 @@ impl NativeExecutor { }) .collect(); let out = py.allow_threads(|| { - run_local( - &self.local_physical_plan, - native_psets, - cfg.config, - results_buffer_size, - self.cancel.clone(), - ) + self.executor + .run(native_psets, cfg.config, results_buffer_size) + .map(|res| res.into_iter()) })?; let iter = Box::new(out.map(|part| { part.map(|p| pyo3::Python::with_gil(|py| PyMicroPartition::from(p).into_py(py))) @@ -110,6 +102,45 @@ impl NativeExecutor { } } +pub struct NativeExecutor { + local_physical_plan: Arc, + cancel: CancellationToken, +} + +impl NativeExecutor { + pub fn from_logical_plan_builder( + logical_plan_builder: &LogicalPlanBuilder, + ) -> DaftResult { + let logical_plan = logical_plan_builder.build(); + let local_physical_plan = translate(&logical_plan)?; + Ok(Self { + local_physical_plan, + cancel: CancellationToken::new(), + }) + } + + pub fn run( + &self, + psets: HashMap>>, + cfg: Arc, + results_buffer_size: Option, + ) -> DaftResult { + run_local( + &self.local_physical_plan, + psets, + cfg, + results_buffer_size, + self.cancel.clone(), + ) + } +} + +impl Drop for NativeExecutor { + fn drop(&mut self) { + self.cancel.cancel(); + } +} + fn should_enable_explain_analyze() -> bool { let explain_var_name = "DAFT_DEV_ENABLE_EXPLAIN_ANALYZE"; if let Ok(val) = std::env::var(explain_var_name) @@ -121,13 +152,105 @@ fn should_enable_explain_analyze() -> bool { } } +pub struct ExecutionEngineReceiverIterator { + receiver: Receiver>, + handle: Option>>, +} + +impl Iterator for ExecutionEngineReceiverIterator { + type Item = DaftResult>; + + fn next(&mut self) -> Option { + match self.receiver.blocking_recv() { + Some(part) => Some(Ok(part)), + None => { + if self.handle.is_some() { + let join_result = self + .handle + .take() + .unwrap() + .join() + .expect("Execution engine thread panicked"); + match join_result { + Ok(()) => None, + Err(e) => Some(Err(e)), + } + } else { + None + } + } + } + } +} + +pub struct ExecutionEngineReceiverStream { + receive_fut: RecvFuture>, + handle: Option>>, +} + +impl Stream for ExecutionEngineReceiverStream { + type Item = DaftResult>; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match self.receive_fut.poll_unpin(cx) { + std::task::Poll::Ready(Ok(part)) => std::task::Poll::Ready(Some(Ok(part))), + std::task::Poll::Ready(Err(_)) => { + if self.handle.is_some() { + let join_result = self + .handle + .take() + .unwrap() + .join() + .expect("Execution engine thread panicked"); + match join_result { + Ok(()) => std::task::Poll::Ready(None), + Err(e) => std::task::Poll::Ready(Some(Err(e))), + } + } else { + std::task::Poll::Ready(None) + } + } + std::task::Poll::Pending => std::task::Poll::Pending, + } + } +} + +pub struct ExecutionEngineResult { + handle: std::thread::JoinHandle>, + receiver: Receiver>, +} + +impl ExecutionEngineResult { + pub fn into_stream(self) -> impl Stream>> { + ExecutionEngineReceiverStream { + receive_fut: self.receiver.into_inner().recv_async(), + handle: Some(self.handle), + } + } +} + +impl IntoIterator for ExecutionEngineResult { + type Item = DaftResult>; + type IntoIter = ExecutionEngineReceiverIterator; + + fn into_iter(self) -> Self::IntoIter { + ExecutionEngineReceiverIterator { + receiver: self.receiver, + handle: Some(self.handle), + } + } +} + pub fn run_local( physical_plan: &LocalPhysicalPlan, psets: HashMap>>, cfg: Arc, results_buffer_size: Option, cancel: CancellationToken, -) -> DaftResult>> + Send>> { +) -> DaftResult { refresh_chrome_trace(); let pipeline = physical_plan_to_pipeline(physical_plan, &psets, &cfg)?; let (tx, rx) = create_channel(results_buffer_size.unwrap_or(1)); @@ -188,38 +311,8 @@ pub fn run_local( }) }); - struct ReceiverIterator { - receiver: Receiver>, - handle: Option>>, - } - - impl Iterator for ReceiverIterator { - type Item = DaftResult>; - - fn next(&mut self) -> Option { - match self.receiver.blocking_recv() { - Some(part) => Some(Ok(part)), - None => { - if self.handle.is_some() { - let join_result = self - .handle - .take() - .unwrap() - .join() - .expect("Execution engine thread panicked"); - match join_result { - Ok(()) => None, - Err(e) => Some(Err(e)), - } - } else { - None - } - } - } - } - } - Ok(Box::new(ReceiverIterator { + Ok(ExecutionEngineResult { + handle, receiver: rx, - handle: Some(handle), - })) + }) } diff --git a/src/daft-logical-plan/src/builder.rs b/src/daft-logical-plan/src/builder.rs index c251537ff2..c945c80203 100644 --- a/src/daft-logical-plan/src/builder.rs +++ b/src/daft-logical-plan/src/builder.rs @@ -589,6 +589,46 @@ impl LogicalPlanBuilder { Ok(self.with_new_plan(logical_plan)) } + pub fn optimize(&self) -> DaftResult { + let default_optimizer_config: OptimizerConfig = Default::default(); + let optimizer_config = OptimizerConfig { + enable_actor_pool_projections: self + .config + .as_ref() + .map(|planning_cfg| planning_cfg.enable_actor_pool_projections) + .unwrap_or(default_optimizer_config.enable_actor_pool_projections), + ..default_optimizer_config + }; + let optimizer = Optimizer::new(optimizer_config); + + // Run LogicalPlan optimizations + let unoptimized_plan = self.build(); + let optimized_plan = optimizer.optimize( + unoptimized_plan, + |new_plan, rule_batch, pass, transformed, seen| { + if transformed { + log::debug!( + "Rule batch {:?} transformed plan on pass {}, and produced {} plan:\n{}", + rule_batch, + pass, + if seen { "an already seen" } else { "a new" }, + new_plan.repr_ascii(true), + ); + } else { + log::debug!( + "Rule batch {:?} did NOT transform plan on pass {} for plan:\n{}", + rule_batch, + pass, + new_plan.repr_ascii(true), + ); + } + }, + )?; + + let builder = Self::new(optimized_plan, self.config.clone()); + Ok(builder) + } + pub fn build(&self) -> Arc { self.plan.clone() } @@ -918,39 +958,7 @@ impl PyLogicalPlanBuilder { /// Optimize the underlying logical plan, returning a new plan builder containing the optimized plan. pub fn optimize(&self, py: Python) -> PyResult { - py.allow_threads(|| { - // Create optimizer - let default_optimizer_config: OptimizerConfig = Default::default(); - let optimizer_config = OptimizerConfig { enable_actor_pool_projections: self.builder.config.as_ref().map(|planning_cfg| planning_cfg.enable_actor_pool_projections).unwrap_or(default_optimizer_config.enable_actor_pool_projections), ..default_optimizer_config }; - let optimizer = Optimizer::new(optimizer_config); - - // Run LogicalPlan optimizations - let unoptimized_plan = self.builder.build(); - let optimized_plan = optimizer.optimize( - unoptimized_plan, - |new_plan, rule_batch, pass, transformed, seen| { - if transformed { - log::debug!( - "Rule batch {:?} transformed plan on pass {}, and produced {} plan:\n{}", - rule_batch, - pass, - if seen { "an already seen" } else { "a new" }, - new_plan.repr_ascii(true), - ); - } else { - log::debug!( - "Rule batch {:?} did NOT transform plan on pass {} for plan:\n{}", - rule_batch, - pass, - new_plan.repr_ascii(true), - ); - } - }, - )?; - - let builder = LogicalPlanBuilder::new(optimized_plan, self.builder.config.clone()); - Ok(builder.into()) - }) + py.allow_threads(|| Ok(self.builder.optimize()?.into())) } pub fn repr_ascii(&self, simple: bool) -> PyResult {