diff --git a/README.md b/README.md index 09506f7..f60677e 100644 --- a/README.md +++ b/README.md @@ -78,6 +78,8 @@ Configuration Parameter | Default | Description **transfer.message.keys** | true | Indicates whether Avro schemas from message keys in source records should be copied to the destination Registry. **include.message.headers** | true | Indicates whether message headers from source records should be preserved after the transform. **schema.capacity** | 100 | Capacity of schemas that can be cached in each `CachedSchemaRegistryClient` +**key.subject.name.strategy** | TopicNameStrategy | Key subject name strategy for destination Schema Registry, possible values: TopicNameStrategy, RecordNameStrategy, TopicRecordNameStrategy +**value.subject.name.strategy** | TopicNameStrategy | Value subject name strategy for destination Schema Registry, possible values: TopicNameStrategy, RecordNameStrategy, TopicRecordNameStrategy ## Embedded Schema Registry Client Configuration diff --git a/pom.xml b/pom.xml index 86547c4..d02bd73 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ <groupId>cricket.jmoore</groupId> <artifactId>schema-registry-transfer-smt</artifactId> - <version>0.2.1-SNAPSHOT</version> + <version>0.2.1</version> <name>schema-registry-transfer-smt</name> <description> A Kafka Connect Transform for copying Confluent Schema Registry schemas between different registries. @@ -66,11 +66,10 @@ <maven.compiler.target>1.8</maven.compiler.target> <slf4j.version>1.7.25</slf4j.version> - <kafka.version>2.1.0</kafka.version> - <confluent.version>5.1.0</confluent.version> - <confluent.patch.version>-cp1</confluent.patch.version> - <jackson.version>2.9.7</jackson.version> - <jackson.asl.version>1.9.13</jackson.asl.version> + <confluent.version>5.5.0</confluent.version> + <jackson.version>2.10.2</jackson.version> + <avro.version>1.9.2</avro.version> + <jersey.bean.validation.version>2.30</jersey.bean.validation.version> <spotless.version>1.20.0</spotless.version> @@ -90,14 +89,14 @@ <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> - <version>${kafka.version}${confluent.patch.version}</version> + <version>${confluent.version}-ccs</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>connect-transforms</artifactId> - <version>${kafka.version}${confluent.patch.version}</version> + <version>${confluent.version}-ccs</version> <scope>provided</scope> </dependency> @@ -122,13 +121,6 @@ </exclusions> </dependency> - <dependency> - <groupId>com.101tec</groupId> - <artifactId>zkclient</artifactId> - <version>0.10</version> - <scope>provided</scope> - </dependency> - <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> @@ -137,16 +129,16 @@ </dependency> <dependency> - <groupId>org.codehaus.jackson</groupId> - <artifactId>jackson-core-asl</artifactId> - <version>${jackson.asl.version}</version> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + <version>${jackson.version}</version> <scope>provided</scope> </dependency> <dependency> - <groupId>org.codehaus.jackson</groupId> - <artifactId>jackson-mapper-asl</artifactId> - <version>${jackson.asl.version}</version> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + <version>${jackson.version}</version> <scope>provided</scope> </dependency> @@ -178,11 +170,18 @@ <scope>provided</scope> </dependency> + <dependency> + <groupId>org.glassfish.jersey.ext</groupId> + <artifactId>jersey-bean-validation</artifactId> + <version>${jersey.bean.validation.version}</version> + <scope>provided</scope> + </dependency> + <!-- Runtime dependencies --> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> - <version>1.8.1</version> + <version>${avro.version}</version> </dependency> <dependency> @@ -201,8 +200,16 @@ <artifactId>kafka-clients</artifactId> </exclusion> <exclusion> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> + <groupId>io.swagger</groupId> + <artifactId>swagger-annotations</artifactId> + </exclusion> + <exclusion> + <groupId>io.swagger</groupId> + <artifactId>swagger-core</artifactId> + </exclusion> + <exclusion> + <groupId>org.glassfish.jersey.ext</groupId> + <artifactId>jersey-bean-validation</artifactId> </exclusion> </exclusions> </dependency> diff --git a/src/main/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryTransfer.java b/src/main/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryTransfer.java index 2cf9e11..6e608cd 100644 --- a/src/main/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryTransfer.java +++ b/src/main/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryTransfer.java @@ -9,6 +9,7 @@ import java.util.Objects; import java.util.Optional; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.cache.Cache; import org.apache.kafka.common.cache.LRUCache; import org.apache.kafka.common.cache.SynchronizedCache; @@ -23,10 +24,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.confluent.kafka.schemaregistry.ParsedSchema; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; +import io.confluent.kafka.serializers.subject.RecordNameStrategy; import io.confluent.kafka.serializers.subject.TopicNameStrategy; +import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy; import io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy; @SuppressWarnings("unused") @@ -44,17 +48,19 @@ public class SchemaRegistryTransfer<R extends ConnectRecord<R>> implements Trans public static final String SRC_PREAMBLE = "For source consumer's schema registry, "; public static final String SRC_SCHEMA_REGISTRY_CONFIG_DOC = "A list of addresses for the Schema Registry to copy from. The consumer's Schema Registry."; - public static final String SRC_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DOC = SRC_PREAMBLE + AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DOC; - public static final String SRC_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DEFAULT = AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DEFAULT; - public static final String SRC_USER_INFO_CONFIG_DOC = SRC_PREAMBLE + AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DOC; - public static final String SRC_USER_INFO_CONFIG_DEFAULT = AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DEFAULT; + public static final String SRC_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DOC = SRC_PREAMBLE + AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DOC; + public static final String SRC_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DEFAULT = AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DEFAULT; + public static final String SRC_USER_INFO_CONFIG_DOC = SRC_PREAMBLE + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DOC; + public static final String SRC_USER_INFO_CONFIG_DEFAULT = AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DEFAULT; public static final String DEST_PREAMBLE = "For target producer's schema registry, "; public static final String DEST_SCHEMA_REGISTRY_CONFIG_DOC = "A list of addresses for the Schema Registry to copy to. The producer's Schema Registry."; - public static final String DEST_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DOC = DEST_PREAMBLE + AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DOC; - public static final String DEST_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DEFAULT = AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DEFAULT; - public static final String DEST_USER_INFO_CONFIG_DOC = DEST_PREAMBLE + AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DOC; - public static final String DEST_USER_INFO_CONFIG_DEFAULT = AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DEFAULT; + public static final String DEST_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DOC = DEST_PREAMBLE + AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DOC; + public static final String DEST_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DEFAULT = AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DEFAULT; + public static final String DEST_USER_INFO_CONFIG_DOC = DEST_PREAMBLE + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DOC; + public static final String DEST_USER_INFO_CONFIG_DEFAULT = AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DEFAULT; + public static final String KEY_SUBJECT_NAME_STRATEGY_DEFAULT = TopicNameStrategy.class.getSimpleName(); + public static final String VALUE_SUBJECT_NAME_STRATEGY_DEFAULT = TopicNameStrategy.class.getSimpleName(); public static final String TRANSFER_KEYS_CONFIG_DOC = "Whether or not to copy message key schemas between registries."; public static final Boolean TRANSFER_KEYS_CONFIG_DEFAULT = true; @@ -63,7 +69,8 @@ public class SchemaRegistryTransfer<R extends ConnectRecord<R>> implements Trans private CachedSchemaRegistryClient sourceSchemaRegistryClient; private CachedSchemaRegistryClient destSchemaRegistryClient; - private SubjectNameStrategy<org.apache.avro.Schema> subjectNameStrategy; + private SubjectNameStrategy keySubjectNameStrategy; + private SubjectNameStrategy valueSubjectNameStrategy; private boolean transferKeys, includeHeaders; // caches from the source registry to the destination registry @@ -83,8 +90,10 @@ public SchemaRegistryTransfer() { .define(ConfigName.SCHEMA_CAPACITY, ConfigDef.Type.INT, SCHEMA_CAPACITY_CONFIG_DEFAULT, ConfigDef.Importance.LOW, SCHEMA_CAPACITY_CONFIG_DOC) .define(ConfigName.TRANSFER_KEYS, ConfigDef.Type.BOOLEAN, TRANSFER_KEYS_CONFIG_DEFAULT, ConfigDef.Importance.MEDIUM, TRANSFER_KEYS_CONFIG_DOC) .define(ConfigName.INCLUDE_HEADERS, ConfigDef.Type.BOOLEAN, INCLUDE_HEADERS_CONFIG_DEFAULT, ConfigDef.Importance.MEDIUM, INCLUDE_HEADERS_CONFIG_DOC) + .define(ConfigName.KEY_SUBJECT_NAME_STRATEGY, ConfigDef.Type.STRING, KEY_SUBJECT_NAME_STRATEGY_DEFAULT, ConfigDef.Importance.MEDIUM, AbstractKafkaSchemaSerDeConfig.KEY_SUBJECT_NAME_STRATEGY_DOC) + .define(ConfigName.VALUE_SUBJECT_NAME_STRATEGY, ConfigDef.Type.STRING, KEY_SUBJECT_NAME_STRATEGY_DEFAULT, ConfigDef.Importance.MEDIUM, AbstractKafkaSchemaSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY_DOC) ; - // TODO: Other properties might be useful, e.g. the Subject Strategies + // TODO: Other properties might be useful } @Override @@ -98,17 +107,17 @@ public void configure(Map<String, ?> props) { List<String> sourceUrls = config.getList(ConfigName.SRC_SCHEMA_REGISTRY_URL); final Map<String, String> sourceProps = new HashMap<>(); - sourceProps.put(AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE, + sourceProps.put(AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "SRC_" + config.getString(ConfigName.SRC_BASIC_AUTH_CREDENTIALS_SOURCE)); - sourceProps.put(AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG, + sourceProps.put(AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG, config.getPassword(ConfigName.SRC_USER_INFO) .value()); List<String> destUrls = config.getList(ConfigName.DEST_SCHEMA_REGISTRY_URL); final Map<String, String> destProps = new HashMap<>(); - destProps.put(AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE, + destProps.put(AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "DEST_" + config.getString(ConfigName.DEST_BASIC_AUTH_CREDENTIALS_SOURCE)); - destProps.put(AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG, + destProps.put(AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG, config.getPassword(ConfigName.DEST_USER_INFO) .value()); @@ -121,9 +130,21 @@ public void configure(Map<String, ?> props) { this.transferKeys = config.getBoolean(ConfigName.TRANSFER_KEYS); this.includeHeaders = config.getBoolean(ConfigName.INCLUDE_HEADERS); - // TODO: Make the Strategy configurable, may be different for src and dest - // Strategy for the -key and -value subjects - this.subjectNameStrategy = new TopicNameStrategy(); + // Strategy for the subjects + this.keySubjectNameStrategy = createSubjectStrategy(config.getString(ConfigName.KEY_SUBJECT_NAME_STRATEGY)); + this.valueSubjectNameStrategy = createSubjectStrategy(config.getString(ConfigName.VALUE_SUBJECT_NAME_STRATEGY)); + } + + private SubjectNameStrategy createSubjectStrategy(String subjectStrategyName) { + if (TopicNameStrategy.class.getSimpleName().equals(subjectStrategyName)) { + return new TopicNameStrategy(); + } else if (RecordNameStrategy.class.getSimpleName().equals(subjectStrategyName)) { + return new RecordNameStrategy(); + } else if (TopicRecordNameStrategy.class.getSimpleName().equals(subjectStrategyName)) { + return new TopicRecordNameStrategy(); + } else { + throw new KafkaException("Unknown Subject strategy name: " + subjectStrategyName); + } } @Override @@ -206,14 +227,14 @@ protected Optional<Integer> copySchema(ByteBuffer buffer, String topic, boolean schemaAndDestId = schemaCache.get(sourceSchemaId); if (schemaAndDestId != null) { - log.trace("Schema id {} has been seen before. Not registering with destination registry again."); + log.trace("Schema id {} has been seen before. Not registering with destination registry again.", sourceSchemaId); } else { // cache miss log.trace("Schema id {} has not been seen before", sourceSchemaId); schemaAndDestId = new SchemaAndId(); try { log.trace("Looking up schema id {} in source registry", sourceSchemaId); // Can't do getBySubjectAndId because that requires a Schema object for the strategy - schemaAndDestId.schema = sourceSchemaRegistryClient.getById(sourceSchemaId); + schemaAndDestId.schema = sourceSchemaRegistryClient.getSchemaById(sourceSchemaId); } catch (IOException | RestClientException e) { log.error(String.format("Unable to fetch source schema for id %d.", sourceSchemaId), e); throw new ConnectException(e); @@ -221,8 +242,7 @@ protected Optional<Integer> copySchema(ByteBuffer buffer, String topic, boolean try { log.trace("Registering schema {} to destination registry", schemaAndDestId.schema); - // It could be possible that the destination naming strategy is different from the source - String subjectName = subjectNameStrategy.subjectName(topic, isKey, schemaAndDestId.schema); + String subjectName = isKey ? keySubjectNameStrategy.subjectName(topic, isKey, schemaAndDestId.schema) : valueSubjectNameStrategy.subjectName(topic, isKey, schemaAndDestId.schema); schemaAndDestId.id = destSchemaRegistryClient.register(subjectName, schemaAndDestId.schema); schemaCache.put(sourceSchemaId, schemaAndDestId); } catch (IOException | RestClientException e) { @@ -244,25 +264,27 @@ public void close() { } interface ConfigName { - String SRC_SCHEMA_REGISTRY_URL = "src." + AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG; - String SRC_BASIC_AUTH_CREDENTIALS_SOURCE = "src." + AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE; - String SRC_USER_INFO = "src." + AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG; - String DEST_SCHEMA_REGISTRY_URL = "dest." + AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG; - String DEST_BASIC_AUTH_CREDENTIALS_SOURCE = "dest." + AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE; - String DEST_USER_INFO = "dest." + AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG; + String SRC_SCHEMA_REGISTRY_URL = "src." + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG; + String SRC_BASIC_AUTH_CREDENTIALS_SOURCE = "src." + AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE; + String SRC_USER_INFO = "src." + AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG; + String DEST_SCHEMA_REGISTRY_URL = "dest." + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG; + String DEST_BASIC_AUTH_CREDENTIALS_SOURCE = "dest." + AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE; + String DEST_USER_INFO = "dest." + AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG; String SCHEMA_CAPACITY = "schema.capacity"; String TRANSFER_KEYS = "transfer.message.keys"; String INCLUDE_HEADERS = "include.message.headers"; + String VALUE_SUBJECT_NAME_STRATEGY = AbstractKafkaSchemaSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY; + String KEY_SUBJECT_NAME_STRATEGY = AbstractKafkaSchemaSerDeConfig.KEY_SUBJECT_NAME_STRATEGY; } private static class SchemaAndId { private Integer id; - private org.apache.avro.Schema schema; + private ParsedSchema schema; SchemaAndId() { } - SchemaAndId(int id, org.apache.avro.Schema schema) { + SchemaAndId(int id, ParsedSchema schema) { this.id = id; this.schema = schema; } diff --git a/src/test/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryMock.java b/src/test/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryMock.java index 1ff74ec..0374de1 100644 --- a/src/test/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryMock.java +++ b/src/test/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryMock.java @@ -16,6 +16,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.confluent.kafka.schemaregistry.ParsedSchema; +import io.confluent.kafka.schemaregistry.avro.AvroSchema; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaMetadata; @@ -152,22 +154,22 @@ public void beforeEach(final ExtensionContext context) { .willReturn(WireMock.aResponse().withTransformers(this.getVersionHandler.getName()))); this.stubFor.apply(WireMock.get(WireMock.urlPathMatching(CONFIG_PATTERN)) .willReturn(WireMock.aResponse().withTransformers(this.getConfigHandler.getName()))); - this.stubFor.apply(WireMock.get(WireMock.urlPathMatching(SCHEMA_BY_ID_PATTERN + "\\d+")) + this.stubFor.apply(WireMock.get(WireMock.urlPathMatching(SCHEMA_BY_ID_PATTERN + "\\d+/(?:fetchMaxId=false)")) .willReturn(WireMock.aResponse().withStatus(HTTP_NOT_FOUND))); } - public int registerSchema(final String topic, boolean isKey, final Schema schema) { + public int registerSchema(final String topic, boolean isKey, final ParsedSchema schema) { return this.registerSchema(topic, isKey, schema, new TopicNameStrategy()); } - public int registerSchema(final String topic, boolean isKey, final Schema schema, SubjectNameStrategy<Schema> strategy) { + public int registerSchema(final String topic, boolean isKey, final ParsedSchema schema, SubjectNameStrategy strategy) { return this.register(strategy.subjectName(topic, isKey, schema), schema); } - private int register(final String subject, final Schema schema) { + private int register(final String subject, final ParsedSchema schema) { try { final int id = this.schemaRegistryClient.register(subject, schema); - this.stubFor.apply(WireMock.get(WireMock.urlEqualTo(SCHEMA_BY_ID_PATTERN + id)) + this.stubFor.apply(WireMock.get(WireMock.urlEqualTo(SCHEMA_BY_ID_PATTERN + id + "?fetchMaxId=false")) .willReturn(ResponseDefinitionBuilder.okForJson(new SchemaString(schema.toString())))); log.debug("Registered schema {}", id); return id; @@ -242,8 +244,8 @@ public ResponseDefinition transform(final Request request, final ResponseDefinit final FileSource files, final Parameters parameters) { try { final int id = SchemaRegistryMock.this.register(getSubject(request), - new Schema.Parser() - .parse(RegisterSchemaRequest.fromJson(request.getBodyAsString()).getSchema())); + new AvroSchema(new Schema.Parser() + .parse(RegisterSchemaRequest.fromJson(request.getBodyAsString()).getSchema()))); final RegisterSchemaResponse registerSchemaResponse = new RegisterSchemaResponse(); registerSchemaResponse.setId(id); return ResponseDefinitionBuilder.jsonResponse(registerSchemaResponse); @@ -279,7 +281,8 @@ private class GetVersionHandler extends SubjectsVersioHandler { @Override public ResponseDefinition transform(final Request request, final ResponseDefinition responseDefinition, final FileSource files, final Parameters parameters) { - String versionStr = Iterables.get(this.urlSplitter.split(request.getUrl()), 3); + String versionStrFull = Iterables.get(this.urlSplitter.split(request.getUrl()), 3); + String versionStr = versionStrFull.substring(0, versionStrFull.indexOf("?")); SchemaMetadata metadata; if (versionStr.equals("latest")) { metadata = SchemaRegistryMock.this.getSubjectVersion(getSubject(request), versionStr); diff --git a/src/test/java/cricket/jmoore/kafka/connect/transforms/TransformTest.java b/src/test/java/cricket/jmoore/kafka/connect/transforms/TransformTest.java index 97b3a9a..edc45de 100644 --- a/src/test/java/cricket/jmoore/kafka/connect/transforms/TransformTest.java +++ b/src/test/java/cricket/jmoore/kafka/connect/transforms/TransformTest.java @@ -36,10 +36,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.confluent.kafka.schemaregistry.ParsedSchema; +import io.confluent.kafka.schemaregistry.avro.AvroSchema; import io.confluent.kafka.schemaregistry.client.SchemaMetadata; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.serializers.NonRecordContainer; +import io.confluent.kafka.serializers.subject.RecordNameStrategy; +import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy; @SuppressWarnings("unchecked") public class TransformTest { @@ -59,19 +63,24 @@ private enum ExplicitAuthType { private static final byte MAGIC_BYTE = (byte) 0x0; public static final int ID_SIZE = Integer.SIZE / Byte.SIZE; private static final int AVRO_CONTENT_OFFSET = 1 + ID_SIZE; - public static final org.apache.avro.Schema INT_SCHEMA = org.apache.avro.Schema.create(INT); - public static final org.apache.avro.Schema STRING_SCHEMA = org.apache.avro.Schema.create(STRING); - public static final org.apache.avro.Schema BOOLEAN_SCHEMA = org.apache.avro.Schema.create(BOOLEAN); - public static final org.apache.avro.Schema NAME_SCHEMA = SchemaBuilder.record("FullName") + public static final AvroSchema INT_SCHEMA = new AvroSchema(org.apache.avro.Schema.create(INT)); + public static final AvroSchema STRING_SCHEMA = new AvroSchema(org.apache.avro.Schema.create(STRING)); + public static final AvroSchema BOOLEAN_SCHEMA = new AvroSchema(org.apache.avro.Schema.create(BOOLEAN)); + public static final AvroSchema NAME_SCHEMA = new AvroSchema(SchemaBuilder.record("FullName") .namespace("cricket.jmoore.kafka.connect.transforms").fields() .requiredString("first") .requiredString("last") - .endRecord(); - public static final org.apache.avro.Schema NAME_SCHEMA_ALIASED = SchemaBuilder.record("FullName") + .endRecord()); + public static final AvroSchema NAME_SCHEMA_ALIASED = new AvroSchema(SchemaBuilder.record("FullName") .namespace("cricket.jmoore.kafka.connect.transforms").fields() .requiredString("first") .name("surname").aliases("last").type().stringType().noDefault() - .endRecord(); + .endRecord()); + public static final AvroSchema KEY_RECORD_SCHEMA = new AvroSchema(SchemaBuilder.record("Company") + .namespace("cricket.jmoore.kafka.connect.transforms").fields() + .requiredString("name") + .requiredString("address") + .endRecord()); @RegisterExtension final SchemaRegistryMock sourceSchemaRegistry = @@ -100,6 +109,13 @@ private Map<String, Object> getRequiredTransformConfigs() { return configs; } + private void configure(boolean copyKeys, String valueSubjectNameStrategy, String keySubjectNameStrategy) { + smtConfiguration.put(ConfigName.TRANSFER_KEYS, copyKeys); + smtConfiguration.put(ConfigName.VALUE_SUBJECT_NAME_STRATEGY, valueSubjectNameStrategy); + smtConfiguration.put(ConfigName.KEY_SUBJECT_NAME_STRATEGY, keySubjectNameStrategy); + smt.configure(smtConfiguration); + } + private void configure(boolean copyKeys) { smtConfiguration.put(ConfigName.TRANSFER_KEYS, copyKeys); smt.configure(smtConfiguration); @@ -188,9 +204,9 @@ private void passSimpleMessage() throws IOException { final int sourceValId = sourceSchemaRegistry.registerSchema(TOPIC, false, STRING_SCHEMA); final ByteArrayOutputStream keyOut = - encodeAvroObject(STRING_SCHEMA, sourceKeyId, HELLO_WORLD_VALUE); + encodeAvroObject(STRING_SCHEMA.rawSchema(), sourceKeyId, HELLO_WORLD_VALUE); final ByteArrayOutputStream valOut = - encodeAvroObject(STRING_SCHEMA, sourceValId, HELLO_WORLD_VALUE); + encodeAvroObject(STRING_SCHEMA.rawSchema(), sourceValId, HELLO_WORLD_VALUE); final ConnectRecord record = createRecord(keyOut.toByteArray(), valOut.toByteArray()); @@ -387,7 +403,7 @@ public void testKeySchemaTransfer() { destSchemaRegistry.registerSchema(UUID.randomUUID().toString(), true, INT_SCHEMA); // Create new schema for source registry - org.apache.avro.Schema schema = STRING_SCHEMA; + AvroSchema schema = STRING_SCHEMA; log.info("Registering schema in source registry"); int sourceId = sourceSchemaRegistry.registerSchema(TOPIC, true, schema); final String subject = TOPIC + "-key"; @@ -403,7 +419,7 @@ public void testKeySchemaTransfer() { } try { - ByteArrayOutputStream out = encodeAvroObject(schema, sourceId, "hello, world"); + ByteArrayOutputStream out = encodeAvroObject(schema.rawSchema(), sourceId, "hello, world"); ConnectRecord record = createRecord(Schema.OPTIONAL_BYTES_SCHEMA, out.toByteArray(), null, null); @@ -431,8 +447,8 @@ public void testKeySchemaTransfer() { "destination id should be different and higher since that registry already had schemas"); // Verify the schema is the same - org.apache.avro.Schema sourceSchema = sourceClient.getById(sourceId); - org.apache.avro.Schema destSchema = new org.apache.avro.Schema.Parser().parse(metadata.getSchema()); + ParsedSchema sourceSchema = sourceClient.getSchemaById(sourceId); + ParsedSchema destSchema = new AvroSchema(metadata.getSchema()); assertEquals(schema, sourceSchema, "source server returned same schema"); assertEquals(schema, destSchema, "destination server returned same schema"); assertEquals(sourceSchema, destSchema, "both servers' schemas match"); @@ -450,7 +466,7 @@ public void testValueSchemaTransfer() { destSchemaRegistry.registerSchema(UUID.randomUUID().toString(), false, INT_SCHEMA); // Create new schema for source registry - org.apache.avro.Schema schema = STRING_SCHEMA; + AvroSchema schema = STRING_SCHEMA; log.info("Registering schema in source registry"); int sourceId = sourceSchemaRegistry.registerSchema(TOPIC, false, schema); final String subject = TOPIC + "-value"; @@ -469,7 +485,7 @@ public void testValueSchemaTransfer() { ConnectRecord appliedRecord = null; int destinationId = -1; try { - ByteArrayOutputStream out = encodeAvroObject(schema, sourceId, "hello, world"); + ByteArrayOutputStream out = encodeAvroObject(schema.rawSchema(), sourceId, "hello, world"); value = out.toByteArray(); ConnectRecord record = createRecord(null, value); @@ -500,8 +516,8 @@ public void testValueSchemaTransfer() { "destination id should be different and higher since that registry already had schemas"); // Verify the schema is the same - org.apache.avro.Schema sourceSchema = sourceClient.getById(sourceId); - org.apache.avro.Schema destSchema = new org.apache.avro.Schema.Parser().parse(metadata.getSchema()); + ParsedSchema sourceSchema = sourceClient.getSchemaById(sourceId); + ParsedSchema destSchema = new AvroSchema(metadata.getSchema()); assertEquals(schema, sourceSchema, "source server returned same schema"); assertEquals(schema, destSchema, "destination server returned same schema"); assertEquals(sourceSchema, destSchema, "both servers' schemas match"); @@ -531,8 +547,8 @@ public void testKeyValueSchemaTransfer() { destSchemaRegistry.registerSchema(UUID.randomUUID().toString(), false, BOOLEAN_SCHEMA); // Create new schemas for source registry - org.apache.avro.Schema keySchema = INT_SCHEMA; - org.apache.avro.Schema valueSchema = STRING_SCHEMA; + AvroSchema keySchema = INT_SCHEMA; + AvroSchema valueSchema = STRING_SCHEMA; log.info("Registering schemas in source registry"); int sourceKeyId = sourceSchemaRegistry.registerSchema(TOPIC, true, keySchema); final String keySubject = TOPIC + "-key"; @@ -559,8 +575,8 @@ public void testKeyValueSchemaTransfer() { int destinationKeyId = -1; int destinationValueId = -1; try { - ByteArrayOutputStream keyStream = encodeAvroObject(keySchema, sourceKeyId, AVRO_CONTENT_OFFSET); - ByteArrayOutputStream valueStream = encodeAvroObject(valueSchema, sourceValueId, "hello, world"); + ByteArrayOutputStream keyStream = encodeAvroObject(keySchema.rawSchema(), sourceKeyId, AVRO_CONTENT_OFFSET); + ByteArrayOutputStream valueStream = encodeAvroObject(valueSchema.rawSchema(), sourceValueId, "hello, world"); key = keyStream.toByteArray(); value = valueStream.toByteArray(); @@ -601,13 +617,13 @@ public void testKeyValueSchemaTransfer() { "destination id should be different and higher since that registry already had schemas"); // Verify the schemas are the same - org.apache.avro.Schema sourceKeySchema = sourceClient.getById(sourceKeyId); - org.apache.avro.Schema destKeySchema = new org.apache.avro.Schema.Parser().parse(keyMetadata.getSchema()); + ParsedSchema sourceKeySchema = sourceClient.getSchemaById(sourceKeyId); + ParsedSchema destKeySchema = new AvroSchema(keyMetadata.getSchema()); assertEquals(destKeySchema, sourceKeySchema, "source server returned same key schema"); assertEquals(keySchema, destKeySchema, "destination server returned same key schema"); assertEquals(sourceKeySchema, destKeySchema, "both servers' key schemas match"); - org.apache.avro.Schema sourceValueSchema = sourceClient.getById(sourceValueId); - org.apache.avro.Schema destValueSchema = new org.apache.avro.Schema.Parser().parse(valueMetadata.getSchema()); + ParsedSchema sourceValueSchema = sourceClient.getSchemaById(sourceValueId); + ParsedSchema destValueSchema = new AvroSchema(valueMetadata.getSchema()); assertEquals(destValueSchema, sourceValueSchema, "source server returned same value schema"); assertEquals(valueSchema, destValueSchema, "destination server returned same value schema"); assertEquals(sourceValueSchema, destValueSchema, "both servers' value schemas match"); @@ -679,20 +695,20 @@ public void testEvolvingValueSchemaTransfer() { } try { - GenericData.Record record1 = new GenericRecordBuilder(NAME_SCHEMA) + GenericData.Record record1 = new GenericRecordBuilder(NAME_SCHEMA.rawSchema()) .set("first", "fname") .set("last", "lname") .build(); - ByteArrayOutputStream out = encodeAvroObject(NAME_SCHEMA, sourceId, record1); + ByteArrayOutputStream out = encodeAvroObject(NAME_SCHEMA.rawSchema(), sourceId, record1); byte[] value = out.toByteArray(); ConnectRecord record = createRecord(null, value); - GenericData.Record record2 = new GenericRecordBuilder(NAME_SCHEMA_ALIASED) + GenericData.Record record2 = new GenericRecordBuilder(NAME_SCHEMA_ALIASED.rawSchema()) .set("first", "fname") .set("surname", "lname") .build(); - out = encodeAvroObject(NAME_SCHEMA_ALIASED, nextSourceId, record2); + out = encodeAvroObject(NAME_SCHEMA_ALIASED.rawSchema(), nextSourceId, record2); byte[] nextValue = out.toByteArray(); ConnectRecord nextRecord = createRecord(null, nextValue); @@ -736,8 +752,8 @@ public void testIncompatibleEvolvingValueSchemaTransfer() { log.info("Registering schema in source registry"); // TODO: Figure out what these should be, where if order is flipped, destination will not accept - org.apache.avro.Schema schema = null; - org.apache.avro.Schema nextSchema = null; + AvroSchema schema = null; + AvroSchema nextSchema = null; int sourceId = sourceSchemaRegistry.registerSchema(TOPIC, false, schema); int nextSourceId = sourceSchemaRegistry.registerSchema(TOPIC, false, nextSchema); @@ -757,12 +773,12 @@ public void testIncompatibleEvolvingValueSchemaTransfer() { try { // TODO: Depending on schemas above, then build Avro records for them // ensure second id is encoded first - ByteArrayOutputStream out = encodeAvroObject(nextSchema, nextSourceId, null); + ByteArrayOutputStream out = encodeAvroObject(nextSchema.rawSchema(), nextSourceId, null); byte[] value = out.toByteArray(); ConnectRecord record = createRecord(null, value); - out = encodeAvroObject(schema, sourceId, null); + out = encodeAvroObject(schema.rawSchema(), sourceId, null); byte[] nextValue = out.toByteArray(); ConnectRecord nextRecord = createRecord(null, nextValue); @@ -788,4 +804,143 @@ public void testIncompatibleEvolvingValueSchemaTransfer() { fail(e); } } + + @Test + public void testRecordNameSubjectStrategy() { + configure(true, RecordNameStrategy.class.getSimpleName(), RecordNameStrategy.class.getSimpleName()); + + // Create bogus schema in destination so that source and destination ids differ + log.info("Registering schema in destination registry"); + destSchemaRegistry.registerSchema(UUID.randomUUID().toString(), false, BOOLEAN_SCHEMA); + + // Create new schemas for source registry + AvroSchema keySchema = KEY_RECORD_SCHEMA; + AvroSchema valueSchema = NAME_SCHEMA; + + //key subject in destination schema registry is record name, NOT default topicName-key + String destKeySubject = keySchema.name(); + //value subject in destination schema registry is record name, NOT default topicName-value + String destValueSubject = valueSchema.name(); + + registerAndApplyTransformation(keySchema, valueSchema, destKeySubject, destValueSubject); + } + + @Test + public void testTopicRecordNameSubjectStrategy() { + configure(true, TopicRecordNameStrategy.class.getSimpleName(), TopicRecordNameStrategy.class.getSimpleName()); + + // Create bogus schema in destination so that source and destination ids differ + log.info("Registering schema in destination registry"); + destSchemaRegistry.registerSchema(UUID.randomUUID().toString(), false, BOOLEAN_SCHEMA); + + // Create new schemas for source registry + AvroSchema keySchema = KEY_RECORD_SCHEMA; + AvroSchema valueSchema = NAME_SCHEMA; + + //key subject in destination schema registry is topic-record name, NOT default topicName-key + String destKeySubject = TOPIC + "-" + keySchema.name(); + //value subject in destination schema registry is topic-record name, NOT default topicName-value + String destValueSubject = TOPIC + "-" + valueSchema.name(); + + registerAndApplyTransformation(keySchema, valueSchema, destKeySubject, destValueSubject); + } + + private void registerAndApplyTransformation(AvroSchema keySchema, AvroSchema valueSchema, String destKeySubject, String destValueSubject) { + log.info("Registering schemas in source registry"); + int sourceKeyId = sourceSchemaRegistry.registerSchema(TOPIC, true, keySchema); + final String keySubject = TOPIC + "-key"; + assertEquals(1, sourceKeyId, "An empty registry starts at id=1"); + int sourceValueId = sourceSchemaRegistry.registerSchema(TOPIC, false, valueSchema); + final String valueSubject = TOPIC + "-value"; + assertEquals(2, sourceValueId, "unique schema ids monotonically increase"); + + SchemaRegistryClient sourceClient = sourceSchemaRegistry.getSchemaRegistryClient(); + int numSourceKeyVersions = 0; + int numSourceValueVersions = 0; + try { + numSourceKeyVersions = sourceClient.getAllVersions(keySubject).size(); + assertEquals(1, numSourceKeyVersions, "the source registry subject contains the pre-registered key schema"); + numSourceValueVersions = sourceClient.getAllVersions(valueSubject).size(); + assertEquals(1, numSourceValueVersions, "the source registry subject contains the pre-registered value schema"); + } catch (IOException | RestClientException e) { + fail(e); + } + + byte[] key; + byte[] value; + ConnectRecord appliedRecord; + int destinationKeyId; + int destinationValueId; + try { + + GenericData.Record keyRecord = new GenericRecordBuilder(KEY_RECORD_SCHEMA.rawSchema()) + .set("name", "cName") + .set("address", "cAddress") + .build(); + + GenericData.Record valueRecord = new GenericRecordBuilder(NAME_SCHEMA.rawSchema()) + .set("first", "fname") + .set("last", "lname") + .build(); + + ByteArrayOutputStream keyStream = encodeAvroObject(keySchema.rawSchema(), sourceKeyId, keyRecord); + ByteArrayOutputStream valueStream = encodeAvroObject(valueSchema.rawSchema(), sourceValueId, valueRecord); + + key = keyStream.toByteArray(); + value = valueStream.toByteArray(); + ConnectRecord record = createRecord(key, value); + + // check the destination has no versions for this subject + SchemaRegistryClient destClient = destSchemaRegistry.getSchemaRegistryClient(); + List<Integer> destKeyVersions = destClient.getAllVersions(destKeySubject); + assertTrue(destKeyVersions.isEmpty(), "the destination registry starts empty"); + List<Integer> destValueVersions = destClient.getAllVersions(destValueSubject); + assertTrue(destValueVersions.isEmpty(), "the destination registry starts empty"); + + // The transform will pass for key and value with byte schemas + log.info("applying transformation"); + appliedRecord = assertDoesNotThrow(() -> smt.apply(record)); + + assertEquals(record.keySchema(), appliedRecord.keySchema(), "key schema unchanged"); + assertEquals(record.valueSchema(), appliedRecord.valueSchema(), "value schema unchanged"); + + // check the value schema was copied, and the destination now has some version + destKeyVersions = destClient.getAllVersions(destKeySubject); + assertEquals(numSourceKeyVersions, destKeyVersions.size(), + "source and destination registries have the same amount of schemas for the key subject"); + destValueVersions = destClient.getAllVersions(destValueSubject); + assertEquals(numSourceValueVersions, destValueVersions.size(), + "source and destination registries have the same amount of schemas for the value subject"); + + // Verify that the ids for the source and destination are different + SchemaMetadata keyMetadata = destClient.getSchemaMetadata(destKeySubject, destKeyVersions.get(0)); + destinationKeyId = keyMetadata.getId(); + log.debug("source_keyId={} ; dest_keyId={}", sourceKeyId, destinationKeyId); + assertTrue(sourceKeyId < destinationKeyId, + "destination id should be different and higher since that registry already had schemas"); + SchemaMetadata valueMetadata = destClient.getSchemaMetadata(destValueSubject, destValueVersions.get(0)); + destinationValueId = valueMetadata.getId(); + log.debug("source_valueId={} ; dest_valueId={}", sourceValueId, destinationValueId); + assertTrue(sourceValueId < destinationValueId, + "destination id should be different and higher since that registry already had schemas"); + + // Verify the schemas are the same + ParsedSchema sourceKeySchema = sourceClient.getSchemaById(sourceKeyId); + ParsedSchema destKeySchema = new AvroSchema(keyMetadata.getSchema()); + + assertEquals(destKeySchema, sourceKeySchema, "source server returned same key schema"); + assertEquals(keySchema, destKeySchema, "destination server returned same key schema"); + assertEquals(sourceKeySchema, destKeySchema, "both servers' key schemas match"); + ParsedSchema sourceValueSchema = sourceClient.getSchemaById(sourceValueId); + ParsedSchema destValueSchema = new AvroSchema(valueMetadata.getSchema()); + assertEquals(destValueSchema, sourceValueSchema, "source server returned same value schema"); + assertEquals(valueSchema, destValueSchema, "destination server returned same value schema"); + assertEquals(sourceValueSchema, destValueSchema, "both servers' value schemas match"); + + } catch (IOException | RestClientException e) { + fail(e); + } + } + + }