Skip to content

Commit

Permalink
feat: Extract FileRead and FileWrite trait (#364)
Browse files Browse the repository at this point in the history
* feat: Extract FileRead and FileWrie trait

Signed-off-by: Xuanwo <[email protected]>

* Enable s3 services for tests

Signed-off-by: Xuanwo <[email protected]>

* Fix sort

Signed-off-by: Xuanwo <[email protected]>

* Add comment for io trait

Signed-off-by: Xuanwo <[email protected]>

* Fix test for rest

Signed-off-by: Xuanwo <[email protected]>

* Use try join

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored May 14, 2024
1 parent ffb691d commit 3b8121e
Show file tree
Hide file tree
Showing 17 changed files with 315 additions and 149 deletions.
12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
[workspace]
resolver = "2"
members = [
"crates/catalog/*",
"crates/examples",
"crates/iceberg",
"crates/integrations/*",
"crates/test_utils",
"crates/catalog/*",
"crates/examples",
"crates/iceberg",
"crates/integrations/*",
"crates/test_utils",
]

[workspace.package]
Expand Down Expand Up @@ -64,7 +64,7 @@ log = "^0.4"
mockito = "^1"
murmur3 = "0.5.2"
once_cell = "1"
opendal = "0.45"
opendal = "0.46"
ordered-float = "4.0.0"
parquet = "51"
pilota = "0.11.0"
Expand Down
1 change: 1 addition & 0 deletions crates/catalog/glue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,5 @@ uuid = { workspace = true }

[dev-dependencies]
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
opendal = { workspace = true, features = ["services-s3"] }
port_scanner = { workspace = true }
15 changes: 5 additions & 10 deletions crates/catalog/glue/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use iceberg::{
TableIdent,
};
use std::{collections::HashMap, fmt::Debug};
use tokio::io::{AsyncReadExt, AsyncWriteExt};

use typed_builder::TypedBuilder;

Expand Down Expand Up @@ -358,13 +357,10 @@ impl Catalog for GlueCatalog {
let metadata = TableMetadataBuilder::from_table_creation(creation)?.build()?;
let metadata_location = create_metadata_location(&location, 0)?;

let mut file = self
.file_io
self.file_io
.new_output(&metadata_location)?
.writer()
.write(serde_json::to_vec(&metadata)?.into())
.await?;
file.write_all(&serde_json::to_vec(&metadata)?).await?;
file.shutdown().await?;

let glue_table = convert_to_glue_table(
&table_name,
Expand Down Expand Up @@ -431,10 +427,9 @@ impl Catalog for GlueCatalog {
Some(table) => {
let metadata_location = get_metadata_location(&table.parameters)?;

let mut reader = self.file_io.new_input(&metadata_location)?.reader().await?;
let mut metadata_str = String::new();
reader.read_to_string(&mut metadata_str).await?;
let metadata = serde_json::from_str::<TableMetadata>(&metadata_str)?;
let input_file = self.file_io.new_input(&metadata_location)?;
let metadata_content = input_file.read().await?;
let metadata = serde_json::from_slice::<TableMetadata>(&metadata_content)?;

let table = Table::builder()
.file_io(self.file_io())
Expand Down
1 change: 1 addition & 0 deletions crates/catalog/hms/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,5 @@ volo-thrift = { workspace = true }

[dev-dependencies]
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
opendal = { workspace = true, features = ["services-s3"] }
port_scanner = { workspace = true }
15 changes: 4 additions & 11 deletions crates/catalog/hms/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ use iceberg::{
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::net::ToSocketAddrs;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use typed_builder::TypedBuilder;
use volo_thrift::ResponseError;

Expand Down Expand Up @@ -349,13 +347,10 @@ impl Catalog for HmsCatalog {
let metadata = TableMetadataBuilder::from_table_creation(creation)?.build()?;
let metadata_location = create_metadata_location(&location, 0)?;

let mut file = self
.file_io
self.file_io
.new_output(&metadata_location)?
.writer()
.write(serde_json::to_vec(&metadata)?.into())
.await?;
file.write_all(&serde_json::to_vec(&metadata)?).await?;
file.shutdown().await?;

let hive_table = convert_to_hive_table(
db_name.clone(),
Expand Down Expand Up @@ -406,10 +401,8 @@ impl Catalog for HmsCatalog {

let metadata_location = get_metadata_location(&hive_table.parameters)?;

let mut reader = self.file_io.new_input(&metadata_location)?.reader().await?;
let mut metadata_str = String::new();
reader.read_to_string(&mut metadata_str).await?;
let metadata = serde_json::from_str::<TableMetadata>(&metadata_str)?;
let metadata_content = self.file_io.new_input(&metadata_location)?.read().await?;
let metadata = serde_json::from_slice::<TableMetadata>(&metadata_content)?;

let table = Table::builder()
.file_io(self.file_io())
Expand Down
1 change: 1 addition & 0 deletions crates/catalog/rest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,6 @@ uuid = { workspace = true, features = ["v4"] }
[dev-dependencies]
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
mockito = { workspace = true }
opendal = { workspace = true, features = ["services-fs"] }
port_scanner = { workspace = true }
tokio = { workspace = true }
59 changes: 53 additions & 6 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,21 @@
use arrow_schema::SchemaRef as ArrowSchemaRef;
use async_stream::try_stream;
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::stream::StreamExt;
use futures::{try_join, TryFutureExt};
use parquet::arrow::async_reader::{AsyncFileReader, MetadataLoader};
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY};
use parquet::file::metadata::ParquetMetaData;
use parquet::schema::types::SchemaDescriptor;
use std::collections::HashMap;
use std::ops::Range;
use std::str::FromStr;
use std::sync::Arc;

use crate::arrow::arrow_schema_to_schema;
use crate::io::FileIO;
use crate::io::{FileIO, FileMetadata, FileRead};
use crate::scan::{ArrowRecordBatchStream, FileScanTaskStream};
use crate::spec::SchemaRef;
use crate::{Error, ErrorKind};
Expand Down Expand Up @@ -91,12 +98,12 @@ impl ArrowReader {

Ok(try_stream! {
while let Some(Ok(task)) = tasks.next().await {
let parquet_reader = file_io
.new_input(task.data().data_file().file_path())?
.reader()
.await?;
let parquet_file = file_io
.new_input(task.data().data_file().file_path())?;
let (parquet_metadata, parquet_reader) = try_join!(parquet_file.metadata(), parquet_file.reader())?;
let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);

let mut batch_stream_builder = ParquetRecordBatchStreamBuilder::new(parquet_reader)
let mut batch_stream_builder = ParquetRecordBatchStreamBuilder::new(arrow_file_reader)
.await?;

let parquet_schema = batch_stream_builder.parquet_schema();
Expand Down Expand Up @@ -187,3 +194,43 @@ impl ArrowReader {
}
}
}

/// ArrowFileReader is a wrapper around a FileRead that impls parquets AsyncFileReader.
///
/// # TODO
///
/// [ParquetObjectReader](https://docs.rs/parquet/latest/src/parquet/arrow/async_reader/store.rs.html#64) contains the following hints to speed up metadata loading, we can consider adding them to this struct:
///
/// - `metadata_size_hint`: Provide a hint as to the size of the parquet file's footer.
/// - `preload_column_index`: Load the Column Index as part of [`Self::get_metadata`].
/// - `preload_offset_index`: Load the Offset Index as part of [`Self::get_metadata`].
struct ArrowFileReader<R: FileRead> {
meta: FileMetadata,
r: R,
}

impl<R: FileRead> ArrowFileReader<R> {
/// Create a new ArrowFileReader
fn new(meta: FileMetadata, r: R) -> Self {
Self { meta, r }
}
}

impl<R: FileRead> AsyncFileReader for ArrowFileReader<R> {
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
Box::pin(
self.r
.read(range.start as _..range.end as _)
.map_err(|err| parquet::errors::ParquetError::External(Box::new(err))),
)
}

fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
Box::pin(async move {
let file_size = self.meta.size;
let mut loader = MetadataLoader::load(self, file_size as usize, None).await?;
loader.load_page_index(false, false).await?;
Ok(Arc::new(loader.finish()))
})
}
}
119 changes: 100 additions & 19 deletions crates/iceberg/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,13 @@
//! - `new_input`: Create input file for reading.
//! - `new_output`: Create output file for writing.
use bytes::Bytes;
use std::ops::Range;
use std::{collections::HashMap, sync::Arc};

use crate::{error::Result, Error, ErrorKind};
use futures::{AsyncRead, AsyncSeek, AsyncWrite};
use once_cell::sync::Lazy;
use opendal::{Operator, Scheme};
use tokio::io::AsyncWrite as TokioAsyncWrite;
use tokio::io::{AsyncRead as TokioAsyncRead, AsyncSeek as TokioAsyncSeek};
use url::Url;

/// Following are arguments for [s3 file io](https://py.iceberg.apache.org/configuration/#s3).
Expand Down Expand Up @@ -206,6 +205,35 @@ impl FileIO {
}
}

/// The struct the represents the metadata of a file.
///
/// TODO: we can add last modified time, content type, etc. in the future.
pub struct FileMetadata {
/// The size of the file.
pub size: u64,
}

/// Trait for reading file.
///
/// # TODO
///
/// It's possible for us to remove the async_trait, but we need to figure
/// out how to handle the object safety.
#[async_trait::async_trait]
pub trait FileRead: Send + Unpin + 'static {
/// Read file content with given range.
///
/// TODO: we can support reading non-contiguous bytes in the future.
async fn read(&self, range: Range<u64>) -> Result<Bytes>;
}

#[async_trait::async_trait]
impl FileRead for opendal::Reader {
async fn read(&self, range: Range<u64>) -> Result<Bytes> {
Ok(opendal::Reader::read(self, range).await?.to_bytes())
}
}

/// Input file is used for reading from files.
#[derive(Debug)]
pub struct InputFile {
Expand All @@ -216,14 +244,6 @@ pub struct InputFile {
relative_path_pos: usize,
}

/// Trait for reading file.
pub trait FileRead: AsyncRead + AsyncSeek + Send + Unpin + TokioAsyncRead + TokioAsyncSeek {}

impl<T> FileRead for T where
T: AsyncRead + AsyncSeek + Send + Unpin + TokioAsyncRead + TokioAsyncSeek
{
}

impl InputFile {
/// Absolute path to root uri.
pub fn location(&self) -> &str {
Expand All @@ -238,16 +258,63 @@ impl InputFile {
.await?)
}

/// Creates [`InputStream`] for reading.
/// Fetch and returns metadata of file.
pub async fn metadata(&self) -> Result<FileMetadata> {
let meta = self.op.stat(&self.path[self.relative_path_pos..]).await?;

Ok(FileMetadata {
size: meta.content_length(),
})
}

/// Read and returns whole content of file.
///
/// For continues reading, use [`Self::reader`] instead.
pub async fn read(&self) -> Result<Bytes> {
Ok(self
.op
.read(&self.path[self.relative_path_pos..])
.await?
.to_bytes())
}

/// Creates [`FileRead`] for continues reading.
///
/// For one-time reading, use [`Self::read`] instead.
pub async fn reader(&self) -> Result<impl FileRead> {
Ok(self.op.reader(&self.path[self.relative_path_pos..]).await?)
}
}

/// Trait for writing file.
pub trait FileWrite: AsyncWrite + TokioAsyncWrite + Send + Unpin {}
///
/// # TODO
///
/// It's possible for us to remove the async_trait, but we need to figure
/// out how to handle the object safety.
#[async_trait::async_trait]
pub trait FileWrite: Send + Unpin + 'static {
/// Write bytes to file.
///
/// TODO: we can support writing non-contiguous bytes in the future.
async fn write(&mut self, bs: Bytes) -> Result<()>;

impl<T> FileWrite for T where T: AsyncWrite + TokioAsyncWrite + Send + Unpin {}
/// Close file.
///
/// Calling close on closed file will generate an error.
async fn close(&mut self) -> Result<()>;
}

#[async_trait::async_trait]
impl FileWrite for opendal::Writer {
async fn write(&mut self, bs: Bytes) -> Result<()> {
Ok(opendal::Writer::write(self, bs).await?)
}

async fn close(&mut self) -> Result<()> {
Ok(opendal::Writer::close(self).await?)
}
}

/// Output file is used for writing to files..
#[derive(Debug)]
Expand Down Expand Up @@ -282,7 +349,23 @@ impl OutputFile {
}
}

/// Creates output file for writing.
/// Create a new output file with given bytes.
///
/// # Notes
///
/// Calling `write` will overwrite the file if it exists.
/// For continues writing, use [`Self::writer`].
pub async fn write(&self, bs: Bytes) -> Result<()> {
let mut writer = self.writer().await?;
writer.write(bs).await?;
writer.close().await
}

/// Creates output file for continues writing.
///
/// # Notes
///
/// For one-time writing, use [`Self::write`] instead.
pub async fn writer(&self) -> Result<Box<dyn FileWrite>> {
Ok(Box::new(
self.op.writer(&self.path[self.relative_path_pos..]).await?,
Expand Down Expand Up @@ -398,7 +481,7 @@ mod tests {
use std::{fs::File, path::Path};

use futures::io::AllowStdIo;
use futures::{AsyncReadExt, AsyncWriteExt};
use futures::AsyncReadExt;

use tempfile::TempDir;

Expand Down Expand Up @@ -483,9 +566,7 @@ mod tests {

assert!(!output_file.exists().await.unwrap());
{
let mut writer = output_file.writer().await.unwrap();
writer.write_all(content.as_bytes()).await.unwrap();
writer.close().await.unwrap();
output_file.write(content.into()).await.unwrap();
}

assert_eq!(&full_path, output_file.location());
Expand Down
Loading

0 comments on commit 3b8121e

Please sign in to comment.