Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add custom iceberg metadata cleaner #621

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
@EqualsAndHashCode(callSuper = true)
public class TargetTable extends ExternalTable {
private final Duration metadataRetention;
private final boolean useInternalMetadataCleaner;

@Builder(toBuilder = true)
public TargetTable(
Expand All @@ -39,9 +40,11 @@ public TargetTable(
String[] namespace,
CatalogConfig catalogConfig,
Duration metadataRetention,
boolean useInternalMetadataCleaner,
Properties additionalProperties) {
super(name, formatName, basePath, namespace, catalogConfig, additionalProperties);
this.metadataRetention =
metadataRetention == null ? Duration.of(7, ChronoUnit.DAYS) : metadataRetention;
this.useInternalMetadataCleaner = useInternalMetadataCleaner;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.iceberg;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;

import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.log4j.Log4j2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import org.apache.iceberg.ExpireSnapshots;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
Expand All @@ -39,6 +42,7 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NotFoundException;

import org.apache.iceberg.util.ThreadPools;
import org.apache.xtable.conversion.TargetTable;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.metadata.TableSyncMetadata;
Expand All @@ -51,6 +55,10 @@

@Log4j2
public class IcebergConversionTarget implements ConversionTarget {

private static final ExecutorService DEFAULT_DELETE_EXECUTOR_SERVICE =
MoreExecutors.newDirectExecutorService();

private static final String METADATA_DIR_PATH = "/metadata/";
private IcebergSchemaExtractor schemaExtractor;
private IcebergSchemaSync schemaSync;
Expand All @@ -66,8 +74,10 @@ public class IcebergConversionTarget implements ConversionTarget {
private Transaction transaction;
private Table table;
private InternalTable internalTableState;
private boolean useInternalMetadataCleaner;

public IcebergConversionTarget() {}
private final ExecutorService deleteExecutorService = DEFAULT_DELETE_EXECUTOR_SERVICE;
private final ExecutorService planExecutorService = ThreadPools.getWorkerPool();

IcebergConversionTarget(
TargetTable targetTable,
Expand Down Expand Up @@ -107,6 +117,7 @@ private void _init(
this.basePath = targetTable.getBasePath();
this.configuration = configuration;
this.snapshotRetentionInHours = (int) targetTable.getMetadataRetention().toHours();
this.useInternalMetadataCleaner = targetTable.isUseInternalMetadataCleaner();
String[] namespace = targetTable.getNamespace();
this.tableIdentifier =
namespace == null
Expand Down Expand Up @@ -211,18 +222,34 @@ public void syncFilesForDiff(DataFilesDiff dataFilesDiff) {

@Override
public void completeSync() {
transaction
.expireSnapshots()
.expireOlderThan(
Instant.now().minus(snapshotRetentionInHours, ChronoUnit.HOURS).toEpochMilli())
.deleteWith(this::safeDelete) // ensures that only metadata files are deleted
.cleanExpiredFiles(true)
.commit();
boolean useInternalIcebergCleaner = useInternalCleaner();
ExpireSnapshots expireSnapshots =
transaction
.expireSnapshots()
.expireOlderThan(
Instant.now().minus(snapshotRetentionInHours, ChronoUnit.HOURS).toEpochMilli())
.cleanExpiredFiles(!useInternalIcebergCleaner); // is internal cleaner is enabled, disable iceberg cleaner
List<Snapshot> removedSnapshots = expireSnapshots.apply();
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expiredSnapshots.apply() will apply the changes and returns list of snapshots that will be deleted based on provided parameters without committing them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does expiredSnapshots.apply() do a a list for the metadata folder for this ? I was trying to understand how expensive this could be for example if we retained 2 days of manifests assuming we have a commit every 5min.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this will only operate on TableMetadata to compute the snapshots needs to be retained based on the provided retention time. Metadata file listing happens only as part of cleaning.

expireSnapshots.commit();
transaction.commitTransaction();
// after commit is complete, clean up the manifest files
if (useInternalIcebergCleaner) {
cleanExpiredSnapshots(removedSnapshots);
Copy link
Author

@vamsikarnika vamsikarnika Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clean up expired snapshots, we're relying on table's current state and list of removed snapshots.

Another approach would be compare metadata before expire snapshots is committed and after. This would give us the exact changes expire snapshots commit has made (Iceberg uses this approach).

But the problem with this approach is how to get the intermediate TableMetadata before commit is made, since Transaction class doesn't expose intermediate state before transaction is committed. One way is casting Transaction to BaseTransaction which exposes TransactionTableOperations which provides intermediate TableMetadata

 TableMetadata beforeExpiration = ((BaseTransaction) transaction).underlyingOps().refresh();
 transaction
        .expireSnapshots()
        .expireOlderThan(
            Instant.now().minus(snapshotRetentionInHours, ChronoUnit.HOURS).toEpochMilli())
        .deleteWith(this::safeDelete) // ensures that only metadata files are deleted
        .cleanExpiredFiles(true)
        .commit();
  TableMetadata afterExpiration = ((BaseTransaction) transaction).underlyingOps().refresh();
  // do metadata clean based on before and after expiration metadata
  cleanUp(beforeExpiration, afterExpiration)  

}
transaction = null;
internalTableState = null;
}

private boolean useInternalCleaner() {
return useInternalMetadataCleaner && table.refs().size() == 1;
}

private void cleanExpiredSnapshots(List<Snapshot> removedSnapshots) {
IcebergMetadataCleanupStrategy cleanupStrategy = new IcebergMetadataFileCleaner(transaction.table().io(), deleteExecutorService, planExecutorService, this::safeDelete);
cleanupStrategy.cleanFiles(table, removedSnapshots);
}
Comment on lines +247 to +250
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be run in async to further improve the performance



private void safeDelete(String file) {
if (file.startsWith(new Path(basePath) + METADATA_DIR_PATH)) {
table.io().deleteFile(file);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.iceberg;

import lombok.extern.log4j.Log4j2;
import org.apache.iceberg.GenericManifestFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;

@Log4j2
abstract class IcebergMetadataCleanupStrategy {
private static final Logger LOG = LoggerFactory.getLogger(IcebergMetadataCleanupStrategy.class);

protected final FileIO fileIO;
protected final ExecutorService planExecutorService;
private final Consumer<String> deleteFunc;
private final ExecutorService deleteExecutorService;

protected IcebergMetadataCleanupStrategy(
FileIO fileIO,
ExecutorService deleteExecutorService,
ExecutorService planExecutorService,
Consumer<String> deleteFunc) {
this.fileIO = fileIO;
this.deleteExecutorService = deleteExecutorService;
this.planExecutorService = planExecutorService;
this.deleteFunc = deleteFunc;
}

public abstract void cleanFiles(Table table, List<Snapshot> removedSnapshots);

private static final Schema MANIFEST_PROJECTION =
ManifestFile.schema()
.select(
"manifest_path",
"manifest_length",
"partition_spec_id",
"added_snapshot_id",
"deleted_data_files_count");

protected CloseableIterable<ManifestFile> readManifests(Snapshot snapshot) {
if (snapshot.manifestListLocation() != null) {
return Avro.read(fileIO.newInputFile(snapshot.manifestListLocation()))
.rename("manifest_file", GenericManifestFile.class.getName())
.classLoader(GenericManifestFile.class.getClassLoader())
.project(MANIFEST_PROJECTION)
.reuseContainers(true)
.build();
} else {
return CloseableIterable.withNoopClose(snapshot.allManifests(fileIO));
}
}

protected void deleteFiles(Set<String> pathsToDelete, String fileType) {
Tasks.foreach(pathsToDelete)
.executeWith(deleteExecutorService)
.retry(3)
.stopRetryOn(NotFoundException.class)
.suppressFailureWhenFinished()
.onFailure(
(file, thrown) -> LOG.warn("Delete failed for {} file: {}", fileType, file, thrown))
.run(deleteFunc::accept);
}
}
Loading
Loading