From 796efdb44b380303c85030a8f1d404d6f57a7bb1 Mon Sep 17 00:00:00 2001 From: iliax Date: Sat, 17 Feb 2024 12:02:56 +0400 Subject: [PATCH 1/7] Quota management --- .../java/io/kafbat/ui/model/ClusterFeature.java | 3 ++- .../io/kafbat/ui/model/rbac/AccessContext.java | 4 +++- .../java/io/kafbat/ui/model/rbac/Permission.java | 2 ++ .../java/io/kafbat/ui/model/rbac/Resource.java | 3 ++- .../model/rbac/permission/PermissibleAction.java | 2 +- .../io/kafbat/ui/service/FeatureService.java | 1 + .../kafbat/ui/service/ReactiveAdminClient.java | 6 +++++- .../io/kafbat/ui/service/audit/AuditRecord.java | 2 ++ .../ui/service/rbac/AccessControlService.java | 3 ++- .../kafbat/ui/service/audit/AuditWriterTest.java | 8 ++++++-- .../main/resources/swagger/kafbat-ui-api.yaml | 16 ++++++++++++++++ 11 files changed, 42 insertions(+), 8 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/model/ClusterFeature.java b/api/src/main/java/io/kafbat/ui/model/ClusterFeature.java index 47e717ade..6a88534e0 100644 --- a/api/src/main/java/io/kafbat/ui/model/ClusterFeature.java +++ b/api/src/main/java/io/kafbat/ui/model/ClusterFeature.java @@ -6,5 +6,6 @@ public enum ClusterFeature { SCHEMA_REGISTRY, TOPIC_DELETION, KAFKA_ACL_VIEW, - KAFKA_ACL_EDIT + KAFKA_ACL_EDIT, + CLIENT_QUOTA_MANAGEMENT } diff --git a/api/src/main/java/io/kafbat/ui/model/rbac/AccessContext.java b/api/src/main/java/io/kafbat/ui/model/rbac/AccessContext.java index 02d3179c3..3e0d8bfbf 100644 --- a/api/src/main/java/io/kafbat/ui/model/rbac/AccessContext.java +++ b/api/src/main/java/io/kafbat/ui/model/rbac/AccessContext.java @@ -3,6 +3,7 @@ import io.kafbat.ui.model.rbac.permission.AclAction; import io.kafbat.ui.model.rbac.permission.ApplicationConfigAction; import io.kafbat.ui.model.rbac.permission.AuditAction; +import io.kafbat.ui.model.rbac.permission.ClientQuotaAction; import io.kafbat.ui.model.rbac.permission.ClusterConfigAction; import io.kafbat.ui.model.rbac.permission.ConnectAction; import io.kafbat.ui.model.rbac.permission.ConsumerGroupAction; @@ -69,6 +70,7 @@ public static final class AccessContextBuilder { private Collection ksqlActions = Collections.emptySet(); private Collection aclActions = Collections.emptySet(); private Collection auditActions = Collections.emptySet(); + private Collection clientQuotaActions = Collections.emptySet(); private String operationName; private Object operationParams; @@ -184,7 +186,7 @@ public AccessContext build() { connect, connectActions, connector, schema, schemaActions, - ksqlActions, aclActions, auditActions, + ksqlActions, aclActions, auditActions, clientQuotaActions, operationName, operationParams); } } diff --git a/api/src/main/java/io/kafbat/ui/model/rbac/Permission.java b/api/src/main/java/io/kafbat/ui/model/rbac/Permission.java index be5e68b9e..9a9c2f189 100644 --- a/api/src/main/java/io/kafbat/ui/model/rbac/Permission.java +++ b/api/src/main/java/io/kafbat/ui/model/rbac/Permission.java @@ -3,6 +3,7 @@ import static io.kafbat.ui.model.rbac.Resource.ACL; import static io.kafbat.ui.model.rbac.Resource.APPLICATIONCONFIG; import static io.kafbat.ui.model.rbac.Resource.AUDIT; +import static io.kafbat.ui.model.rbac.Resource.CLIENT_QUOTAS; import static io.kafbat.ui.model.rbac.Resource.CLUSTERCONFIG; import static io.kafbat.ui.model.rbac.Resource.KSQL; @@ -88,6 +89,7 @@ private List getAllActionValues() { case KSQL -> Arrays.stream(KsqlAction.values()).map(Enum::toString).toList(); case ACL -> Arrays.stream(AclAction.values()).map(Enum::toString).toList(); case AUDIT -> Arrays.stream(AuditAction.values()).map(Enum::toString).toList(); + case CLIENT_QUOTAS -> Arrays.stream(ClientQuotaAction.values()).map(Enum::toString).toList(); }; } diff --git a/api/src/main/java/io/kafbat/ui/model/rbac/Resource.java b/api/src/main/java/io/kafbat/ui/model/rbac/Resource.java index 7fab0d343..06b27e488 100644 --- a/api/src/main/java/io/kafbat/ui/model/rbac/Resource.java +++ b/api/src/main/java/io/kafbat/ui/model/rbac/Resource.java @@ -13,7 +13,8 @@ public enum Resource { CONNECT, KSQL, ACL, - AUDIT; + AUDIT, + CLIENT_QUOTAS; @Nullable public static Resource fromString(String name) { diff --git a/api/src/main/java/io/kafbat/ui/model/rbac/permission/PermissibleAction.java b/api/src/main/java/io/kafbat/ui/model/rbac/permission/PermissibleAction.java index cb727fb2c..69393e113 100644 --- a/api/src/main/java/io/kafbat/ui/model/rbac/permission/PermissibleAction.java +++ b/api/src/main/java/io/kafbat/ui/model/rbac/permission/PermissibleAction.java @@ -4,7 +4,7 @@ public sealed interface PermissibleAction permits AclAction, ApplicationConfigAction, ConsumerGroupAction, SchemaAction, ConnectAction, ClusterConfigAction, - KsqlAction, TopicAction, AuditAction { + KsqlAction, TopicAction, AuditAction, ClientQuotaAction { String name(); diff --git a/api/src/main/java/io/kafbat/ui/service/FeatureService.java b/api/src/main/java/io/kafbat/ui/service/FeatureService.java index 8e62a1012..44776627b 100644 --- a/api/src/main/java/io/kafbat/ui/service/FeatureService.java +++ b/api/src/main/java/io/kafbat/ui/service/FeatureService.java @@ -41,6 +41,7 @@ public Mono> getAvailableFeatures(ReactiveAdminClient admin features.add(topicDeletionEnabled(adminClient)); features.add(aclView(adminClient)); features.add(aclEdit(adminClient, clusterDescription)); + features.add(quotaManagement(adminClient)); return Flux.fromIterable(features).flatMap(m -> m).collectList(); } diff --git a/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java b/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java index 4c345d633..efad8fcf4 100644 --- a/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java +++ b/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java @@ -79,6 +79,9 @@ import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.quota.ClientQuotaAlteration; +import org.apache.kafka.common.quota.ClientQuotaEntity; +import org.apache.kafka.common.quota.ClientQuotaFilter; import org.apache.kafka.common.requests.DescribeLogDirsResponse; import org.apache.kafka.common.resource.ResourcePatternFilter; import reactor.core.publisher.Flux; @@ -96,7 +99,8 @@ public enum SupportedFeature { INCREMENTAL_ALTER_CONFIGS(2.3f), CONFIG_DOCUMENTATION_RETRIEVAL(2.6f), DESCRIBE_CLUSTER_INCLUDE_AUTHORIZED_OPERATIONS(2.3f), - AUTHORIZED_SECURITY_ENABLED(ReactiveAdminClient::isAuthorizedSecurityEnabled); + AUTHORIZED_SECURITY_ENABLED(ReactiveAdminClient::isAuthorizedSecurityEnabled), + CLIENT_QUOTA_MANAGEMENT(2.6f); private final BiFunction> predicate; diff --git a/api/src/main/java/io/kafbat/ui/service/audit/AuditRecord.java b/api/src/main/java/io/kafbat/ui/service/audit/AuditRecord.java index 966884cd6..9b79a033a 100644 --- a/api/src/main/java/io/kafbat/ui/service/audit/AuditRecord.java +++ b/api/src/main/java/io/kafbat/ui/service/audit/AuditRecord.java @@ -65,6 +65,8 @@ static List getAccessedResources(AccessContext ctx) { .forEach(a -> resources.add(create(a, Resource.ACL, null))); ctx.getAuditAction() .forEach(a -> resources.add(create(a, Resource.AUDIT, null))); + ctx.getClientQuotaActions() + .forEach(a -> resources.add(create(a, Resource.CLIENT_QUOTAS, null))); return resources; } diff --git a/api/src/main/java/io/kafbat/ui/service/rbac/AccessControlService.java b/api/src/main/java/io/kafbat/ui/service/rbac/AccessControlService.java index 0359311c3..6ece4c475 100644 --- a/api/src/main/java/io/kafbat/ui/service/rbac/AccessControlService.java +++ b/api/src/main/java/io/kafbat/ui/service/rbac/AccessControlService.java @@ -122,7 +122,8 @@ && isConnectorAccessible(context, user) // TODO connector selectors && isSchemaAccessible(context, user) && isKsqlAccessible(context, user) && isAclAccessible(context, user) - && isAuditAccessible(context, user); + && isAuditAccessible(context, user) + && isClientQuotaAccessible(context, user); if (!accessGranted) { throw new AccessDeniedException(ACCESS_DENIED); diff --git a/api/src/test/java/io/kafbat/ui/service/audit/AuditWriterTest.java b/api/src/test/java/io/kafbat/ui/service/audit/AuditWriterTest.java index 3ca2ffb73..5c4f0c019 100644 --- a/api/src/test/java/io/kafbat/ui/service/audit/AuditWriterTest.java +++ b/api/src/test/java/io/kafbat/ui/service/audit/AuditWriterTest.java @@ -8,6 +8,7 @@ import io.kafbat.ui.model.rbac.AccessContext; import io.kafbat.ui.model.rbac.AccessContext.AccessContextBuilder; import io.kafbat.ui.model.rbac.permission.AclAction; +import io.kafbat.ui.model.rbac.permission.ClientQuotaAction; import io.kafbat.ui.model.rbac.permission.ClusterConfigAction; import io.kafbat.ui.model.rbac.permission.ConnectAction; import io.kafbat.ui.model.rbac.permission.ConsumerGroupAction; @@ -55,9 +56,11 @@ static Stream onlyLogsWhenAlterOperationIsPresentForOneOfResource SchemaAction.ALTER_ACTIONS.stream().map(a -> c -> c.schema("sc").schemaActions(a)); Stream> connEditActions = ConnectAction.ALTER_ACTIONS.stream().map(a -> c -> c.connect("conn").connectActions(a)); + Stream> quotaEditActions = + ClientQuotaAction.ALTER_ACTIONS.stream().map(a -> c -> c.clientQuotaActions(a)); return Stream.of( topicEditActions, clusterConfigEditActions, aclEditActions, - cgEditActions, connEditActions, schemaEditActions + cgEditActions, connEditActions, schemaEditActions, quotaEditActions ) .flatMap(c -> c) .map(setter -> setter.apply(AccessContext.builder().cluster("test").operationName("test")).build()); @@ -78,7 +81,8 @@ static Stream doesNothingIfNoResourceHasAlterAction() { c -> c.aclActions(AclAction.VIEW), c -> c.consumerGroup("cg").consumerGroupActions(ConsumerGroupAction.VIEW), c -> c.schema("sc").schemaActions(SchemaAction.VIEW), - c -> c.connect("conn").connectActions(ConnectAction.VIEW) + c -> c.connect("conn").connectActions(ConnectAction.VIEW), + c -> c.clientQuotaActions(ClientQuotaAction.VIEW) ).map(setter -> setter.apply(AccessContext.builder().cluster("test").operationName("test")).build()); } } diff --git a/contract/src/main/resources/swagger/kafbat-ui-api.yaml b/contract/src/main/resources/swagger/kafbat-ui-api.yaml index c74b580b6..deedce35a 100644 --- a/contract/src/main/resources/swagger/kafbat-ui-api.yaml +++ b/contract/src/main/resources/swagger/kafbat-ui-api.yaml @@ -2202,6 +2202,7 @@ components: - TOPIC_DELETION - KAFKA_ACL_VIEW # get ACLs listing - KAFKA_ACL_EDIT # create & delete ACLs + - CLIENT_QUOTA_MANAGEMENT required: - id - name @@ -3652,6 +3653,21 @@ components: - KSQL - ACL - AUDIT + - CLIENT_QUOTAS + + ClientQuotas: + type: object + properties: + user: + type: string + clientId: + type: string + ip: + type: string + quotas: + type: object + additionalProperties: + type: number KafkaAcl: type: object From c20c0580394af3fd0b2d6133844536b95282a7fa Mon Sep 17 00:00:00 2001 From: iliax Date: Sat, 17 Feb 2024 12:04:11 +0400 Subject: [PATCH 2/7] Quotas management --- .../ui/controller/ClientQuotasController.java | 105 ++++++++++++++++++ .../kafbat/ui/model/rbac/AccessContext.java | 8 ++ .../io/kafbat/ui/model/rbac/Permission.java | 1 + .../rbac/permission/ClientQuotaAction.java | 19 ++++ .../io/kafbat/ui/service/FeatureService.java | 8 ++ .../ui/service/ReactiveAdminClient.java | 9 ++ .../ui/service/quota/ClientQuotaRecord.java | 20 ++++ .../ui/service/quota/ClientQuotaService.java | 103 +++++++++++++++++ .../ui/service/rbac/AccessControlService.java | 17 +++ .../service/quota/ClientQuotaServiceTest.java | 78 +++++++++++++ .../main/resources/swagger/kafbat-ui-api.yaml | 48 ++++++++ 11 files changed, 416 insertions(+) create mode 100644 api/src/main/java/io/kafbat/ui/controller/ClientQuotasController.java create mode 100644 api/src/main/java/io/kafbat/ui/model/rbac/permission/ClientQuotaAction.java create mode 100644 api/src/main/java/io/kafbat/ui/service/quota/ClientQuotaRecord.java create mode 100644 api/src/main/java/io/kafbat/ui/service/quota/ClientQuotaService.java create mode 100644 api/src/test/java/io/kafbat/ui/service/quota/ClientQuotaServiceTest.java diff --git a/api/src/main/java/io/kafbat/ui/controller/ClientQuotasController.java b/api/src/main/java/io/kafbat/ui/controller/ClientQuotasController.java new file mode 100644 index 000000000..720a7f3ac --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/controller/ClientQuotasController.java @@ -0,0 +1,105 @@ +package io.kafbat.ui.controller; + +import static java.util.stream.Collectors.toMap; + +import io.kafbat.ui.api.ClientQuotasApi; +import io.kafbat.ui.model.ClientQuotasDTO; +import io.kafbat.ui.model.rbac.AccessContext; +import io.kafbat.ui.model.rbac.permission.ClientQuotaAction; +import io.kafbat.ui.service.quota.ClientQuotaRecord; +import io.kafbat.ui.service.quota.ClientQuotaService; +import java.math.BigDecimal; +import java.util.Comparator; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import lombok.RequiredArgsConstructor; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.server.ServerWebExchange; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +@RestController +@RequiredArgsConstructor +public class ClientQuotasController extends AbstractController implements ClientQuotasApi { + + private static final Comparator QUOTA_RECORDS_COMPARATOR = + Comparator.nullsLast(Comparator.comparing(ClientQuotaRecord::user)) + .thenComparing(Comparator.nullsLast(Comparator.comparing(ClientQuotaRecord::clientId))) + .thenComparing(Comparator.nullsLast(Comparator.comparing(ClientQuotaRecord::ip))); + + private final ClientQuotaService clientQuotaService; + + @Override + public Mono>> listQuotas(String clusterName, + ServerWebExchange exchange) { + var context = AccessContext.builder() + .cluster(clusterName) + .operationName("listClientQuotas") + .clientQuotaActions(ClientQuotaAction.VIEW) + .build(); + + Mono>> operation = + Mono.just( + clientQuotaService.getAll(getCluster(clusterName)) + .sort(QUOTA_RECORDS_COMPARATOR) + .map(this::mapToDto) + ).map(ResponseEntity::ok); + + return validateAccess(context) + .then(operation) + .doOnEach(sig -> audit(context, sig)); + } + + @Override + public Mono> upsertClientQuotas(String clusterName, + Mono quotasDto, + ServerWebExchange exchange) { + return quotasDto.flatMap( + newQuotas -> { + var context = AccessContext.builder() + .cluster(clusterName) + .operationName("upsertClientQuotas") + .operationParams(Map.of("newQuotas", newQuotas)) + .clientQuotaActions(ClientQuotaAction.EDIT) + .build(); + + Mono> operation = clientQuotaService.upsert( + getCluster(clusterName), + newQuotas.getUser(), + newQuotas.getClientId(), + newQuotas.getIp(), + Optional.ofNullable(newQuotas.getQuotas()).orElse(Map.of()) + .entrySet() + .stream() + .collect(toMap(Map.Entry::getKey, e -> e.getValue().doubleValue())) + ) + .map(statusCode -> ResponseEntity.status(statusCode).build()); + + return validateAccess(context) + .then(operation) + .doOnEach(sig -> audit(context, sig)); + } + ); + } + + private ClientQuotasDTO mapToDto(ClientQuotaRecord quotaRecord) { + return new ClientQuotasDTO() + .user(quotaRecord.user()) + .clientId(quotaRecord.clientId()) + .ip(quotaRecord.ip()) + .quotas( + quotaRecord.quotas().entrySet() + .stream() + .sorted(Map.Entry.comparingByKey()) + .collect(toMap( + Map.Entry::getKey, + e -> BigDecimal.valueOf(e.getValue()), + (v1, v2) -> null, //won't be called + LinkedHashMap::new //to keep order + )) + ); + } + +} diff --git a/api/src/main/java/io/kafbat/ui/model/rbac/AccessContext.java b/api/src/main/java/io/kafbat/ui/model/rbac/AccessContext.java index 3e0d8bfbf..1e8248bd3 100644 --- a/api/src/main/java/io/kafbat/ui/model/rbac/AccessContext.java +++ b/api/src/main/java/io/kafbat/ui/model/rbac/AccessContext.java @@ -45,6 +45,8 @@ public class AccessContext { Collection auditAction; + Collection clientQuotaActions; + String operationName; Object operationParams; @@ -162,6 +164,12 @@ public AccessContextBuilder auditActions(AuditAction... actions) { return this; } + public AccessContextBuilder clientQuotaActions(ClientQuotaAction... actions) { + Assert.isTrue(actions.length > 0, ACTIONS_NOT_PRESENT); + this.clientQuotaActions = List.of(actions); + return this; + } + public AccessContextBuilder operationName(String operationName) { this.operationName = operationName; return this; diff --git a/api/src/main/java/io/kafbat/ui/model/rbac/Permission.java b/api/src/main/java/io/kafbat/ui/model/rbac/Permission.java index 9a9c2f189..4b7308131 100644 --- a/api/src/main/java/io/kafbat/ui/model/rbac/Permission.java +++ b/api/src/main/java/io/kafbat/ui/model/rbac/Permission.java @@ -10,6 +10,7 @@ import io.kafbat.ui.model.rbac.permission.AclAction; import io.kafbat.ui.model.rbac.permission.ApplicationConfigAction; import io.kafbat.ui.model.rbac.permission.AuditAction; +import io.kafbat.ui.model.rbac.permission.ClientQuotaAction; import io.kafbat.ui.model.rbac.permission.ClusterConfigAction; import io.kafbat.ui.model.rbac.permission.ConnectAction; import io.kafbat.ui.model.rbac.permission.ConsumerGroupAction; diff --git a/api/src/main/java/io/kafbat/ui/model/rbac/permission/ClientQuotaAction.java b/api/src/main/java/io/kafbat/ui/model/rbac/permission/ClientQuotaAction.java new file mode 100644 index 000000000..4871e0930 --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/model/rbac/permission/ClientQuotaAction.java @@ -0,0 +1,19 @@ +package io.kafbat.ui.model.rbac.permission; + +import java.util.Set; + +public enum ClientQuotaAction implements PermissibleAction { + + VIEW, + EDIT + + ; + + public static final Set ALTER_ACTIONS = Set.of(EDIT); + + @Override + public boolean isAlter() { + return ALTER_ACTIONS.contains(this); + } + +} diff --git a/api/src/main/java/io/kafbat/ui/service/FeatureService.java b/api/src/main/java/io/kafbat/ui/service/FeatureService.java index 44776627b..59a23236b 100644 --- a/api/src/main/java/io/kafbat/ui/service/FeatureService.java +++ b/api/src/main/java/io/kafbat/ui/service/FeatureService.java @@ -1,5 +1,7 @@ package io.kafbat.ui.service; +import static io.kafbat.ui.service.ReactiveAdminClient.SupportedFeature.CLIENT_QUOTA_MANAGEMENT; + import io.kafbat.ui.model.ClusterFeature; import io.kafbat.ui.model.KafkaCluster; import io.kafbat.ui.service.ReactiveAdminClient.ClusterDescription; @@ -52,6 +54,12 @@ private Mono topicDeletionEnabled(ReactiveAdminClient adminClien : Mono.empty(); } + private Mono quotaManagement(ReactiveAdminClient adminClient) { + return adminClient.getClusterFeatures().contains(CLIENT_QUOTA_MANAGEMENT) + ? Mono.just(ClusterFeature.CLIENT_QUOTA_MANAGEMENT) + : Mono.empty(); + } + private Mono aclEdit(ReactiveAdminClient adminClient, ClusterDescription clusterDescription) { var authorizedOps = Optional.ofNullable(clusterDescription.getAuthorizedOperations()).orElse(Set.of()); boolean canEdit = aclViewEnabled(adminClient) diff --git a/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java b/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java index efad8fcf4..ccdcc4daa 100644 --- a/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java +++ b/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java @@ -664,6 +664,15 @@ public Mono alterReplicaLogDirs(Map replica return toMono(client.alterReplicaLogDirs(replicaAssignment).all()); } + public Mono>> getClientQuotas(ClientQuotaFilter filter) { + return toMono(client.describeClientQuotas(filter).entities()); + } + + public Mono alterClientQuota(ClientQuotaAlteration alteration) { + return toMono(client.alterClientQuotas(List.of(alteration)).all()); + } + + // returns tp -> list of active producer's states (if any) public Mono>> getActiveProducersState(String topic) { return describeTopic(topic) diff --git a/api/src/main/java/io/kafbat/ui/service/quota/ClientQuotaRecord.java b/api/src/main/java/io/kafbat/ui/service/quota/ClientQuotaRecord.java new file mode 100644 index 000000000..b28def6e5 --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/service/quota/ClientQuotaRecord.java @@ -0,0 +1,20 @@ +package io.kafbat.ui.service.quota; + +import jakarta.annotation.Nullable; +import java.util.Map; +import org.apache.kafka.common.quota.ClientQuotaEntity; + +public record ClientQuotaRecord(@Nullable String user, + @Nullable String clientId, + @Nullable String ip, + Map quotas) { + + static ClientQuotaRecord create(ClientQuotaEntity entity, Map quotas) { + return new ClientQuotaRecord( + entity.entries().get(ClientQuotaEntity.USER), + entity.entries().get(ClientQuotaEntity.CLIENT_ID), + entity.entries().get(ClientQuotaEntity.IP), + quotas + ); + } +} diff --git a/api/src/main/java/io/kafbat/ui/service/quota/ClientQuotaService.java b/api/src/main/java/io/kafbat/ui/service/quota/ClientQuotaService.java new file mode 100644 index 000000000..6f3ed9319 --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/service/quota/ClientQuotaService.java @@ -0,0 +1,103 @@ +package io.kafbat.ui.service.quota; + +import static org.apache.kafka.common.quota.ClientQuotaEntity.CLIENT_ID; +import static org.apache.kafka.common.quota.ClientQuotaEntity.IP; +import static org.apache.kafka.common.quota.ClientQuotaEntity.USER; + +import com.google.common.collect.Sets; +import io.kafbat.ui.exception.ValidationException; +import io.kafbat.ui.model.KafkaCluster; +import io.kafbat.ui.service.AdminClientService; +import io.kafbat.ui.service.ReactiveAdminClient; +import jakarta.annotation.Nullable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Stream; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.common.quota.ClientQuotaAlteration; +import org.apache.kafka.common.quota.ClientQuotaEntity; +import org.apache.kafka.common.quota.ClientQuotaFilter; +import org.apache.kafka.common.quota.ClientQuotaFilterComponent; +import org.springframework.http.HttpStatus; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +@Service +@RequiredArgsConstructor +public class ClientQuotaService { + + private final AdminClientService adminClientService; + + public Flux getAll(KafkaCluster cluster) { + return adminClientService.get(cluster) + .flatMap(ac -> ac.getClientQuotas(ClientQuotaFilter.all())) + .flatMapIterable(Map::entrySet) + .map(e -> ClientQuotaRecord.create(e.getKey(), e.getValue())); + } + + //returns 201 if new entity was created, 200 if existing was updated, 204 if existing was deleted + public Mono upsert(KafkaCluster cluster, + @Nullable String user, + @Nullable String clientId, + @Nullable String ip, + Map newQuotas) { + ClientQuotaEntity quotaEntity = quotaEntity(user, clientId, ip); + return adminClientService.get(cluster) + .flatMap(ac -> + findQuotas(ac, quotaEntity) + .flatMap(currentQuotas -> { + HttpStatus result = HttpStatus.OK; //updated + if (newQuotas.isEmpty()) { + result = HttpStatus.NO_CONTENT; //deleted + } else if (currentQuotas.isEmpty()) { + result = HttpStatus.CREATED; + } + var alteration = createAlteration(quotaEntity, currentQuotas, newQuotas); + return ac.alterClientQuota(alteration) + .thenReturn(result); + }) + ); + } + + private ClientQuotaEntity quotaEntity(@Nullable String user, @Nullable String clientId, @Nullable String ip) { + if (Stream.of(user, clientId, ip).allMatch(Objects::isNull)) { + throw new ValidationException("Quota entity id is not set"); + } + var id = new HashMap(); + Optional.ofNullable(user).ifPresent(u -> id.put(USER, u)); + Optional.ofNullable(clientId).ifPresent(cid -> id.put(CLIENT_ID, cid)); + Optional.ofNullable(ip).ifPresent(i -> id.put(IP, i)); + return new ClientQuotaEntity(id); + } + + private ClientQuotaAlteration createAlteration(ClientQuotaEntity quotaEntity, + Map currentQuotas, + Map newQuotas) { + Set quotasToClear = Sets.difference(currentQuotas.keySet(), newQuotas.keySet()); + List ops = Stream.concat( + quotasToClear.stream() + .map(name -> new ClientQuotaAlteration.Op(name, null)), //setting null value to clear current state + newQuotas.entrySet().stream() + .map(e -> new ClientQuotaAlteration.Op(e.getKey(), e.getValue())) + ).toList(); + return new ClientQuotaAlteration(quotaEntity, ops); + } + + // returns empty map if no quotas found for an entity + private Mono> findQuotas(ReactiveAdminClient ac, ClientQuotaEntity quotaEntity) { + return ac.getClientQuotas(crateSearchFilter(quotaEntity)) + .map(found -> Optional.ofNullable(found.get(quotaEntity)).orElse(Map.of())); + } + + private ClientQuotaFilter crateSearchFilter(ClientQuotaEntity quotaEntity) { + List filters = new ArrayList<>(); + quotaEntity.entries().forEach((type, name) -> filters.add(ClientQuotaFilterComponent.ofEntity(type, name))); + return ClientQuotaFilter.contains(filters); + } +} diff --git a/api/src/main/java/io/kafbat/ui/service/rbac/AccessControlService.java b/api/src/main/java/io/kafbat/ui/service/rbac/AccessControlService.java index 6ece4c475..f5c168fca 100644 --- a/api/src/main/java/io/kafbat/ui/service/rbac/AccessControlService.java +++ b/api/src/main/java/io/kafbat/ui/service/rbac/AccessControlService.java @@ -417,6 +417,23 @@ private boolean isAuditAccessible(AccessContext context, AuthenticatedUser user) return isAccessible(Resource.AUDIT, null, user, context, requiredActions); } + private boolean isClientQuotaAccessible(AccessContext context, AuthenticatedUser user) { + if (!rbacEnabled) { + return true; + } + + if (context.getClientQuotaActions().isEmpty()) { + return true; + } + + Set requiredActions = context.getClientQuotaActions() + .stream() + .map(a -> a.toString().toUpperCase()) + .collect(Collectors.toSet()); + + return isAccessible(Resource.CLIENT_QUOTAS, null, user, context, requiredActions); + } + public Set getOauthExtractors() { return oauthExtractors; } diff --git a/api/src/test/java/io/kafbat/ui/service/quota/ClientQuotaServiceTest.java b/api/src/test/java/io/kafbat/ui/service/quota/ClientQuotaServiceTest.java new file mode 100644 index 000000000..7ce9be711 --- /dev/null +++ b/api/src/test/java/io/kafbat/ui/service/quota/ClientQuotaServiceTest.java @@ -0,0 +1,78 @@ +package io.kafbat.ui.service.quota; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.kafbat.ui.AbstractIntegrationTest; +import io.kafbat.ui.model.KafkaCluster; +import io.kafbat.ui.service.ClustersStorage; +import java.util.Map; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.springframework.beans.factory.annotation.Autowired; +import reactor.test.StepVerifier; + +class ClientQuotaServiceTest extends AbstractIntegrationTest { + + @Autowired + ClientQuotaService quotaService; + + private KafkaCluster cluster; + + @BeforeEach + void init() { + cluster = applicationContext.getBean(ClustersStorage.class).getClusterByName(LOCAL).get(); + } + + @ParameterizedTest + @CsvSource( + value = { + "testUser, null, null ", + "null, testUserId, null", + "testUser2, testUserId2, null", + }, + nullValues = "null" + ) + void createUpdateDelete(String user, String clientId, String ip) { + var initialQuotas = Map.of( + "producer_byte_rate", 123.0, + "consumer_byte_rate", 234.0, + "request_percentage", 10.0 + ); + + //creating new + StepVerifier.create( + quotaService.upsert(cluster, user, clientId, ip, initialQuotas) + ) + .assertNext(status -> assertThat(status.value()).isEqualTo(201)) + .verifyComplete(); + + assertThat(quotaRecordExists(new ClientQuotaRecord(user, clientId, ip, initialQuotas))) + .isTrue(); + + //updating + StepVerifier.create( + quotaService.upsert(cluster, user, clientId, ip, Map.of("producer_byte_rate", 22222.0)) + ) + .assertNext(status -> assertThat(status.value()).isEqualTo(200)) + .verifyComplete(); + + assertThat(quotaRecordExists(new ClientQuotaRecord(user, clientId, ip, Map.of("producer_byte_rate", 22222.0)))) + .isTrue(); + + //deleting created record + StepVerifier.create( + quotaService.upsert(cluster, user, clientId, ip, Map.of()) + ) + .assertNext(status -> assertThat(status.value()).isEqualTo(204)) + .verifyComplete(); + + assertThat(quotaRecordExists(new ClientQuotaRecord(user, clientId, ip, Map.of("producer_byte_rate", 22222.0)))) + .isFalse(); + } + + private boolean quotaRecordExists(ClientQuotaRecord rec) { + return quotaService.getAll(cluster).collectList().block().contains(rec); + } + +} diff --git a/contract/src/main/resources/swagger/kafbat-ui-api.yaml b/contract/src/main/resources/swagger/kafbat-ui-api.yaml index deedce35a..d833c1a95 100644 --- a/contract/src/main/resources/swagger/kafbat-ui-api.yaml +++ b/contract/src/main/resources/swagger/kafbat-ui-api.yaml @@ -1937,6 +1937,54 @@ paths: 200: description: OK + /api/clusters/{clusterName}/clientquotas: + get: + tags: + - ClientQuotas + summary: listQuotas + operationId: listQuotas + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/ClientQuotas' + post: + tags: + - ClientQuotas + summary: upsertClientQuotas + operationId: upsertClientQuotas + description: | + - updates/creates client quota record if `quotas` field is non-empty + - deletes client quota record if `quotas` field is null or empty + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/ClientQuotas' + responses: + 200: + description: Existing quota updated + 201: + description: New quota created + 204: + description: Existing quota deleted + /api/clusters/{clusterName}/acl/streamApp: post: tags: From 5d57715c2ca4be53247403abfd4ebad53607ffe3 Mon Sep 17 00:00:00 2001 From: iliax Date: Sat, 17 Feb 2024 12:12:21 +0400 Subject: [PATCH 3/7] minor improvements --- .../kafbat/ui/controller/ClientQuotasController.java | 12 ++++++------ .../kafbat/ui/service/quota/ClientQuotaService.java | 6 +++--- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/controller/ClientQuotasController.java b/api/src/main/java/io/kafbat/ui/controller/ClientQuotasController.java index 720a7f3ac..9741fd216 100644 --- a/api/src/main/java/io/kafbat/ui/controller/ClientQuotasController.java +++ b/api/src/main/java/io/kafbat/ui/controller/ClientQuotasController.java @@ -57,20 +57,20 @@ public Mono> upsertClientQuotas(String clusterName, Mono quotasDto, ServerWebExchange exchange) { return quotasDto.flatMap( - newQuotas -> { + quotasUpd -> { var context = AccessContext.builder() .cluster(clusterName) .operationName("upsertClientQuotas") - .operationParams(Map.of("newQuotas", newQuotas)) + .operationParams(Map.of("quotasUpdate", quotasUpd)) .clientQuotaActions(ClientQuotaAction.EDIT) .build(); Mono> operation = clientQuotaService.upsert( getCluster(clusterName), - newQuotas.getUser(), - newQuotas.getClientId(), - newQuotas.getIp(), - Optional.ofNullable(newQuotas.getQuotas()).orElse(Map.of()) + quotasUpd.getUser(), + quotasUpd.getClientId(), + quotasUpd.getIp(), + Optional.ofNullable(quotasUpd.getQuotas()).orElse(Map.of()) .entrySet() .stream() .collect(toMap(Map.Entry::getKey, e -> e.getValue().doubleValue())) diff --git a/api/src/main/java/io/kafbat/ui/service/quota/ClientQuotaService.java b/api/src/main/java/io/kafbat/ui/service/quota/ClientQuotaService.java index 6f3ed9319..9c62ffc96 100644 --- a/api/src/main/java/io/kafbat/ui/service/quota/ClientQuotaService.java +++ b/api/src/main/java/io/kafbat/ui/service/quota/ClientQuotaService.java @@ -52,11 +52,11 @@ public Mono upsert(KafkaCluster cluster, .flatMap(ac -> findQuotas(ac, quotaEntity) .flatMap(currentQuotas -> { - HttpStatus result = HttpStatus.OK; //updated + HttpStatus result = HttpStatus.OK; //updated(200) if (newQuotas.isEmpty()) { - result = HttpStatus.NO_CONTENT; //deleted + result = HttpStatus.NO_CONTENT; //deleted(204) } else if (currentQuotas.isEmpty()) { - result = HttpStatus.CREATED; + result = HttpStatus.CREATED; //created(201) } var alteration = createAlteration(quotaEntity, currentQuotas, newQuotas); return ac.alterClientQuota(alteration) From e102af2dcb4bea0c94221596fc650a0d242bf786 Mon Sep 17 00:00:00 2001 From: iliax Date: Sat, 17 Feb 2024 12:23:30 +0400 Subject: [PATCH 4/7] minor improvements --- .../io/kafbat/ui/service/quota/ClientQuotaServiceTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/api/src/test/java/io/kafbat/ui/service/quota/ClientQuotaServiceTest.java b/api/src/test/java/io/kafbat/ui/service/quota/ClientQuotaServiceTest.java index 7ce9be711..fe6fe988e 100644 --- a/api/src/test/java/io/kafbat/ui/service/quota/ClientQuotaServiceTest.java +++ b/api/src/test/java/io/kafbat/ui/service/quota/ClientQuotaServiceTest.java @@ -35,9 +35,9 @@ void init() { ) void createUpdateDelete(String user, String clientId, String ip) { var initialQuotas = Map.of( - "producer_byte_rate", 123.0, - "consumer_byte_rate", 234.0, - "request_percentage", 10.0 + "producer_byte_rate", 123.0, //should not have decimals + "consumer_byte_rate", 234.0, //should not have decimals + "request_percentage", 10.3 //can have decimal part ); //creating new From e38ea280cca96324923a2c35b9e806f31111cdce Mon Sep 17 00:00:00 2001 From: iliax Date: Sun, 18 Feb 2024 11:13:36 +0400 Subject: [PATCH 5/7] merged with master --- .../java/io/kafbat/ui/model/rbac/AccessContext.java | 6 ++++++ .../ui/model/rbac/permission/ClientQuotaAction.java | 13 ++++++++++++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/api/src/main/java/io/kafbat/ui/model/rbac/AccessContext.java b/api/src/main/java/io/kafbat/ui/model/rbac/AccessContext.java index 2322d1f89..b5a459471 100644 --- a/api/src/main/java/io/kafbat/ui/model/rbac/AccessContext.java +++ b/api/src/main/java/io/kafbat/ui/model/rbac/AccessContext.java @@ -16,6 +16,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import org.springframework.security.access.AccessDeniedException; @@ -76,6 +77,10 @@ public boolean isAccessible(List userPermissions) throws AccessDenie } } + public static AccessContextBuilder builder() { + return new AccessContextBuilder(); + } + public boolean isAccessible(List userPermissions) { return accessedResources().stream() .allMatch(resourceAccess -> resourceAccess.isAccessible(userPermissions)); @@ -83,6 +88,7 @@ public boolean isAccessible(List userPermissions) { public static final class AccessContextBuilder { + private String cluster; private String operationName; private Object operationParams; private final List accessedResources = new ArrayList<>(); diff --git a/api/src/main/java/io/kafbat/ui/model/rbac/permission/ClientQuotaAction.java b/api/src/main/java/io/kafbat/ui/model/rbac/permission/ClientQuotaAction.java index 4871e0930..61b4c1005 100644 --- a/api/src/main/java/io/kafbat/ui/model/rbac/permission/ClientQuotaAction.java +++ b/api/src/main/java/io/kafbat/ui/model/rbac/permission/ClientQuotaAction.java @@ -5,15 +5,26 @@ public enum ClientQuotaAction implements PermissibleAction { VIEW, - EDIT + EDIT(VIEW) ; public static final Set ALTER_ACTIONS = Set.of(EDIT); + private final PermissibleAction[] dependantActions; + + ClientQuotaAction(ClientQuotaAction... dependantActions) { + this.dependantActions = dependantActions; + } + @Override public boolean isAlter() { return ALTER_ACTIONS.contains(this); } + @Override + public PermissibleAction[] dependantActions() { + return dependantActions; + } + } From 4bfc745b40596b13c3ca9fc2c14b21d816ea5076 Mon Sep 17 00:00:00 2001 From: iliax Date: Sun, 18 Feb 2024 11:19:10 +0400 Subject: [PATCH 6/7] merged with master --- api/src/main/java/io/kafbat/ui/model/rbac/AccessContext.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/src/main/java/io/kafbat/ui/model/rbac/AccessContext.java b/api/src/main/java/io/kafbat/ui/model/rbac/AccessContext.java index b5a459471..04b210f46 100644 --- a/api/src/main/java/io/kafbat/ui/model/rbac/AccessContext.java +++ b/api/src/main/java/io/kafbat/ui/model/rbac/AccessContext.java @@ -152,7 +152,7 @@ public AccessContextBuilder auditActions(AuditAction... actions) { } public AccessContextBuilder clientQuotaActions(ClientQuotaAction... actions) { - accessedResources.add(new SingleResourceAccess(Resource.ACL, List.of(actions))); + accessedResources.add(new SingleResourceAccess(Resource.CLIENT_QUOTAS, List.of(actions))); return this; } From 6fd0655d9e9c6f2f51e63fea99a18018c860d987 Mon Sep 17 00:00:00 2001 From: iliax Date: Sun, 18 Feb 2024 11:22:15 +0400 Subject: [PATCH 7/7] merged with master --- .../java/io/kafbat/ui/emitter/MessagesProcessing.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/emitter/MessagesProcessing.java b/api/src/main/java/io/kafbat/ui/emitter/MessagesProcessing.java index 973af091d..547a77a70 100644 --- a/api/src/main/java/io/kafbat/ui/emitter/MessagesProcessing.java +++ b/api/src/main/java/io/kafbat/ui/emitter/MessagesProcessing.java @@ -89,9 +89,9 @@ void sendPhase(FluxSink sink, String name) { @VisibleForTesting static Iterable> sortForSending(Iterable> records, boolean asc) { - Comparator offsetComparator = asc + Comparator> offsetComparator = asc ? Comparator.comparingLong(ConsumerRecord::offset) - : Comparator.comparingLong(ConsumerRecord::offset).reversed(); + : Comparator.>comparingLong(ConsumerRecord::offset).reversed(); // partition -> sorted by offsets records Map>> perPartition = Streams.stream(records) @@ -101,9 +101,9 @@ static Iterable> sortForSending(Iterable lst.stream().sorted(offsetComparator).toList()))); - Comparator tsComparator = asc + Comparator> tsComparator = asc ? Comparator.comparing(ConsumerRecord::timestamp) - : Comparator.comparingLong(ConsumerRecord::timestamp).reversed(); + : Comparator.>comparingLong(ConsumerRecord::timestamp).reversed(); // merge-sorting records from partitions one by one using timestamp comparator return Iterables.mergeSorted(perPartition.values(), tsComparator);