Skip to content

Commit

Permalink
[improve][broker] Add fine-grain authorization to ns/topic management…
Browse files Browse the repository at this point in the history
… endpoints (apache#22305)

(cherry picked from commit fd34d4a)

# Conflicts:
#	pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
(cherry picked from commit 99eb49a)

# Conflicts:
#	pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
#	pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
(cherry picked from commit 5c4f4cb)

# Conflicts:
#	pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
#	pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
#	pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
  • Loading branch information
Technoboy- authored and lhotari committed Mar 20, 2024
1 parent 838c0c3 commit c1f4d0a
Show file tree
Hide file tree
Showing 8 changed files with 768 additions and 247 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
Expand Down Expand Up @@ -704,10 +702,7 @@ protected static void resumeAsyncResponseExceptionally(AsyncResponse asyncRespon
}

protected CompletableFuture<SchemaCompatibilityStrategy> getSchemaCompatibilityStrategyAsync() {
return validateTopicPolicyOperationAsync(topicName,
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
PolicyOperation.READ)
.thenCompose((__) -> getSchemaCompatibilityStrategyAsyncWithoutAuth()).whenComplete((__, ex) -> {
return getSchemaCompatibilityStrategyAsyncWithoutAuth().whenComplete((__, ex) -> {
if (ex != null) {
log.error("[{}] Failed to get schema compatibility strategy of topic {} {}",
clientAppId(), topicName, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2758,102 +2758,110 @@ protected void internalSetMaxTopicsPerNamespace(Integer maxTopicsPerNamespace) {
}

protected void internalSetProperty(String key, String value, AsyncResponse asyncResponse) {
validatePoliciesReadOnlyAccess();
updatePoliciesAsync(namespaceName, policies -> {
policies.properties.put(key, value);
return policies;
}).thenAccept(v -> {
log.info("[{}] Successfully set property for key {} on namespace {}", clientAppId(), key,
namespaceName);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to set property for key {} on namespace {}", clientAppId(), key,
namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
policies.properties.put(key, value);
return policies;
}))
.thenAccept(v -> {
log.info("[{}] Successfully set property for key {} on namespace {}", clientAppId(), key,
namespaceName);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to set property for key {} on namespace {}", clientAppId(), key,
namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
}

protected void internalSetProperties(Map<String, String> properties, AsyncResponse asyncResponse) {
validatePoliciesReadOnlyAccess();
updatePoliciesAsync(namespaceName, policies -> {
policies.properties.putAll(properties);
return policies;
}).thenAccept(v -> {
log.info("[{}] Successfully set {} properties on namespace {}", clientAppId(), properties.size(),
namespaceName);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to set {} properties on namespace {}", clientAppId(), properties.size(),
namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
policies.properties.putAll(properties);
return policies;
}))
.thenAccept(v -> {
log.info("[{}] Successfully set {} properties on namespace {}", clientAppId(), properties.size(),
namespaceName);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to set {} properties on namespace {}", clientAppId(), properties.size(),
namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
}

protected void internalGetProperty(String key, AsyncResponse asyncResponse) {
getNamespacePoliciesAsync(namespaceName).thenAccept(policies -> {
asyncResponse.resume(policies.properties.get(key));
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to get property for key {} of namespace {}", clientAppId(), key,
namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenAccept(policies -> asyncResponse.resume(policies.properties.get(key)))
.exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to get property for key {} of namespace {}", clientAppId(), key,
namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
}

protected void internalGetProperties(AsyncResponse asyncResponse) {
getNamespacePoliciesAsync(namespaceName).thenAccept(policies -> {
asyncResponse.resume(policies.properties);
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to get properties of namespace {}", clientAppId(), namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenAccept(policies -> asyncResponse.resume(policies.properties))
.exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to get properties of namespace {}", clientAppId(), namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
}

protected void internalRemoveProperty(String key, AsyncResponse asyncResponse) {
validatePoliciesReadOnlyAccess();

AtomicReference<String> oldVal = new AtomicReference<>(null);
updatePoliciesAsync(namespaceName, policies -> {
oldVal.set(policies.properties.remove(key));
return policies;
}).thenAccept(v -> {
asyncResponse.resume(oldVal.get());
log.info("[{}] Successfully remove property for key {} on namespace {}", clientAppId(), key,
namespaceName);
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to remove property for key {} on namespace {}", clientAppId(), key,
namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
oldVal.set(policies.properties.remove(key));
return policies;
})).thenAccept(v -> {
asyncResponse.resume(oldVal.get());
log.info("[{}] Successfully remove property for key {} on namespace {}", clientAppId(), key,
namespaceName);
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to remove property for key {} on namespace {}", clientAppId(), key,
namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
}

protected void internalClearProperties(AsyncResponse asyncResponse) {
validatePoliciesReadOnlyAccess();
AtomicReference<Integer> clearedCount = new AtomicReference<>(0);
updatePoliciesAsync(namespaceName, policies -> {
clearedCount.set(policies.properties.size());
policies.properties.clear();
return policies;
}).thenAccept(v -> {
asyncResponse.resume(Response.noContent().build());
log.info("[{}] Successfully clear {} properties for on namespace {}", clientAppId(), clearedCount.get(),
namespaceName);
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to remove property for key {} on namespace {}", clientAppId(), clearedCount.get(),
namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
clearedCount.set(policies.properties.size());
policies.properties.clear();
return policies;
}))
.thenAccept(v -> {
asyncResponse.resume(Response.noContent().build());
log.info("[{}] Successfully clear {} properties on namespace {}", clientAppId(), clearedCount.get(),
namespaceName);
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to clear {} properties on namespace {}", clientAppId(), clearedCount.get(),
namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
}

private CompletableFuture<Void> updatePoliciesAsync(NamespaceName ns, Function<Policies, Policies> updateFunction) {
Expand Down
Loading

0 comments on commit c1f4d0a

Please sign in to comment.