Skip to content

Commit

Permalink
[#5517] feat(auth): Paimon catalog supports Ranger plugin (#5523)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Paimon catalog supports Ranger plugin. Kyuubi authz plugin doesn't
support to update or delete the table.

### Why are the changes needed?

Fix: #5517 

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Add IT.
  • Loading branch information
jerqi authored Nov 12, 2024
1 parent 889b475 commit 7761440
Show file tree
Hide file tree
Showing 12 changed files with 284 additions and 71 deletions.
6 changes: 4 additions & 2 deletions authorizations/authorization-ranger/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ plugins {

val scalaVersion: String = project.properties["scalaVersion"] as? String ?: extra["defaultScalaVersion"].toString()
val sparkVersion: String = libs.versions.spark35.get()
val kyuubiVersion: String = libs.versions.kyuubi4spark35.get()
val kyuubiVersion: String = libs.versions.kyuubi4paimon.get()
val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".")
val icebergVersion: String = libs.versions.iceberg4spark.get()
val paimonVersion: String = libs.versions.paimon.get()

dependencies {
implementation(project(":api")) {
Expand Down Expand Up @@ -86,7 +87,7 @@ dependencies {
exclude("io.dropwizard.metrics")
exclude("org.rocksdb")
}
testImplementation("org.apache.kyuubi:kyuubi-spark-authz_$scalaVersion:$kyuubiVersion") {
testImplementation("org.apache.kyuubi:kyuubi-spark-authz-shaded_$scalaVersion:$kyuubiVersion") {
exclude("com.sun.jersey")
}
testImplementation(libs.hadoop3.client)
Expand All @@ -100,6 +101,7 @@ dependencies {
exclude("io.netty")
}
testImplementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
testImplementation("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion")
}

tasks {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ protected AuthorizationPlugin newPlugin(String catalogProvider, Map<String, Stri
switch (catalogProvider) {
case "hive":
case "lakehouse-iceberg":
case "lakehouse-paimon":
return RangerAuthorizationHadoopSQLPlugin.getInstance(config);
default:
throw new IllegalArgumentException("Unknown catalog provider: " + catalogProvider);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.apache.gravitino.authorization.Privilege;
import org.apache.gravitino.authorization.SecurableObject;
import org.apache.gravitino.authorization.SecurableObjects;
import org.apache.gravitino.authorization.ranger.RangerPrivileges.RangerHivePrivilege;
import org.apache.gravitino.authorization.ranger.RangerPrivileges.RangerHadoopSQLPrivilege;
import org.apache.gravitino.authorization.ranger.reference.RangerDefines.PolicyResource;
import org.apache.gravitino.exceptions.AuthorizationPluginException;
import org.slf4j.Logger;
Expand Down Expand Up @@ -96,26 +96,28 @@ public void validateRangerMetadataObject(List<String> names, RangerMetadataObjec
public Map<Privilege.Name, Set<RangerPrivilege>> privilegesMappingRule() {
return ImmutableMap.of(
Privilege.Name.CREATE_CATALOG,
ImmutableSet.of(RangerHivePrivilege.CREATE),
ImmutableSet.of(RangerHadoopSQLPrivilege.CREATE),
Privilege.Name.USE_CATALOG,
ImmutableSet.of(RangerHivePrivilege.SELECT),
ImmutableSet.of(RangerHadoopSQLPrivilege.SELECT),
Privilege.Name.CREATE_SCHEMA,
ImmutableSet.of(RangerHivePrivilege.CREATE),
ImmutableSet.of(RangerHadoopSQLPrivilege.CREATE),
Privilege.Name.USE_SCHEMA,
ImmutableSet.of(RangerHivePrivilege.SELECT),
ImmutableSet.of(RangerHadoopSQLPrivilege.SELECT),
Privilege.Name.CREATE_TABLE,
ImmutableSet.of(RangerHivePrivilege.CREATE),
ImmutableSet.of(RangerHadoopSQLPrivilege.CREATE),
Privilege.Name.MODIFY_TABLE,
ImmutableSet.of(
RangerHivePrivilege.UPDATE, RangerHivePrivilege.ALTER, RangerHivePrivilege.WRITE),
RangerHadoopSQLPrivilege.UPDATE,
RangerHadoopSQLPrivilege.ALTER,
RangerHadoopSQLPrivilege.WRITE),
Privilege.Name.SELECT_TABLE,
ImmutableSet.of(RangerHivePrivilege.READ, RangerHivePrivilege.SELECT));
ImmutableSet.of(RangerHadoopSQLPrivilege.READ, RangerHadoopSQLPrivilege.SELECT));
}

@Override
/** Set the default owner rule. */
public Set<RangerPrivilege> ownerMappingRule() {
return ImmutableSet.of(RangerHivePrivilege.ALL);
return ImmutableSet.of(RangerHadoopSQLPrivilege.ALL);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

public class RangerPrivileges {
/** Ranger Hive privileges enumeration. */
public enum RangerHivePrivilege implements RangerPrivilege {
public enum RangerHadoopSQLPrivilege implements RangerPrivilege {
ALL("all"),
SELECT("select"),
UPDATE("update"),
Expand All @@ -41,7 +41,7 @@ public enum RangerHivePrivilege implements RangerPrivilege {

private final String name; // Access a type in the Ranger policy item

RangerHivePrivilege(String name) {
RangerHadoopSQLPrivilege(String name) {
this.name = name;
}

Expand Down Expand Up @@ -117,7 +117,7 @@ public boolean equalsTo(String value) {

static List<Class<? extends Enum<? extends RangerPrivilege>>> allRangerPrivileges =
Lists.newArrayList(
RangerPrivileges.RangerHivePrivilege.class, RangerPrivileges.RangerHdfsPrivilege.class);
RangerHadoopSQLPrivilege.class, RangerPrivileges.RangerHdfsPrivilege.class);

public static RangerPrivilege valueOf(String name) {
Preconditions.checkArgument(name != null, "Privilege name string cannot be null!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ protected static void waitForUpdatingPolicies() throws InterruptedException {

protected abstract void useCatalog() throws InterruptedException;

protected abstract void checkHaveNoPrivileges();
protected abstract void checkWithoutPrivileges();

protected abstract void testAlterTable();

Expand Down Expand Up @@ -269,7 +269,7 @@ void testCreateTable() throws InterruptedException {
AccessControlException.class, () -> sparkSession.sql(SQL_SELECT_TABLE).collectAsList());

// Clean up
catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, tableName));
catalog.asTableCatalog().purgeTable(NameIdentifier.of(schemaName, tableName));
catalog.asSchemas().dropSchema(schemaName, false);
metalake.deleteRole(createTableRole);
metalake.deleteRole(createSchemaRole);
Expand Down Expand Up @@ -323,10 +323,10 @@ void testReadWriteTableWithMetalakeLevelRole() throws InterruptedException {
// case 7: If we don't have the role, we can't insert and select from data.
metalake.deleteRole(readWriteRole);
waitForUpdatingPolicies();
checkHaveNoPrivileges();
checkWithoutPrivileges();

// Clean up
catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, tableName));
catalog.asTableCatalog().purgeTable(NameIdentifier.of(schemaName, tableName));
catalog.asSchemas().dropSchema(schemaName, false);
}

Expand Down Expand Up @@ -387,10 +387,10 @@ void testReadWriteTableWithTableLevelRole() throws InterruptedException {
// case 7: If we don't have the role, we can't insert and select from data.
metalake.deleteRole(roleName);
waitForUpdatingPolicies();
checkHaveNoPrivileges();
checkWithoutPrivileges();

// Clean up
catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, tableName));
catalog.asTableCatalog().purgeTable(NameIdentifier.of(schemaName, tableName));
catalog.asSchemas().dropSchema(schemaName, false);
}

Expand Down Expand Up @@ -441,10 +441,10 @@ void testReadOnlyTable() throws InterruptedException {
// case 7: If we don't have the role, we can't insert and select from data.
metalake.deleteRole(readOnlyRole);
waitForUpdatingPolicies();
checkHaveNoPrivileges();
checkWithoutPrivileges();

// Clean up
catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, tableName));
catalog.asTableCatalog().purgeTable(NameIdentifier.of(schemaName, tableName));
catalog.asSchemas().dropSchema(schemaName, false);
}

Expand Down Expand Up @@ -496,10 +496,10 @@ void testWriteOnlyTable() throws InterruptedException {
// case 7: If we don't have the role, we can't insert and select from data.
metalake.deleteRole(writeOnlyRole);
waitForUpdatingPolicies();
checkHaveNoPrivileges();
checkWithoutPrivileges();

// Clean up
catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, tableName));
catalog.asTableCatalog().purgeTable(NameIdentifier.of(schemaName, tableName));
catalog.asSchemas().dropSchema(schemaName, false);
}

Expand Down Expand Up @@ -547,7 +547,7 @@ void testCreateAllPrivilegesRole() throws InterruptedException {
sparkSession.sql(SQL_CREATE_TABLE);

// Clean up
catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, tableName));
catalog.asTableCatalog().purgeTable(NameIdentifier.of(schemaName, tableName));
catalog.asSchemas().dropSchema(schemaName, false);
metalake.deleteRole(roleName);
}
Expand Down Expand Up @@ -690,7 +690,7 @@ void testRenameMetadataObject() throws InterruptedException {
sparkSession.sql(SQL_RENAME_BACK_TABLE);

// Clean up
catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, tableName));
catalog.asTableCatalog().purgeTable(NameIdentifier.of(schemaName, tableName));
catalog.asSchemas().dropSchema(schemaName, false);
metalake.deleteRole(roleName);
}
Expand Down Expand Up @@ -739,7 +739,7 @@ void testRenameMetadataObjectPrivilege() throws InterruptedException {
sparkSession.sql(SQL_INSERT_TABLE);

// Clean up
catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, tableName));
catalog.asTableCatalog().purgeTable(NameIdentifier.of(schemaName, tableName));
catalog.asSchemas().dropSchema(schemaName, false);
metalake.deleteRole(roleName);
}
Expand Down Expand Up @@ -774,7 +774,7 @@ void testChangeOwner() throws InterruptedException {
metalake.deleteRole(helperRole);
waitForUpdatingPolicies();

checkHaveNoPrivileges();
checkWithoutPrivileges();

// case 2. user is the table owner
MetadataObject tableObject =
Expand All @@ -787,7 +787,7 @@ void testChangeOwner() throws InterruptedException {
checkTableAllPrivilegesExceptForCreating();

// Delete Gravitino's meta data
catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, tableName));
catalog.asTableCatalog().purgeTable(NameIdentifier.of(schemaName, tableName));
waitForUpdatingPolicies();

// Fail to create the table
Expand Down Expand Up @@ -854,7 +854,7 @@ void testChangeOwner() throws InterruptedException {
sparkSession.sql(SQL_DROP_SCHEMA);

// Clean up
catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, tableName));
catalog.asTableCatalog().purgeTable(NameIdentifier.of(schemaName, tableName));
catalog.asSchemas().dropSchema(schemaName, false);
}

Expand Down Expand Up @@ -915,7 +915,7 @@ void testAllowUseSchemaPrivilege() throws InterruptedException {
1, rows2.stream().filter(row -> row.getString(0).equals(schemaName)).count());

// Clean up
catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, tableName));
catalog.asTableCatalog().purgeTable(NameIdentifier.of(schemaName, tableName));
catalog.asSchemas().dropSchema(schemaName, false);
metalake.revokeRolesFromUser(Lists.newArrayList(roleName), userName1);
metalake.deleteRole(roleName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ protected void useCatalog() throws InterruptedException {
}

@Override
protected void checkHaveNoPrivileges() {
protected void checkWithoutPrivileges() {
Assertions.assertThrows(AccessControlException.class, () -> sparkSession.sql(SQL_INSERT_TABLE));
Assertions.assertThrows(
AccessControlException.class, () -> sparkSession.sql(SQL_SELECT_TABLE).collectAsList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ public void testFindManagedPolicy() {
RangerMetadataObject.Type.TABLE,
ImmutableSet.of(
new RangerPrivileges.RangerHivePrivilegeImpl(
RangerPrivileges.RangerHivePrivilege.ALL, Privilege.Condition.ALLOW)));
RangerPrivileges.RangerHadoopSQLPrivilege.ALL, Privilege.Condition.ALLOW)));
Assertions.assertNull(rangerHelper.findManagedPolicy(rangerSecurableObject));

// Add a policy for `db3.tab1`
Expand Down Expand Up @@ -398,7 +398,7 @@ static void createHivePolicy(
policyItem.setAccesses(
Arrays.asList(
new RangerPolicy.RangerPolicyItemAccess(
RangerPrivileges.RangerHivePrivilege.SELECT.toString())));
RangerPrivileges.RangerHadoopSQLPrivilege.SELECT.toString())));
RangerITEnv.updateOrCreateRangerPolicy(
RangerDefines.SERVICE_TYPE_HIVE,
RangerITEnv.RANGER_HIVE_REPO_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ static void allowAnyoneAccessInformationSchema() {
policyItem.setAccesses(
Arrays.asList(
new RangerPolicy.RangerPolicyItemAccess(
RangerPrivileges.RangerHivePrivilege.SELECT.toString())));
RangerPrivileges.RangerHadoopSQLPrivilege.SELECT.toString())));
updateOrCreateRangerPolicy(
RANGER_HIVE_TYPE,
RANGER_HIVE_REPO_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void startIntegrationTest() throws Exception {
sparkSession =
SparkSession.builder()
.master("local[1]")
.appName("Ranger Hive E2E integration test")
.appName("Ranger Iceberg E2E integration test")
.config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.iceberg.type", "hive")
.config("spark.sql.catalog.iceberg.uri", HIVE_METASTORE_URIS)
Expand Down Expand Up @@ -147,7 +147,7 @@ protected void checkDeleteSQLWithWritePrivileges() {
}

@Override
protected void checkHaveNoPrivileges() {
protected void checkWithoutPrivileges() {
Assertions.assertThrows(AccessControlException.class, () -> sparkSession.sql(SQL_INSERT_TABLE));
Assertions.assertThrows(
AccessControlException.class, () -> sparkSession.sql(SQL_SELECT_TABLE).collectAsList());
Expand Down
Loading

0 comments on commit 7761440

Please sign in to comment.