Skip to content

Commit

Permalink
BE: Chore: Cleanup api module
Browse files Browse the repository at this point in the history
  • Loading branch information
wernerdv committed Feb 3, 2025
1 parent 5a40117 commit 91beb6d
Show file tree
Hide file tree
Showing 35 changed files with 97 additions and 291 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import io.kafbat.ui.connect.model.ConnectorTopics;
import io.kafbat.ui.connect.model.NewConnector;
import io.kafbat.ui.connect.model.TaskStatus;
import io.kafbat.ui.exception.KafkaConnectConflictReponseException;
import io.kafbat.ui.exception.KafkaConnectConflictResponseException;
import io.kafbat.ui.exception.ValidationException;
import io.kafbat.ui.util.WebClientConfigurator;
import jakarta.validation.constraints.NotNull;
Expand Down Expand Up @@ -48,7 +48,7 @@ private static Retry conflictCodeRetry() {
.fixedDelay(MAX_RETRIES, RETRIES_DELAY)
.filter(e -> e instanceof WebClientResponseException.Conflict)
.onRetryExhaustedThrow((spec, signal) ->
new KafkaConnectConflictReponseException(
new KafkaConnectConflictResponseException(
(WebClientResponseException.Conflict) signal.failure()));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package io.kafbat.ui.config.auth;

import io.kafbat.ui.util.EmptyRedirectStrategy;
import io.kafbat.ui.util.StaticFileWebFilter;
import java.net.URI;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
Expand All @@ -12,8 +10,6 @@
import org.springframework.security.config.web.server.SecurityWebFiltersOrder;
import org.springframework.security.config.web.server.ServerHttpSecurity;
import org.springframework.security.web.server.SecurityWebFilterChain;
import org.springframework.security.web.server.authentication.RedirectServerAuthenticationSuccessHandler;
import org.springframework.security.web.server.authentication.logout.RedirectServerLogoutSuccessHandler;
import org.springframework.security.web.server.util.matcher.ServerWebExchangeMatchers;

@Configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public Mono<Void> handle(WebFilterExchange exchange, Authentication authenticati
requestUri.getPath(), requestUri.getQuery());

final UriComponents baseUrl = UriComponentsBuilder
.fromHttpUrl(fullUrl)
.fromUriString(fullUrl)
.replacePath("/")
.replaceQuery(null)
.fragment(null)
Expand Down
2 changes: 1 addition & 1 deletion api/src/main/java/io/kafbat/ui/emitter/OffsetsInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private Map<TopicPartition, Long> firstOffsetsForPolling(Consumer<?, ?> consumer
Collection<TopicPartition> partitions) {
try {
// we try to use offsetsForTimes() to find earliest offsets, since for
// some topics (like compacted) beginningOffsets() ruturning 0 offsets
// some topics (like compacted) beginningOffsets() returning 0 offsets
// even when effectively first offset can be very high
var offsets = consumer.offsetsForTimes(
partitions.stream().collect(Collectors.toMap(p -> p, p -> 0L))
Expand Down
23 changes: 0 additions & 23 deletions api/src/main/java/io/kafbat/ui/emitter/ResultSizeLimiter.java

This file was deleted.

2 changes: 1 addition & 1 deletion api/src/main/java/io/kafbat/ui/exception/CelException.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.kafbat.ui.exception;

public class CelException extends CustomBaseException {
private String celOriginalExpression;
private final String celOriginalExpression;

public CelException(String celOriginalExpression, String errorMessage) {
super("CEL error. Original expression: %s. Error message: %s".formatted(celOriginalExpression, errorMessage));
Expand Down

This file was deleted.

This file was deleted.

6 changes: 0 additions & 6 deletions api/src/main/java/io/kafbat/ui/exception/ErrorCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,19 @@
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;


public enum ErrorCode {

FORBIDDEN(403, HttpStatus.FORBIDDEN),

UNEXPECTED(5000, HttpStatus.INTERNAL_SERVER_ERROR),
KSQL_API_ERROR(5001, HttpStatus.INTERNAL_SERVER_ERROR),
BINDING_FAIL(4001, HttpStatus.BAD_REQUEST),
NOT_FOUND(404, HttpStatus.NOT_FOUND),
VALIDATION_FAIL(4002, HttpStatus.BAD_REQUEST),
READ_ONLY_MODE_ENABLE(4003, HttpStatus.METHOD_NOT_ALLOWED),
CONNECT_CONFLICT_RESPONSE(4004, HttpStatus.CONFLICT),
DUPLICATED_ENTITY(4005, HttpStatus.CONFLICT),
UNPROCESSABLE_ENTITY(4006, HttpStatus.UNPROCESSABLE_ENTITY),
CLUSTER_NOT_FOUND(4007, HttpStatus.NOT_FOUND),
TOPIC_NOT_FOUND(4008, HttpStatus.NOT_FOUND),
SCHEMA_NOT_FOUND(4009, HttpStatus.NOT_FOUND),
CONNECT_NOT_FOUND(4010, HttpStatus.NOT_FOUND),
KSQLDB_NOT_FOUND(4011, HttpStatus.NOT_FOUND),
DIR_NOT_FOUND(4012, HttpStatus.BAD_REQUEST),
TOPIC_OR_PARTITION_NOT_FOUND(4013, HttpStatus.BAD_REQUEST),
INVALID_REQUEST(4014, HttpStatus.BAD_REQUEST),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ private Mono<ServerResponse> render(CustomBaseException baseException, ServerReq

private Mono<ServerResponse> render(WebExchangeBindException exception, ServerRequest request) {
Map<String, Set<String>> fieldErrorsMap = exception.getFieldErrors().stream()
.collect(Collectors
.toMap(FieldError::getField, f -> Set.of(extractFieldErrorMsg(f)), Sets::union));
.collect(Collectors.toMap(FieldError::getField, f -> Set.of(extractFieldErrorMsg(f)), Sets::union));

var fieldsErrors = fieldErrorsMap.entrySet().stream()
.map(e -> {
Expand Down Expand Up @@ -151,9 +150,7 @@ private String requestId(ServerRequest request) {
}

private Consumer<HttpHeaders> headers(ServerRequest request) {
return (HttpHeaders headers) -> {
CorsGlobalConfiguration.fillCorsHeader(headers, request.exchange().getRequest());
};
return (HttpHeaders headers) -> CorsGlobalConfiguration.fillCorsHeader(headers, request.exchange().getRequest());
}

private BigDecimal currentTimestamp() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package io.kafbat.ui.exception;


import org.springframework.web.reactive.function.client.WebClientResponseException;

public class KafkaConnectConflictReponseException extends CustomBaseException {
public class KafkaConnectConflictResponseException extends CustomBaseException {

public KafkaConnectConflictReponseException(WebClientResponseException.Conflict e) {
public KafkaConnectConflictResponseException(WebClientResponseException.Conflict e) {
super("Kafka Connect responded with 409 (Conflict) code. Response body: "
+ e.getResponseBodyAsString());
}
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

26 changes: 9 additions & 17 deletions api/src/main/java/io/kafbat/ui/mapper/ConsumerGroupMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,23 +97,15 @@ private static BrokerDTO mapCoordinator(Node node) {
return new BrokerDTO().host(node.host()).id(node.id()).port(node.port());
}

private static ConsumerGroupStateDTO mapConsumerGroupState(
org.apache.kafka.common.ConsumerGroupState state) {
switch (state) {
case DEAD:
return ConsumerGroupStateDTO.DEAD;
case EMPTY:
return ConsumerGroupStateDTO.EMPTY;
case STABLE:
return ConsumerGroupStateDTO.STABLE;
case PREPARING_REBALANCE:
return ConsumerGroupStateDTO.PREPARING_REBALANCE;
case COMPLETING_REBALANCE:
return ConsumerGroupStateDTO.COMPLETING_REBALANCE;
default:
return ConsumerGroupStateDTO.UNKNOWN;
}
private static ConsumerGroupStateDTO mapConsumerGroupState(org.apache.kafka.common.ConsumerGroupState state) {
return switch (state) {
case DEAD -> ConsumerGroupStateDTO.DEAD;
case EMPTY -> ConsumerGroupStateDTO.EMPTY;
case STABLE -> ConsumerGroupStateDTO.STABLE;
case PREPARING_REBALANCE -> ConsumerGroupStateDTO.PREPARING_REBALANCE;
case COMPLETING_REBALANCE -> ConsumerGroupStateDTO.COMPLETING_REBALANCE;
default -> ConsumerGroupStateDTO.UNKNOWN;
};
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ private BrokersLogdirsDTO toBrokerLogDirs(Integer broker, String dirName,

private BrokerTopicLogdirsDTO toTopicLogDirs(Integer broker, String name,
List<Map.Entry<TopicPartition,
DescribeLogDirsResponse.ReplicaInfo>> partitions) {
DescribeLogDirsResponse.ReplicaInfo>> partitions) {
BrokerTopicLogdirsDTO topic = new BrokerTopicLogdirsDTO();
topic.setName(name);
topic.setPartitions(
Expand All @@ -54,8 +54,7 @@ private BrokerTopicLogdirsDTO toTopicLogDirs(Integer broker, String name,
}

private BrokerTopicPartitionLogdirDTO topicPartitionLogDir(Integer broker, Integer partition,
DescribeLogDirsResponse.ReplicaInfo
replicaInfo) {
DescribeLogDirsResponse.ReplicaInfo replicaInfo) {
BrokerTopicPartitionLogdirDTO logDir = new BrokerTopicPartitionLogdirDTO();
logDir.setBroker(broker);
logDir.setPartition(partition);
Expand Down
2 changes: 1 addition & 1 deletion api/src/main/java/io/kafbat/ui/serdes/SerdeInstance.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void close() {
try {
serde.close();
} catch (Exception e) {
log.error("Error closing serde " + name, e);
log.error("Error closing serde {}", name, e);
}
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public Serde.Serializer serializer(String topic, Serde.Target type) {
return inputString -> {
inputString = inputString.trim();
// it is actually a hack to provide ability to sent empty array as a key/value
if (inputString.length() == 0) {
if (inputString.isEmpty()) {
return new byte[] {};
}
return decoder.decode(inputString);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public Serializer serializer(String topic, Target type) {
return input -> {
input = input.trim();
// it is a hack to provide ability to sent empty array as a key/value
if (input.length() == 0) {
if (input.isEmpty()) {
return new byte[] {};
}
return HexFormat.of().parseHex(prepareInputForParse(input));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ private Map<String, ProtoFile> knownProtoFiles() {
}

private ProtoFile loadKnownProtoFile(String path, Descriptors.FileDescriptor fileDescriptor) {
String protoFileString = null;
String protoFileString;
// know type file contains either message or enum
if (!fileDescriptor.getMessageTypes().isEmpty()) {
protoFileString = new ProtobufSchema(fileDescriptor.getMessageTypes().get(0)).canonicalString();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.kafbat.ui.service;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.kafbat.ui.connect.api.KafkaConnectClientApi;
import io.kafbat.ui.connect.model.ConnectorStatus;
import io.kafbat.ui.connect.model.ConnectorStatusConnector;
Expand Down Expand Up @@ -44,7 +43,6 @@
public class KafkaConnectService {
private final ClusterMapper clusterMapper;
private final KafkaConnectMapper kafkaConnectMapper;
private final ObjectMapper objectMapper;
private final KafkaConfigSanitizer kafkaConfigSanitizer;

public Flux<ConnectDTO> getConnects(KafkaCluster cluster) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,12 +389,6 @@ static <K, V> Mono<Map<K, V>> toMonoWithExceptionFilter(Map<K, KafkaFuture<V>> v
);
}

public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> describeLogDirs() {
return describeCluster()
.map(d -> d.getNodes().stream().map(Node::id).collect(toList()))
.flatMap(this::describeLogDirs);
}

public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> describeLogDirs(
Collection<Integer> brokerIds) {
return toMono(client.describeLogDirs(brokerIds).all())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public Mono<List<String>> getAllSubjectNames(KafkaCluster cluster) {
@SneakyThrows
private List<String> parseSubjectListString(String subjectNamesStr) {
//workaround for https://github.com/spring-projects/spring-framework/issues/24734
return new JsonMapper().readValue(subjectNamesStr, new TypeReference<List<String>>() {
return new JsonMapper().readValue(subjectNamesStr, new TypeReference<>() {
});
}

Expand Down
12 changes: 5 additions & 7 deletions api/src/main/java/io/kafbat/ui/service/TopicsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private Mono<InternalTopic> loadTopic(KafkaCluster c, String topicName) {

/**
* After creation topic can be invisible via API for some time.
* To workaround this, we retyring topic loading until it becomes visible.
* To workaround this, we're retrying topic loading until it becomes visible.
*/
private Mono<InternalTopic> loadTopicAfterCreation(KafkaCluster c, String topicName) {
return loadTopic(c, topicName)
Expand Down Expand Up @@ -137,8 +137,7 @@ private List<InternalTopic> createList(List<String> orderedNames,
.collect(toList());
}

private Mono<InternalPartitionsOffsets> getPartitionOffsets(Map<String, TopicDescription>
descriptionsMap,
private Mono<InternalPartitionsOffsets> getPartitionOffsets(Map<String, TopicDescription> descriptionsMap,
ReactiveAdminClient ac) {
var descriptions = descriptionsMap.values();
return ac.listOffsets(descriptions, OffsetSpec.earliest())
Expand Down Expand Up @@ -225,8 +224,7 @@ private Mono<InternalTopic> updateTopic(KafkaCluster cluster,
.then(loadTopic(cluster, topicName)));
}

public Mono<InternalTopic> updateTopic(KafkaCluster cl, String topicName,
Mono<TopicUpdateDTO> topicUpdate) {
public Mono<InternalTopic> updateTopic(KafkaCluster cl, String topicName, Mono<TopicUpdateDTO> topicUpdate) {
return topicUpdate
.flatMap(t -> updateTopic(cl, topicName, t));
}
Expand Down Expand Up @@ -298,7 +296,7 @@ private Map<TopicPartition, Optional<NewPartitionReassignment>> getPartitionsRea
var brokers = brokersUsage.entrySet().stream()
.sorted(Map.Entry.comparingByValue())
.map(Map.Entry::getKey)
.collect(toList());
.toList();

// Iterate brokers and try to add them in assignment
// while partition replicas count != requested replication factor
Expand Down Expand Up @@ -326,7 +324,7 @@ private Map<TopicPartition, Optional<NewPartitionReassignment>> getPartitionsRea
var brokersUsageList = brokersUsage.entrySet().stream()
.sorted(Map.Entry.comparingByValue(Comparator.reverseOrder()))
.map(Map.Entry::getKey)
.collect(toList());
.toList();

// Iterate brokers and try to remove them from assignment
// while partition replicas count != requested replication factor
Expand Down
Loading

0 comments on commit 91beb6d

Please sign in to comment.