Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BE: Quota management API #139

Merged
merged 8 commits into from
Feb 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package io.kafbat.ui.controller;

import static java.util.stream.Collectors.toMap;

import io.kafbat.ui.api.ClientQuotasApi;
import io.kafbat.ui.model.ClientQuotasDTO;
import io.kafbat.ui.model.rbac.AccessContext;
import io.kafbat.ui.model.rbac.permission.ClientQuotaAction;
import io.kafbat.ui.service.quota.ClientQuotaRecord;
import io.kafbat.ui.service.quota.ClientQuotaService;
import java.math.BigDecimal;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@RestController
@RequiredArgsConstructor
public class ClientQuotasController extends AbstractController implements ClientQuotasApi {

private static final Comparator<ClientQuotaRecord> QUOTA_RECORDS_COMPARATOR =
Comparator.nullsLast(Comparator.comparing(ClientQuotaRecord::user))
.thenComparing(Comparator.nullsLast(Comparator.comparing(ClientQuotaRecord::clientId)))
.thenComparing(Comparator.nullsLast(Comparator.comparing(ClientQuotaRecord::ip)));

private final ClientQuotaService clientQuotaService;

@Override
public Mono<ResponseEntity<Flux<ClientQuotasDTO>>> listQuotas(String clusterName,
ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.operationName("listClientQuotas")
.clientQuotaActions(ClientQuotaAction.VIEW)
.build();

Mono<ResponseEntity<Flux<ClientQuotasDTO>>> operation =
Mono.just(
clientQuotaService.getAll(getCluster(clusterName))
.sort(QUOTA_RECORDS_COMPARATOR)
.map(this::mapToDto)
).map(ResponseEntity::ok);

return validateAccess(context)
.then(operation)
.doOnEach(sig -> audit(context, sig));
}

@Override
public Mono<ResponseEntity<Void>> upsertClientQuotas(String clusterName,
Mono<ClientQuotasDTO> quotasDto,
ServerWebExchange exchange) {
return quotasDto.flatMap(
quotasUpd -> {
var context = AccessContext.builder()
.cluster(clusterName)
.operationName("upsertClientQuotas")
.operationParams(Map.of("quotasUpdate", quotasUpd))
.clientQuotaActions(ClientQuotaAction.EDIT)
.build();

Mono<ResponseEntity<Void>> operation = clientQuotaService.upsert(
getCluster(clusterName),
quotasUpd.getUser(),
quotasUpd.getClientId(),
quotasUpd.getIp(),
Optional.ofNullable(quotasUpd.getQuotas()).orElse(Map.of())
.entrySet()
.stream()
.collect(toMap(Map.Entry::getKey, e -> e.getValue().doubleValue()))
)
.map(statusCode -> ResponseEntity.status(statusCode).build());

return validateAccess(context)
.then(operation)
.doOnEach(sig -> audit(context, sig));
}
);
}

private ClientQuotasDTO mapToDto(ClientQuotaRecord quotaRecord) {
return new ClientQuotasDTO()
.user(quotaRecord.user())
.clientId(quotaRecord.clientId())
.ip(quotaRecord.ip())
.quotas(
quotaRecord.quotas().entrySet()
.stream()
.sorted(Map.Entry.comparingByKey())
.collect(toMap(
Map.Entry::getKey,
e -> BigDecimal.valueOf(e.getValue()),
(v1, v2) -> null, //won't be called
LinkedHashMap::new //to keep order
))
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ void sendPhase(FluxSink<TopicMessageEventDTO> sink, String name) {
@VisibleForTesting
static Iterable<ConsumerRecord<Bytes, Bytes>> sortForSending(Iterable<ConsumerRecord<Bytes, Bytes>> records,
boolean asc) {
Comparator<ConsumerRecord> offsetComparator = asc
Comparator<ConsumerRecord<Bytes, Bytes>> offsetComparator = asc
? Comparator.comparingLong(ConsumerRecord::offset)
: Comparator.<ConsumerRecord>comparingLong(ConsumerRecord::offset).reversed();
: Comparator.<ConsumerRecord<Bytes, Bytes>>comparingLong(ConsumerRecord::offset).reversed();

// partition -> sorted by offsets records
Map<Integer, List<ConsumerRecord<Bytes, Bytes>>> perPartition = Streams.stream(records)
Expand All @@ -101,9 +101,9 @@ static Iterable<ConsumerRecord<Bytes, Bytes>> sortForSending(Iterable<ConsumerRe
TreeMap::new,
collectingAndThen(toList(), lst -> lst.stream().sorted(offsetComparator).toList())));

Comparator<ConsumerRecord> tsComparator = asc
Comparator<ConsumerRecord<Bytes, Bytes>> tsComparator = asc
? Comparator.comparing(ConsumerRecord::timestamp)
: Comparator.<ConsumerRecord>comparingLong(ConsumerRecord::timestamp).reversed();
: Comparator.<ConsumerRecord<Bytes, Bytes>>comparingLong(ConsumerRecord::timestamp).reversed();

// merge-sorting records from partitions one by one using timestamp comparator
return Iterables.mergeSorted(perPartition.values(), tsComparator);
Expand Down
3 changes: 2 additions & 1 deletion api/src/main/java/io/kafbat/ui/model/ClusterFeature.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ public enum ClusterFeature {
SCHEMA_REGISTRY,
TOPIC_DELETION,
KAFKA_ACL_VIEW,
KAFKA_ACL_EDIT
KAFKA_ACL_EDIT,
CLIENT_QUOTA_MANAGEMENT
}
12 changes: 12 additions & 0 deletions api/src/main/java/io/kafbat/ui/model/rbac/AccessContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.kafbat.ui.model.rbac.permission.AclAction;
import io.kafbat.ui.model.rbac.permission.ApplicationConfigAction;
import io.kafbat.ui.model.rbac.permission.AuditAction;
import io.kafbat.ui.model.rbac.permission.ClientQuotaAction;
import io.kafbat.ui.model.rbac.permission.ClusterConfigAction;
import io.kafbat.ui.model.rbac.permission.ConnectAction;
import io.kafbat.ui.model.rbac.permission.ConsumerGroupAction;
Expand All @@ -15,6 +16,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.springframework.security.access.AccessDeniedException;

Expand Down Expand Up @@ -149,6 +151,11 @@ public AccessContextBuilder auditActions(AuditAction... actions) {
return this;
}

public AccessContextBuilder clientQuotaActions(ClientQuotaAction... actions) {
accessedResources.add(new SingleResourceAccess(Resource.CLIENT_QUOTAS, List.of(actions)));
return this;
}

public AccessContextBuilder operationName(String operationName) {
this.operationName = operationName;
return this;
Expand All @@ -159,6 +166,11 @@ public AccessContextBuilder operationParams(Object operationParams) {
return this;
}

public AccessContextBuilder operationParams(Map<String, Object> paramsMap) {
this.operationParams = paramsMap;
return this;
}

public AccessContext build() {
return new AccessContext(cluster, accessedResources, operationName, operationParams);
}
Expand Down
5 changes: 4 additions & 1 deletion api/src/main/java/io/kafbat/ui/model/rbac/Resource.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.kafbat.ui.model.rbac.permission.AclAction;
import io.kafbat.ui.model.rbac.permission.ApplicationConfigAction;
import io.kafbat.ui.model.rbac.permission.ClientQuotaAction;
import io.kafbat.ui.model.rbac.permission.ClusterConfigAction;
import io.kafbat.ui.model.rbac.permission.ConnectAction;
import io.kafbat.ui.model.rbac.permission.ConsumerGroupAction;
Expand Down Expand Up @@ -32,7 +33,9 @@ public enum Resource {

ACL(AclAction.values()),

AUDIT(AclAction.values());
AUDIT(AclAction.values()),

CLIENT_QUOTAS(ClientQuotaAction.values());

private final List<PermissibleAction> actions;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.kafbat.ui.model.rbac.permission;

import java.util.Set;

public enum ClientQuotaAction implements PermissibleAction {

VIEW,
EDIT(VIEW)

;

public static final Set<ClientQuotaAction> ALTER_ACTIONS = Set.of(EDIT);

private final PermissibleAction[] dependantActions;

ClientQuotaAction(ClientQuotaAction... dependantActions) {
this.dependantActions = dependantActions;
}

@Override
public boolean isAlter() {
return ALTER_ACTIONS.contains(this);
}

@Override
public PermissibleAction[] dependantActions() {
return dependantActions;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ public sealed interface PermissibleAction permits
AclAction, ApplicationConfigAction,
ConsumerGroupAction, SchemaAction,
ConnectAction, ClusterConfigAction,
KsqlAction, TopicAction, AuditAction {
KsqlAction, TopicAction, AuditAction, ClientQuotaAction {

String name();

Expand Down
9 changes: 9 additions & 0 deletions api/src/main/java/io/kafbat/ui/service/FeatureService.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.kafbat.ui.service;

import static io.kafbat.ui.service.ReactiveAdminClient.SupportedFeature.CLIENT_QUOTA_MANAGEMENT;

import io.kafbat.ui.model.ClusterFeature;
import io.kafbat.ui.model.KafkaCluster;
import io.kafbat.ui.service.ReactiveAdminClient.ClusterDescription;
Expand Down Expand Up @@ -41,6 +43,7 @@ public Mono<List<ClusterFeature>> getAvailableFeatures(ReactiveAdminClient admin
features.add(topicDeletionEnabled(adminClient));
features.add(aclView(adminClient));
features.add(aclEdit(adminClient, clusterDescription));
features.add(quotaManagement(adminClient));

return Flux.fromIterable(features).flatMap(m -> m).collectList();
}
Expand All @@ -51,6 +54,12 @@ private Mono<ClusterFeature> topicDeletionEnabled(ReactiveAdminClient adminClien
: Mono.empty();
}

private Mono<ClusterFeature> quotaManagement(ReactiveAdminClient adminClient) {
return adminClient.getClusterFeatures().contains(CLIENT_QUOTA_MANAGEMENT)
? Mono.just(ClusterFeature.CLIENT_QUOTA_MANAGEMENT)
: Mono.empty();
}

private Mono<ClusterFeature> aclEdit(ReactiveAdminClient adminClient, ClusterDescription clusterDescription) {
var authorizedOps = Optional.ofNullable(clusterDescription.getAuthorizedOperations()).orElse(Set.of());
boolean canEdit = aclViewEnabled(adminClient)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.quota.ClientQuotaFilter;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import reactor.core.publisher.Flux;
Expand All @@ -97,7 +100,8 @@ public enum SupportedFeature {
INCREMENTAL_ALTER_CONFIGS(2.3f),
CONFIG_DOCUMENTATION_RETRIEVAL(2.6f),
DESCRIBE_CLUSTER_INCLUDE_AUTHORIZED_OPERATIONS(2.3f),
AUTHORIZED_SECURITY_ENABLED(ReactiveAdminClient::isAuthorizedSecurityEnabled);
AUTHORIZED_SECURITY_ENABLED(ReactiveAdminClient::isAuthorizedSecurityEnabled),
CLIENT_QUOTA_MANAGEMENT(2.6f);

private final BiFunction<AdminClient, Float, Mono<Boolean>> predicate;

Expand Down Expand Up @@ -664,6 +668,15 @@ public Mono<Void> alterReplicaLogDirs(Map<TopicPartitionReplica, String> replica
return toMono(client.alterReplicaLogDirs(replicaAssignment).all());
}

public Mono<Map<ClientQuotaEntity, Map<String, Double>>> getClientQuotas(ClientQuotaFilter filter) {
return toMono(client.describeClientQuotas(filter).entities());
}

public Mono<Void> alterClientQuota(ClientQuotaAlteration alteration) {
return toMono(client.alterClientQuotas(List.of(alteration)).all());
}


// returns tp -> list of active producer's states (if any)
public Mono<Map<TopicPartition, List<ProducerState>>> getActiveProducersState(String topic) {
return describeTopic(topic)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.kafbat.ui.service.quota;

import jakarta.annotation.Nullable;
import java.util.Map;
import org.apache.kafka.common.quota.ClientQuotaEntity;

public record ClientQuotaRecord(@Nullable String user,
@Nullable String clientId,
@Nullable String ip,
Map<String, Double> quotas) {

static ClientQuotaRecord create(ClientQuotaEntity entity, Map<String, Double> quotas) {
return new ClientQuotaRecord(
entity.entries().get(ClientQuotaEntity.USER),
entity.entries().get(ClientQuotaEntity.CLIENT_ID),
entity.entries().get(ClientQuotaEntity.IP),
quotas
);
}
}
Loading
Loading