Skip to content

Commit

Permalink
- Add comment about why we don't use spawn for metadata
Browse files Browse the repository at this point in the history
- Remove outdated comment about target_has_atomic
- Add test to verify reader fails when spawned on a shutdown runtime
  • Loading branch information
itsjunetime committed Oct 29, 2024
1 parent ff4437d commit e2270b0
Showing 1 changed file with 26 additions and 4 deletions.
30 changes: 26 additions & 4 deletions parquet/src/arrow/async_reader/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@ impl AsyncFileReader for ParquetObjectReader {
self.spawn(|store, path| async move { store.get_ranges(path, &ranges).await }.boxed())
}

// This method doesn't directly call `self.spawn` because all of the IO that is done down the
// line due to this method call is done through `self.get_bytes` and/or `self.get_byte_ranges`.
// When `self` is passed into `ParquetMetaDataReader::load_and_finish`, it treats it as
// an `impl MetadataFetch` and calls those methods to get data from it. Due to `Self`'s impl of
// `AsyncFileReader`, the calls to `MetadataFetch::fetch` are just delegated to
// `Self::get_bytes`.
fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
Box::pin(async move {
let file_size = self.meta.size;
Expand Down Expand Up @@ -189,7 +195,7 @@ mod tests {
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};

use crate::arrow::async_reader::ParquetObjectReader;
use crate::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
use crate::arrow::ParquetRecordBatchStreamBuilder;

async fn get_meta_store() -> (ObjectMeta, Arc<dyn ObjectStore>) {
Expand Down Expand Up @@ -238,8 +244,6 @@ mod tests {
}

#[tokio::test]
// We need to mark this with the `target_has_atomic` because the spawned_tasks_count() fn is
// only available for that cfg
async fn test_runtime_is_used() {
let num_actions = Arc::new(AtomicUsize::new(0));

Expand All @@ -263,7 +267,7 @@ mod tests {
let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();

// Just copied these assert_eqs from the `test_timple` above
// Just copied these assert_eqs from the `test_simple` above
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 8);

Expand Down Expand Up @@ -297,4 +301,22 @@ mod tests {

tokio::runtime::Handle::current().spawn_blocking(move || drop(rt));
}

#[tokio::test]
async fn io_fails_on_shutdown_runtime() {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.build()
.unwrap();

let (meta, store) = get_meta_store().await;

let mut reader = ParquetObjectReader::new(store, meta).with_runtime(rt.handle().clone());

rt.shutdown_background();

let err = reader.get_bytes(0..1).await.unwrap_err().to_string();

assert!(err.to_string().contains("was cancelled"));
}
}

0 comments on commit e2270b0

Please sign in to comment.