Skip to content

Commit

Permalink
feat: introduce opendal AsyncReader for parquet integrations (#4972)
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu authored Aug 7, 2024
1 parent 1f4c119 commit d184c69
Show file tree
Hide file tree
Showing 5 changed files with 428 additions and 7 deletions.
4 changes: 4 additions & 0 deletions integrations/parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,7 @@ arrow = { version = "52.0" }
[[example]]
name = "async_writer"
path = "examples/async_writer.rs"

[[example]]
name = "async_reader"
path = "examples/async_reader.rs"
57 changes: 57 additions & 0 deletions integrations/parquet/examples/async_reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use std::sync::Arc;

use arrow::array::{ArrayRef, Int64Array, RecordBatch};

use futures::StreamExt;
use opendal::{services::S3Config, Operator};
use parquet::arrow::{AsyncArrowWriter, ParquetRecordBatchStreamBuilder};
use parquet_opendal::{AsyncReader, AsyncWriter};

#[tokio::main]
async fn main() {
let mut cfg = S3Config::default();
cfg.access_key_id = Some("my_access_key".to_string());
cfg.secret_access_key = Some("my_secret_key".to_string());
cfg.endpoint = Some("my_endpoint".to_string());
cfg.region = Some("my_region".to_string());
cfg.bucket = "my_bucket".to_string();

// Create a new operator
let operator = Operator::from_config(cfg).unwrap().finish();
let path = "/path/to/file.parquet";

// Create an async writer
let writer = AsyncWriter::new(
operator
.writer_with(path)
.chunk(32 * 1024 * 1024)
.concurrent(8)
.await
.unwrap(),
);

let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
let mut writer = AsyncArrowWriter::try_new(writer, to_write.schema(), None).unwrap();
writer.write(&to_write).await.unwrap();
writer.close().await.unwrap();

// gap: Allow the underlying reader to merge small IOs
// when the gap between multiple IO ranges is less than the threshold.
let reader = operator
.reader_with(path)
.gap(512 * 1024)
.chunk(16 * 1024 * 1024)
.concurrent(16)
.await
.unwrap();
let content_len = operator.stat(path).await.unwrap().content_length();
let reader = AsyncReader::new(reader, content_len).with_prefetch_footer_size(512 * 1024);
let mut stream = ParquetRecordBatchStreamBuilder::new(reader)
.await
.unwrap()
.build()
.unwrap();
let read = stream.next().await.unwrap().unwrap();
assert_eq!(to_write, read);
}
Loading

0 comments on commit d184c69

Please sign in to comment.