From 22bc77217797968dee34fb8fe4a6534dc04a2144 Mon Sep 17 00:00:00 2001 From: June <61218022+itsjunetime@users.noreply.github.com> Date: Sat, 2 Nov 2024 05:41:45 -0600 Subject: [PATCH] Add `ParquetObjectReader::with_runtime` (#6612) * Add ParquetObjectReader::with_runtime (#6248) * Add test for ParquetObjectReader::with_runtime and fix clippy complaints * Switch ParquetObjectReader runtime tests to not depend on tokio_unstable anymore * Add doc-comment for test_runtime_thread_id_different Co-authored-by: Andrew Lamb * - Add comment about why we don't use spawn for metadata - Remove outdated comment about target_has_atomic - Add test to verify reader fails when spawned on a shutdown runtime * Avoid use of Infallable and From conversion --------- Co-authored-by: Raphael Taylor-Davies Co-authored-by: Andrew Lamb --- arrow-json/src/writer/encoder.rs | 2 +- arrow-string/src/predicate.rs | 2 +- parquet/Cargo.toml | 2 +- parquet/src/arrow/async_reader/store.rs | 187 ++++++++++++++++++++---- parquet/src/errors.rs | 1 - 5 files changed, 162 insertions(+), 32 deletions(-) diff --git a/arrow-json/src/writer/encoder.rs b/arrow-json/src/writer/encoder.rs index 84ed384cfd80..ed430fe6a1ec 100644 --- a/arrow-json/src/writer/encoder.rs +++ b/arrow-json/src/writer/encoder.rs @@ -454,7 +454,7 @@ impl Encoder for ArrayFormatter<'_> { /// A newtype wrapper around [`ArrayFormatter`] that skips surrounding the value with `"` struct RawArrayFormatter<'a>(ArrayFormatter<'a>); -impl<'a> Encoder for RawArrayFormatter<'a> { +impl Encoder for RawArrayFormatter<'_> { fn encode(&mut self, idx: usize, out: &mut Vec) { let _ = write!(out, "{}", self.0.value(idx)); } diff --git a/arrow-string/src/predicate.rs b/arrow-string/src/predicate.rs index f559088e6c96..408d9d45cc75 100644 --- a/arrow-string/src/predicate.rs +++ b/arrow-string/src/predicate.rs @@ -239,7 +239,7 @@ fn equals_kernel((n, h): (&u8, &u8)) -> bool { } fn equals_ignore_ascii_case_kernel((n, h): (&u8, &u8)) -> bool { - n.to_ascii_lowercase() == h.to_ascii_lowercase() + n.eq_ignore_ascii_case(h) } /// Transforms a like `pattern` to a regex compatible pattern. To achieve that, it does: diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 32bc13b62a53..133b5b212b36 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -81,7 +81,7 @@ lz4_flex = { version = "0.11", default-features = false, features = ["std", "fra zstd = { version = "0.13", default-features = false } serde_json = { version = "1.0", features = ["std"], default-features = false } arrow = { workspace = true, features = ["ipc", "test_utils", "prettyprint", "json"] } -tokio = { version = "1.0", default-features = false, features = ["macros", "rt", "io-util", "fs"] } +tokio = { version = "1.0", default-features = false, features = ["macros", "rt-multi-thread", "io-util", "fs"] } rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] } object_store = { version = "0.11.0", default-features = false, features = ["azure"] } diff --git a/parquet/src/arrow/async_reader/store.rs b/parquet/src/arrow/async_reader/store.rs index e6b47856ebe8..fd0397b5e1fc 100644 --- a/parquet/src/arrow/async_reader/store.rs +++ b/parquet/src/arrow/async_reader/store.rs @@ -15,17 +15,15 @@ // specific language governing permissions and limitations // under the License. -use std::ops::Range; -use std::sync::Arc; +use std::{ops::Range, sync::Arc}; use bytes::Bytes; -use futures::future::BoxFuture; -use futures::{FutureExt, TryFutureExt}; - -use object_store::{ObjectMeta, ObjectStore}; +use futures::{future::BoxFuture, FutureExt, TryFutureExt}; +use object_store::{path::Path, ObjectMeta, ObjectStore}; +use tokio::runtime::Handle; use crate::arrow::async_reader::AsyncFileReader; -use crate::errors::Result; +use crate::errors::{ParquetError, Result}; use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; /// Reads Parquet files in object storage using [`ObjectStore`]. @@ -59,6 +57,7 @@ pub struct ParquetObjectReader { metadata_size_hint: Option, preload_column_index: bool, preload_offset_index: bool, + runtime: Option, } impl ParquetObjectReader { @@ -72,6 +71,7 @@ impl ParquetObjectReader { metadata_size_hint: None, preload_column_index: false, preload_offset_index: false, + runtime: None, } } @@ -99,29 +99,70 @@ impl ParquetObjectReader { ..self } } + + /// Perform IO on the provided tokio runtime + /// + /// Tokio is a cooperative scheduler, and relies on tasks yielding in a timely manner + /// to service IO. Therefore, running IO and CPU-bound tasks, such as parquet decoding, + /// on the same tokio runtime can lead to degraded throughput, dropped connections and + /// other issues. For more information see [here]. + /// + /// [here]: https://www.influxdata.com/blog/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/ + pub fn with_runtime(self, handle: Handle) -> Self { + Self { + runtime: Some(handle), + ..self + } + } + + fn spawn(&self, f: F) -> BoxFuture<'_, Result> + where + F: for<'a> FnOnce(&'a Arc, &'a Path) -> BoxFuture<'a, Result> + + Send + + 'static, + O: Send + 'static, + E: Into + Send + 'static, + { + match &self.runtime { + Some(handle) => { + let path = self.meta.location.clone(); + let store = Arc::clone(&self.store); + handle + .spawn(async move { f(&store, &path).await }) + .map_ok_or_else( + |e| match e.try_into_panic() { + Err(e) => Err(ParquetError::External(Box::new(e))), + Ok(p) => std::panic::resume_unwind(p), + }, + |res| res.map_err(|e| e.into()), + ) + .boxed() + } + None => f(&self.store, &self.meta.location) + .map_err(|e| e.into()) + .boxed(), + } + } } impl AsyncFileReader for ParquetObjectReader { fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { - self.store - .get_range(&self.meta.location, range) - .map_err(|e| e.into()) - .boxed() + self.spawn(|store, path| store.get_range(path, range)) } fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> where Self: Send, { - async move { - self.store - .get_ranges(&self.meta.location, &ranges) - .await - .map_err(|e| e.into()) - } - .boxed() + self.spawn(|store, path| async move { store.get_ranges(path, &ranges).await }.boxed()) } + // This method doesn't directly call `self.spawn` because all of the IO that is done down the + // line due to this method call is done through `self.get_bytes` and/or `self.get_byte_ranges`. + // When `self` is passed into `ParquetMetaDataReader::load_and_finish`, it treats it as + // an `impl MetadataFetch` and calls those methods to get data from it. Due to `Self`'s impl of + // `AsyncFileReader`, the calls to `MetadataFetch::fetch` are just delegated to + // `Self::get_bytes`. fn get_metadata(&mut self) -> BoxFuture<'_, Result>> { Box::pin(async move { let file_size = self.meta.size; @@ -138,30 +179,40 @@ impl AsyncFileReader for ParquetObjectReader { #[cfg(test)] mod tests { - use std::sync::Arc; + use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }; use futures::TryStreamExt; use arrow::util::test_util::parquet_test_data; + use futures::FutureExt; use object_store::local::LocalFileSystem; use object_store::path::Path; - use object_store::ObjectStore; + use object_store::{ObjectMeta, ObjectStore}; - use crate::arrow::async_reader::ParquetObjectReader; + use crate::arrow::async_reader::{AsyncFileReader, ParquetObjectReader}; use crate::arrow::ParquetRecordBatchStreamBuilder; + use crate::errors::ParquetError; - #[tokio::test] - async fn test_simple() { + async fn get_meta_store() -> (ObjectMeta, Arc) { let res = parquet_test_data(); let store = LocalFileSystem::new_with_prefix(res).unwrap(); - let mut meta = store + let meta = store .head(&Path::from("alltypes_plain.parquet")) .await .unwrap(); - let store = Arc::new(store) as Arc; - let object_reader = ParquetObjectReader::new(Arc::clone(&store), meta.clone()); + (meta, Arc::new(store) as Arc) + } + + #[tokio::test] + async fn test_simple() { + let (meta, store) = get_meta_store().await; + let object_reader = ParquetObjectReader::new(store, meta); + let builder = ParquetRecordBatchStreamBuilder::new(object_reader) .await .unwrap(); @@ -169,7 +220,11 @@ mod tests { assert_eq!(batches.len(), 1); assert_eq!(batches[0].num_rows(), 8); + } + #[tokio::test] + async fn test_not_found() { + let (mut meta, store) = get_meta_store().await; meta.location = Path::from("I don't exist.parquet"); let object_reader = ParquetObjectReader::new(store, meta); @@ -180,10 +235,86 @@ mod tests { let err = e.to_string(); assert!( err.contains("not found: No such file or directory (os error 2)"), - "{}", - err + "{err}", ); } } } + + #[tokio::test] + async fn test_runtime_is_used() { + let num_actions = Arc::new(AtomicUsize::new(0)); + + let (a1, a2) = (num_actions.clone(), num_actions.clone()); + let rt = tokio::runtime::Builder::new_multi_thread() + .on_thread_park(move || { + a1.fetch_add(1, Ordering::Relaxed); + }) + .on_thread_unpark(move || { + a2.fetch_add(1, Ordering::Relaxed); + }) + .build() + .unwrap(); + + let (meta, store) = get_meta_store().await; + + let initial_actions = num_actions.load(Ordering::Relaxed); + + let reader = ParquetObjectReader::new(store, meta).with_runtime(rt.handle().clone()); + + let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap(); + let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap(); + + // Just copied these assert_eqs from the `test_simple` above + assert_eq!(batches.len(), 1); + assert_eq!(batches[0].num_rows(), 8); + + assert!(num_actions.load(Ordering::Relaxed) - initial_actions > 0); + + // Runtimes have to be dropped in blocking contexts, so we need to move this one to a new + // blocking thread to drop it. + tokio::runtime::Handle::current().spawn_blocking(move || drop(rt)); + } + + /// Unit test that `ParquetObjectReader::spawn`spawns on the provided runtime + #[tokio::test] + async fn test_runtime_thread_id_different() { + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .build() + .unwrap(); + + let (meta, store) = get_meta_store().await; + + let reader = ParquetObjectReader::new(store, meta).with_runtime(rt.handle().clone()); + + let current_id = std::thread::current().id(); + + let other_id = reader + .spawn(|_, _| async move { Ok::<_, ParquetError>(std::thread::current().id()) }.boxed()) + .await + .unwrap(); + + assert_ne!(current_id, other_id); + + tokio::runtime::Handle::current().spawn_blocking(move || drop(rt)); + } + + #[tokio::test] + async fn io_fails_on_shutdown_runtime() { + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .build() + .unwrap(); + + let (meta, store) = get_meta_store().await; + + let mut reader = ParquetObjectReader::new(store, meta).with_runtime(rt.handle().clone()); + + rt.shutdown_background(); + + let err = reader.get_bytes(0..1).await.unwrap_err().to_string(); + + assert!(err.to_string().contains("was cancelled")); + } } diff --git a/parquet/src/errors.rs b/parquet/src/errors.rs index bb4d2543c7b4..6adbffa2a2e5 100644 --- a/parquet/src/errors.rs +++ b/parquet/src/errors.rs @@ -106,7 +106,6 @@ impl From for ParquetError { ParquetError::External(Box::new(e)) } } - #[cfg(feature = "arrow")] impl From for ParquetError { fn from(e: ArrowError) -> ParquetError {