Skip to content

Commit

Permalink
Add a configuration to optionally disable drop with purge (#270)
Browse files Browse the repository at this point in the history
  • Loading branch information
eric-maynard authored Sep 9, 2024
1 parent 17dd740 commit 7d1b8f2
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,29 @@ public static <T> Builder<T> builder() {
"If set to true, allows tables to have external locations outside the default structure.")
.defaultValue(false)
.build();

public static final PolarisConfiguration<Boolean> CLEANUP_ON_NAMESPACE_DROP =
PolarisConfiguration.<Boolean>builder()
.key("CLEANUP_ON_NAMESPACE_DROP")
.catalogConfig("cleanup.on.namespace.drop")
.description("If set to true, clean up data when a namespace is dropped")
.defaultValue(false)
.build();

public static final PolarisConfiguration<Boolean> CLEANUP_ON_CATALOG_DROP =
PolarisConfiguration.<Boolean>builder()
.key("CLEANUP_ON_CATALOG_DROP")
.catalogConfig("cleanup.on.catalog.drop")
.description("If set to true, clean up data when a catalog is dropped")
.defaultValue(false)
.build();

public static final PolarisConfiguration<Boolean> DROP_WITH_PURGE_ENABLED =
PolarisConfiguration.<Boolean>builder()
.key("DROP_WITH_PURGE_ENABLED")
.catalogConfig("drop-with-purge.enabled")
.description(
"If set to true, allows tables to be dropped with the purge parameter set to true.")
.defaultValue(true)
.build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@
*/
public class PolarisAdminService {
private static final Logger LOGGER = LoggerFactory.getLogger(PolarisAdminService.class);
public static final String CLEANUP_ON_CATALOG_DROP = "CLEANUP_ON_CATALOG_DROP";

private final CallContext callContext;
private final PolarisEntityManager entityManager;
Expand Down Expand Up @@ -578,7 +577,7 @@ public void deleteCatalog(String name) {
boolean cleanup =
polarisCallContext
.getConfigurationStore()
.getConfiguration(polarisCallContext, CLEANUP_ON_CATALOG_DROP, false);
.getConfiguration(polarisCallContext, PolarisConfiguration.CLEANUP_ON_CATALOG_DROP);
PolarisMetaStoreManager.DropEntityResult dropEntityResult =
entityManager
.getMetaStoreManager()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ public class BasePolarisCatalog extends BaseMetastoreViewCatalog
&& !(ex instanceof UnprocessableEntityException)
&& isStorageProviderRetryableException(ex);
};
public static final String CLEANUP_ON_NAMESPACE_DROP = "CLEANUP_ON_NAMESPACE_DROP";

private final PolarisEntityManager entityManager;
private final CallContext callContext;
Expand Down Expand Up @@ -620,7 +619,8 @@ public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyExcept
Map.of(),
polarisCallContext
.getConfigurationStore()
.getConfiguration(polarisCallContext, CLEANUP_ON_NAMESPACE_DROP, false));
.getConfiguration(
polarisCallContext, PolarisConfiguration.CLEANUP_ON_NAMESPACE_DROP));

if (!dropEntityResult.isSuccess() && dropEntityResult.failedBecauseNotEmpty()) {
throw new NamespaceNotEmptyException("Namespace %s is not empty", namespace);
Expand Down Expand Up @@ -1803,6 +1803,7 @@ private void updateTableLike(TableIdentifier identifier, PolarisEntity entity) {
}
}

@SuppressWarnings("FormatStringAnnotation")
private @NotNull PolarisMetaStoreManager.DropEntityResult dropTableLike(
PolarisEntitySubType subType,
TableIdentifier identifier,
Expand All @@ -1818,6 +1819,28 @@ private void updateTableLike(TableIdentifier identifier, PolarisEntity entity) {

List<PolarisEntity> catalogPath = resolvedEntities.getRawParentPath();
PolarisEntity leafEntity = resolvedEntities.getRawLeafEntity();

// Check that purge is enabled, if it is set:
if (catalogPath != null && !catalogPath.isEmpty() && purge) {
boolean dropWithPurgeEnabled =
callContext
.getPolarisCallContext()
.getConfigurationStore()
.getConfiguration(
callContext.getPolarisCallContext(),
catalogEntity,
PolarisConfiguration.DROP_WITH_PURGE_ENABLED);
if (!dropWithPurgeEnabled) {
throw new ForbiddenException(
String.format(
"Unable to purge entity: %s. To enable this feature, set the Polaris configuration %s "
+ "or the catalog configuration %s",
identifier.name(),
PolarisConfiguration.DROP_WITH_PURGE_ENABLED.key,
PolarisConfiguration.DROP_WITH_PURGE_ENABLED.catalogConfig()));
}
}

return entityManager
.getMetaStoreManager()
.dropEntityIfExists(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1212,6 +1212,67 @@ public void testDropTableWithPurge() {
Assertions.assertThat(fileIO).isNotNull().isInstanceOf(InMemoryFileIO.class);
}

@Test
public void testDropTableWithPurgeDisabled() {
// Create a catalog with purge disabled:
String noPurgeCatalogName = CATALOG_NAME + "_no_purge";
String storageLocation = "s3://testDropTableWithPurgeDisabled/data";
AwsStorageConfigInfo noPurgeStorageConfigModel =
AwsStorageConfigInfo.builder()
.setRoleArn("arn:aws:iam::012345678901:role/jdoe")
.setExternalId("externalId")
.setUserArn("aws::a:user:arn")
.setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
.build();
adminService.createCatalog(
new CatalogEntity.Builder()
.setName(noPurgeCatalogName)
.setDefaultBaseLocation(storageLocation)
.setReplaceNewLocationPrefixWithCatalogDefault("file:")
.addProperty(PolarisConfiguration.ALLOW_EXTERNAL_TABLE_LOCATION.catalogConfig(), "true")
.addProperty(
PolarisConfiguration.ALLOW_UNSTRUCTURED_TABLE_LOCATION.catalogConfig(), "true")
.addProperty(PolarisConfiguration.DROP_WITH_PURGE_ENABLED.catalogConfig(), "false")
.setStorageConfigurationInfo(noPurgeStorageConfigModel, storageLocation)
.build());
RealmContext realmContext = () -> "realm";
CallContext callContext = CallContext.of(realmContext, polarisContext);
PolarisPassthroughResolutionView passthroughView =
new PolarisPassthroughResolutionView(
callContext, entityManager, authenticatedRoot, noPurgeCatalogName);
BasePolarisCatalog noPurgeCatalog =
new BasePolarisCatalog(
entityManager,
callContext,
passthroughView,
authenticatedRoot,
Mockito.mock(),
new DefaultFileIOFactory());
noPurgeCatalog.initialize(
noPurgeCatalogName,
ImmutableMap.of(
CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO"));

if (this.requiresNamespaceCreate()) {
((SupportsNamespaces) noPurgeCatalog).createNamespace(NS);
}

Assertions.assertThatPredicate(noPurgeCatalog::tableExists)
.as("Table should not exist before create")
.rejects(TABLE);

Table table = noPurgeCatalog.buildTable(TABLE, SCHEMA).create();
Assertions.assertThatPredicate(noPurgeCatalog::tableExists)
.as("Table should exist after create")
.accepts(TABLE);
Assertions.assertThat(table).isInstanceOf(BaseTable.class);

// Attempt to drop the table:
Assertions.assertThatThrownBy(() -> noPurgeCatalog.dropTable(TABLE, true))
.isInstanceOf(ForbiddenException.class)
.hasMessageContaining(PolarisConfiguration.DROP_WITH_PURGE_ENABLED.key);
}

private TableMetadata createSampleTableMetadata(String tableLocation) {
Schema schema =
new Schema(
Expand Down

0 comments on commit 7d1b8f2

Please sign in to comment.