Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add custom iceberg metadata cleaner #621

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

vamsikarnika
Copy link

Important Read

  • Please ensure the GitHub issue is mentioned at the beginning of the PR

What is the purpose of the pull request

(For example: This pull request implements the sync for delta format.)

Brief change log

(for example:)

  • Fixed JSON parsing error when persisting state
  • Added unit tests for schema evolution

Verify this pull request

(Please pick either of the following options)

This pull request is a trivial rework / code cleanup without any test coverage.

(or)

This pull request is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end.
  • Added TestConversionController to verify the change.
  • Manually verified the change by running a job locally.

@vamsikarnika vamsikarnika marked this pull request as draft January 8, 2025 16:30
.expireOlderThan(
Instant.now().minus(snapshotRetentionInHours, ChronoUnit.HOURS).toEpochMilli())
.cleanExpiredFiles(!useInternalIcebergCleaner); // is internal cleaner is enabled, disable iceberg cleaner
List<Snapshot> removedSnapshots = expireSnapshots.apply();
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expiredSnapshots.apply() will apply the changes and returns list of snapshots that will be deleted based on provided parameters without committing them.

// only remove files that were deleted in an ancestor of the current table state to avoid
// physically deleting files that were logically deleted in a commit that was rolled back.
Set<Long> ancestorIds =
Sets.newHashSet(SnapshotUtil.ancestorIds(latest, snapshotLookup(removedSnapshots, table)));
Copy link
Author

@vamsikarnika vamsikarnika Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To fetch the ancestors of table before expiring snapshots, we assume total table snapshots before committing = total table snapshots after committing + removed snapshots

transaction.commitTransaction();
// after commit is complete, clean up the manifest files
if (useInternalIcebergCleaner) {
cleanExpiredSnapshots(removedSnapshots);
Copy link
Author

@vamsikarnika vamsikarnika Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clean up expired snapshots, we're relying on table's current state and list of removed snapshots.

Another approach would be compare metadata before expire snapshots is committed and after. This would give us the exact changes expire snapshots commit has made (Iceberg uses this approach).

But the problem with this approach is how to get the intermediate TableMetadata before commit is made, since Transaction class doesn't expose intermediate state before transaction is committed. One way is casting Transaction to BaseTransaction which exposes TransactionTableOperations which provides intermediate TableMetadata

 TableMetadata beforeExpiration = ((BaseTransaction) transaction).underlyingOps().refresh();
 transaction
        .expireSnapshots()
        .expireOlderThan(
            Instant.now().minus(snapshotRetentionInHours, ChronoUnit.HOURS).toEpochMilli())
        .deleteWith(this::safeDelete) // ensures that only metadata files are deleted
        .cleanExpiredFiles(true)
        .commit();
  TableMetadata afterExpiration = ((BaseTransaction) transaction).underlyingOps().refresh();
  // do metadata clean based on before and after expiration metadata
  cleanUp(beforeExpiration, afterExpiration)  

Comment on lines +247 to +250
private void cleanExpiredSnapshots(List<Snapshot> removedSnapshots) {
IcebergMetadataCleanupStrategy cleanupStrategy = new IcebergMetadataFileCleaner(transaction.table().io(), deleteExecutorService, planExecutorService, this::safeDelete);
cleanupStrategy.cleanFiles(table, removedSnapshots);
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be run in async to further improve the performance

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant