Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Nov 20, 2024
1 parent 2e9d733 commit cccecf1
Showing 1 changed file with 11 additions and 24 deletions.
35 changes: 11 additions & 24 deletions datafusion-examples/examples/thread_pools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ async fn main() -> Result<()> {
// `await` here, so the the `async` function still runs on the current runtime.
// We use the `DedicatedExecutor` to run the query on a different runtime.
different_runtime_basic(ctx, sql).await?;

// Run the same query on a different runtime including remote IO
different_runtime_advanced(ctx, sql).await?;

Ok(())
}

Expand Down Expand Up @@ -129,35 +133,18 @@ async fn different_runtime_basic(ctx: SessionContext, sql: String) -> Result<()>
// and the `await` simply notifies when the work is done that the work is done
.await??;

// When done with a DedicatedExecutor, it should be shut down cleanly to give
// any outstanding tasks a chance to clean up
dedicated_executor.join().await;

Ok(())
}

/// Demonstrates how to run queries on a **different** runtime than the current one but
/// run IO operations on the current runtime
/// this is run on a new, separate runtime
async fn different_runtime_inner(ctx: SessionContext, sql: String) -> Result<()> {
// Setup execution as before
let df = ctx.sql(&sql).await?;

let mut stream = df.execute_stream().await?;

//XXX Note at this point, calling next() will be run on our new threadpool. However, this will also spawn the catalog and object store requests on the same threadpool as well!

// While this will mean we don't interfere with handling of other network requests, it will mean tht the network requests that happen as part of query processing will still be running on the same threadpool

// TODO show this working

// To avoid this, all IO access, both catalog and data (e.g. object_store) must be spawned on to their own runtime, like this:
// TODO....

//
// care is required to avoid calling `next()` (aka polling) from the default IO thread (even if planning / execution is run on that other thread)
// Best practice is to do all of DataFusion planning / execution on a separate pool. Note that some care is required for remote catalogs such as iceberg that
// themselves do network IO
// TODO figure out how to cause an erorr due to io thread

while let Some(batch) = stream.next().await {
println!("{}", pretty_format_batches(&[batch?]).unwrap());
}

//
// You can use the DataFusion "DedicatedExecutor" in this case to handle most of this automatically for you.

Expand Down

0 comments on commit cccecf1

Please sign in to comment.