Skip to content

Commit

Permalink
Add more example
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Nov 20, 2024
1 parent 9a4055e commit 2e9d733
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 30 deletions.
59 changes: 32 additions & 27 deletions datafusion-examples/examples/thread_pools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,6 @@
// specific language governing permissions and limitations
// under the License.

use arrow::util::pretty::pretty_format_batches;
use datafusion::error::Result;
use datafusion::prelude::*;
use futures::stream::StreamExt;
use datafusion::execution::SendableRecordBatchStream;
//! This example shows how to use a separate thread pool (tokio [`Runtime`])) to
//! run the CPU intensive parts of DataFusion plans.
//!
Expand All @@ -30,6 +25,13 @@ use datafusion::execution::SendableRecordBatchStream;
//! due to congestion control and increased latencies for processing network
//! messages.
use arrow::util::pretty::pretty_format_batches;
use datafusion::common::runtime::dedicated_executor::DedicatedExecutor;
use datafusion::common::runtime::DedicatedExecutorBuilder;
use datafusion::error::Result;
use datafusion::execution::SendableRecordBatchStream;
use datafusion::prelude::*;
use futures::stream::StreamExt;

/// Normally, you don't need to worry about the details of the tokio runtime,
/// but for this example it is important to understand how the [`Runtime`]s work.
Expand Down Expand Up @@ -59,7 +61,7 @@ async fn main() -> Result<()> {
// Run the same query on a different runtime. Note that we are still calling
// `await` here, so the the `async` function still runs on the current runtime.
// We use the `DedicatedExecutor` to run the query on a different runtime.
different_runtime(ctx, sql).await?;
different_runtime_basic(ctx, sql).await?;
Ok(())
}

Expand Down Expand Up @@ -100,31 +102,34 @@ async fn same_runtime(ctx: &SessionContext, sql: &str) -> Result<()> {

/// Demonstrates how to run queries on a **different** runtime than the current one
///
/// This is typically how you should run DataFusion queries from a network
/// server or when processing data from a remote object store.
async fn different_runtime(ctx: SessionContext, sql: String) -> Result<()> {
/// See [`different_runtime_advanced`] to see how you should run DataFusion
/// queries from a network server or when processing data from a remote object
/// store.
async fn different_runtime_basic(ctx: SessionContext, sql: String) -> Result<()> {
// First, we need a new runtime, which we can create with the tokio builder
// however, since we are already in the context of another runtime
// (installed by #[tokio::main]) we create a new thread for the runtime
tokio::task::spawn_blocking(move || {
std::thread::spawn(move || thread_entry(ctx, sql.to_string()))
.join()
.expect("thread did not panic")
})
.await
.expect("task did not panic")
}

/// This is the entry point of thread that we started our second runtime on
fn thread_entry(ctx: SessionContext, sql: String) -> Result<()> {
let runtime = tokio::runtime::Builder::new_multi_thread()
// only enable the time driver (not the I/O driver), meaning this
// runtime will not be able to perform network I/O
.enable_time()
.build()?;
let dedicated_executor = DedicatedExecutorBuilder::new().build();

// Now, we can simply run the query on the new runtime
dedicated_executor
.spawn(async move {
// this runs on the different threadpool
let df = ctx.sql(&sql).await?;
let mut stream: SendableRecordBatchStream = df.execute_stream().await?;

// Calling `next()` to drive the plan on the different threadpool
while let Some(batch) = stream.next().await {
println!("{}", pretty_format_batches(&[batch?]).unwrap());
}
Ok(()) as Result<()>
})
// even though we are `await`ing here on the "current" pool, internally
// the DedicatedExecutor runs the work on the separate threadpool pool
// and the `await` simply notifies when the work is done that the work is done
.await??;

// Now we can run the actual code we want on a different runtime
runtime.block_on(async move { different_runtime_inner(ctx, sql).await })
Ok(())
}

/// this is run on a new, separate runtime
Expand Down
1 change: 1 addition & 0 deletions datafusion/common-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ log = { workspace = true }
tokio = { workspace = true }
parking_lot = { workspace = true }
futures = { workspace = true }
datafusion-common = { workspace = true }

[dev-dependencies]
tokio = { version = "1.36", features = ["rt", "rt-multi-thread", "time", "net"] }
14 changes: 11 additions & 3 deletions datafusion/common-runtime/src/dedicated_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
//! Originally from [InfluxDB 3.0]
//!
//! [InfluxDB 3.0]: https://github.com/influxdata/influxdb3_core/tree/6fcbb004232738d55655f32f4ad2385523d10696/executor
use datafusion_common::DataFusionError;
use futures::{
future::{BoxFuture, Shared},
Future, FutureExt, TryFutureExt,
Expand Down Expand Up @@ -341,11 +342,18 @@ pub struct DedicatedExecutorBuilder {
runtime_builder: Builder,
}

impl From<JobError> for DataFusionError {
fn from(value: JobError) -> Self {
DataFusionError::External(Box::new(value))
.context("JobError from DedicatedExecutor")
}
}

impl DedicatedExecutorBuilder {
/// Create a new `DedicatedExecutorBuilder` with default values
///
/// Note that by default this `D`edicatedExecutor` will not be able to perform
/// I/O.
/// Note that by default this `DedicatedExecutor` will not be able to
/// perform network I/O.
pub fn new() -> Self {
Self {
name: String::from("DedicatedExecutor"),
Expand Down Expand Up @@ -389,7 +397,7 @@ impl DedicatedExecutorBuilder {
/// for I/O.
///
/// See the documentation on [`DedicatedExecutor`] for more details.
fn build(self) -> DedicatedExecutor {
pub fn build(self) -> DedicatedExecutor {
let Self {
name,
runtime_builder,
Expand Down
1 change: 1 addition & 0 deletions datafusion/common-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ pub mod common;
pub mod dedicated_executor;

pub use common::SpawnedTask;
pub use dedicated_executor::{DedicatedExecutor, DedicatedExecutorBuilder};

0 comments on commit 2e9d733

Please sign in to comment.