Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: greatly improve parquet cloud reading #11479

Merged
merged 9 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 0 additions & 91 deletions crates/nano-arrow/src/io/parquet/read/row_group.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use std::io::{Read, Seek};

use futures::future::{try_join_all, BoxFuture};
use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
use parquet2::indexes::FilteredPage;
use parquet2::metadata::ColumnChunkMetaData;
use parquet2::read::{BasicDecompressor, IndexedPageReader, PageMetaData, PageReader};
Expand Down Expand Up @@ -133,48 +131,6 @@ where
Ok((meta, chunk))
}

async fn _read_single_column_async<'b, R, F>(
reader_factory: F,
meta: &ColumnChunkMetaData,
) -> Result<(&ColumnChunkMetaData, Vec<u8>)>
where
R: AsyncRead + AsyncSeek + Send + Unpin,
F: Fn() -> BoxFuture<'b, std::io::Result<R>>,
{
let mut reader = reader_factory().await?;
let (start, length) = meta.byte_range();
reader.seek(std::io::SeekFrom::Start(start)).await?;

let mut chunk = vec![];
chunk.try_reserve(length as usize)?;
reader.take(length).read_to_end(&mut chunk).await?;
Result::Ok((meta, chunk))
}

/// Reads all columns that are part of the parquet field `field_name`
/// # Implementation
/// This operation is IO-bounded `O(C)` where C is the number of columns associated to
/// the field (one for non-nested types)
///
/// It does so asynchronously via a single `join_all` over all the necessary columns for
/// `field_name`.
pub async fn read_columns_async<
'a,
'b,
R: AsyncRead + AsyncSeek + Send + Unpin,
F: Fn() -> BoxFuture<'b, std::io::Result<R>> + Clone,
>(
reader_factory: F,
columns: &'a [ColumnChunkMetaData],
field_name: &str,
) -> Result<Vec<(&'a ColumnChunkMetaData, Vec<u8>)>> {
let futures = get_field_columns(columns, field_name)
.into_iter()
.map(|meta| async { _read_single_column_async(reader_factory.clone(), meta).await });

try_join_all(futures).await
}

type Pages = Box<
dyn Iterator<Item = std::result::Result<parquet2::page::CompressedPage, parquet2::error::Error>>
+ Sync
Expand Down Expand Up @@ -290,50 +246,3 @@ pub fn read_columns_many<'a, R: Read + Seek>(
.collect()
}
}

/// Returns a vector of iterators of [`Array`] corresponding to the top level parquet fields whose
/// name matches `fields`'s names.
///
/// # Implementation
/// This operation is IO-bounded `O(C)` where C is the number of columns in the row group -
/// it reads all the columns to memory from the row group associated to the requested fields.
/// It does so asynchronously via `join_all`
pub async fn read_columns_many_async<
'a,
'b,
R: AsyncRead + AsyncSeek + Send + Unpin,
F: Fn() -> BoxFuture<'b, std::io::Result<R>> + Clone,
>(
reader_factory: F,
row_group: &RowGroupMetaData,
fields: Vec<Field>,
chunk_size: Option<usize>,
limit: Option<usize>,
pages: Option<Vec<Vec<Vec<FilteredPage>>>>,
) -> Result<Vec<ArrayIter<'a>>> {
let num_rows = row_group.num_rows();
let num_rows = limit.map(|limit| limit.min(num_rows)).unwrap_or(num_rows);

let futures = fields
.iter()
.map(|field| read_columns_async(reader_factory.clone(), row_group.columns(), &field.name));

let field_columns = try_join_all(futures).await?;

if let Some(pages) = pages {
field_columns
.into_iter()
.zip(fields)
.zip(pages)
.map(|((columns, field), pages)| {
to_deserializer(columns, field, num_rows, chunk_size, Some(pages))
})
.collect()
} else {
field_columns
.into_iter()
.zip(fields.into_iter())
.map(|(columns, field)| to_deserializer(columns, field, num_rows, chunk_size, None))
.collect()
}
}
136 changes: 76 additions & 60 deletions crates/polars-io/src/parquet/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,18 @@
use std::ops::Range;
use std::sync::Arc;

use arrow::io::parquet::read::{
self as parquet2_read, read_columns_async, ColumnChunkMetaData, RowGroupMetaData,
};
use arrow::io::parquet::read::{self as parquet2_read, RowGroupMetaData};
use arrow::io::parquet::write::FileMetaData;
use futures::future::BoxFuture;
use futures::TryFutureExt;
use bytes::Bytes;
use futures::future::try_join_all;
use object_store::path::Path as ObjectPath;
use object_store::ObjectStore;
use polars_core::config::verbose;
use polars_core::datatypes::PlHashMap;
use polars_core::error::{to_compute_err, PolarsResult};
use polars_core::prelude::*;
use polars_core::schema::Schema;
use smartstring::alias::String as SmartString;

use super::cloud::{build_object_store, CloudLocation, CloudReader};
use super::mmap;
Expand All @@ -25,18 +24,22 @@ pub struct ParquetObjectStore {
store: Arc<dyn ObjectStore>,
path: ObjectPath,
length: Option<u64>,
metadata: Option<FileMetaData>,
metadata: Option<Arc<FileMetaData>>,
}

impl ParquetObjectStore {
pub async fn from_uri(uri: &str, options: Option<&CloudOptions>) -> PolarsResult<Self> {
pub async fn from_uri(
uri: &str,
options: Option<&CloudOptions>,
metadata: Option<Arc<FileMetaData>>,
) -> PolarsResult<Self> {
let (CloudLocation { prefix, .. }, store) = build_object_store(uri, options).await?;

Ok(ParquetObjectStore {
store,
path: ObjectPath::from_url_path(prefix).map_err(to_compute_err)?,
length: None,
metadata: None,
metadata,
})
}

Expand Down Expand Up @@ -82,86 +85,102 @@ impl ParquetObjectStore {
}

/// Fetch and memoize the metadata of the parquet file.
pub async fn get_metadata(&mut self) -> PolarsResult<&FileMetaData> {
self.initialize_length().await?;
pub async fn get_metadata(&mut self) -> PolarsResult<&Arc<FileMetaData>> {
if self.metadata.is_none() {
self.metadata = Some(self.fetch_metadata().await?);
self.initialize_length().await?;
self.metadata = Some(Arc::new(self.fetch_metadata().await?));
}
Ok(self.metadata.as_ref().unwrap())
}
}

/// A vector of downloaded RowGroups.
/// A RowGroup will have 1 or more columns, for each column we store:
/// - a reference to its metadata
/// - the actual content as downloaded from object storage (generally cloud).
type RowGroupChunks<'a> = Vec<Vec<(&'a ColumnChunkMetaData, Vec<u8>)>>;
async fn read_single_column_async(
async_reader: &ParquetObjectStore,
start: usize,
length: usize,
) -> PolarsResult<(u64, Bytes)> {
let chunk = async_reader
.store
.get_range(&async_reader.path, start..start + length)
.await
.map_err(to_compute_err)?;
Ok((start as u64, chunk))
}

async fn read_columns_async2(
async_reader: &ParquetObjectStore,
ranges: &[(u64, u64)],
) -> PolarsResult<Vec<(u64, Bytes)>> {
let futures = ranges.iter().map(|(start, length)| async {
read_single_column_async(async_reader, *start as usize, *length as usize).await
});

try_join_all(futures).await
}

/// Download rowgroups for the column whose indexes are given in `projection`.
/// We concurrently download the columns for each field.
async fn download_projection<'a: 'b, 'b>(
projection: &[usize],
row_groups: &'a [RowGroupMetaData],
schema: &ArrowSchema,
async_reader: &'b ParquetObjectStore,
) -> PolarsResult<RowGroupChunks<'a>> {
let fields = projection
.iter()
.map(|i| schema.fields[*i].name.clone())
.collect::<Vec<_>>();

let reader_factory = || {
let object_store = async_reader.store.clone();
let path = async_reader.path.clone();
Box::pin(futures::future::ready(Ok(CloudReader::new(
async_reader.length,
object_store,
path,
))))
}
as BoxFuture<'static, std::result::Result<CloudReader, std::io::Error>>;

async fn download_projection(
fields: &[SmartString],
row_groups: &[RowGroupMetaData],
async_reader: &Arc<ParquetObjectStore>,
) -> PolarsResult<Vec<Vec<(u64, Bytes)>>> {
// Build the cartesian product of the fields and the row groups.
let product_futures = fields
.into_iter()
.flat_map(|f| row_groups.iter().map(move |r| (f.clone(), r)))
.iter()
.flat_map(|name| row_groups.iter().map(move |r| (name.clone(), r)))
.map(|(name, row_group)| async move {
let columns = row_group.columns();
read_columns_async(reader_factory, columns, name.as_ref())
.map_err(to_compute_err)
.await
let ranges = columns
.iter()
.filter_map(|meta| {
if meta.descriptor().path_in_schema[0] == name.as_str() {
Some(meta.byte_range())
} else {
None
}
})
.collect::<Vec<_>>();
let async_reader = async_reader.clone();
let handle =
tokio::spawn(async move { read_columns_async2(&async_reader, &ranges).await });
handle.await.unwrap()
});

// Download concurrently
futures::future::try_join_all(product_futures).await
}

pub struct FetchRowGroupsFromObjectStore {
reader: ParquetObjectStore,
reader: Arc<ParquetObjectStore>,
row_groups_metadata: Vec<RowGroupMetaData>,
projection: Vec<usize>,
projected_fields: Vec<SmartString>,
logging: bool,
schema: ArrowSchema,
}

impl FetchRowGroupsFromObjectStore {
pub fn new(
reader: ParquetObjectStore,
metadata: &FileMetaData,
projection: &Option<Vec<usize>>,
schema: SchemaRef,
projection: Option<&[usize]>,
) -> PolarsResult<Self> {
let schema = parquet2_read::schema::infer_schema(metadata)?;
let logging = verbose();

let projection = projection
.to_owned()
.unwrap_or_else(|| (0usize..schema.fields.len()).collect::<Vec<_>>());
let projected_fields = projection
.map(|projection| {
projection
.iter()
.map(|i| schema.get_at_index(*i).unwrap().0.clone())
.collect::<Vec<_>>()
})
.unwrap_or_else(|| schema.iter().map(|tpl| tpl.0).cloned().collect());

Ok(FetchRowGroupsFromObjectStore {
reader,
reader: Arc::new(reader),
row_groups_metadata: metadata.row_groups.to_owned(),
projection,
projected_fields,
logging,
schema,
})
}

Expand All @@ -182,22 +201,19 @@ impl FetchRowGroupsFromObjectStore {

// Package in the format required by ColumnStore.
let downloaded =
download_projection(&self.projection, row_groups, &self.schema, &self.reader).await?;
download_projection(&self.projected_fields, row_groups, &self.reader).await?;

if self.logging {
eprintln!(
"BatchedParquetReader: fetched {} row_groups for {} fields, yielding {} column chunks.",
row_groups.len(),
self.projection.len(),
self.projected_fields.len(),
downloaded.len(),
);
}
let downloaded_per_filepos = downloaded
.into_iter()
.flat_map(|rg| {
rg.into_iter()
.map(|(meta, data)| (meta.byte_range().0, data))
})
.flat_map(|rg| rg.into_iter())
.collect::<PlHashMap<_, _>>();

if self.logging {
Expand Down
5 changes: 3 additions & 2 deletions crates/polars-io/src/parquet/mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use arrow::io::parquet::read::{
column_iter_to_arrays, get_field_columns, ArrayIter, BasicDecompressor, ColumnChunkMetaData,
PageReader,
};
use bytes::Bytes;
#[cfg(feature = "async")]
use polars_core::datatypes::PlHashMap;

Expand All @@ -22,7 +23,7 @@ use super::*;
pub enum ColumnStore<'a> {
Local(&'a [u8]),
#[cfg(feature = "async")]
Fetched(PlHashMap<u64, Vec<u8>>),
Fetched(PlHashMap<u64, Bytes>),
}

/// For local files memory maps all columns that are part of the parquet field `field_name`.
Expand Down Expand Up @@ -52,7 +53,7 @@ fn _mmap_single_column<'a>(
"mmap_columns: column with start {start} must be prefetched in ColumnStore.\n"
)
});
entry.as_slice()
entry.as_ref()
},
};
(meta, chunk)
Expand Down
Loading