Skip to content

Commit

Permalink
more comments
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Nov 20, 2024
1 parent e4aacd8 commit 19d8916
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 24 deletions.
68 changes: 44 additions & 24 deletions datafusion-examples/examples/thread_pools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,39 +149,59 @@ async fn different_runtime_advanced() -> Result<()> {
// In this example, we will configure access to a remote object store
// over the network during the plan

let ctx = SessionContext::new()
.enable_url_table();
let ctx = SessionContext::new().enable_url_table();

// setup http object store
let base_url = Url::parse("https://github.com").unwrap();
let http_store = HttpBuilder::new()
.with_url(base_url.clone())
.build()
.unwrap();
let http_store = HttpBuilder::new().with_url(base_url.clone()).build()?;

// By default, the object store will use the current runtime for IO operations
// if we use a dedicated executor, object store requests will also use the
// dedicated executor's runtime
// This, wrap this object store in XXX that uses the "IO" runtime to do the IO
// (if we don't do this the example fails with an error like
//
// ctx.register_object_store(&base_url, Arc::new(http_store));
// A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.
ctx.register_object_store(&base_url, Arc::new(http_store));

// register csv file with the execution context
ctx.register_csv(
"aggregate_test_100",
"https://github.com/apache/arrow-testing/raw/master/data/csv/aggregate_test_100.csv",
CsvReadOptions::new(),
)
.await?;
let dedicated_executor = DedicatedExecutorBuilder::new().build();

// execute the query
let df = ctx
.sql("SELECT c1,c2,c3 FROM aggregate_test_100 LIMIT 5")
.await?;
// Plan (and execute) the query on the dedicated runtime
let mut stream = dedicated_executor
.spawn(async move {
// Plan / execute the query
let url = "https://github.com/apache/arrow-testing/raw/master/data/csv/aggregate_test_100.csv";
let df = ctx
.sql(&format!("SELECT c1,c2,c3 FROM '{url}' LIMIT 5"))
.await?;
let stream: SendableRecordBatchStream = df.execute_stream().await?;

// print the results
df.show().await?;
Ok(stream) as Result<SendableRecordBatchStream>
}).await??;

// Note that special care must be taken when running Datafusion plans that do async io to transfer the work to their own thread pools.
// We have now planned the query on the dedicated runtime, but we still need to
// drive the stream (aka call `next()` to get the results.

// Using separate Runtime will avoid other requests being messed up but it won't really help requests made from DataSources such as
// reading parquet files from object_store.
//
// Thus this runtime also disables IO so that we are sure there is no IO work being done on it.

// as mentioned above, calling `next()` (including indirectly by using
// FlightDataEncoder to convert the results to flight to send it over the
// network), will *still* result in the CPU work (and a bunch of spawned
// tasks) being done on the runtime calling next() (aka the current runtime)
// and not on the dedicated runtime.

// to drive the stream on the dedicated runtime, we need to wrap it using a XXX stream function

while let Some(batch) = stream.next().await {
println!("{}", pretty_format_batches(&[batch?]).unwrap());
}

Ok(())
}





// TODO move this into the common runtime crate

4 changes: 4 additions & 0 deletions datafusion/common-runtime/src/dedicated_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ impl From<Builder> for DedicatedExecutorBuilder {
///
/// TODO add note about `io_thread`
///
/// TODO: things we use in InfluxData
/// 1. Testing mode (so we can make a bunch of DedicatedExecutors) -- maybe we can wrap DedicatedExectors like IOxDedicatedExecutors
/// 2. Some sort of hook to install tokio metrics
///
/// When [`DedicatedExecutorBuilder::build`] is called, the "current" tokio
/// runtime will be maked for io, via [`register_io_runtime`] by all threads
/// spawned by the executor. Any I/O done by threads in this
Expand Down

0 comments on commit 19d8916

Please sign in to comment.