Skip to content

Conversation

CTTY
Copy link
Contributor

@CTTY CTTY commented Oct 10, 2025

Which issue does this PR close?

What changes are included in this PR?

New:

  • Added new partitioning module with PartitioningWriter trait
  • ClusteredWriter: Optimized for pre-sorted data, requires writing in partition order
  • FanoutWriter: Flexible writer that can handle data from any partition at any time

Modification:

  • (BREAKING) Modified DataFileWriterBuilder to support dynamic partition assignment
  • Updated DataFusion integration to use the new writer API

Are these changes tested?

Added unit tests

/// Build the iceberg writer.
async fn build(self) -> Result<Self::R>;
/// Build the iceberg writer for an optional partition key.
async fn build_with_partition(self, partition_key: Option<PartitionKey>) -> Result<Self::R>;
Copy link
Contributor Author

@CTTY CTTY Oct 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change. I believe this is necessary because:

  1. IcebergWriter is supposed to generate DataFile that always hold a partition value according to iceberg spec.

  2. The existing code store partition value in the builder directly, making builder.clone() useless:

let builder = IcebergWriterBuilder::new(partition_A);
let writer_A = builder.build();
... // write to partition A

// done with partition A and now we need to write to partition B
// this is wrong because partition value A is still stored in the builder
let writer_B = builder.clone().build() 

An alternative is to add a new method clone_with_partition() but that would also be a breaking change and it's less clean compared to build_with_partition()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with this change, but I want a further change as following:

async fn build(&self, partition_key: Option<PartitionKey>) -> Result<Self::R>

If the builder could be reused for creating actual IcebergWriter, I want to avoid cloning.

Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @CTTY for this pr! Just finished first round of review, and I think we are on the right track!

/// # Returns
///
/// `Ok(())` on success, or an error if the write operation fails.
async fn write(&mut self, partition_key: Option<PartitionKey>, input: I) -> Result<()>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
async fn write(&mut self, partition_key: Option<PartitionKey>, input: I) -> Result<()>;
async fn write(&mut self, partition_key: PartitionKey, input: I) -> Result<()>;

For partitioning writer, it should always be partitioned?

Copy link
Contributor Author

@CTTY CTTY Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was planning to have partitioning writer to take care of unpartitioned data as well, but I think you are right that we can have an explicit unpartitioned writer (basically a wrapper of iceberg writer) and have TaskWriter to decide which one to use.

will fix this

let partition_value = partition_key.data();

// Check if this partition has been closed already
if self.closed_partitions.contains(partition_value) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's odd to add the check here. It's the caller's responsibility to ensure that inputs are sorted, but if it's not, we should not throw error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was referring to Java's behavior when writing this.

When looking at the original java PR I don't see any explicit explanation, but I think this can force users to be aware of if their data source is sorted and help identify hidden performance issues

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with following java's logic.

/// Build the iceberg writer.
async fn build(self) -> Result<Self::R>;
/// Build the iceberg writer for an optional partition key.
async fn build_with_partition(self, partition_key: Option<PartitionKey>) -> Result<Self::R>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with this change, but I want a further change as following:

async fn build(&self, partition_key: Option<PartitionKey>) -> Result<Self::R>

If the builder could be reused for creating actual IcebergWriter, I want to avoid cloning.

/// * `B` - The inner writer builder type
/// * `I` - Input type (defaults to `RecordBatch`)
/// * `O` - Output collection type (defaults to `Vec<DataFile>`)
#[derive(Clone)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need this? Cloning a FanoutWriter is ambiguous, since it contains states like opened writers and data files.

let partition_value = partition_key.data();

// Check if this partition has been closed already
if self.closed_partitions.contains(partition_value) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with following java's logic.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement fanout partitioned data writer.

2 participants