Skip to content

Commit

Permalink
Merge pull request #42 from JanKaul/partition-record-batches
Browse files Browse the repository at this point in the history
avoid collecting the streams of partition streams
  • Loading branch information
JanKaul authored Oct 3, 2024
2 parents 4f58316 + 8392737 commit f8d852c
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 9 deletions.
15 changes: 8 additions & 7 deletions iceberg-rust/src/arrow/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@ pub async fn partition_record_batches(
partition_spec: &PartitionSpec,
schema: &Schema,
) -> Result<
Vec<(
Vec<Value>,
impl Stream<Item = Result<RecordBatch, ArrowError>> + Send,
)>,
impl Stream<
Item = (
Vec<Value>,
impl Stream<Item = Result<RecordBatch, ArrowError>> + Send,
),
>,
ArrowError,
> {
#[allow(clippy::type_complexity)]
Expand Down Expand Up @@ -166,8 +168,7 @@ pub async fn partition_record_batches(
.into_values()
.for_each(|sender| sender.close_channel());
partition_sender.close_channel();
let receivers = partition_receiver.collect().await;
Ok(receivers)
Ok(partition_receiver)
}

fn distinct_values(array: ArrayRef) -> Result<DistinctValues, ArrowError> {
Expand Down Expand Up @@ -310,7 +311,7 @@ mod tests {
let streams = partition_record_batches(record_batches, &partition_spec, &schema)
.await
.unwrap();
let output = stream::iter(streams.into_iter())
let output = streams
.then(|s| async move { s.1.collect::<Vec<_>>().await })
.collect::<Vec<_>>()
.await;
Expand Down
4 changes: 2 additions & 2 deletions iceberg-rust/src/arrow/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use futures::{
channel::mpsc::{channel, unbounded, Receiver, Sender},
lock::Mutex,
stream, SinkExt, StreamExt, TryStreamExt,
SinkExt, StreamExt, TryStreamExt,
};
use object_store::{buffered::BufWriter, ObjectStore};
use std::sync::{
Expand Down Expand Up @@ -66,7 +66,7 @@ pub async fn write_parquet_partitioned(
} else {
let streams = partition_record_batches(batches, partition_spec, schema).await?;

stream::iter(streams.into_iter())
streams
.map(Ok::<_, ArrowError>)
.try_for_each_concurrent(None, |(partition_values, batches)| {
let arrow_schema = arrow_schema.clone();
Expand Down

0 comments on commit f8d852c

Please sign in to comment.