Skip to content

Commit

Permalink
Fixed bugs (#90)
Browse files Browse the repository at this point in the history
* Fixed bugs

* More fixes
  • Loading branch information
germanosin authored Aug 3, 2020
1 parent bc2efc9 commit 8057dc1
Show file tree
Hide file tree
Showing 19 changed files with 201 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ public class SimpleRecordDeserializer implements RecordDeserializer {

@Override
public Object deserialize(ConsumerRecord<Bytes, Bytes> record) {
return stringDeserializer.deserialize(record.topic(), record.value().get());
if (record.value()!=null) {
return stringDeserializer.deserialize(record.topic(), record.value().get());
} else {
return "empty";
}
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package com.provectus.kafka.ui.cluster.mapper;

import com.provectus.kafka.ui.cluster.config.ClustersProperties;
import com.provectus.kafka.ui.cluster.model.InternalClusterMetrics;
import com.provectus.kafka.ui.cluster.model.InternalTopic;
import com.provectus.kafka.ui.cluster.model.InternalTopicConfig;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import com.provectus.kafka.ui.cluster.model.*;
import com.provectus.kafka.ui.model.*;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
Expand All @@ -19,8 +16,10 @@ public interface ClusterMapper {
Cluster toCluster(KafkaCluster cluster);

KafkaCluster toKafkaCluster(ClustersProperties.Cluster clusterProperties);
BrokersMetrics toBrokerMetrics(InternalClusterMetrics metrics);
ClusterMetrics toClusterMetrics(InternalClusterMetrics metrics);
BrokerMetrics toBrokerMetrics(InternalBrokerMetrics metrics);
Topic toTopic(InternalTopic topic);
TopicDetails toTopicDetails(InternalTopic topic);
TopicConfig toTopicConfig(InternalTopicConfig topic);
Replica toReplica(InternalReplica replica);
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@
@Builder(toBuilder = true)
public class InternalBrokerMetrics {
private final Long segmentSize;
private final List<Metric> jmxMetrics;
private final List<Metric> metrics;
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
import java.util.List;

@Data
@Builder
@Builder(toBuilder = true)
public class InternalPartition {
private final int partition;
private final Integer leader;
private final List<InternalReplica> replicas;
private final int inSyncReplicasCount;
private final int replicasCount;
private final long offsetMin;
private final long offsetMax;
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.provectus.kafka.ui.cluster.model;

import com.provectus.kafka.ui.model.TopicPartitionDto;
import lombok.Builder;
import lombok.Data;
import org.apache.kafka.common.TopicPartition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
public class KafkaCluster {

private final String name;
private final int jmxPort;
private final Integer jmxPort;
private final String bootstrapServers;
private final String zookeeper;
private final String schemaRegistry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,22 @@ public List<Cluster> getClusters() {
.collect(Collectors.toList());
}

public Mono<BrokersMetrics> getBrokersMetrics(String name, Integer id) {
public Mono<BrokerMetrics> getBrokerMetrics(String name, Integer id) {
return Mono.justOrEmpty(clustersStorage.getClusterByName(name)
.map(KafkaCluster::getMetrics)
.map(s -> {
var brokerMetrics = clusterMapper.toBrokerMetrics(s);
brokerMetrics.setMetrics(s.getInternalBrokerMetrics().get(id).getJmxMetrics());
brokerMetrics.setSegmentZise(Long.valueOf(s.getSegmentSize()).intValue());
return brokerMetrics;
}));
.map( c -> c.getMetrics().getInternalBrokerMetrics())
.map( m -> m.get(id))
.map(clusterMapper::toBrokerMetrics));
}

public Mono<ClusterMetrics> getClusterMetrics(String name) {
return Mono.justOrEmpty(
clustersStorage.getClusterByName(name)
.map(KafkaCluster::getMetrics)
.map(clusterMapper::toClusterMetrics)
);
}


public List<Topic> getTopics(String name) {
return clustersStorage.getClusterByName(name)
.map(c ->
Expand All @@ -60,12 +65,15 @@ public List<Topic> getTopics(String name) {

public Optional<TopicDetails> getTopicDetails(String name, String topicName) {
return clustersStorage.getClusterByName(name)
.map(c -> {
var topic = c.getTopics().get(topicName);
return clusterMapper
.toTopicDetails(topic)
.partitions(kafkaService.partitionDtoList(topic, c));
});
.flatMap( c ->
Optional.ofNullable(
c.getTopics().get(topicName)
).map(
t -> t.toBuilder().partitions(
kafkaService.getTopicPartitions(c, t)
).build()
).map(clusterMapper::toTopicDetails)
);
}

public Optional<List<TopicConfig>> getTopicConfigs(String name, String topicName) {
Expand Down Expand Up @@ -143,6 +151,7 @@ public Mono<Topic> updateTopic(String clusterName, String topicName, Mono<TopicF
return clustersStorage.getClusterByName(clusterName).map(cl ->
topicFormData
.flatMap(t -> kafkaService.updateTopic(cl, topicName, t))
.map(clusterMapper::toTopic)
.flatMap(t -> updateCluster(t, clusterName, cl))
)
.orElse(Mono.empty());
Expand All @@ -161,4 +170,5 @@ public Flux<TopicMessage> getMessages(String clusterName, String topicName, Cons
.map(c -> consumingService.loadMessages(c, topicName, consumerPosition, query, limit))
.orElse(Flux.empty());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,12 @@ public static List<ConsumerTopicPartitionDetail> convertToConsumerTopicPartition
) {
return consumer.assignment().topicPartitions().stream()
.map(tp -> {
Long currentOffset = groupOffsets.get(tp).offset();
Long endOffset = endOffsets.get(tp);
Long currentOffset = Optional.ofNullable(
groupOffsets.get(tp)).map(o -> o.offset()).orElse(0L);
Long endOffset = Optional.ofNullable(endOffsets.get(tp)).orElse(0L);
ConsumerTopicPartitionDetail cd = new ConsumerTopicPartitionDetail();
cd.setConsumerId(consumer.consumerId());
cd.setHost(consumer.host());
cd.setTopic(tp.topic());
cd.setPartition(tp.partition());
cd.setCurrentOffset(currentOffset);
Expand Down Expand Up @@ -116,7 +118,7 @@ public static InternalTopic mapToInternalTopic(TopicDescription topicDescription

int urpCount = partitions.stream()
.flatMap(partition -> partition.getReplicas().stream())
.filter(InternalReplica::isInSync).mapToInt(e -> 1)
.filter(p -> !p.isInSync()).mapToInt(e -> 1)
.sum();

int inSyncReplicasCount = partitions.stream()
Expand Down Expand Up @@ -199,6 +201,10 @@ private static ExtendedAdminClient.SupportedFeature getSupportedUpdateFeature(Ma
.filter(entry -> entry.name().contains(CLUSTER_VERSION_PARAM_KEY))
.findFirst().orElseThrow().value();
try {
final String[] parts = version.split("\\.");
if (parts.length>2) {
version = parts[0] + "." + parts[1];
}
return Float.parseFloat(version.split("-")[0]) <= 2.3f
? ExtendedAdminClient.SupportedFeature.ALTER_CONFIGS : ExtendedAdminClient.SupportedFeature.INCREMENTAL_ALTER_CONFIGS;
} catch (Exception e) {
Expand All @@ -207,24 +213,6 @@ private static ExtendedAdminClient.SupportedFeature getSupportedUpdateFeature(Ma
}
}

public static Topic convertToTopic(InternalTopic internalTopic) {
Topic topic = new Topic();
topic.setName(internalTopic.getName());
List<Partition> partitions = internalTopic.getPartitions().stream().flatMap(s -> {
Partition partition = new Partition();
partition.setPartition(s.getPartition());
partition.setLeader(s.getLeader());
partition.setReplicas(s.getReplicas().stream().flatMap(r -> {
Replica replica = new Replica();
replica.setBroker(r.getBroker());
return Stream.of(replica);
}).collect(Collectors.toList()));
return Stream.of(partition);
}).collect(Collectors.toList());
topic.setPartitions(partitions);
return topic;
}

public static <T, R> Map<T, R> toSingleMap (Stream<Map<T, R>> streamOfMaps) {
return streamOfMaps.reduce((map1, map2) -> Stream.concat(map1.entrySet().stream(), map2.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).orElseThrow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ private void closeConnectionExceptionally(String url, JMXConnector srv) {
public List<MetricDto> convertToMetricDto(InternalClusterMetrics internalClusterMetrics) {
return internalClusterMetrics.getInternalBrokerMetrics().values().stream()
.map(c ->
c.getJmxMetrics().stream()
c.getMetrics().stream()
.filter(j -> isSameMetric(j.getCanonicalName()))
.map(j -> j.getValue().entrySet().stream()
.map(e -> new MetricDto(j.getCanonicalName(), e.getKey(), e.getValue()))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ private Mono<String> createTopic(AdminClient adminClient, NewTopic newTopic) {
}

@SneakyThrows
public Mono<Topic> updateTopic(KafkaCluster cluster, String topicName, TopicFormData topicFormData) {
public Mono<InternalTopic> updateTopic(KafkaCluster cluster, String topicName, TopicFormData topicFormData) {
ConfigResource topicCR = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
return getOrCreateAdminClient(cluster)
.flatMap(ac -> {
Expand All @@ -281,11 +281,10 @@ public Mono<Topic> updateTopic(KafkaCluster cluster, String topicName, TopicForm



private Mono<Topic> getUpdatedTopic (ExtendedAdminClient ac, String topicName) {
private Mono<InternalTopic> getUpdatedTopic (ExtendedAdminClient ac, String topicName) {
return getTopicsData(ac.getAdminClient())
.map(s -> s.stream()
.filter(t -> t.getName().equals(topicName)).findFirst().orElseThrow())
.map(ClusterUtil::convertToTopic);
.filter(t -> t.getName().equals(topicName)).findFirst().orElseThrow());
}

private Mono<String> incrementalAlterConfig(TopicFormData topicFormData, ConfigResource topicCR, ExtendedAdminClient ac) {
Expand Down Expand Up @@ -346,6 +345,8 @@ private Mono<InternalSegmentSizeDto> updateSegmentMetrics(AdminClient ac, Intern

public List<Metric> getJmxMetric(String clusterName, Node node) {
return clustersStorage.getClusterByName(clusterName)
.filter( c -> c.getJmxPort() != null)
.filter( c -> c.getJmxPort() > 0)
.map(c -> jmxClusterUtil.getJmxMetrics(c.getJmxPort(), node.host())).orElse(Collections.emptyList());
}

Expand All @@ -357,7 +358,7 @@ private Mono<InternalClusterMetrics> fillBrokerMetrics(InternalClusterMetrics in
return ClusterUtil.toMono(ac.describeCluster().nodes())
.flatMapIterable(nodes -> nodes)
.map(broker -> Map.of(broker.id(), InternalBrokerMetrics.builder().
jmxMetrics(getJmxMetric(clusterName, broker)).build()))
metrics(getJmxMetric(clusterName, broker)).build()))
.collectList()
.map(s -> internalClusterMetrics.toBuilder().internalBrokerMetrics(ClusterUtil.toSingleMap(s.stream())).build());
}
Expand All @@ -377,22 +378,25 @@ private InternalClusterMetrics calculateClusterMetrics(InternalClusterMetrics in
.collect(Collectors.toList())).build();
}

public List<TopicPartitionDto> partitionDtoList (InternalTopic topic, KafkaCluster cluster) {
var topicPartitions = topic.getPartitions().stream().map(t -> new TopicPartition(topic.getName(), t.getPartition())).collect(Collectors.toList());
return getTopicPartitionOffset(cluster, topicPartitions);
}
public List<InternalPartition> getTopicPartitions(KafkaCluster c, InternalTopic topic ) {
var tps = topic.getPartitions().stream()
.map(t -> new TopicPartition(topic.getName(), t.getPartition()))
.collect(Collectors.toList());
Map<Integer, InternalPartition> partitions =
topic.getPartitions().stream().collect(Collectors.toMap(
InternalPartition::getPartition,
tp -> tp
));

private List<TopicPartitionDto> getTopicPartitionOffset(KafkaCluster c, List<TopicPartition> topicPartitions ) {
try (var consumer = createConsumer(c)) {
final Map<TopicPartition, Long> earliest = consumer.beginningOffsets(topicPartitions);
final Map<TopicPartition, Long> latest = consumer.endOffsets(topicPartitions);
final Map<TopicPartition, Long> earliest = consumer.beginningOffsets(tps);
final Map<TopicPartition, Long> latest = consumer.endOffsets(tps);

return topicPartitions.stream()
.map( tp -> new TopicPartitionDto()
.topic(tp.topic())
.partition(tp.partition())
return tps.stream()
.map( tp -> partitions.get(tp.partition()).toBuilder()
.offsetMin(Optional.ofNullable(earliest.get(tp)).orElse(0L))
.offsetMax(Optional.ofNullable(latest.get(tp)).orElse(0L))
.build()
).collect(Collectors.toList());
} catch (Exception e) {
return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,19 @@ public Mono<ResponseEntity<Flux<Cluster>>> getClusters(ServerWebExchange exchang
}

@Override
public Mono<ResponseEntity<BrokersMetrics>> getBrokersMetrics(String clusterName, Integer id, ServerWebExchange exchange) {
return clusterService.getBrokersMetrics(clusterName, id)
public Mono<ResponseEntity<BrokerMetrics>> getBrokersMetrics(String clusterName, Integer id, ServerWebExchange exchange) {
return clusterService.getBrokerMetrics(clusterName, id)
.map(ResponseEntity::ok)
.onErrorReturn(ResponseEntity.notFound().build());
}

@Override
public Mono<ResponseEntity<ClusterMetrics>> getClusterMetrics(String clusterName, ServerWebExchange exchange) {
return clusterService.getClusterMetrics(clusterName)
.map(ResponseEntity::ok)
.onErrorReturn(ResponseEntity.notFound().build());
}

@Override
public Mono<ResponseEntity<Flux<Topic>>> getTopics(String clusterName, ServerWebExchange exchange) {
return Mono.just(ResponseEntity.ok(Flux.fromIterable(clusterService.getTopics(clusterName))));
Expand Down
Loading

0 comments on commit 8057dc1

Please sign in to comment.