Skip to content

Commit

Permalink
perf: greatly improve parquet cloud reading (#11479)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Oct 3, 2023
1 parent d8cc0cf commit 5882ab7
Show file tree
Hide file tree
Showing 12 changed files with 186 additions and 198 deletions.
91 changes: 0 additions & 91 deletions crates/nano-arrow/src/io/parquet/read/row_group.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use std::io::{Read, Seek};

use futures::future::{try_join_all, BoxFuture};
use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
use parquet2::indexes::FilteredPage;
use parquet2::metadata::ColumnChunkMetaData;
use parquet2::read::{BasicDecompressor, IndexedPageReader, PageMetaData, PageReader};
Expand Down Expand Up @@ -133,48 +131,6 @@ where
Ok((meta, chunk))
}

async fn _read_single_column_async<'b, R, F>(
reader_factory: F,
meta: &ColumnChunkMetaData,
) -> Result<(&ColumnChunkMetaData, Vec<u8>)>
where
R: AsyncRead + AsyncSeek + Send + Unpin,
F: Fn() -> BoxFuture<'b, std::io::Result<R>>,
{
let mut reader = reader_factory().await?;
let (start, length) = meta.byte_range();
reader.seek(std::io::SeekFrom::Start(start)).await?;

let mut chunk = vec![];
chunk.try_reserve(length as usize)?;
reader.take(length).read_to_end(&mut chunk).await?;
Result::Ok((meta, chunk))
}

/// Reads all columns that are part of the parquet field `field_name`
/// # Implementation
/// This operation is IO-bounded `O(C)` where C is the number of columns associated to
/// the field (one for non-nested types)
///
/// It does so asynchronously via a single `join_all` over all the necessary columns for
/// `field_name`.
pub async fn read_columns_async<
'a,
'b,
R: AsyncRead + AsyncSeek + Send + Unpin,
F: Fn() -> BoxFuture<'b, std::io::Result<R>> + Clone,
>(
reader_factory: F,
columns: &'a [ColumnChunkMetaData],
field_name: &str,
) -> Result<Vec<(&'a ColumnChunkMetaData, Vec<u8>)>> {
let futures = get_field_columns(columns, field_name)
.into_iter()
.map(|meta| async { _read_single_column_async(reader_factory.clone(), meta).await });

try_join_all(futures).await
}

type Pages = Box<
dyn Iterator<Item = std::result::Result<parquet2::page::CompressedPage, parquet2::error::Error>>
+ Sync
Expand Down Expand Up @@ -290,50 +246,3 @@ pub fn read_columns_many<'a, R: Read + Seek>(
.collect()
}
}

/// Returns a vector of iterators of [`Array`] corresponding to the top level parquet fields whose
/// name matches `fields`'s names.
///
/// # Implementation
/// This operation is IO-bounded `O(C)` where C is the number of columns in the row group -
/// it reads all the columns to memory from the row group associated to the requested fields.
/// It does so asynchronously via `join_all`
pub async fn read_columns_many_async<
'a,
'b,
R: AsyncRead + AsyncSeek + Send + Unpin,
F: Fn() -> BoxFuture<'b, std::io::Result<R>> + Clone,
>(
reader_factory: F,
row_group: &RowGroupMetaData,
fields: Vec<Field>,
chunk_size: Option<usize>,
limit: Option<usize>,
pages: Option<Vec<Vec<Vec<FilteredPage>>>>,
) -> Result<Vec<ArrayIter<'a>>> {
let num_rows = row_group.num_rows();
let num_rows = limit.map(|limit| limit.min(num_rows)).unwrap_or(num_rows);

let futures = fields
.iter()
.map(|field| read_columns_async(reader_factory.clone(), row_group.columns(), &field.name));

let field_columns = try_join_all(futures).await?;

if let Some(pages) = pages {
field_columns
.into_iter()
.zip(fields)
.zip(pages)
.map(|((columns, field), pages)| {
to_deserializer(columns, field, num_rows, chunk_size, Some(pages))
})
.collect()
} else {
field_columns
.into_iter()
.zip(fields.into_iter())
.map(|(columns, field)| to_deserializer(columns, field, num_rows, chunk_size, None))
.collect()
}
}
136 changes: 76 additions & 60 deletions crates/polars-io/src/parquet/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,18 @@
use std::ops::Range;
use std::sync::Arc;

use arrow::io::parquet::read::{
self as parquet2_read, read_columns_async, ColumnChunkMetaData, RowGroupMetaData,
};
use arrow::io::parquet::read::{self as parquet2_read, RowGroupMetaData};
use arrow::io::parquet::write::FileMetaData;
use futures::future::BoxFuture;
use futures::TryFutureExt;
use bytes::Bytes;
use futures::future::try_join_all;
use object_store::path::Path as ObjectPath;
use object_store::ObjectStore;
use polars_core::config::verbose;
use polars_core::datatypes::PlHashMap;
use polars_core::error::{to_compute_err, PolarsResult};
use polars_core::prelude::*;
use polars_core::schema::Schema;
use smartstring::alias::String as SmartString;

use super::cloud::{build_object_store, CloudLocation, CloudReader};
use super::mmap;
Expand All @@ -25,18 +24,22 @@ pub struct ParquetObjectStore {
store: Arc<dyn ObjectStore>,
path: ObjectPath,
length: Option<u64>,
metadata: Option<FileMetaData>,
metadata: Option<Arc<FileMetaData>>,
}

impl ParquetObjectStore {
pub async fn from_uri(uri: &str, options: Option<&CloudOptions>) -> PolarsResult<Self> {
pub async fn from_uri(
uri: &str,
options: Option<&CloudOptions>,
metadata: Option<Arc<FileMetaData>>,
) -> PolarsResult<Self> {
let (CloudLocation { prefix, .. }, store) = build_object_store(uri, options).await?;

Ok(ParquetObjectStore {
store,
path: ObjectPath::from_url_path(prefix).map_err(to_compute_err)?,
length: None,
metadata: None,
metadata,
})
}

Expand Down Expand Up @@ -82,86 +85,102 @@ impl ParquetObjectStore {
}

/// Fetch and memoize the metadata of the parquet file.
pub async fn get_metadata(&mut self) -> PolarsResult<&FileMetaData> {
self.initialize_length().await?;
pub async fn get_metadata(&mut self) -> PolarsResult<&Arc<FileMetaData>> {
if self.metadata.is_none() {
self.metadata = Some(self.fetch_metadata().await?);
self.initialize_length().await?;
self.metadata = Some(Arc::new(self.fetch_metadata().await?));
}
Ok(self.metadata.as_ref().unwrap())
}
}

/// A vector of downloaded RowGroups.
/// A RowGroup will have 1 or more columns, for each column we store:
/// - a reference to its metadata
/// - the actual content as downloaded from object storage (generally cloud).
type RowGroupChunks<'a> = Vec<Vec<(&'a ColumnChunkMetaData, Vec<u8>)>>;
async fn read_single_column_async(
async_reader: &ParquetObjectStore,
start: usize,
length: usize,
) -> PolarsResult<(u64, Bytes)> {
let chunk = async_reader
.store
.get_range(&async_reader.path, start..start + length)
.await
.map_err(to_compute_err)?;
Ok((start as u64, chunk))
}

async fn read_columns_async2(
async_reader: &ParquetObjectStore,
ranges: &[(u64, u64)],
) -> PolarsResult<Vec<(u64, Bytes)>> {
let futures = ranges.iter().map(|(start, length)| async {
read_single_column_async(async_reader, *start as usize, *length as usize).await
});

try_join_all(futures).await
}

/// Download rowgroups for the column whose indexes are given in `projection`.
/// We concurrently download the columns for each field.
async fn download_projection<'a: 'b, 'b>(
projection: &[usize],
row_groups: &'a [RowGroupMetaData],
schema: &ArrowSchema,
async_reader: &'b ParquetObjectStore,
) -> PolarsResult<RowGroupChunks<'a>> {
let fields = projection
.iter()
.map(|i| schema.fields[*i].name.clone())
.collect::<Vec<_>>();

let reader_factory = || {
let object_store = async_reader.store.clone();
let path = async_reader.path.clone();
Box::pin(futures::future::ready(Ok(CloudReader::new(
async_reader.length,
object_store,
path,
))))
}
as BoxFuture<'static, std::result::Result<CloudReader, std::io::Error>>;

async fn download_projection(
fields: &[SmartString],
row_groups: &[RowGroupMetaData],
async_reader: &Arc<ParquetObjectStore>,
) -> PolarsResult<Vec<Vec<(u64, Bytes)>>> {
// Build the cartesian product of the fields and the row groups.
let product_futures = fields
.into_iter()
.flat_map(|f| row_groups.iter().map(move |r| (f.clone(), r)))
.iter()
.flat_map(|name| row_groups.iter().map(move |r| (name.clone(), r)))
.map(|(name, row_group)| async move {
let columns = row_group.columns();
read_columns_async(reader_factory, columns, name.as_ref())
.map_err(to_compute_err)
.await
let ranges = columns
.iter()
.filter_map(|meta| {
if meta.descriptor().path_in_schema[0] == name.as_str() {
Some(meta.byte_range())
} else {
None
}
})
.collect::<Vec<_>>();
let async_reader = async_reader.clone();
let handle =
tokio::spawn(async move { read_columns_async2(&async_reader, &ranges).await });
handle.await.unwrap()
});

// Download concurrently
futures::future::try_join_all(product_futures).await
}

pub struct FetchRowGroupsFromObjectStore {
reader: ParquetObjectStore,
reader: Arc<ParquetObjectStore>,
row_groups_metadata: Vec<RowGroupMetaData>,
projection: Vec<usize>,
projected_fields: Vec<SmartString>,
logging: bool,
schema: ArrowSchema,
}

impl FetchRowGroupsFromObjectStore {
pub fn new(
reader: ParquetObjectStore,
metadata: &FileMetaData,
projection: &Option<Vec<usize>>,
schema: SchemaRef,
projection: Option<&[usize]>,
) -> PolarsResult<Self> {
let schema = parquet2_read::schema::infer_schema(metadata)?;
let logging = verbose();

let projection = projection
.to_owned()
.unwrap_or_else(|| (0usize..schema.fields.len()).collect::<Vec<_>>());
let projected_fields = projection
.map(|projection| {
projection
.iter()
.map(|i| schema.get_at_index(*i).unwrap().0.clone())
.collect::<Vec<_>>()
})
.unwrap_or_else(|| schema.iter().map(|tpl| tpl.0).cloned().collect());

Ok(FetchRowGroupsFromObjectStore {
reader,
reader: Arc::new(reader),
row_groups_metadata: metadata.row_groups.to_owned(),
projection,
projected_fields,
logging,
schema,
})
}

Expand All @@ -182,22 +201,19 @@ impl FetchRowGroupsFromObjectStore {

// Package in the format required by ColumnStore.
let downloaded =
download_projection(&self.projection, row_groups, &self.schema, &self.reader).await?;
download_projection(&self.projected_fields, row_groups, &self.reader).await?;

if self.logging {
eprintln!(
"BatchedParquetReader: fetched {} row_groups for {} fields, yielding {} column chunks.",
row_groups.len(),
self.projection.len(),
self.projected_fields.len(),
downloaded.len(),
);
}
let downloaded_per_filepos = downloaded
.into_iter()
.flat_map(|rg| {
rg.into_iter()
.map(|(meta, data)| (meta.byte_range().0, data))
})
.flat_map(|rg| rg.into_iter())
.collect::<PlHashMap<_, _>>();

if self.logging {
Expand Down
5 changes: 3 additions & 2 deletions crates/polars-io/src/parquet/mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use arrow::io::parquet::read::{
column_iter_to_arrays, get_field_columns, ArrayIter, BasicDecompressor, ColumnChunkMetaData,
PageReader,
};
use bytes::Bytes;
#[cfg(feature = "async")]
use polars_core::datatypes::PlHashMap;

Expand All @@ -22,7 +23,7 @@ use super::*;
pub enum ColumnStore<'a> {
Local(&'a [u8]),
#[cfg(feature = "async")]
Fetched(PlHashMap<u64, Vec<u8>>),
Fetched(PlHashMap<u64, Bytes>),
}

/// For local files memory maps all columns that are part of the parquet field `field_name`.
Expand Down Expand Up @@ -52,7 +53,7 @@ fn _mmap_single_column<'a>(
"mmap_columns: column with start {start} must be prefetched in ColumnStore.\n"
)
});
entry.as_slice()
entry.as_ref()
},
};
(meta, chunk)
Expand Down
Loading

0 comments on commit 5882ab7

Please sign in to comment.