Skip to content

Commit

Permalink
[CHECKER] add base code
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 committed Jan 2, 2025
1 parent 6857bab commit 787b26e
Show file tree
Hide file tree
Showing 9 changed files with 359 additions and 6 deletions.
9 changes: 3 additions & 6 deletions app/src/main/java/org/astraea/app/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
import java.util.Map;
import org.astraea.app.automation.Automation;
import org.astraea.app.benchmark.BalancerBenchmarkApp;
import org.astraea.app.homework.BulkChecker;
import org.astraea.app.homework.BulkSender;
import org.astraea.app.checker.Checker;
import org.astraea.app.homework.Prepare;
import org.astraea.app.homework.SendYourData;
import org.astraea.app.performance.Performance;
Expand All @@ -35,10 +34,8 @@
public class App {
private static final Map<String, Class<?>> MAIN_CLASSES =
Map.of(
"bulk_sender",
BulkSender.class,
"bulk_checker",
BulkChecker.class,
"40_checker",
Checker.class,
"performance",
Performance.class,
"prepare",
Expand Down
35 changes: 35 additions & 0 deletions app/src/main/java/org/astraea/app/checker/Changelog.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.app.checker;

import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

public class Changelog {
private List<Protocol> protocols;
private List<Config> configs;

public Map<String, Protocol> protocols() {
return protocols.stream().collect(Collectors.toMap(Protocol::name, Function.identity()));
}

public Map<String, Config> configs() {
return configs.stream().collect(Collectors.toMap(Config::name, Function.identity()));
}
}
93 changes: 93 additions & 0 deletions app/src/main/java/org/astraea/app/checker/Checker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.app.checker;

import com.beust.jcommander.Parameter;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.Node;
import org.astraea.app.argument.IntegerMapField;
import org.astraea.app.argument.NonEmptyStringField;
import org.astraea.app.argument.NonNegativeIntegerField;
import org.astraea.common.json.JsonConverter;
import org.astraea.common.json.TypeRef;
import org.astraea.common.metrics.JndiClient;
import org.astraea.common.metrics.MBeanClient;

public class Checker {

private static final List<Guard> GUARDS = List.of(new ProduceRpcGuard());

public static void main(String[] args) throws Exception {
execute(Argument.parse(new Argument(), args));
}

public static void execute(final Argument param) throws Exception {
try (var admin = Admin.create(Map.of("bootstrap.servers", param.bootstrapServers()))) {
for (var guard : GUARDS) {
var result = guard.run(admin, param.mBeanClientFunction(), param.readChangelog());
System.out.println(result);
}
}
}

public static class Argument extends org.astraea.app.argument.Argument {
@Parameter(
names = {"--changelog"},
description = "String: url of changelog file",
validateWith = NonEmptyStringField.class)
String changelog =
"https://raw.githubusercontent.com/opensource4you/astraea/refs/heads/main/config/kafka_changelog.json";

Changelog readChangelog() throws IOException {
try (var in = new URL(changelog).openStream()) {
return JsonConverter.defaultConverter()
.fromJson(
new String(in.readAllBytes(), StandardCharsets.UTF_8), TypeRef.of(Changelog.class));
}
}

@Parameter(
names = {"--jmx.port"},
description = "Integer: the port to query JMX for each server",
validateWith = NonNegativeIntegerField.class,
converter = NonNegativeIntegerField.class)
int jmxPort = -1;

@Parameter(
names = {"--jmx.ports"},
description =
"Map: the JMX port for each broker. For example: 1024=19999 means for the broker with id 1024, its JMX port located at 19999 port",
validateWith = IntegerMapField.class,
converter = IntegerMapField.class)
Map<Integer, Integer> jmxPorts = Map.of();

Function<Node, MBeanClient> mBeanClientFunction() {
return node -> {
int port = jmxPorts.getOrDefault(node.id(), jmxPort);
if (port < 0)
throw new IllegalArgumentException("Failed to get jmx port for broker: " + node);
return JndiClient.of(node.host(), port);
};
}
}
}
42 changes: 42 additions & 0 deletions app/src/main/java/org/astraea/app/checker/Config.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.app.checker;

import java.util.Optional;

public class Config {
private String name;
private Optional<String> value;
private String commit;
private String kip;

public String name() {
return name;
}

public Optional<String> value() {
return value;
}

public String commit() {
return commit;
}

public String kip() {
return kip;
}
}
28 changes: 28 additions & 0 deletions app/src/main/java/org/astraea/app/checker/Guard.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.app.checker;

import java.util.Collection;
import java.util.function.Function;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.Node;
import org.astraea.common.metrics.MBeanClient;

public interface Guard {
Collection<Report> run(Admin admin, Function<Node, MBeanClient> clients, Changelog changelog)
throws Exception;
}
44 changes: 44 additions & 0 deletions app/src/main/java/org/astraea/app/checker/ProduceRpcGuard.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.app.checker;

import java.util.Collection;
import java.util.function.Function;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.Node;
import org.astraea.common.metrics.MBeanClient;
import org.astraea.common.metrics.broker.NetworkMetrics;

public class ProduceRpcGuard implements Guard {
@Override
public Collection<Report> run(
Admin admin, Function<Node, MBeanClient> clients, Changelog changelog) throws Exception {
return admin.describeCluster().nodes().get().stream()
.map(
node -> {
var protocol =
changelog
.protocols()
.get(NetworkMetrics.Request.PRODUCE.metricName().toLowerCase());
if (protocol == null) return Report.empty();
var versions = NetworkMetrics.Request.PRODUCE.versions(clients.apply(node));
return Report.of(node, protocol, versions);
})
.flatMap(Report::stream)
.toList();
}
}
40 changes: 40 additions & 0 deletions app/src/main/java/org/astraea/app/checker/Protocol.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.app.checker;

public class Protocol {
private String name;
private int base;
private String commit;
private String kip;

public int base() {
return base;
}

public String name() {
return name;
}

public String commit() {
return commit;
}

public String kip() {
return kip;
}
}
52 changes: 52 additions & 0 deletions app/src/main/java/org/astraea/app/checker/Report.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.app.checker;

import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.Node;

public record Report(Node node, String why) {
static Report noMetrics(Node node) {
return new Report(node, "failed to get metrics from");
}

static Report of(Node node, String why) {
return new Report(node, why);
}

static Report empty() {
return new Report(null, "");
}

static Report of(Node node, Protocol protocol, Set<Integer> versions) {
var unsupportedVersions =
versions.stream().filter(v -> v < protocol.base()).collect(Collectors.toSet());
if (unsupportedVersions.isEmpty()) return empty();
return new Report(
node,
String.format(
"there are unsupported %s versions: %s due to new baseline: %s",
protocol.name(), unsupportedVersions, protocol.base()));
}

Stream<Report> stream() {
if (why.isEmpty()) return Stream.empty();
return Stream.of(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.astraea.common.EnumInfo;
Expand Down Expand Up @@ -132,6 +134,26 @@ public Histogram fetch(MBeanClient mBeanClient) {
return new Histogram(mBeanClient.bean(ALL.get(this)));
}

public Set<Integer> versions(MBeanClient mBeanClient) {
try {
var beanObjects =
mBeanClient.beans(
BeanQuery.builder()
.domainName("kafka.network")
.property("type", "RequestMetrics")
.property("request", "Produce")
.property("name", "RequestsPerSec")
.property("version", "*")
.build());
return beanObjects.stream()
.map(b -> Integer.parseInt(b.properties().get("version")))
.collect(Collectors.toSet());
} catch (NoSuchElementException ignored) {
// this is expected if the node has no such request
return Set.of();
}
}

public record Histogram(BeanObject beanObject) implements HasHistogram {

public Request type() {
Expand Down

0 comments on commit 787b26e

Please sign in to comment.