From d184c69ca6a4e40fda55d729e28c48f7a824a45e Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Thu, 8 Aug 2024 01:22:58 +0800 Subject: [PATCH] feat: introduce opendal `AsyncReader` for parquet integrations (#4972) --- integrations/parquet/Cargo.toml | 4 + integrations/parquet/examples/async_reader.rs | 57 +++ integrations/parquet/src/async_reader.rs | 347 ++++++++++++++++++ integrations/parquet/src/async_writer.rs | 2 +- integrations/parquet/src/lib.rs | 25 +- 5 files changed, 428 insertions(+), 7 deletions(-) create mode 100644 integrations/parquet/examples/async_reader.rs create mode 100644 integrations/parquet/src/async_reader.rs diff --git a/integrations/parquet/Cargo.toml b/integrations/parquet/Cargo.toml index 1efe19b75a85..bc34698306c1 100644 --- a/integrations/parquet/Cargo.toml +++ b/integrations/parquet/Cargo.toml @@ -50,3 +50,7 @@ arrow = { version = "52.0" } [[example]] name = "async_writer" path = "examples/async_writer.rs" + +[[example]] +name = "async_reader" +path = "examples/async_reader.rs" diff --git a/integrations/parquet/examples/async_reader.rs b/integrations/parquet/examples/async_reader.rs new file mode 100644 index 000000000000..9fe7ebe7418c --- /dev/null +++ b/integrations/parquet/examples/async_reader.rs @@ -0,0 +1,57 @@ +use std::sync::Arc; + +use arrow::array::{ArrayRef, Int64Array, RecordBatch}; + +use futures::StreamExt; +use opendal::{services::S3Config, Operator}; +use parquet::arrow::{AsyncArrowWriter, ParquetRecordBatchStreamBuilder}; +use parquet_opendal::{AsyncReader, AsyncWriter}; + +#[tokio::main] +async fn main() { + let mut cfg = S3Config::default(); + cfg.access_key_id = Some("my_access_key".to_string()); + cfg.secret_access_key = Some("my_secret_key".to_string()); + cfg.endpoint = Some("my_endpoint".to_string()); + cfg.region = Some("my_region".to_string()); + cfg.bucket = "my_bucket".to_string(); + + // Create a new operator + let operator = Operator::from_config(cfg).unwrap().finish(); + let path = "/path/to/file.parquet"; + + // Create an async writer + let writer = AsyncWriter::new( + operator + .writer_with(path) + .chunk(32 * 1024 * 1024) + .concurrent(8) + .await + .unwrap(), + ); + + let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; + let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap(); + let mut writer = AsyncArrowWriter::try_new(writer, to_write.schema(), None).unwrap(); + writer.write(&to_write).await.unwrap(); + writer.close().await.unwrap(); + + // gap: Allow the underlying reader to merge small IOs + // when the gap between multiple IO ranges is less than the threshold. + let reader = operator + .reader_with(path) + .gap(512 * 1024) + .chunk(16 * 1024 * 1024) + .concurrent(16) + .await + .unwrap(); + let content_len = operator.stat(path).await.unwrap().content_length(); + let reader = AsyncReader::new(reader, content_len).with_prefetch_footer_size(512 * 1024); + let mut stream = ParquetRecordBatchStreamBuilder::new(reader) + .await + .unwrap() + .build() + .unwrap(); + let read = stream.next().await.unwrap().unwrap(); + assert_eq!(to_write, read); +} diff --git a/integrations/parquet/src/async_reader.rs b/integrations/parquet/src/async_reader.rs new file mode 100644 index 000000000000..4bffdd06009c --- /dev/null +++ b/integrations/parquet/src/async_reader.rs @@ -0,0 +1,347 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::cmp::{max, min}; +use std::ops::Range; +use std::sync::Arc; + +use futures::future::BoxFuture; +use futures::FutureExt; +use opendal::Reader; +use parquet::arrow::async_reader::AsyncFileReader; +use parquet::errors::{ParquetError, Result as ParquetResult}; +use parquet::file::footer::{decode_footer, decode_metadata}; +use parquet::file::metadata::ParquetMetaData; +use parquet::file::FOOTER_SIZE; + +const PREFETCH_FOOTER_SIZE: usize = 512 * 1024; + +/// AsyncReader implements AsyncFileReader trait by using opendal. +/// +/// ```no_run +/// use std::sync::Arc; +/// use arrow::array::{ArrayRef, Int64Array, RecordBatch}; +/// +/// use futures::StreamExt; +/// use opendal::{services::S3Config, Operator}; +/// use parquet::arrow::{AsyncArrowWriter, ParquetRecordBatchStreamBuilder}; +/// use parquet_opendal::{AsyncReader, AsyncWriter}; +/// +/// #[tokio::main] +/// async fn main() { +/// let mut cfg = S3Config::default(); +/// cfg.access_key_id = Some("my_access_key".to_string()); +/// cfg.secret_access_key = Some("my_secret_key".to_string()); +/// cfg.endpoint = Some("my_endpoint".to_string()); +/// cfg.region = Some("my_region".to_string()); +/// cfg.bucket = "my_bucket".to_string(); +/// +/// // Create a new operator +/// let operator = Operator::from_config(cfg).unwrap().finish(); +/// let path = "/path/to/file.parquet"; +/// +/// // Create an async writer +/// let writer = AsyncWriter::new( +/// operator +/// .writer_with(path) +/// .chunk(32 * 1024 * 1024) +/// .concurrent(8) +/// .await +/// .unwrap(), +/// ); +/// +/// let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; +/// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap(); +/// let mut writer = AsyncArrowWriter::try_new(writer, to_write.schema(), None).unwrap(); +/// writer.write(&to_write).await.unwrap(); +/// writer.close().await.unwrap(); +/// +/// // gap: Allow the underlying reader to merge small IOs +/// // when the gap between multiple IO ranges is less than the threshold. +/// let reader = operator +/// .reader_with(path) +/// .gap(512 * 1024) +/// .chunk(16 * 1024 * 1024) +/// .concurrent(16) +/// .await +/// .unwrap(); +/// let content_len = operator.stat(path).await.unwrap().content_length(); +/// let reader = AsyncReader::new(reader, content_len).with_prefetch_footer_size(512 * 1024); +/// let mut stream = ParquetRecordBatchStreamBuilder::new(reader) +/// .await +/// .unwrap() +/// .build() +/// .unwrap(); +/// let read = stream.next().await.unwrap().unwrap(); +/// assert_eq!(to_write, read); +/// } +/// ``` +pub struct AsyncReader { + inner: Reader, + content_length: u64, + // The prefetch size for fetching file footer. + prefetch_footer_size: usize, +} + +fn set_prefetch_footer_size(footer_size: usize, content_size: u64) -> usize { + let footer_size = max(footer_size, FOOTER_SIZE); + min(footer_size as u64, content_size) as usize +} + +impl AsyncReader { + /// Create a [`AsyncReader`] by given [`Reader`]. + pub fn new(reader: Reader, content_length: u64) -> Self { + Self { + inner: reader, + content_length, + prefetch_footer_size: set_prefetch_footer_size(PREFETCH_FOOTER_SIZE, content_length), + } + } + + /// Set prefetch size for fetching file footer. + pub fn with_prefetch_footer_size(mut self, footer_size: usize) -> Self { + self.prefetch_footer_size = set_prefetch_footer_size(footer_size, self.content_length); + self + } +} + +impl AsyncFileReader for AsyncReader { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, ParquetResult> { + async move { + Ok(self + .inner + .read(range.start as u64..range.end as u64) + .await + .map_err(|err| ParquetError::External(Box::new(err)))? + .to_bytes()) + } + .boxed() + } + + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, ParquetResult>> { + async move { + Ok(self + .inner + .fetch( + ranges + .into_iter() + .map(|range| range.start as u64..range.end as u64) + .collect(), + ) + .await + .map_err(|err| ParquetError::External(Box::new(err)))? + .into_iter() + .map(|buf| buf.to_bytes()) + .collect::>()) + } + .boxed() + } + + fn get_metadata(&mut self) -> BoxFuture<'_, ParquetResult>> { + async move { + let prefetched_footer_content = self + .inner + .read(self.content_length - self.prefetch_footer_size as u64..self.content_length) + .await + .map_err(|err| ParquetError::External(Box::new(err)))?; + let prefetched_footer_length = prefetched_footer_content.len(); + + // Decode the metadata length from the last 8 bytes of the file. + let metadata_length = { + let buf = &prefetched_footer_content + .slice((prefetched_footer_length - FOOTER_SIZE)..prefetched_footer_length); + // Safety: checked above. + let buf: [u8; 8] = buf.to_vec().try_into().unwrap(); + decode_footer(&buf)? + }; + + // Try to read the metadata from the `prefetched_footer_content`. + // Otherwise, fetch exact metadata from the remote. + let buf = if prefetched_footer_length >= metadata_length + FOOTER_SIZE { + prefetched_footer_content.slice( + (prefetched_footer_length - metadata_length - FOOTER_SIZE) + ..(prefetched_footer_length - FOOTER_SIZE), + ) + } else { + self.inner + .read( + self.content_length - metadata_length as u64 - FOOTER_SIZE as u64 + ..self.content_length - FOOTER_SIZE as u64, + ) + .await + .map_err(|err| ParquetError::External(Box::new(err)))? + }; + + Ok(Arc::new(decode_metadata(&buf.to_vec())?)) + } + .boxed() + } +} + +#[cfg(test)] +mod tests { + use futures::StreamExt; + use opendal::{services, Operator}; + use rand::{distributions::Alphanumeric, Rng}; + + use crate::{async_reader::PREFETCH_FOOTER_SIZE, AsyncReader, AsyncWriter}; + use std::sync::Arc; + + use arrow::array::{ArrayRef, Int64Array, RecordBatch}; + use parquet::{ + arrow::{AsyncArrowWriter, ParquetRecordBatchStreamBuilder}, + file::properties::WriterProperties, + format::KeyValue, + }; + + #[tokio::test] + async fn test_async_reader_with_prefetch_footer_size() { + let operator = Operator::new(services::Memory::default()).unwrap().finish(); + let path = "/path/to/file.parquet"; + + let reader = AsyncReader::new(operator.reader(path).await.unwrap(), 1024); + assert_eq!(reader.prefetch_footer_size, 1024); + assert_eq!(reader.content_length, 1024); + + let reader = AsyncReader::new(operator.reader(path).await.unwrap(), 1024 * 1024); + assert_eq!(reader.prefetch_footer_size, PREFETCH_FOOTER_SIZE); + assert_eq!(reader.content_length, 1024 * 1024); + + let reader = AsyncReader::new(operator.reader(path).await.unwrap(), 1024 * 1024) + .with_prefetch_footer_size(2048 * 1024); + assert_eq!(reader.prefetch_footer_size, 1024 * 1024); + assert_eq!(reader.content_length, 1024 * 1024); + + let reader = AsyncReader::new(operator.reader(path).await.unwrap(), 1024 * 1024) + .with_prefetch_footer_size(1); + assert_eq!(reader.prefetch_footer_size, 8); + assert_eq!(reader.content_length, 1024 * 1024); + } + + fn gen_fixed_string(size: usize) -> String { + rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(size) + .map(char::from) + .collect() + } + + #[tokio::test] + async fn test_async_reader() { + let operator = Operator::new(services::Memory::default()).unwrap().finish(); + let path = "/path/to/file.parquet"; + let writer = AsyncWriter::new( + operator + .writer_with(path) + .chunk(32 * 1024 * 1024) + .concurrent(8) + .await + .unwrap(), + ); + + let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; + let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap(); + let mut writer = AsyncArrowWriter::try_new(writer, to_write.schema(), None).unwrap(); + writer.write(&to_write).await.unwrap(); + writer.close().await.unwrap(); + + let reader = operator.reader(path).await.unwrap(); + let content_len = operator.stat(path).await.unwrap().content_length(); + let reader = AsyncReader::new(reader, content_len); + let mut stream = ParquetRecordBatchStreamBuilder::new(reader) + .await + .unwrap() + .build() + .unwrap(); + let read = stream.next().await.unwrap().unwrap(); + assert_eq!(to_write, read); + } + + struct TestCase { + metadata_size: usize, + prefetch: Option, + } + + #[tokio::test] + async fn test_async_reader_with_large_metadata() { + for case in [ + TestCase { + metadata_size: 256 * 1024, + prefetch: None, + }, + TestCase { + metadata_size: 1024 * 1024, + prefetch: None, + }, + TestCase { + metadata_size: 256 * 1024, + prefetch: Some(4), + }, + TestCase { + metadata_size: 1024 * 1024, + prefetch: Some(4), + }, + ] { + let operator = Operator::new(services::Memory::default()).unwrap().finish(); + let path = "/path/to/file.parquet"; + let writer = AsyncWriter::new( + operator + .writer_with(path) + .chunk(32 * 1024 * 1024) + .concurrent(8) + .await + .unwrap(), + ); + + let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; + let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap(); + + let mut writer = AsyncArrowWriter::try_new( + writer, + to_write.schema(), + Some( + WriterProperties::builder() + .set_key_value_metadata(Some(vec![KeyValue { + key: "__metadata".to_string(), + value: Some(gen_fixed_string(case.metadata_size)), + }])) + .build(), + ), + ) + .unwrap(); + writer.write(&to_write).await.unwrap(); + writer.close().await.unwrap(); + + let reader = operator.reader(path).await.unwrap(); + let content_len = operator.stat(path).await.unwrap().content_length(); + let mut reader = AsyncReader::new(reader, content_len); + if let Some(footer_size) = case.prefetch { + reader = reader.with_prefetch_footer_size(footer_size); + } + let mut stream = ParquetRecordBatchStreamBuilder::new(reader) + .await + .unwrap() + .build() + .unwrap(); + let read = stream.next().await.unwrap().unwrap(); + assert_eq!(to_write, read); + } + } +} diff --git a/integrations/parquet/src/async_writer.rs b/integrations/parquet/src/async_writer.rs index 027c9214c05d..47266564c393 100644 --- a/integrations/parquet/src/async_writer.rs +++ b/integrations/parquet/src/async_writer.rs @@ -76,7 +76,7 @@ pub struct AsyncWriter { } impl AsyncWriter { - /// Create a [`OpendalAsyncWriter`] by given [`Writer`]. + /// Create a [`AsyncWriter`] by given [`Writer`]. pub fn new(writer: Writer) -> Self { Self { inner: writer } } diff --git a/integrations/parquet/src/lib.rs b/integrations/parquet/src/lib.rs index ded082d82371..ed5d932b10f9 100644 --- a/integrations/parquet/src/lib.rs +++ b/integrations/parquet/src/lib.rs @@ -21,12 +21,12 @@ //! //! ```no_run //! use std::sync::Arc; -//! //! use arrow::array::{ArrayRef, Int64Array, RecordBatch}; //! +//! use futures::StreamExt; //! use opendal::{services::S3Config, Operator}; -//! use parquet::arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, AsyncArrowWriter}; -//! use parquet_opendal::AsyncWriter; +//! use parquet::arrow::{AsyncArrowWriter, ParquetRecordBatchStreamBuilder}; +//! use parquet_opendal::{AsyncReader, AsyncWriter}; //! //! #[tokio::main] //! async fn main() { @@ -57,16 +57,29 @@ //! writer.write(&to_write).await.unwrap(); //! writer.close().await.unwrap(); //! -//! let buffer = operator.read(path).await.unwrap().to_bytes(); -//! let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer) +//! // gap: Allow the underlying reader to merge small IOs +//! // when the gap between multiple IO ranges is less than the threshold. +//! let reader = operator +//! .reader_with(path) +//! .gap(512 * 1024) +//! .chunk(16 * 1024 * 1024) +//! .concurrent(16) +//! .await +//! .unwrap(); +//! let content_len = operator.stat(path).await.unwrap().content_length(); +//! let reader = AsyncReader::new(reader, content_len).with_prefetch_footer_size(512 * 1024); +//! let mut stream = ParquetRecordBatchStreamBuilder::new(reader) +//! .await //! .unwrap() //! .build() //! .unwrap(); -//! let read = reader.next().unwrap().unwrap(); +//! let read = stream.next().await.unwrap().unwrap(); //! assert_eq!(to_write, read); //! } //! ``` +mod async_reader; mod async_writer; +pub use async_reader::AsyncReader; pub use async_writer::AsyncWriter;