From c1f4d0a8c229d79fbb9a6d30275496608aea9086 Mon Sep 17 00:00:00 2001 From: Jiwei Guo Date: Wed, 20 Mar 2024 13:49:54 +0800 Subject: [PATCH] [improve][broker] Add fine-grain authorization to ns/topic management endpoints (#22305) (cherry picked from commit fd34d4ab9c5aa7e0dca961d5a8badae4613fbe8e) # Conflicts: # pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java (cherry picked from commit 99eb49a68982271562597d4c4cea127132bc0b35) # Conflicts: # pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java # pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java (cherry picked from commit 5c4f4cb64167c0ea1a5775f6d991b386ad95c786) # Conflicts: # pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java # pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java # pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java --- .../pulsar/broker/admin/AdminResource.java | 7 +- .../broker/admin/impl/NamespacesBase.java | 166 ++++++----- .../admin/impl/PersistentTopicsBase.java | 137 ++++----- .../broker/admin/v2/PersistentTopics.java | 1 - .../pulsar/broker/admin/BaseAuthZTest.java | 110 +++++++ .../broker/admin/NamespaceAuthZTest.java | 117 ++++++++ .../pulsar/broker/admin/TopicAuthZTest.java | 272 ++++++++++++++++++ .../broker/admin/TopicPoliciesAuthZTest.java | 205 +++++++------ 8 files changed, 768 insertions(+), 247 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BaseAuthZTest.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index f64e1d94507e5..6c09bd982f3c5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -54,8 +54,6 @@ import org.apache.pulsar.common.policies.data.NamespaceOperation; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.Policies; -import org.apache.pulsar.common.policies.data.PolicyName; -import org.apache.pulsar.common.policies.data.PolicyOperation; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.SubscribeRate; @@ -704,10 +702,7 @@ protected static void resumeAsyncResponseExceptionally(AsyncResponse asyncRespon } protected CompletableFuture getSchemaCompatibilityStrategyAsync() { - return validateTopicPolicyOperationAsync(topicName, - PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, - PolicyOperation.READ) - .thenCompose((__) -> getSchemaCompatibilityStrategyAsyncWithoutAuth()).whenComplete((__, ex) -> { + return getSchemaCompatibilityStrategyAsyncWithoutAuth().whenComplete((__, ex) -> { if (ex != null) { log.error("[{}] Failed to get schema compatibility strategy of topic {} {}", clientAppId(), topicName, ex); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 24d2b97fc26d8..9c859238249c9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -2758,102 +2758,110 @@ protected void internalSetMaxTopicsPerNamespace(Integer maxTopicsPerNamespace) { } protected void internalSetProperty(String key, String value, AsyncResponse asyncResponse) { - validatePoliciesReadOnlyAccess(); - updatePoliciesAsync(namespaceName, policies -> { - policies.properties.put(key, value); - return policies; - }).thenAccept(v -> { - log.info("[{}] Successfully set property for key {} on namespace {}", clientAppId(), key, - namespaceName); - asyncResponse.resume(Response.noContent().build()); - }).exceptionally(ex -> { - Throwable cause = ex.getCause(); - log.error("[{}] Failed to set property for key {} on namespace {}", clientAppId(), key, - namespaceName, cause); - asyncResponse.resume(cause); - return null; - }); + validateAdminAccessForTenantAsync(namespaceName.getTenant()) + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) + .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { + policies.properties.put(key, value); + return policies; + })) + .thenAccept(v -> { + log.info("[{}] Successfully set property for key {} on namespace {}", clientAppId(), key, + namespaceName); + asyncResponse.resume(Response.noContent().build()); + }).exceptionally(ex -> { + Throwable cause = ex.getCause(); + log.error("[{}] Failed to set property for key {} on namespace {}", clientAppId(), key, + namespaceName, cause); + asyncResponse.resume(cause); + return null; + }); } protected void internalSetProperties(Map properties, AsyncResponse asyncResponse) { - validatePoliciesReadOnlyAccess(); - updatePoliciesAsync(namespaceName, policies -> { - policies.properties.putAll(properties); - return policies; - }).thenAccept(v -> { - log.info("[{}] Successfully set {} properties on namespace {}", clientAppId(), properties.size(), - namespaceName); - asyncResponse.resume(Response.noContent().build()); - }).exceptionally(ex -> { - Throwable cause = ex.getCause(); - log.error("[{}] Failed to set {} properties on namespace {}", clientAppId(), properties.size(), - namespaceName, cause); - asyncResponse.resume(cause); - return null; - }); + validateAdminAccessForTenantAsync(namespaceName.getTenant()) + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) + .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { + policies.properties.putAll(properties); + return policies; + })) + .thenAccept(v -> { + log.info("[{}] Successfully set {} properties on namespace {}", clientAppId(), properties.size(), + namespaceName); + asyncResponse.resume(Response.noContent().build()); + }).exceptionally(ex -> { + Throwable cause = ex.getCause(); + log.error("[{}] Failed to set {} properties on namespace {}", clientAppId(), properties.size(), + namespaceName, cause); + asyncResponse.resume(cause); + return null; + }); } protected void internalGetProperty(String key, AsyncResponse asyncResponse) { - getNamespacePoliciesAsync(namespaceName).thenAccept(policies -> { - asyncResponse.resume(policies.properties.get(key)); - }).exceptionally(ex -> { - Throwable cause = ex.getCause(); - log.error("[{}] Failed to get property for key {} of namespace {}", clientAppId(), key, - namespaceName, cause); - asyncResponse.resume(cause); - return null; - }); + validateAdminAccessForTenantAsync(namespaceName.getTenant()) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenAccept(policies -> asyncResponse.resume(policies.properties.get(key))) + .exceptionally(ex -> { + Throwable cause = ex.getCause(); + log.error("[{}] Failed to get property for key {} of namespace {}", clientAppId(), key, + namespaceName, cause); + asyncResponse.resume(cause); + return null; + }); } protected void internalGetProperties(AsyncResponse asyncResponse) { - getNamespacePoliciesAsync(namespaceName).thenAccept(policies -> { - asyncResponse.resume(policies.properties); - }).exceptionally(ex -> { - Throwable cause = ex.getCause(); - log.error("[{}] Failed to get properties of namespace {}", clientAppId(), namespaceName, cause); - asyncResponse.resume(cause); - return null; - }); + validateAdminAccessForTenantAsync(namespaceName.getTenant()) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenAccept(policies -> asyncResponse.resume(policies.properties)) + .exceptionally(ex -> { + Throwable cause = ex.getCause(); + log.error("[{}] Failed to get properties of namespace {}", clientAppId(), namespaceName, cause); + asyncResponse.resume(cause); + return null; + }); } protected void internalRemoveProperty(String key, AsyncResponse asyncResponse) { - validatePoliciesReadOnlyAccess(); - AtomicReference oldVal = new AtomicReference<>(null); - updatePoliciesAsync(namespaceName, policies -> { - oldVal.set(policies.properties.remove(key)); - return policies; - }).thenAccept(v -> { - asyncResponse.resume(oldVal.get()); - log.info("[{}] Successfully remove property for key {} on namespace {}", clientAppId(), key, - namespaceName); - }).exceptionally(ex -> { - Throwable cause = ex.getCause(); - log.error("[{}] Failed to remove property for key {} on namespace {}", clientAppId(), key, - namespaceName, cause); - asyncResponse.resume(cause); - return null; - }); + validateAdminAccessForTenantAsync(namespaceName.getTenant()) + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) + .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { + oldVal.set(policies.properties.remove(key)); + return policies; + })).thenAccept(v -> { + asyncResponse.resume(oldVal.get()); + log.info("[{}] Successfully remove property for key {} on namespace {}", clientAppId(), key, + namespaceName); + }).exceptionally(ex -> { + Throwable cause = ex.getCause(); + log.error("[{}] Failed to remove property for key {} on namespace {}", clientAppId(), key, + namespaceName, cause); + asyncResponse.resume(cause); + return null; + }); } protected void internalClearProperties(AsyncResponse asyncResponse) { - validatePoliciesReadOnlyAccess(); AtomicReference clearedCount = new AtomicReference<>(0); - updatePoliciesAsync(namespaceName, policies -> { - clearedCount.set(policies.properties.size()); - policies.properties.clear(); - return policies; - }).thenAccept(v -> { - asyncResponse.resume(Response.noContent().build()); - log.info("[{}] Successfully clear {} properties for on namespace {}", clientAppId(), clearedCount.get(), - namespaceName); - }).exceptionally(ex -> { - Throwable cause = ex.getCause(); - log.error("[{}] Failed to remove property for key {} on namespace {}", clientAppId(), clearedCount.get(), - namespaceName, cause); - asyncResponse.resume(cause); - return null; - }); + validateAdminAccessForTenantAsync(namespaceName.getTenant()) + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) + .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { + clearedCount.set(policies.properties.size()); + policies.properties.clear(); + return policies; + })) + .thenAccept(v -> { + asyncResponse.resume(Response.noContent().build()); + log.info("[{}] Successfully clear {} properties on namespace {}", clientAppId(), clearedCount.get(), + namespaceName); + }).exceptionally(ex -> { + Throwable cause = ex.getCause(); + log.error("[{}] Failed to clear {} properties on namespace {}", clientAppId(), clearedCount.get(), + namespaceName, cause); + asyncResponse.resume(cause); + return null; + }); } private CompletableFuture updatePoliciesAsync(NamespaceName ns, Function updateFunction) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 82711096701f2..dbcfd734a375c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -541,7 +541,9 @@ protected void internalUpdatePartitionedTopic(int numPartitions, protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) { getPartitionedTopicMetadataAsync(topicName, false, false).thenAccept(metadata -> { if (metadata != null) { - tryCreatePartitionsAsync(metadata.partitions).thenAccept(v -> { + CompletableFuture future = validateNamespaceOperationAsync(topicName.getNamespaceObject(), + NamespaceOperation.CREATE_TOPIC); + future.thenCompose(__ -> tryCreatePartitionsAsync(metadata.partitions)).thenAccept(v -> { asyncResponse.resume(Response.noContent().build()); }).exceptionally(e -> { log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName); @@ -783,13 +785,13 @@ private CompletableFuture internalRemovePartitionsAuthenticationPoliciesAs protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean authoritative) { log.info("[{}] Unloading topic {}", clientAppId(), topicName); - CompletableFuture future; - if (topicName.isGlobal()) { - future = validateGlobalNamespaceOwnershipAsync(namespaceName); - } else { - future = CompletableFuture.completedFuture(null); - } - future.thenAccept(__ -> { + CompletableFuture future = validateTopicOperationAsync(topicName, TopicOperation.UNLOAD); + future.thenCompose(__ -> { + if (topicName.isGlobal()) { + return validateGlobalNamespaceOwnershipAsync(namespaceName); + } + return CompletableFuture.completedFuture(null); + }).thenAccept(__ -> { // If the topic name is a partition name, no need to get partition topic metadata again if (topicName.isPartitioned()) { if (checkTopicIsTransactionCoordinatorAssign(topicName)) { @@ -1049,13 +1051,12 @@ protected CompletableFuture internalSetDeduplicationSnapshotInterval(Integ private void internalUnloadNonPartitionedTopicAsync(AsyncResponse asyncResponse, boolean authoritative) { validateTopicOwnershipAsync(topicName, authoritative) - .thenCompose(unused -> validateTopicOperationAsync(topicName, TopicOperation.UNLOAD) .thenCompose(__ -> getTopicReferenceAsync(topicName)) .thenCompose(topic -> topic.close(false)) .thenRun(() -> { log.info("[{}] Successfully unloaded topic {}", clientAppId(), topicName); asyncResponse.resume(Response.noContent().build()); - })) + }) .exceptionally(ex -> { // If the exception is not redirect exception we need to log it. if (!isNot307And404Exception(ex)) { @@ -1068,16 +1069,14 @@ private void internalUnloadNonPartitionedTopicAsync(AsyncResponse asyncResponse, private void internalUnloadTransactionCoordinatorAsync(AsyncResponse asyncResponse, boolean authoritative) { validateTopicOwnershipAsync(topicName, authoritative) - .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.UNLOAD) - .thenCompose(v -> pulsar() - .getTransactionMetadataStoreService() - .removeTransactionMetadataStore( - TransactionCoordinatorID.get(topicName.getPartitionIndex()))) - .thenRun(() -> { - log.info("[{}] Successfully unloaded tc {}", clientAppId(), - topicName.getPartitionIndex()); - asyncResponse.resume(Response.noContent().build()); - })) + .thenCompose(v -> pulsar() + .getTransactionMetadataStoreService() + .removeTransactionMetadataStore( + TransactionCoordinatorID.get(topicName.getPartitionIndex()))) + .thenRun(() -> { + log.info("[{}] Successfully unloaded tc {}", clientAppId(), topicName.getPartitionIndex()); + asyncResponse.resume(Response.noContent().build()); + }) .exceptionally(ex -> { // If the exception is not redirect exception we need to log it. if (!isNot307And404Exception(ex)) { @@ -1311,13 +1310,13 @@ protected PersistentTopicInternalStats internalGetInternalStats(boolean authorit } protected void internalGetManagedLedgerInfo(AsyncResponse asyncResponse, boolean authoritative) { - CompletableFuture future; - if (topicName.isGlobal()) { - future = validateGlobalNamespaceOwnershipAsync(namespaceName); - } else { - future = CompletableFuture.completedFuture(null); - } - future.thenAccept(__ -> { + CompletableFuture future = validateTopicOperationAsync(topicName, TopicOperation.GET_STATS); + future.thenCompose(__ -> { + if (topicName.isGlobal()) { + return validateGlobalNamespaceOwnershipAsync(namespaceName); + } + return CompletableFuture.completedFuture(null); + }).thenAccept(__ -> { // If the topic name is a partition name, no need to get partition topic metadata again if (topicName.isPartitioned()) { internalGetManagedLedgerInfoForNonPartitionedTopic(asyncResponse); @@ -1423,13 +1422,13 @@ public void getInfoFailed(ManagedLedgerException exception, Object ctx) { protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean authoritative, boolean perPartition, boolean getPreciseBacklog, boolean subscriptionBacklogSize, boolean getEarliestTimeInBacklog) { - CompletableFuture future; - if (topicName.isGlobal()) { - future = validateGlobalNamespaceOwnershipAsync(namespaceName); - } else { - future = CompletableFuture.completedFuture(null); - } - future.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, + CompletableFuture future = validateTopicOperationAsync(topicName, TopicOperation.GET_STATS); + future.thenCompose(__ -> { + if (topicName.isGlobal()) { + return validateGlobalNamespaceOwnershipAsync(namespaceName); + } + return CompletableFuture.completedFuture(null); + }).thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false)).thenAccept(partitionMetadata -> { if (partitionMetadata.partitions == 0) { asyncResponse.resume(new RestException(Status.NOT_FOUND, "Partitioned Topic not found")); @@ -1497,14 +1496,15 @@ protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean } protected void internalGetPartitionedStatsInternal(AsyncResponse asyncResponse, boolean authoritative) { - CompletableFuture future; - if (topicName.isGlobal()) { - future = validateGlobalNamespaceOwnershipAsync(namespaceName); - } else { - future = CompletableFuture.completedFuture(null); - } - future.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false)) - .thenAccept(partitionMetadata -> { + CompletableFuture future = validateTopicOperationAsync(topicName, TopicOperation.GET_STATS); + future.thenCompose(__ -> { + if (topicName.isGlobal()) { + return validateGlobalNamespaceOwnershipAsync(namespaceName); + } else { + return CompletableFuture.completedFuture(null); + } + }).thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false)) + .thenAccept(partitionMetadata -> { if (partitionMetadata.partitions == 0) { asyncResponse.resume(new RestException(Status.NOT_FOUND, "Partitioned Topic not found")); return; @@ -2201,13 +2201,14 @@ private CompletableFuture internalResetCursorForNonPartitionedTopic(String protected void internalCreateSubscription(AsyncResponse asyncResponse, String subscriptionName, MessageIdImpl messageId, boolean authoritative, boolean replicated, Map properties) { - CompletableFuture ret; - if (topicName.isGlobal()) { - ret = validateGlobalNamespaceOwnershipAsync(namespaceName); - } else { - ret = CompletableFuture.completedFuture(null); - } - ret.thenAccept(__ -> { + CompletableFuture ret = validateTopicOperationAsync(topicName, TopicOperation.SUBSCRIBE, + subscriptionName); + ret.thenCompose(__ -> { + if (topicName.isGlobal()) { + return validateGlobalNamespaceOwnershipAsync(namespaceName); + } + return CompletableFuture.completedFuture(null); + }).thenAccept(__ -> { final MessageIdImpl targetMessageId = messageId == null ? (MessageIdImpl) MessageId.latest : messageId; log.info("[{}][{}] Creating subscription {} at message id {} with properties {}", clientAppId(), topicName, subscriptionName, targetMessageId, properties); @@ -2366,14 +2367,13 @@ private void internalCreateSubscriptionForNonPartitionedTopic( protected void internalUpdateSubscriptionProperties(AsyncResponse asyncResponse, String subName, Map subscriptionProperties, boolean authoritative) { - CompletableFuture future; - if (topicName.isGlobal()) { - future = validateGlobalNamespaceOwnershipAsync(namespaceName); - } else { - future = CompletableFuture.completedFuture(null); - } - - future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)).thenAccept(__ -> { + CompletableFuture future = validateTopicOperationAsync(topicName, TopicOperation.SUBSCRIBE, subName); + future.thenCompose(__ -> { + if (topicName.isGlobal()) { + return validateGlobalNamespaceOwnershipAsync(namespaceName); + } + return CompletableFuture.completedFuture(null); + }).thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)).thenAccept(__ -> { if (topicName.isPartitioned()) { internalUpdateSubscriptionPropertiesForNonPartitionedTopic(asyncResponse, subName, subscriptionProperties, authoritative); @@ -3898,13 +3898,14 @@ private CompletableFuture internalExpireMessagesNonPartitionedTopicByPosit protected void internalTriggerCompaction(AsyncResponse asyncResponse, boolean authoritative) { log.info("[{}] Trigger compaction on topic {}", clientAppId(), topicName); - CompletableFuture future; - if (topicName.isGlobal()) { - future = validateGlobalNamespaceOwnershipAsync(namespaceName); - } else { - future = CompletableFuture.completedFuture(null); - } - future.thenAccept(__ -> { + CompletableFuture future = validateTopicOperationAsync(topicName, TopicOperation.COMPACT); + future.thenCompose(__ -> { + if (topicName.isGlobal()) { + return validateGlobalNamespaceOwnershipAsync(namespaceName); + } else { + return CompletableFuture.completedFuture(null); + } + }).thenAccept(__ -> { // If the topic name is a partition name, no need to get partition topic metadata again if (topicName.isPartitioned()) { internalTriggerCompactionNonPartitionedTopic(asyncResponse, authoritative); @@ -5135,12 +5136,12 @@ private void internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(Async } protected CompletableFuture internalGetSchemaCompatibilityStrategy(boolean applied) { + CompletableFuture future = validateTopicPolicyOperationAsync(topicName, + PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ); if (applied) { - return getSchemaCompatibilityStrategyAsync(); + return future.thenCompose(__ -> getSchemaCompatibilityStrategyAsync()); } - return validateTopicPolicyOperationAsync(topicName, - PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, - PolicyOperation.READ) + return future .thenCompose(n -> getTopicPoliciesAsyncWithRetry(topicName).thenApply(op -> { if (!op.isPresent()) { return null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 4dd4c1310cc62..3ceb8083e36d2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -372,7 +372,6 @@ public void setOffloadPolicies(@Suspended final AsyncResponse asyncResponse, @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal, @ApiParam(value = "Offload policies for the specified topic") OffloadPoliciesImpl offloadPolicies) { validateTopicName(tenant, namespace, encodedTopic); - validateTopicPolicyOperation(topicName, PolicyName.OFFLOAD, PolicyOperation.WRITE); validateTopicPolicyOperationAsync(topicName, PolicyName.OFFLOAD, PolicyOperation.WRITE) .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetOffloadPolicies(offloadPolicies, isGlobal)) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BaseAuthZTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BaseAuthZTest.java new file mode 100644 index 0000000000000..58d8da8d16056 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BaseAuthZTest.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin; + +import io.jsonwebtoken.Jwts; +import io.jsonwebtoken.SignatureAlgorithm; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; +import javax.crypto.SecretKey; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; +import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; +import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminBuilder; +import org.apache.pulsar.client.impl.auth.AuthenticationToken; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; + +public abstract class BaseAuthZTest extends MockedPulsarServiceBaseTest { + protected static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256); + private static final String TENANT_ADMIN_SUBJECT = UUID.randomUUID().toString(); + private static final String TENANT_ADMIN_TOKEN = Jwts.builder() + .claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact(); + private static final String BROKER_INTERNAL_CLIENT_SUBJECT = "broker_internal"; + private static final String BROKER_INTERNAL_CLIENT_TOKEN = Jwts.builder() + .claim("sub", BROKER_INTERNAL_CLIENT_SUBJECT).signWith(SECRET_KEY).compact(); + private static final String SUPER_USER_SUBJECT = "super-user"; + private static final String SUPER_USER_TOKEN = Jwts.builder() + .claim("sub", SUPER_USER_SUBJECT).signWith(SECRET_KEY).compact(); + private static final String NOBODY_SUBJECT = "nobody"; + private static final String NOBODY_TOKEN = Jwts.builder() + .claim("sub", NOBODY_SUBJECT).signWith(SECRET_KEY).compact(); + protected PulsarAdmin superUserAdmin; + protected PulsarAdmin tenantManagerAdmin; + + @BeforeClass + @Override + protected void setup() throws Exception { + conf.setSystemTopicEnabled(true); + conf.setTopicLevelPoliciesEnabled(true); + conf.setAuthorizationEnabled(true); + conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName()); + conf.setSuperUserRoles(new HashSet<>(Arrays.asList(SUPER_USER_SUBJECT, BROKER_INTERNAL_CLIENT_SUBJECT))); + conf.setAuthenticationEnabled(true); + conf.setAuthenticationProviders(new HashSet<>(Arrays.asList(AuthenticationProviderToken.class.getName()))); + // internal client + conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName()); + final Map brokerClientAuthParams = new HashMap<>(); + brokerClientAuthParams.put("token", BROKER_INTERNAL_CLIENT_TOKEN); + final String brokerClientAuthParamStr = ObjectMapperFactory.getThreadLocal() + .writeValueAsString(brokerClientAuthParams); + conf.setBrokerClientAuthenticationParameters(brokerClientAuthParamStr); + + Properties properties = conf.getProperties(); + if (properties == null) { + properties = new Properties(); + conf.setProperties(properties); + } + properties.put("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY)); + + internalSetup(); + setupDefaultTenantAndNamespace(); + + this.superUserAdmin = PulsarAdmin.builder() + .serviceHttpUrl(pulsar.getWebServiceAddress()) + .authentication(new AuthenticationToken(SUPER_USER_TOKEN)) + .build(); + final TenantInfo tenantInfo = superUserAdmin.tenants().getTenantInfo("public"); + tenantInfo.getAdminRoles().add(TENANT_ADMIN_SUBJECT); + superUserAdmin.tenants().updateTenant("public", tenantInfo); + this.tenantManagerAdmin = PulsarAdmin.builder() + .serviceHttpUrl(pulsar.getWebServiceAddress()) + .authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN)) + .build(); + } + + @Override + protected void customizeNewPulsarAdminBuilder(PulsarAdminBuilder pulsarAdminBuilder) { + pulsarAdminBuilder.authentication(new AuthenticationToken(SUPER_USER_TOKEN)); + } + + @AfterClass + @Override + protected void cleanup() throws Exception { + internalCleanup(); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java new file mode 100644 index 0000000000000..d64d0aa2aa183 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.broker.admin; + +import io.jsonwebtoken.Jwts; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import lombok.Cleanup; +import lombok.SneakyThrows; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.impl.auth.AuthenticationToken; +import org.apache.pulsar.common.policies.data.AuthAction; +import org.testng.Assert; +import org.testng.annotations.Test; + +@Test(groups = "broker-admin") +public class NamespaceAuthZTest extends BaseAuthZTest { + @SneakyThrows + @Test + public void testProperties() { + final String random = UUID.randomUUID().toString(); + final String namespace = "public/default"; + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + superUserAdmin.topics().createNonPartitionedTopic(topic); + + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(pulsar.getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + // test superuser + Map properties = new HashMap<>(); + properties.put("key1", "value1"); + superUserAdmin.namespaces().setProperties(namespace, properties); + superUserAdmin.namespaces().setProperty(namespace, "key2", "value2"); + superUserAdmin.namespaces().getProperties(namespace); + superUserAdmin.namespaces().getProperty(namespace, "key2"); + superUserAdmin.namespaces().removeProperty(namespace, "key2"); + superUserAdmin.namespaces().clearProperties(namespace); + + // test tenant manager + tenantManagerAdmin.namespaces().setProperties(namespace, properties); + tenantManagerAdmin.namespaces().setProperty(namespace, "key2", "value2"); + tenantManagerAdmin.namespaces().getProperties(namespace); + tenantManagerAdmin.namespaces().getProperty(namespace, "key2"); + tenantManagerAdmin.namespaces().removeProperty(namespace, "key2"); + tenantManagerAdmin.namespaces().clearProperties(namespace); + + // test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setProperties(namespace, properties)); + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setProperty(namespace, "key2", "value2")); + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getProperties(namespace)); + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getProperty(namespace, "key2")); + + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().removeProperty(namespace, "key2")); + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().clearProperties(namespace)); + + for (AuthAction action : AuthAction.values()) { + superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, subject, Collections.singleton(action)); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setProperties(namespace, properties)); + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setProperty(namespace, "key2", "value2")); + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getProperties(namespace)); + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getProperty(namespace, "key2")); + + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().removeProperty(namespace, "key2")); + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().clearProperties(namespace)); + + superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject); + } + superUserAdmin.topics().delete(topic, true); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java new file mode 100644 index 0000000000000..cd0de3f4f6369 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java @@ -0,0 +1,272 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.broker.admin; + +import io.jsonwebtoken.Jwts; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.Cleanup; +import lombok.SneakyThrows; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.impl.auth.AuthenticationToken; +import org.apache.pulsar.common.policies.data.AuthAction; +import org.testng.Assert; +import org.testng.annotations.Test; + +@Test(groups = "broker-admin") +public class TopicAuthZTest extends BaseAuthZTest { + + @SneakyThrows + @Test + public void testUnloadAndCompact() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + superUserAdmin.topics().createPartitionedTopic(topic, 2); + + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(pulsar.getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + // test superuser + superUserAdmin.topics().unload(topic); + superUserAdmin.topics().triggerCompaction(topic); + superUserAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, false); + + // test tenant manager + tenantManagerAdmin.topics().unload(topic); + tenantManagerAdmin.topics().triggerCompaction(topic); + tenantManagerAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, false); + + // test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().unload(topic)); + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().triggerCompaction(topic)); + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, false)); + + // Test only super/admin can do the operation, other auth are not permitted. + for (AuthAction action : AuthAction.values()) { + superUserAdmin.topics().grantPermission(topic, subject, Collections.singleton(action)); + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().unload(topic)); + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().triggerCompaction(topic)); + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, false)); + + superUserAdmin.topics().revokePermissions(topic, subject); + } + superUserAdmin.topics().deletePartitionedTopic(topic, true); + } + + @Test + @SneakyThrows + public void testGetManagedLedgerInfo() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + superUserAdmin.topics().createPartitionedTopic(topic, 2); + + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(pulsar.getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + // test superuser + superUserAdmin.topics().getInternalInfo(topic); + + // test tenant manager + tenantManagerAdmin.topics().getInternalInfo(topic); + + // test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().getInternalInfo(topic)); + + for (AuthAction action : AuthAction.values()) { + superUserAdmin.topics().grantPermission(topic, subject, Collections.singleton(action)); + if (action == AuthAction.produce || action == AuthAction.consume) { + subAdmin.topics().getInternalInfo(topic); + } else { + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().getInternalInfo(topic)); + } + superUserAdmin.topics().revokePermissions(topic, subject); + } + superUserAdmin.topics().deletePartitionedTopic(topic, true); + } + + @Test + @SneakyThrows + public void testGetPartitionedStatsAndInternalStats() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + superUserAdmin.topics().createPartitionedTopic(topic, 2); + + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(pulsar.getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + // test superuser + superUserAdmin.topics().getPartitionedStats(topic, false); + superUserAdmin.topics().getPartitionedInternalStats(topic); + + // test tenant manager + tenantManagerAdmin.topics().getPartitionedStats(topic, false); + tenantManagerAdmin.topics().getPartitionedInternalStats(topic); + + // test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().getPartitionedStats(topic, false)); + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().getPartitionedInternalStats(topic)); + + for (AuthAction action : AuthAction.values()) { + superUserAdmin.topics().grantPermission(topic, subject, Collections.singleton(action)); + if (action == AuthAction.produce || action == AuthAction.consume) { + subAdmin.topics().getPartitionedStats(topic, false); + subAdmin.topics().getPartitionedInternalStats(topic); + } else { + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().getPartitionedStats(topic, false)); + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().getPartitionedInternalStats(topic)); + } + superUserAdmin.topics().revokePermissions(topic, subject); + } + superUserAdmin.topics().deletePartitionedTopic(topic, true); + } + + @Test + @SneakyThrows + public void testCreateSubscriptionAndUpdateSubscriptionPropertiesAndAnalyzeSubscriptionBacklog() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + superUserAdmin.topics().createPartitionedTopic(topic, 2); + AtomicInteger suffix = new AtomicInteger(1); + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(pulsar.getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + // + superUserAdmin.topics().createSubscription(topic, "test-sub" + suffix.incrementAndGet(), MessageId.earliest); + + // test tenant manager + tenantManagerAdmin.topics().createSubscription(topic, "test-sub" + suffix.incrementAndGet(), MessageId.earliest); + + // test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().createSubscription(topic, "test-sub" + suffix.incrementAndGet(), MessageId.earliest)); + + for (AuthAction action : AuthAction.values()) { + superUserAdmin.topics().grantPermission(topic, subject, Collections.singleton(action)); + if (action == AuthAction.consume) { + subAdmin.topics().createSubscription(topic, "test-sub" + suffix.incrementAndGet(), MessageId.earliest); + } else { + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().createSubscription(topic, "test-sub" + suffix.incrementAndGet(), MessageId.earliest)); + } + superUserAdmin.topics().revokePermissions(topic, subject); + } + // test UpdateSubscriptionProperties + Map properties = new HashMap<>(); + superUserAdmin.topics().createSubscription(topic, "test-sub", MessageId.earliest); + // test superuser + superUserAdmin.topics().updateSubscriptionProperties(topic, "test-sub" , properties); + + // test tenant manager + tenantManagerAdmin.topics().updateSubscriptionProperties(topic, "test-sub" , properties); + + // test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().updateSubscriptionProperties(topic, "test-sub", properties)); + + for (AuthAction action : AuthAction.values()) { + superUserAdmin.topics().grantPermission(topic, subject, Collections.singleton(action)); + if (action == AuthAction.consume) { + subAdmin.topics().updateSubscriptionProperties(topic, "test-sub", properties); + } else { + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().updateSubscriptionProperties(topic, "test-sub", properties)); + } + superUserAdmin.topics().revokePermissions(topic, subject); + } + superUserAdmin.topics().deletePartitionedTopic(topic, true); + } + + @Test + @SneakyThrows + public void testCreateMissingPartition() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + superUserAdmin.topics().createPartitionedTopic(topic, 2); + AtomicInteger suffix = new AtomicInteger(1); + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(pulsar.getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + // + superUserAdmin.topics().createMissedPartitions(topic); + + // test tenant manager + tenantManagerAdmin.topics().createMissedPartitions(topic); + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().createMissedPartitions(topic)); + + for (AuthAction action : AuthAction.values()) { + superUserAdmin.topics().grantPermission(topic, subject, Collections.singleton(action)); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().createMissedPartitions(topic)); + superUserAdmin.topics().revokePermissions(topic, subject); + } + superUserAdmin.topics().deletePartitionedTopic(topic, true); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java index b76c6e3f3f2ec..152c84b854be5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java @@ -19,107 +19,23 @@ package org.apache.pulsar.broker.admin; import static org.awaitility.Awaitility.await; -import com.google.common.collect.Sets; import io.jsonwebtoken.Jwts; -import io.jsonwebtoken.SignatureAlgorithm; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; +import java.util.Collections; import java.util.UUID; -import javax.crypto.SecretKey; import lombok.Cleanup; import lombok.SneakyThrows; -import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; -import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; -import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; -import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider; import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.impl.auth.AuthenticationToken; import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.OffloadPolicies; +import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; import org.apache.pulsar.common.policies.data.RetentionPolicies; -import org.apache.pulsar.common.policies.data.TenantInfo; -import org.apache.pulsar.common.util.ObjectMapperFactory; import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -public final class TopicPoliciesAuthZTest extends MockedPulsarServiceBaseTest { - - private PulsarAdmin superUserAdmin; - - private PulsarAdmin tenantManagerAdmin; - - private static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256); - private static final String TENANT_ADMIN_SUBJECT = UUID.randomUUID().toString(); - private static final String TENANT_ADMIN_TOKEN = Jwts.builder() - .claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact(); - - - private static final String BROKER_INTERNAL_CLIENT_SUBJECT = "broker_internal"; - private static final String BROKER_INTERNAL_CLIENT_TOKEN = Jwts.builder() - .claim("sub", BROKER_INTERNAL_CLIENT_SUBJECT).signWith(SECRET_KEY).compact(); - private static final String SUPER_USER_SUBJECT = "super-user"; - private static final String SUPER_USER_TOKEN = Jwts.builder() - .claim("sub", SUPER_USER_SUBJECT).signWith(SECRET_KEY).compact(); - private static final String NOBODY_SUBJECT = "nobody"; - private static final String NOBODY_TOKEN = Jwts.builder() - .claim("sub", NOBODY_SUBJECT).signWith(SECRET_KEY).compact(); - - - @BeforeClass - @Override - protected void setup() throws Exception { - conf.setAuthorizationEnabled(true); - conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName()); - conf.setSuperUserRoles(Sets.newHashSet(SUPER_USER_SUBJECT, BROKER_INTERNAL_CLIENT_SUBJECT)); - conf.setAuthenticationEnabled(true); - conf.setSystemTopicEnabled(true); - conf.setTopicLevelPoliciesEnabled(true); - conf.setAuthenticationProviders(Sets.newHashSet(AuthenticationProviderToken.class.getName())); - // internal client - conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName()); - final Map brokerClientAuthParams = new HashMap<>(); - brokerClientAuthParams.put("token", BROKER_INTERNAL_CLIENT_TOKEN); - final String brokerClientAuthParamStr = ObjectMapperFactory.getThreadLocal() - .writeValueAsString(brokerClientAuthParams); - conf.setBrokerClientAuthenticationParameters(brokerClientAuthParamStr); - - Properties properties = conf.getProperties(); - if (properties == null) { - properties = new Properties(); - conf.setProperties(properties); - } - properties.put("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY)); - - internalSetup(); - setupDefaultTenantAndNamespace(); - - this.superUserAdmin =PulsarAdmin.builder() - .serviceHttpUrl(pulsar.getWebServiceAddress()) - .authentication(new AuthenticationToken(SUPER_USER_TOKEN)) - .build(); - final TenantInfo tenantInfo = superUserAdmin.tenants().getTenantInfo("public"); - tenantInfo.getAdminRoles().add(TENANT_ADMIN_SUBJECT); - superUserAdmin.tenants().updateTenant("public", tenantInfo); - this.tenantManagerAdmin = PulsarAdmin.builder() - .serviceHttpUrl(pulsar.getWebServiceAddress()) - .authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN)) - .build(); - } - - @Override - protected void customizeNewPulsarAdminBuilder(PulsarAdminBuilder pulsarAdminBuilder) { - pulsarAdminBuilder.authentication(new AuthenticationToken(SUPER_USER_TOKEN)); - } - @AfterClass - @Override - protected void cleanup() throws Exception { - internalCleanup(); - } +public final class TopicPoliciesAuthZTest extends BaseAuthZTest { @SneakyThrows @@ -193,7 +109,7 @@ public void testRetention() { // test sub user with permissions for (AuthAction action : AuthAction.values()) { superUserAdmin.namespaces().grantPermissionOnNamespace("public/default", - subject, Sets.newHashSet(action)); + subject, Collections.singleton(action)); try { subAdmin.topicPolicies().getRetention(topic); Assert.fail("unexpected behaviour"); @@ -218,6 +134,109 @@ public void testRetention() { superUserAdmin.namespaces().revokePermissionsOnNamespace("public/default", subject); } } + @SneakyThrows + @Test + public void testOffloadPolicy() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + superUserAdmin.topics().createNonPartitionedTopic(topic); + + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsar().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + + // mocked data + final OffloadPoliciesImpl definedOffloadPolicies = new OffloadPoliciesImpl(); + definedOffloadPolicies.setManagedLedgerOffloadThresholdInBytes(100L); + definedOffloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(200L); + definedOffloadPolicies.setManagedLedgerOffloadDriver(""); // set to blank value to test the behaviour + definedOffloadPolicies.setManagedLedgerOffloadBucket("buck"); + + // test superuser + superUserAdmin.topicPolicies().setOffloadPolicies(topic, definedOffloadPolicies); + + // because the topic policies is eventual consistency, we should wait here + await().untilAsserted(() -> { + final OffloadPolicies offloadPolicy = superUserAdmin.topicPolicies().getOffloadPolicies(topic); + Assert.assertEquals(offloadPolicy, definedOffloadPolicies); + }); + superUserAdmin.topicPolicies().removeOffloadPolicies(topic); + + await().untilAsserted(() -> { + final OffloadPolicies offloadPolicy = superUserAdmin.topicPolicies().getOffloadPolicies(topic); + Assert.assertNull(offloadPolicy); + }); + + // test tenant manager + + tenantManagerAdmin.topicPolicies().setOffloadPolicies(topic, definedOffloadPolicies); + await().untilAsserted(() -> { + final OffloadPolicies offloadPolicy = tenantManagerAdmin.topicPolicies().getOffloadPolicies(topic); + Assert.assertEquals(offloadPolicy, definedOffloadPolicies); + }); + tenantManagerAdmin.topicPolicies().removeOffloadPolicies(topic); + await().untilAsserted(() -> { + final OffloadPolicies offloadPolicy = tenantManagerAdmin.topicPolicies().getOffloadPolicies(topic); + Assert.assertNull(offloadPolicy); + }); + + // test nobody + + try { + subAdmin.topicPolicies().getOffloadPolicies(topic); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + + try { + + subAdmin.topicPolicies().setOffloadPolicies(topic, definedOffloadPolicies); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + + try { + subAdmin.topicPolicies().removeOffloadPolicies(topic); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + + // test sub user with permissions + for (AuthAction action : AuthAction.values()) { + superUserAdmin.namespaces().grantPermissionOnNamespace("public/default", + subject, Collections.singleton(action)); + try { + subAdmin.topicPolicies().getOffloadPolicies(topic); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + + try { + + subAdmin.topicPolicies().setOffloadPolicies(topic, definedOffloadPolicies); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + + try { + subAdmin.topicPolicies().removeOffloadPolicies(topic); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + superUserAdmin.namespaces().revokePermissionsOnNamespace("public/default", subject); + } + } @SneakyThrows @Test @@ -230,7 +249,7 @@ public void testMaxUnackedMessagesOnConsumer() { superUserAdmin.topics().createNonPartitionedTopic(topic); @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() - .serviceHttpUrl(pulsar.getWebServiceAddress()) + .serviceHttpUrl(getPulsar().getWebServiceAddress()) .authentication(new AuthenticationToken(token)) .build(); @@ -293,7 +312,7 @@ public void testMaxUnackedMessagesOnConsumer() { // test sub user with permissions for (AuthAction action : AuthAction.values()) { superUserAdmin.namespaces().grantPermissionOnNamespace("public/default", - subject, Sets.newHashSet(action)); + subject, Collections.singleton(action)); try { subAdmin.topicPolicies().getMaxUnackedMessagesOnConsumer(topic); Assert.fail("unexpected behaviour"); @@ -330,7 +349,7 @@ public void testMaxUnackedMessagesOnSubscription() { superUserAdmin.topics().createNonPartitionedTopic(topic); @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() - .serviceHttpUrl(pulsar.getWebServiceAddress()) + .serviceHttpUrl(getPulsar().getWebServiceAddress()) .authentication(new AuthenticationToken(token)) .build(); @@ -395,7 +414,7 @@ public void testMaxUnackedMessagesOnSubscription() { // test sub user with permissions for (AuthAction action : AuthAction.values()) { superUserAdmin.namespaces().grantPermissionOnNamespace("public/default", - subject, Sets.newHashSet(action)); + subject, Collections.singleton(action)); try { subAdmin.topicPolicies().getMaxUnackedMessagesOnSubscription(topic); Assert.fail("unexpected behaviour");