diff --git a/parquet/src/arrow/async_reader/store.rs b/parquet/src/arrow/async_reader/store.rs index 77c00e91a3aa..3c9a951cb072 100644 --- a/parquet/src/arrow/async_reader/store.rs +++ b/parquet/src/arrow/async_reader/store.rs @@ -22,11 +22,12 @@ use bytes::Bytes; use futures::future::BoxFuture; use futures::{FutureExt, TryFutureExt}; -use object_store::{ObjectMeta, ObjectStore}; - use crate::arrow::async_reader::{AsyncFileReader, MetadataLoader}; -use crate::errors::Result; +use crate::errors::{ParquetError, Result}; use crate::file::metadata::ParquetMetaData; +use object_store::path::Path; +use object_store::{ObjectMeta, ObjectStore}; +use tokio::runtime::Handle; /// Reads Parquet files in object storage using [`ObjectStore`]. /// @@ -59,6 +60,7 @@ pub struct ParquetObjectReader { metadata_size_hint: Option, preload_column_index: bool, preload_offset_index: bool, + runtime: Option, } impl ParquetObjectReader { @@ -72,6 +74,7 @@ impl ParquetObjectReader { metadata_size_hint: None, preload_column_index: false, preload_offset_index: false, + runtime: None, } } @@ -99,27 +102,57 @@ impl ParquetObjectReader { ..self } } + + /// Perform IO on the provided tokio runtime + /// + /// Tokio is a cooperative scheduler, and relies on tasks yielding in a timely manner + /// to service IO. Therefore, running IO and CPU-bound tasks, such as parquet decoding, + /// on the same tokio runtime can lead to degraded throughput, dropped connections and + /// other issues. For more information see [here]. + /// + /// [here]: https://www.influxdata.com/blog/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/ + pub fn with_runtime(self, handle: Handle) -> Self { + Self { + runtime: Some(handle), + ..self + } + } + + fn spawn(&self, f: F) -> BoxFuture<'_, Result> + where + F: for<'a> FnOnce(&'a Arc, &'a Path) -> BoxFuture<'a, Result> + + Send + + 'static, + O: Send + 'static, + { + match &self.runtime { + Some(handle) => { + let path = self.meta.location.clone(); + let store = Arc::clone(&self.store); + let fut = handle.spawn(async move { f(&store, &path).await }); + fut.unwrap_or_else(|e| match e.try_into_panic() { + Ok(p) => std::panic::resume_unwind(p), + Err(e) => Err(ParquetError::External(Box::new(e))), + }) + .boxed() + } + None => f(&self.store, &self.meta.location), + } + } } impl AsyncFileReader for ParquetObjectReader { fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { - self.store - .get_range(&self.meta.location, range) - .map_err(|e| e.into()) - .boxed() + self.spawn(|store, path| store.get_range(path, range).map_err(|e| e.into()).boxed()) } fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> where Self: Send, { - async move { - self.store - .get_ranges(&self.meta.location, &ranges) - .await - .map_err(|e| e.into()) - } - .boxed() + self.spawn(move |store, path| { + async move { store.get_ranges(path, &ranges).await.map_err(|e| e.into()) }.boxed() + }) } fn get_metadata(&mut self) -> BoxFuture<'_, Result>> {