diff --git a/datafusion-examples/examples/thread_pools.rs b/datafusion-examples/examples/thread_pools.rs index b678770e49136..2690bcbe2bca3 100644 --- a/datafusion-examples/examples/thread_pools.rs +++ b/datafusion-examples/examples/thread_pools.rs @@ -25,9 +25,10 @@ //! due to congestion control and increased latencies for processing network //! messages. use arrow::util::pretty::pretty_format_batches; +use datafusion::common::runtime::dedicated_executor; use datafusion::error::Result; use datafusion::execution::SendableRecordBatchStream; -use datafusion::physical_plan::{dedicated_executor, DedicatedExecutor}; +use datafusion::physical_plan::DedicatedExecutor; use datafusion::prelude::*; use futures::stream::StreamExt; use object_store::http::HttpBuilder; @@ -167,13 +168,13 @@ async fn different_runtime_advanced() -> Result<()> { // ctx.register_object_store(&base_url, http_store); // A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers. - let http_store = DedicatedExecutor::wrap_object_store(http_store); + let http_store = dedicated_executor.wrap_object_store(http_store); // Tell datafusion about processing http:// urls with this wrapped object store ctx.register_object_store(&base_url, http_store); // Plan (and execute) the query on the dedicated runtime - let mut stream = dedicated_executor + let stream = dedicated_executor .spawn(async move { // Plan / execute the query let url = "https://github.com/apache/arrow-testing/raw/master/data/csv/aggregate_test_100.csv"; diff --git a/datafusion/physical-plan/src/dedicated_executor.rs b/datafusion/physical-plan/src/dedicated_executor.rs index 15f894a59947f..51f14a3513ee6 100644 --- a/datafusion/physical-plan/src/dedicated_executor.rs +++ b/datafusion/physical-plan/src/dedicated_executor.rs @@ -25,7 +25,10 @@ use crate::io_object_store::IoObjectStore; use crate::stream::RecordBatchStreamAdapter; use crate::SendableRecordBatchStream; use datafusion_common::DataFusionError; -use futures::{future::{BoxFuture, Shared}, Future, FutureExt, Stream, TryFutureExt}; +use futures::{ + future::{BoxFuture, Shared}, + Future, FutureExt, Stream, TryFutureExt, +}; use log::{info, warn}; use object_store::ObjectStore; use parking_lot::RwLock; @@ -221,7 +224,10 @@ impl DedicatedExecutor { /// Note that this object store will only work correctly if run on this /// dedicated executor. If you try and use it on another executor, it will /// panic with "no IO runtime registered" type error. - pub fn wrap_object_store(&self, object_store: Arc) -> Arc { + pub fn wrap_object_store( + &self, + object_store: Arc, + ) -> Arc { Arc::new(IoObjectStore::new(self.clone(), object_store)) } diff --git a/datafusion/physical-plan/src/io_object_store.rs b/datafusion/physical-plan/src/io_object_store.rs index 01cd70531352a..d4331f1beb27d 100644 --- a/datafusion/physical-plan/src/io_object_store.rs +++ b/datafusion/physical-plan/src/io_object_store.rs @@ -17,15 +17,14 @@ use std::sync::Arc; +use crate::dedicated_executor::JobError; use crate::DedicatedExecutor; use async_trait::async_trait; use futures::stream::BoxStream; -use futures::StreamExt; use object_store::{ path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, }; -use crate::dedicated_executor::JobError; /// 'ObjectStore' that wraps an inner `ObjectStore` and wraps all the underlying /// methods with [`DedicatedExecutor::spawn_io`] so that they are run on the Tokio Runtime @@ -57,7 +56,7 @@ impl std::fmt::Display for IoObjectStore { fn convert_error(e: JobError) -> object_store::Error { object_store::Error::Generic { store: "IoObjectStore", - source: Box::new(e) + source: Box::new(e), } } @@ -98,7 +97,7 @@ impl ObjectStore for IoObjectStore { inner_stream //self.executor.run_stream(inner_stream, convert_error) - // .boxed() + // .boxed() } async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { @@ -136,6 +135,4 @@ impl ObjectStore for IoObjectStore { }) .await } - - }