Skip to content

Commit

Permalink
Correct the handling of access delegation mode (#211)
Browse files Browse the repository at this point in the history
  • Loading branch information
dimas-b authored Sep 4, 2024
1 parent fe4d16c commit ed4c0cb
Show file tree
Hide file tree
Showing 11 changed files with 181 additions and 31 deletions.
6 changes: 3 additions & 3 deletions docs/index.html

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ bin/spark-shell \
--packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2,org.apache.hadoop:hadoop-aws:3.4.0 \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.quickstart_catalog.warehouse=quickstart_catalog \
--conf spark.sql.catalog.quickstart_catalog.header.X-Iceberg-Access-Delegation=true \
--conf spark.sql.catalog.quickstart_catalog.header.X-Iceberg-Access-Delegation=vended-credentials \
--conf spark.sql.catalog.quickstart_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.quickstart_catalog.catalog-impl=org.apache.iceberg.rest.RESTCatalog \
--conf spark.sql.catalog.quickstart_catalog.uri=http://localhost:8181/api/catalog \
Expand Down
4 changes: 2 additions & 2 deletions notebooks/SparkPolaris.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@
"* Catalog URI points to our Polaris installation\n",
"* Credential set using the client_id and client_secret generated for the principal\n",
"* Scope set to `PRINCIPAL_ROLE:ALL`\n",
"* `X-Iceberg-Access-Delegation` is set to true"
"* `X-Iceberg-Access-Delegation` is set to vended-credentials"
]
},
{
Expand Down Expand Up @@ -278,7 +278,7 @@
" .config(\"spark.sql.catalog.polaris.scope\", 'PRINCIPAL_ROLE:ALL')\n",
"\n",
" # Enable access credential delegation\n",
" .config(\"spark.sql.catalog.polaris.header.X-Iceberg-Access-Delegation\", 'true')\n",
" .config(\"spark.sql.catalog.polaris.header.X-Iceberg-Access-Delegation\", 'vended-credentials')\n",
"\n",
" .config(\"spark.sql.catalog.polaris.io-impl\", \"org.apache.iceberg.io.ResolvingFileIO\")\n",
" .config(\"spark.sql.catalog.polaris.s3.region\", \"us-west-2\")\n",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.service.catalog;

import com.google.common.base.Functions;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Represents access mechanisms defined in the Iceberg REST API specification (values for the {@code
* X-Iceberg-Access-Delegation} header).
*/
public enum AccessDelegationMode {
UNKNOWN("unknown"),
VENDED_CREDENTIALS("vended-credentials"),
REMOTE_SIGNING("remote-signing"),
;

AccessDelegationMode(String protocolValue) {
this.protocolValue = protocolValue;
}

private final String protocolValue;

public String protocolValue() {
return protocolValue;
}

public static EnumSet<AccessDelegationMode> fromProtocolValuesList(String protocolValues) {
if (protocolValues == null || protocolValues.isEmpty()) {
return EnumSet.noneOf(AccessDelegationMode.class);
}

// Backward-compatibility case for old clients that still use the unofficial value of `true` to
// request credential vending. Note that if the client requests `true` among other values it
// will be parsed as `UNKNOWN` (by the code below this `if`) since the client submitting
// multiple access modes is expected to be aware of the Iceberg REST API spec.
if (protocolValues.trim().toLowerCase(Locale.ROOT).equals("true")) {
return EnumSet.of(VENDED_CREDENTIALS);
}

EnumSet<AccessDelegationMode> set = EnumSet.noneOf(AccessDelegationMode.class);
Arrays.stream(protocolValues.split(",")) // per Iceberg REST Catalog spec
.map(String::trim)
.map(n -> Mapper.byProtocolValue.getOrDefault(n, UNKNOWN))
.forEach(set::add);
return set;
}

private static class Mapper {
private static final Map<String, AccessDelegationMode> byProtocolValue =
Arrays.stream(AccessDelegationMode.values())
.collect(Collectors.toMap(AccessDelegationMode::protocolValue, Functions.identity()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
*/
package org.apache.polaris.service.catalog;

import static org.apache.polaris.service.catalog.AccessDelegationMode.VENDED_CREDENTIALS;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.SecurityContext;
import java.net.URLEncoder;
import java.nio.charset.Charset;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -171,28 +173,39 @@ public Response updateProperties(
.build();
}

private EnumSet<AccessDelegationMode> parseAccessDelegationModes(String accessDelegationMode) {
EnumSet<AccessDelegationMode> delegationModes =
AccessDelegationMode.fromProtocolValuesList(accessDelegationMode);
Preconditions.checkArgument(
delegationModes.isEmpty() || delegationModes.contains(VENDED_CREDENTIALS),
"Unsupported access delegation mode: %s",
accessDelegationMode);
return delegationModes;
}

@Override
public Response createTable(
String prefix,
String namespace,
CreateTableRequest createTableRequest,
String xIcebergAccessDelegation,
String accessDelegationMode,
SecurityContext securityContext) {
EnumSet<AccessDelegationMode> delegationModes =
parseAccessDelegationModes(accessDelegationMode);
Namespace ns = decodeNamespace(namespace);
if (createTableRequest.stageCreate()) {
if (Strings.isNullOrEmpty(xIcebergAccessDelegation)) {
if (delegationModes.isEmpty()) {
return Response.ok(
newHandlerWrapper(securityContext, prefix)
.createTableStaged(ns, createTableRequest))
.build();
} else {
return Response.ok(
newHandlerWrapper(securityContext, prefix)
.createTableStagedWithWriteDelegation(
ns, createTableRequest, xIcebergAccessDelegation))
.createTableStagedWithWriteDelegation(ns, createTableRequest))
.build();
}
} else if (Strings.isNullOrEmpty(xIcebergAccessDelegation)) {
} else if (delegationModes.isEmpty()) {
return Response.ok(
newHandlerWrapper(securityContext, prefix).createTableDirect(ns, createTableRequest))
.build();
Expand Down Expand Up @@ -220,20 +233,21 @@ public Response loadTable(
String prefix,
String namespace,
String table,
String xIcebergAccessDelegation,
String accessDelegationMode,
String snapshots,
SecurityContext securityContext) {
EnumSet<AccessDelegationMode> delegationModes =
parseAccessDelegationModes(accessDelegationMode);
Namespace ns = decodeNamespace(namespace);
TableIdentifier tableIdentifier = TableIdentifier.of(ns, RESTUtil.decodeString(table));
if (Strings.isNullOrEmpty(xIcebergAccessDelegation)) {
if (delegationModes.isEmpty()) {
return Response.ok(
newHandlerWrapper(securityContext, prefix).loadTable(tableIdentifier, snapshots))
.build();
} else {
return Response.ok(
newHandlerWrapper(securityContext, prefix)
.loadTableWithAccessDelegation(
tableIdentifier, xIcebergAccessDelegation, snapshots))
.loadTableWithAccessDelegation(tableIdentifier, snapshots))
.build();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ public LoadTableResponse createTableStaged(Namespace namespace, CreateTableReque
}

public LoadTableResponse createTableStagedWithWriteDelegation(
Namespace namespace, CreateTableRequest request, String xIcebergAccessDelegation) {
Namespace namespace, CreateTableRequest request) {
PolarisAuthorizableOperation op =
PolarisAuthorizableOperation.CREATE_TABLE_STAGED_WITH_WRITE_DELEGATION;
authorizeCreateTableLikeUnderNamespaceOperationOrThrow(
Expand Down Expand Up @@ -770,7 +770,7 @@ public LoadTableResponse loadTable(TableIdentifier tableIdentifier, String snaps
}

public LoadTableResponse loadTableWithAccessDelegation(
TableIdentifier tableIdentifier, String xIcebergAccessDelegation, String snapshots) {
TableIdentifier tableIdentifier, String snapshots) {
// Here we have a single method that falls through multiple candidate
// PolarisAuthorizableOperations because instead of identifying the desired operation up-front
// and
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.service.catalog;

import static org.apache.polaris.service.catalog.AccessDelegationMode.*;
import static org.apache.polaris.service.catalog.AccessDelegationMode.REMOTE_SIGNING;
import static org.apache.polaris.service.catalog.AccessDelegationMode.VENDED_CREDENTIALS;
import static org.apache.polaris.service.catalog.AccessDelegationMode.fromProtocolValuesList;
import static org.assertj.core.api.Assertions.assertThat;

import java.util.EnumSet;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

class AccessDelegationModeTest {

@ParameterizedTest
@EnumSource(AccessDelegationMode.class)
void testSingle(AccessDelegationMode mode) {
assertThat(fromProtocolValuesList(mode.protocolValue())).isEqualTo(EnumSet.of(mode));
}

@Test
void testSeveral() {
assertThat(fromProtocolValuesList("vended-credentials, remote-signing"))
.isEqualTo(EnumSet.of(VENDED_CREDENTIALS, REMOTE_SIGNING));
}

@Test
void testEmpty() {
assertThat(fromProtocolValuesList(null)).isEqualTo(EnumSet.noneOf(AccessDelegationMode.class));
assertThat(fromProtocolValuesList("")).isEqualTo(EnumSet.noneOf(AccessDelegationMode.class));
}

@Test
void testUnknown() {
assertThat(fromProtocolValuesList("abc")).isEqualTo(EnumSet.of(UNKNOWN));
assertThat(fromProtocolValuesList("abc,def")).isEqualTo(EnumSet.of(UNKNOWN));
assertThat(fromProtocolValuesList("abc,remote-signing"))
.isEqualTo(EnumSet.of(REMOTE_SIGNING, UNKNOWN));
}

@Test
void testLegacy() {
assertThat(fromProtocolValuesList("true")).isEqualTo(EnumSet.of(VENDED_CREDENTIALS));
assertThat(fromProtocolValuesList("true, vended-credentials"))
.isEqualTo(EnumSet.of(UNKNOWN, VENDED_CREDENTIALS));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -706,8 +706,7 @@ public void testCreateTableStagedWithWriteDelegationAllSufficientPrivileges() {
Set.of(PolarisPrivilege.CATALOG_MANAGE_CONTENT)),
() -> {
newWrapper(Set.of(PRINCIPAL_ROLE1))
.createTableStagedWithWriteDelegation(
NS2, createStagedWithWriteDelegationRequest, "vended-credentials");
.createTableStagedWithWriteDelegation(NS2, createStagedWithWriteDelegationRequest);
},
// createTableStagedWithWriteDelegation doesn't actually commit any metadata
null,
Expand Down Expand Up @@ -736,8 +735,7 @@ public void testCreateTableStagedWithWriteDelegationInsufficientPermissions() {
PolarisPrivilege.TABLE_LIST),
() -> {
newWrapper(Set.of(PRINCIPAL_ROLE1))
.createTableStagedWithWriteDelegation(
NS2, createStagedWithWriteDelegationRequest, "vended-credentials");
.createTableStagedWithWriteDelegation(NS2, createStagedWithWriteDelegationRequest);
});
}

Expand Down Expand Up @@ -855,7 +853,7 @@ public void testLoadTableWithReadAccessDelegationSufficientPrivileges() {
PolarisPrivilege.TABLE_READ_DATA,
PolarisPrivilege.TABLE_WRITE_DATA,
PolarisPrivilege.CATALOG_MANAGE_CONTENT),
() -> newWrapper().loadTableWithAccessDelegation(TABLE_NS1A_2, "vended-credentials", "all"),
() -> newWrapper().loadTableWithAccessDelegation(TABLE_NS1A_2, "all"),
null /* cleanupAction */);
}

Expand All @@ -871,8 +869,7 @@ public void testLoadTableWithReadAccessDelegationInsufficientPermissions() {
PolarisPrivilege.TABLE_CREATE,
PolarisPrivilege.TABLE_LIST,
PolarisPrivilege.TABLE_DROP),
() ->
newWrapper().loadTableWithAccessDelegation(TABLE_NS1A_2, "vended-credentials", "all"));
() -> newWrapper().loadTableWithAccessDelegation(TABLE_NS1A_2, "all"));
}

@Test
Expand All @@ -885,7 +882,7 @@ public void testLoadTableWithWriteAccessDelegationSufficientPrivileges() {
PolarisPrivilege.TABLE_READ_DATA,
PolarisPrivilege.TABLE_WRITE_DATA,
PolarisPrivilege.CATALOG_MANAGE_CONTENT),
() -> newWrapper().loadTableWithAccessDelegation(TABLE_NS1A_2, "vended-credentials", "all"),
() -> newWrapper().loadTableWithAccessDelegation(TABLE_NS1A_2, "all"),
null /* cleanupAction */);
}

Expand All @@ -901,8 +898,7 @@ public void testLoadTableWithWriteAccessDelegationInsufficientPermissions() {
PolarisPrivilege.TABLE_CREATE,
PolarisPrivilege.TABLE_LIST,
PolarisPrivilege.TABLE_DROP),
() ->
newWrapper().loadTableWithAccessDelegation(TABLE_NS1A_2, "vended-credentials", "all"));
() -> newWrapper().loadTableWithAccessDelegation(TABLE_NS1A_2, "all"));
}

@Test
Expand Down
2 changes: 1 addition & 1 deletion regtests/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ spark.driver.extraJavaOptions -Dderby.system.home=${DERBY_HOME}
spark.sql.catalog.polaris=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.polaris.type=rest
spark.sql.catalog.polaris.uri=http://${POLARIS_HOST:-localhost}:8181/api/catalog
spark.sql.catalog.polaris.header.X-Iceberg-Access-Delegation=true
spark.sql.catalog.polaris.header.X-Iceberg-Access-Delegation=vended-credentials
spark.sql.catalog.polaris.client.region=us-west-2
EOF
echo 'Success!'
Expand Down
2 changes: 1 addition & 1 deletion regtests/t_pyspark/src/iceberg_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def __enter__(self):
.config(
f"spark.sql.catalog.{catalog_name}", "org.apache.iceberg.spark.SparkCatalog"
)
.config(f"spark.sql.catalog.{catalog_name}.header.X-Iceberg-Access-Delegation", "true")
.config(f"spark.sql.catalog.{catalog_name}.header.X-Iceberg-Access-Delegation", "vended-credentials")
.config(f"spark.sql.catalog.{catalog_name}.type", "rest")
.config(f"spark.sql.catalog.{catalog_name}.uri", self.polaris_url)
.config(f"spark.sql.catalog.{catalog_name}.warehouse", self.catalog_name)
Expand Down
Loading

0 comments on commit ed4c0cb

Please sign in to comment.