From 8392737cb757cbeaeb8ce3c992059a5ce30f2cef Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Thu, 3 Oct 2024 08:27:33 +0200 Subject: [PATCH] avoid collecting the streams of partition streams --- iceberg-rust/src/arrow/partition.rs | 15 ++++++++------- iceberg-rust/src/arrow/write.rs | 4 ++-- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/iceberg-rust/src/arrow/partition.rs b/iceberg-rust/src/arrow/partition.rs index 3fbfe779..ded526db 100644 --- a/iceberg-rust/src/arrow/partition.rs +++ b/iceberg-rust/src/arrow/partition.rs @@ -42,10 +42,12 @@ pub async fn partition_record_batches( partition_spec: &PartitionSpec, schema: &Schema, ) -> Result< - Vec<( - Vec, - impl Stream> + Send, - )>, + impl Stream< + Item = ( + Vec, + impl Stream> + Send, + ), + >, ArrowError, > { #[allow(clippy::type_complexity)] @@ -166,8 +168,7 @@ pub async fn partition_record_batches( .into_values() .for_each(|sender| sender.close_channel()); partition_sender.close_channel(); - let receivers = partition_receiver.collect().await; - Ok(receivers) + Ok(partition_receiver) } fn distinct_values(array: ArrayRef) -> Result { @@ -310,7 +311,7 @@ mod tests { let streams = partition_record_batches(record_batches, &partition_spec, &schema) .await .unwrap(); - let output = stream::iter(streams.into_iter()) + let output = streams .then(|s| async move { s.1.collect::>().await }) .collect::>() .await; diff --git a/iceberg-rust/src/arrow/write.rs b/iceberg-rust/src/arrow/write.rs index ee2ac307..2ee6c09c 100644 --- a/iceberg-rust/src/arrow/write.rs +++ b/iceberg-rust/src/arrow/write.rs @@ -5,7 +5,7 @@ use futures::{ channel::mpsc::{channel, unbounded, Receiver, Sender}, lock::Mutex, - stream, SinkExt, StreamExt, TryStreamExt, + SinkExt, StreamExt, TryStreamExt, }; use object_store::{buffered::BufWriter, ObjectStore}; use std::sync::{ @@ -66,7 +66,7 @@ pub async fn write_parquet_partitioned( } else { let streams = partition_record_batches(batches, partition_spec, schema).await?; - stream::iter(streams.into_iter()) + streams .map(Ok::<_, ArrowError>) .try_for_each_concurrent(None, |(partition_values, batches)| { let arrow_schema = arrow_schema.clone();