Skip to content

Commit

Permalink
KAFKA-16469: Metadata schema checker (apache#15995)
Browse files Browse the repository at this point in the history
Create a schema checker that can validate that later versions of a KRPC schema are compatible with earlier ones.

Reviewers: David Arthur <[email protected]>
  • Loading branch information
cmccabe authored Oct 3, 2024
1 parent 85bfdf4 commit dbd50ff
Show file tree
Hide file tree
Showing 22 changed files with 1,717 additions and 14 deletions.
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@
<allow pkg="com.fasterxml.jackson" />
<allow pkg="com.fasterxml.jackson.annotation" />
<allow pkg="net.sourceforge.argparse4j" />
<allow pkg="org.apache.kafka.message" />
<allow pkg="org.apache.message" />
</subpackage>

Expand Down
4 changes: 2 additions & 2 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@

<!-- Generator -->
<suppress checks="CyclomaticComplexity|BooleanExpressionComplexity"
files="(SchemaGenerator|MessageDataGenerator|FieldSpec|FieldType).java"/>
files="(SchemaGenerator|MessageDataGenerator|FieldSpec|FieldSpecPairIterator|FieldType).java"/>
<suppress checks="NPathComplexity"
files="(MessageDataGenerator|FieldSpec|WorkerSinkTask).java"/>
files="(FieldSpecPairIterator|MessageDataGenerator|FieldSpec|WorkerSinkTask).java"/>
<suppress checks="JavaNCSS"
files="(ApiMessageType|FieldSpec|MessageDataGenerator|KafkaConsumerTest).java"/>
<suppress checks="MethodLength"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@ public FieldSpec(@JsonProperty("name") String name,
if (!this.type.isArray() && !this.type.isStruct()) {
throw new RuntimeException("Non-array or Struct field " + name + " cannot have fields");
}
// Check struct invariants
if (this.type.isStruct() || this.type.isStructArray()) {
new StructSpec(name,
versions,
Versions.NONE_STRING, // version deprecations not supported at field level
fields);
}
}

if (flexibleVersions == null || flexibleVersions.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;

import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
Expand Down Expand Up @@ -165,7 +166,7 @@ public final class MessageGenerator {
/**
* The Jackson serializer we use for JSON objects.
*/
static final ObjectMapper JSON_SERDE;
public static final ObjectMapper JSON_SERDE;

static {
JSON_SERDE = new ObjectMapper();
Expand All @@ -174,6 +175,7 @@ public final class MessageGenerator {
JSON_SERDE.configure(DeserializationFeature.FAIL_ON_TRAILING_TOKENS, true);
JSON_SERDE.configure(JsonParser.Feature.ALLOW_COMMENTS, true);
JSON_SERDE.setSerializationInclusion(JsonInclude.Include.NON_EMPTY);
JSON_SERDE.registerModule(new Jdk8Module());
}

private static List<TypeClassGenerator> createTypeClassGenerators(String packageName,
Expand Down Expand Up @@ -272,7 +274,7 @@ public static void processDirectories(String packageName,
System.out.printf("MessageGenerator: processed %d Kafka message JSON files(s).%n", numProcessed);
}

static String capitalizeFirst(String string) {
public static String capitalizeFirst(String string) {
if (string.isEmpty()) {
return string;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
/**
* Contains structure data for Kafka MessageData classes.
*/
final class StructRegistry {
public final class StructRegistry {
private final Map<String, StructInfo> structs;
private final Set<String> commonStructNames;

Expand Down Expand Up @@ -58,15 +58,15 @@ public Versions parentVersions() {
}
}

StructRegistry() {
public StructRegistry() {
this.structs = new TreeMap<>();
this.commonStructNames = new TreeSet<>();
}

/**
* Register all the structures contained a message spec.
*/
void register(MessageSpec message) {
public void register(MessageSpec message) throws Exception {
// Register common structures.
for (StructSpec struct : message.commonStructs()) {
if (!MessageGenerator.firstIsCapitalized(struct.name())) {
Expand Down Expand Up @@ -122,7 +122,7 @@ private void addStructSpecs(Versions parentVersions, List<FieldSpec> fields) {
/**
* Locate the struct corresponding to a field.
*/
StructSpec findStruct(FieldSpec field) {
public StructSpec findStruct(FieldSpec field) {
String structFieldName;
if (field.type().isArray()) {
FieldType.ArrayType arrayType = (FieldType.ArrayType) field.type();
Expand All @@ -134,6 +134,10 @@ StructSpec findStruct(FieldSpec field) {
throw new RuntimeException("Field " + field.name() +
" cannot be treated as a structure.");
}
return findStruct(structFieldName);
}

public StructSpec findStruct(String structFieldName) {
StructInfo structInfo = structs.get(structFieldName);
if (structInfo == null) {
throw new RuntimeException("Unable to locate a specification for the structure " +
Expand All @@ -145,7 +149,7 @@ StructSpec findStruct(FieldSpec field) {
/**
* Return true if the field is a struct array with keys.
*/
boolean isStructArrayWithKeys(FieldSpec field) {
public boolean isStructArrayWithKeys(FieldSpec field) {
if (!field.type().isArray()) {
return false;
}
Expand Down
16 changes: 11 additions & 5 deletions generator/src/main/java/org/apache/kafka/message/StructSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,20 @@ public StructSpec(@JsonProperty("name") String name,
if (fields != null) {
// Each field should have a unique tag ID (if the field has a tag ID).
HashSet<Integer> tags = new HashSet<>();
// Each field should have a unique name.
HashSet<String> names = new HashSet<>();
for (FieldSpec field : fields) {
if (field.tag().isPresent()) {
if (tags.contains(field.tag().get())) {
field.tag().ifPresent(tag -> {
if (!tags.add(tag)) {
throw new RuntimeException("In " + name + ", field " + field.name() +
" has a duplicate tag ID " + field.tag().get() + ". All tags IDs " +
"must be unique.");
" has a duplicate tag ID " + tag + ". All tags IDs " +
"must be unique.");
}
tags.add(field.tag().get());
});
if (!names.add(field.name())) {
throw new RuntimeException("In " + name + ", field " + field.name() +
" has a duplicate name " + field.name() + ". All field names " +
"must be unique.");
}
newFields.add(field);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.message.checker;

import org.apache.kafka.message.FieldSpec;
import org.apache.kafka.message.MessageGenerator;
import org.apache.kafka.message.MessageSpec;
import org.apache.kafka.message.Versions;

import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

/**
* Utilities for the metadata schema checker.
*/
class CheckerUtils {
/**
* A min function defined for shorts.
*
* @param a The first short integer to compare.
* @param b The second short integer to compare.
* @return The minimum short integer.
*/
static short min(short a, short b) {
return a < b ? a : b;
}

/**
* A max function defined for shorts.
*
* @param a The first short integer to compare.
* @param b The second short integer to compare.
* @return The maximum short integer.
*/
static short max(short a, short b) {
return a > b ? a : b;
}

/**
* Validate the a field doesn't have tagged versions that are outside of the top-level flexible
* versions.
*
* @param what A description of the field.
* @param field The field to validate.
* @param topLevelFlexibleVersions The top-level flexible versions.
*/
static void validateTaggedVersions(
String what,
FieldSpec field,
Versions topLevelFlexibleVersions
) {
if (!field.flexibleVersions().isPresent()) {
if (!topLevelFlexibleVersions.contains(field.taggedVersions())) {
throw new RuntimeException("Tagged versions for " + what + " " +
field.name() + " are " + field.taggedVersions() + ", but top " +
"level flexible versions are " + topLevelFlexibleVersions);
}
}
}

/**
* Read a MessageSpec file from a path.
*
* @param schemaPath The path to read the file from.
* @return The MessageSpec.
*/
static MessageSpec readMessageSpecFromFile(String schemaPath) {
if (!Files.isReadable(Paths.get(schemaPath))) {
throw new RuntimeException("Path " + schemaPath + " does not point to " +
"a readable file.");
}
try {
return MessageGenerator.JSON_SERDE.readValue(new File(schemaPath), MessageSpec.class);
} catch (Exception e) {
throw new RuntimeException("Unable to parse file as MessageSpec: " + schemaPath, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.message.checker;

/**
* An exception thrown when a schema is evolved an invalid way.
*/
public class EvolutionException extends RuntimeException {
public EvolutionException(String message, Throwable t) {
super(message, t);
}

public EvolutionException(String message) {
super(message, null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.message.checker;

import org.apache.kafka.message.FieldSpec;
import org.apache.kafka.message.MessageSpec;
import org.apache.kafka.message.StructSpec;

public class EvolutionVerifier {
private final MessageSpec topLevelMessage1;
private final MessageSpec topLevelMessage2;

public EvolutionVerifier(
MessageSpec topLevelMessage1,
MessageSpec topLevelMessage2
) {
this.topLevelMessage1 = topLevelMessage1;
this.topLevelMessage2 = topLevelMessage2;
}

public void verify() throws Exception {
verifyTopLevelMessages(topLevelMessage1, topLevelMessage2);
verifyVersionsMatchTopLevelMessage("message1", topLevelMessage1);
verifyVersionsMatchTopLevelMessage("message2", topLevelMessage2);
Unifier unifier = new Unifier(topLevelMessage1, topLevelMessage2);
unifier.unify();
}

static void verifyTopLevelMessages(MessageSpec topLevelMessage1, MessageSpec topLevelMessage2) {
if (!topLevelMessage1.apiKey().equals(topLevelMessage2.apiKey())) {
throw new EvolutionException("Initial apiKey " + topLevelMessage1.apiKey() +
" does not match final apiKey " + topLevelMessage2.apiKey());
}
if (!topLevelMessage1.type().equals(topLevelMessage2.type())) {
throw new EvolutionException("Initial type " + topLevelMessage1.type() +
" does not match final type " + topLevelMessage2.type());
}
if (!topLevelMessage2.flexibleVersions().contains(topLevelMessage1.flexibleVersions())) {
throw new EvolutionException("Initial flexibleVersions " + topLevelMessage1.flexibleVersions() +
" must be a subset of final flexibleVersions " + topLevelMessage2.flexibleVersions());
}
if (topLevelMessage2.validVersions().highest() < topLevelMessage1.validVersions().highest()) {
throw new EvolutionException("Initial maximum valid version " +
topLevelMessage1.validVersions().highest() + " must not be higher than final " +
"maximum valid version " + topLevelMessage2.validVersions().highest());
}
if (topLevelMessage2.validVersions().lowest() < topLevelMessage1.validVersions().lowest()) {
throw new EvolutionException("Initial minimum valid version " +
topLevelMessage1.validVersions().lowest() + " must not be higher than final " +
"minimum valid version " + topLevelMessage2.validVersions().lowest());
}
}

static void verifyVersionsMatchTopLevelMessage(
String what,
MessageSpec topLevelMessage
) {
for (FieldSpec field : topLevelMessage.fields()) {
verifyVersionsMatchTopLevelMessage(what, topLevelMessage, field);
}
for (StructSpec struct : topLevelMessage.commonStructs()) {
for (FieldSpec field : topLevelMessage.fields()) {
verifyVersionsMatchTopLevelMessage(what, topLevelMessage, field);
}
}
}

static void verifyVersionsMatchTopLevelMessage(
String what,
MessageSpec topLevelMessage,
FieldSpec field
) {
if (topLevelMessage.validVersions().intersect(field.versions()).empty()) {
throw new EvolutionException("Field " + field.name() + " in " + what + " has versions " +
field.versions() + ", but the message versions are only " +
topLevelMessage.validVersions() + ".");
}
if (!field.nullableVersions().empty()) {
if (topLevelMessage.validVersions().intersect(field.nullableVersions()).empty()) {
throw new EvolutionException("Field " + field.name() + " in " + what +
" has nullableVersions " + field.nullableVersions() + ", but the message " +
"versions are only " + topLevelMessage.validVersions() + ".");
}
}
if (field.tag().isPresent()) {
if (topLevelMessage.validVersions().intersect(field.taggedVersions()).empty()) {
throw new EvolutionException("Field " + field.name() + " in " + what +
" has taggedVersions " + field.taggedVersions() + ", but the message " +
"versions are only " + topLevelMessage.validVersions() + ".");
}
}
field.flexibleVersions().ifPresent(v -> {
if (topLevelMessage.validVersions().intersect(v).empty()) {
throw new EvolutionException("Field " + field.name() + " in " + what +
" has flexibleVersions " + v + ", but the message versions are only " +
topLevelMessage.validVersions() + ".");
}

});
for (FieldSpec child : field.fields()) {
verifyVersionsMatchTopLevelMessage(what, topLevelMessage, child);
}
}
}
Loading

0 comments on commit dbd50ff

Please sign in to comment.