From e0d45b3118fb72fbe4bf8bbcb348855bcad273eb Mon Sep 17 00:00:00 2001 From: Ivan Turcinovic Date: Thu, 4 Jun 2020 08:51:53 +0200 Subject: [PATCH 1/9] dependency updates - CP v.5.5.0, Kafka v.2.5.0 --- pom.xml | 74 +++++++++++++++++++++++---------------------------------- 1 file changed, 30 insertions(+), 44 deletions(-) diff --git a/pom.xml b/pom.xml index 9b5dd29..80dc2b0 100644 --- a/pom.xml +++ b/pom.xml @@ -66,11 +66,10 @@ 1.8 1.7.25 - 2.1.0 - 5.1.0 - -cp1 - 2.9.7 - 1.9.13 + 2.5.0 + 5.5.0 + 2.10.2 + 1.9.2 1.19.0 @@ -90,42 +89,14 @@ org.apache.kafka kafka-clients - ${kafka.version}${confluent.patch.version} + ${kafka.version} provided org.apache.kafka connect-transforms - ${kafka.version}${confluent.patch.version} - provided - - - - org.apache.zookeeper - zookeeper - 3.4.13 - provided - - - log4j - log4j - - - io.netty - netty - - - org.slf4j - slf4j-log4j12 - - - - - - com.101tec - zkclient - 0.10 + ${kafka.version} provided @@ -137,16 +108,16 @@ - org.codehaus.jackson - jackson-core-asl - ${jackson.asl.version} + com.fasterxml.jackson.core + jackson-core + ${jackson.version} provided - org.codehaus.jackson - jackson-mapper-asl - ${jackson.asl.version} + com.fasterxml.jackson.core + jackson-annotations + ${jackson.version} provided @@ -178,11 +149,18 @@ provided + + org.glassfish.jersey.ext + jersey-bean-validation + 2.30 + provided + + org.apache.avro avro - 1.8.1 + ${avro.version} @@ -201,8 +179,16 @@ kafka-clients - com.fasterxml.jackson.core - jackson-databind + io.swagger + swagger-annotations + + + io.swagger + swagger-core + + + org.glassfish.jersey.ext + jersey-bean-validation From db6bcaa0257513e091506da0391c5f2de83e45ed Mon Sep 17 00:00:00 2001 From: Ivan Turcinovic Date: Thu, 4 Jun 2020 09:00:17 +0200 Subject: [PATCH 2/9] alignment with kafka v.2.5.0 and latest Schema Registry - ParsedSchema, AvroSchema --- .../transforms/SchemaRegistryTransfer.java | 9 +-- .../transforms/SchemaRegistryMock.java | 19 +++--- .../connect/transforms/TransformTest.java | 68 ++++++++++--------- 3 files changed, 51 insertions(+), 45 deletions(-) diff --git a/src/main/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryTransfer.java b/src/main/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryTransfer.java index 2cf9e11..c45d9a1 100644 --- a/src/main/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryTransfer.java +++ b/src/main/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryTransfer.java @@ -23,6 +23,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.confluent.kafka.schemaregistry.ParsedSchema; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; @@ -63,7 +64,7 @@ public class SchemaRegistryTransfer> implements Trans private CachedSchemaRegistryClient sourceSchemaRegistryClient; private CachedSchemaRegistryClient destSchemaRegistryClient; - private SubjectNameStrategy subjectNameStrategy; + private SubjectNameStrategy subjectNameStrategy; private boolean transferKeys, includeHeaders; // caches from the source registry to the destination registry @@ -213,7 +214,7 @@ protected Optional copySchema(ByteBuffer buffer, String topic, boolean try { log.trace("Looking up schema id {} in source registry", sourceSchemaId); // Can't do getBySubjectAndId because that requires a Schema object for the strategy - schemaAndDestId.schema = sourceSchemaRegistryClient.getById(sourceSchemaId); + schemaAndDestId.schema = sourceSchemaRegistryClient.getSchemaById(sourceSchemaId); } catch (IOException | RestClientException e) { log.error(String.format("Unable to fetch source schema for id %d.", sourceSchemaId), e); throw new ConnectException(e); @@ -257,12 +258,12 @@ interface ConfigName { private static class SchemaAndId { private Integer id; - private org.apache.avro.Schema schema; + private ParsedSchema schema; SchemaAndId() { } - SchemaAndId(int id, org.apache.avro.Schema schema) { + SchemaAndId(int id, ParsedSchema schema) { this.id = id; this.schema = schema; } diff --git a/src/test/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryMock.java b/src/test/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryMock.java index 1ff74ec..0374de1 100644 --- a/src/test/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryMock.java +++ b/src/test/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryMock.java @@ -16,6 +16,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.confluent.kafka.schemaregistry.ParsedSchema; +import io.confluent.kafka.schemaregistry.avro.AvroSchema; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaMetadata; @@ -152,22 +154,22 @@ public void beforeEach(final ExtensionContext context) { .willReturn(WireMock.aResponse().withTransformers(this.getVersionHandler.getName()))); this.stubFor.apply(WireMock.get(WireMock.urlPathMatching(CONFIG_PATTERN)) .willReturn(WireMock.aResponse().withTransformers(this.getConfigHandler.getName()))); - this.stubFor.apply(WireMock.get(WireMock.urlPathMatching(SCHEMA_BY_ID_PATTERN + "\\d+")) + this.stubFor.apply(WireMock.get(WireMock.urlPathMatching(SCHEMA_BY_ID_PATTERN + "\\d+/(?:fetchMaxId=false)")) .willReturn(WireMock.aResponse().withStatus(HTTP_NOT_FOUND))); } - public int registerSchema(final String topic, boolean isKey, final Schema schema) { + public int registerSchema(final String topic, boolean isKey, final ParsedSchema schema) { return this.registerSchema(topic, isKey, schema, new TopicNameStrategy()); } - public int registerSchema(final String topic, boolean isKey, final Schema schema, SubjectNameStrategy strategy) { + public int registerSchema(final String topic, boolean isKey, final ParsedSchema schema, SubjectNameStrategy strategy) { return this.register(strategy.subjectName(topic, isKey, schema), schema); } - private int register(final String subject, final Schema schema) { + private int register(final String subject, final ParsedSchema schema) { try { final int id = this.schemaRegistryClient.register(subject, schema); - this.stubFor.apply(WireMock.get(WireMock.urlEqualTo(SCHEMA_BY_ID_PATTERN + id)) + this.stubFor.apply(WireMock.get(WireMock.urlEqualTo(SCHEMA_BY_ID_PATTERN + id + "?fetchMaxId=false")) .willReturn(ResponseDefinitionBuilder.okForJson(new SchemaString(schema.toString())))); log.debug("Registered schema {}", id); return id; @@ -242,8 +244,8 @@ public ResponseDefinition transform(final Request request, final ResponseDefinit final FileSource files, final Parameters parameters) { try { final int id = SchemaRegistryMock.this.register(getSubject(request), - new Schema.Parser() - .parse(RegisterSchemaRequest.fromJson(request.getBodyAsString()).getSchema())); + new AvroSchema(new Schema.Parser() + .parse(RegisterSchemaRequest.fromJson(request.getBodyAsString()).getSchema()))); final RegisterSchemaResponse registerSchemaResponse = new RegisterSchemaResponse(); registerSchemaResponse.setId(id); return ResponseDefinitionBuilder.jsonResponse(registerSchemaResponse); @@ -279,7 +281,8 @@ private class GetVersionHandler extends SubjectsVersioHandler { @Override public ResponseDefinition transform(final Request request, final ResponseDefinition responseDefinition, final FileSource files, final Parameters parameters) { - String versionStr = Iterables.get(this.urlSplitter.split(request.getUrl()), 3); + String versionStrFull = Iterables.get(this.urlSplitter.split(request.getUrl()), 3); + String versionStr = versionStrFull.substring(0, versionStrFull.indexOf("?")); SchemaMetadata metadata; if (versionStr.equals("latest")) { metadata = SchemaRegistryMock.this.getSubjectVersion(getSubject(request), versionStr); diff --git a/src/test/java/cricket/jmoore/kafka/connect/transforms/TransformTest.java b/src/test/java/cricket/jmoore/kafka/connect/transforms/TransformTest.java index 97b3a9a..b8e81ca 100644 --- a/src/test/java/cricket/jmoore/kafka/connect/transforms/TransformTest.java +++ b/src/test/java/cricket/jmoore/kafka/connect/transforms/TransformTest.java @@ -36,6 +36,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.confluent.kafka.schemaregistry.ParsedSchema; +import io.confluent.kafka.schemaregistry.avro.AvroSchema; import io.confluent.kafka.schemaregistry.client.SchemaMetadata; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; @@ -59,19 +61,19 @@ private enum ExplicitAuthType { private static final byte MAGIC_BYTE = (byte) 0x0; public static final int ID_SIZE = Integer.SIZE / Byte.SIZE; private static final int AVRO_CONTENT_OFFSET = 1 + ID_SIZE; - public static final org.apache.avro.Schema INT_SCHEMA = org.apache.avro.Schema.create(INT); - public static final org.apache.avro.Schema STRING_SCHEMA = org.apache.avro.Schema.create(STRING); - public static final org.apache.avro.Schema BOOLEAN_SCHEMA = org.apache.avro.Schema.create(BOOLEAN); - public static final org.apache.avro.Schema NAME_SCHEMA = SchemaBuilder.record("FullName") + public static final AvroSchema INT_SCHEMA = new AvroSchema(org.apache.avro.Schema.create(INT)); + public static final AvroSchema STRING_SCHEMA = new AvroSchema(org.apache.avro.Schema.create(STRING)); + public static final AvroSchema BOOLEAN_SCHEMA = new AvroSchema(org.apache.avro.Schema.create(BOOLEAN)); + public static final AvroSchema NAME_SCHEMA = new AvroSchema(SchemaBuilder.record("FullName") .namespace("cricket.jmoore.kafka.connect.transforms").fields() .requiredString("first") .requiredString("last") - .endRecord(); - public static final org.apache.avro.Schema NAME_SCHEMA_ALIASED = SchemaBuilder.record("FullName") + .endRecord()); + public static final AvroSchema NAME_SCHEMA_ALIASED = new AvroSchema(SchemaBuilder.record("FullName") .namespace("cricket.jmoore.kafka.connect.transforms").fields() .requiredString("first") .name("surname").aliases("last").type().stringType().noDefault() - .endRecord(); + .endRecord()); @RegisterExtension final SchemaRegistryMock sourceSchemaRegistry = @@ -188,9 +190,9 @@ private void passSimpleMessage() throws IOException { final int sourceValId = sourceSchemaRegistry.registerSchema(TOPIC, false, STRING_SCHEMA); final ByteArrayOutputStream keyOut = - encodeAvroObject(STRING_SCHEMA, sourceKeyId, HELLO_WORLD_VALUE); + encodeAvroObject(STRING_SCHEMA.rawSchema(), sourceKeyId, HELLO_WORLD_VALUE); final ByteArrayOutputStream valOut = - encodeAvroObject(STRING_SCHEMA, sourceValId, HELLO_WORLD_VALUE); + encodeAvroObject(STRING_SCHEMA.rawSchema(), sourceValId, HELLO_WORLD_VALUE); final ConnectRecord record = createRecord(keyOut.toByteArray(), valOut.toByteArray()); @@ -387,7 +389,7 @@ public void testKeySchemaTransfer() { destSchemaRegistry.registerSchema(UUID.randomUUID().toString(), true, INT_SCHEMA); // Create new schema for source registry - org.apache.avro.Schema schema = STRING_SCHEMA; + AvroSchema schema = STRING_SCHEMA; log.info("Registering schema in source registry"); int sourceId = sourceSchemaRegistry.registerSchema(TOPIC, true, schema); final String subject = TOPIC + "-key"; @@ -403,7 +405,7 @@ public void testKeySchemaTransfer() { } try { - ByteArrayOutputStream out = encodeAvroObject(schema, sourceId, "hello, world"); + ByteArrayOutputStream out = encodeAvroObject(schema.rawSchema(), sourceId, "hello, world"); ConnectRecord record = createRecord(Schema.OPTIONAL_BYTES_SCHEMA, out.toByteArray(), null, null); @@ -431,8 +433,8 @@ public void testKeySchemaTransfer() { "destination id should be different and higher since that registry already had schemas"); // Verify the schema is the same - org.apache.avro.Schema sourceSchema = sourceClient.getById(sourceId); - org.apache.avro.Schema destSchema = new org.apache.avro.Schema.Parser().parse(metadata.getSchema()); + ParsedSchema sourceSchema = sourceClient.getSchemaById(sourceId); + ParsedSchema destSchema = new AvroSchema(metadata.getSchema()); assertEquals(schema, sourceSchema, "source server returned same schema"); assertEquals(schema, destSchema, "destination server returned same schema"); assertEquals(sourceSchema, destSchema, "both servers' schemas match"); @@ -450,7 +452,7 @@ public void testValueSchemaTransfer() { destSchemaRegistry.registerSchema(UUID.randomUUID().toString(), false, INT_SCHEMA); // Create new schema for source registry - org.apache.avro.Schema schema = STRING_SCHEMA; + AvroSchema schema = STRING_SCHEMA; log.info("Registering schema in source registry"); int sourceId = sourceSchemaRegistry.registerSchema(TOPIC, false, schema); final String subject = TOPIC + "-value"; @@ -469,7 +471,7 @@ public void testValueSchemaTransfer() { ConnectRecord appliedRecord = null; int destinationId = -1; try { - ByteArrayOutputStream out = encodeAvroObject(schema, sourceId, "hello, world"); + ByteArrayOutputStream out = encodeAvroObject(schema.rawSchema(), sourceId, "hello, world"); value = out.toByteArray(); ConnectRecord record = createRecord(null, value); @@ -500,8 +502,8 @@ public void testValueSchemaTransfer() { "destination id should be different and higher since that registry already had schemas"); // Verify the schema is the same - org.apache.avro.Schema sourceSchema = sourceClient.getById(sourceId); - org.apache.avro.Schema destSchema = new org.apache.avro.Schema.Parser().parse(metadata.getSchema()); + ParsedSchema sourceSchema = sourceClient.getSchemaById(sourceId); + ParsedSchema destSchema = new AvroSchema(metadata.getSchema()); assertEquals(schema, sourceSchema, "source server returned same schema"); assertEquals(schema, destSchema, "destination server returned same schema"); assertEquals(sourceSchema, destSchema, "both servers' schemas match"); @@ -531,8 +533,8 @@ public void testKeyValueSchemaTransfer() { destSchemaRegistry.registerSchema(UUID.randomUUID().toString(), false, BOOLEAN_SCHEMA); // Create new schemas for source registry - org.apache.avro.Schema keySchema = INT_SCHEMA; - org.apache.avro.Schema valueSchema = STRING_SCHEMA; + AvroSchema keySchema = INT_SCHEMA; + AvroSchema valueSchema = STRING_SCHEMA; log.info("Registering schemas in source registry"); int sourceKeyId = sourceSchemaRegistry.registerSchema(TOPIC, true, keySchema); final String keySubject = TOPIC + "-key"; @@ -559,8 +561,8 @@ public void testKeyValueSchemaTransfer() { int destinationKeyId = -1; int destinationValueId = -1; try { - ByteArrayOutputStream keyStream = encodeAvroObject(keySchema, sourceKeyId, AVRO_CONTENT_OFFSET); - ByteArrayOutputStream valueStream = encodeAvroObject(valueSchema, sourceValueId, "hello, world"); + ByteArrayOutputStream keyStream = encodeAvroObject(keySchema.rawSchema(), sourceKeyId, AVRO_CONTENT_OFFSET); + ByteArrayOutputStream valueStream = encodeAvroObject(valueSchema.rawSchema(), sourceValueId, "hello, world"); key = keyStream.toByteArray(); value = valueStream.toByteArray(); @@ -601,13 +603,13 @@ public void testKeyValueSchemaTransfer() { "destination id should be different and higher since that registry already had schemas"); // Verify the schemas are the same - org.apache.avro.Schema sourceKeySchema = sourceClient.getById(sourceKeyId); - org.apache.avro.Schema destKeySchema = new org.apache.avro.Schema.Parser().parse(keyMetadata.getSchema()); + ParsedSchema sourceKeySchema = sourceClient.getSchemaById(sourceKeyId); + ParsedSchema destKeySchema = new AvroSchema(keyMetadata.getSchema()); assertEquals(destKeySchema, sourceKeySchema, "source server returned same key schema"); assertEquals(keySchema, destKeySchema, "destination server returned same key schema"); assertEquals(sourceKeySchema, destKeySchema, "both servers' key schemas match"); - org.apache.avro.Schema sourceValueSchema = sourceClient.getById(sourceValueId); - org.apache.avro.Schema destValueSchema = new org.apache.avro.Schema.Parser().parse(valueMetadata.getSchema()); + ParsedSchema sourceValueSchema = sourceClient.getSchemaById(sourceValueId); + ParsedSchema destValueSchema = new AvroSchema(valueMetadata.getSchema()); assertEquals(destValueSchema, sourceValueSchema, "source server returned same value schema"); assertEquals(valueSchema, destValueSchema, "destination server returned same value schema"); assertEquals(sourceValueSchema, destValueSchema, "both servers' value schemas match"); @@ -679,20 +681,20 @@ public void testEvolvingValueSchemaTransfer() { } try { - GenericData.Record record1 = new GenericRecordBuilder(NAME_SCHEMA) + GenericData.Record record1 = new GenericRecordBuilder(NAME_SCHEMA.rawSchema()) .set("first", "fname") .set("last", "lname") .build(); - ByteArrayOutputStream out = encodeAvroObject(NAME_SCHEMA, sourceId, record1); + ByteArrayOutputStream out = encodeAvroObject(NAME_SCHEMA.rawSchema(), sourceId, record1); byte[] value = out.toByteArray(); ConnectRecord record = createRecord(null, value); - GenericData.Record record2 = new GenericRecordBuilder(NAME_SCHEMA_ALIASED) + GenericData.Record record2 = new GenericRecordBuilder(NAME_SCHEMA_ALIASED.rawSchema()) .set("first", "fname") .set("surname", "lname") .build(); - out = encodeAvroObject(NAME_SCHEMA_ALIASED, nextSourceId, record2); + out = encodeAvroObject(NAME_SCHEMA_ALIASED.rawSchema(), nextSourceId, record2); byte[] nextValue = out.toByteArray(); ConnectRecord nextRecord = createRecord(null, nextValue); @@ -736,8 +738,8 @@ public void testIncompatibleEvolvingValueSchemaTransfer() { log.info("Registering schema in source registry"); // TODO: Figure out what these should be, where if order is flipped, destination will not accept - org.apache.avro.Schema schema = null; - org.apache.avro.Schema nextSchema = null; + AvroSchema schema = null; + AvroSchema nextSchema = null; int sourceId = sourceSchemaRegistry.registerSchema(TOPIC, false, schema); int nextSourceId = sourceSchemaRegistry.registerSchema(TOPIC, false, nextSchema); @@ -757,12 +759,12 @@ public void testIncompatibleEvolvingValueSchemaTransfer() { try { // TODO: Depending on schemas above, then build Avro records for them // ensure second id is encoded first - ByteArrayOutputStream out = encodeAvroObject(nextSchema, nextSourceId, null); + ByteArrayOutputStream out = encodeAvroObject(nextSchema.rawSchema(), nextSourceId, null); byte[] value = out.toByteArray(); ConnectRecord record = createRecord(null, value); - out = encodeAvroObject(schema, sourceId, null); + out = encodeAvroObject(schema.rawSchema(), sourceId, null); byte[] nextValue = out.toByteArray(); ConnectRecord nextRecord = createRecord(null, nextValue); From 6d125c55e8958d0dcdcb278527e890eca735f234 Mon Sep 17 00:00:00 2001 From: Ivan Turcinovic Date: Thu, 4 Jun 2020 13:46:30 +0200 Subject: [PATCH 3/9] replaced deprecated AbstractKafkaAvroSerDeConfig with AbstractKafkaSchemaSerDeConfig --- .../transforms/SchemaRegistryTransfer.java | 40 +++++++++---------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/src/main/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryTransfer.java b/src/main/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryTransfer.java index c45d9a1..a52ec11 100644 --- a/src/main/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryTransfer.java +++ b/src/main/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryTransfer.java @@ -9,6 +9,7 @@ import java.util.Objects; import java.util.Optional; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import org.apache.kafka.common.cache.Cache; import org.apache.kafka.common.cache.LRUCache; import org.apache.kafka.common.cache.SynchronizedCache; @@ -26,7 +27,6 @@ import io.confluent.kafka.schemaregistry.ParsedSchema; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; import io.confluent.kafka.serializers.subject.TopicNameStrategy; import io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy; @@ -45,17 +45,17 @@ public class SchemaRegistryTransfer> implements Trans public static final String SRC_PREAMBLE = "For source consumer's schema registry, "; public static final String SRC_SCHEMA_REGISTRY_CONFIG_DOC = "A list of addresses for the Schema Registry to copy from. The consumer's Schema Registry."; - public static final String SRC_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DOC = SRC_PREAMBLE + AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DOC; - public static final String SRC_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DEFAULT = AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DEFAULT; - public static final String SRC_USER_INFO_CONFIG_DOC = SRC_PREAMBLE + AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DOC; - public static final String SRC_USER_INFO_CONFIG_DEFAULT = AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DEFAULT; + public static final String SRC_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DOC = SRC_PREAMBLE + AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DOC; + public static final String SRC_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DEFAULT = AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DEFAULT; + public static final String SRC_USER_INFO_CONFIG_DOC = SRC_PREAMBLE + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DOC; + public static final String SRC_USER_INFO_CONFIG_DEFAULT = AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DEFAULT; public static final String DEST_PREAMBLE = "For target producer's schema registry, "; public static final String DEST_SCHEMA_REGISTRY_CONFIG_DOC = "A list of addresses for the Schema Registry to copy to. The producer's Schema Registry."; - public static final String DEST_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DOC = DEST_PREAMBLE + AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DOC; - public static final String DEST_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DEFAULT = AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DEFAULT; - public static final String DEST_USER_INFO_CONFIG_DOC = DEST_PREAMBLE + AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DOC; - public static final String DEST_USER_INFO_CONFIG_DEFAULT = AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DEFAULT; + public static final String DEST_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DOC = DEST_PREAMBLE + AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DOC; + public static final String DEST_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DEFAULT = AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DEFAULT; + public static final String DEST_USER_INFO_CONFIG_DOC = DEST_PREAMBLE + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DOC; + public static final String DEST_USER_INFO_CONFIG_DEFAULT = AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DEFAULT; public static final String TRANSFER_KEYS_CONFIG_DOC = "Whether or not to copy message key schemas between registries."; public static final Boolean TRANSFER_KEYS_CONFIG_DEFAULT = true; @@ -99,17 +99,17 @@ public void configure(Map props) { List sourceUrls = config.getList(ConfigName.SRC_SCHEMA_REGISTRY_URL); final Map sourceProps = new HashMap<>(); - sourceProps.put(AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE, + sourceProps.put(AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "SRC_" + config.getString(ConfigName.SRC_BASIC_AUTH_CREDENTIALS_SOURCE)); - sourceProps.put(AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG, + sourceProps.put(AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG, config.getPassword(ConfigName.SRC_USER_INFO) .value()); List destUrls = config.getList(ConfigName.DEST_SCHEMA_REGISTRY_URL); final Map destProps = new HashMap<>(); - destProps.put(AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE, + destProps.put(AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "DEST_" + config.getString(ConfigName.DEST_BASIC_AUTH_CREDENTIALS_SOURCE)); - destProps.put(AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG, + destProps.put(AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG, config.getPassword(ConfigName.DEST_USER_INFO) .value()); @@ -207,7 +207,7 @@ protected Optional copySchema(ByteBuffer buffer, String topic, boolean schemaAndDestId = schemaCache.get(sourceSchemaId); if (schemaAndDestId != null) { - log.trace("Schema id {} has been seen before. Not registering with destination registry again."); + log.trace("Schema id {} has been seen before. Not registering with destination registry again.", sourceSchemaId); } else { // cache miss log.trace("Schema id {} has not been seen before", sourceSchemaId); schemaAndDestId = new SchemaAndId(); @@ -245,12 +245,12 @@ public void close() { } interface ConfigName { - String SRC_SCHEMA_REGISTRY_URL = "src." + AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG; - String SRC_BASIC_AUTH_CREDENTIALS_SOURCE = "src." + AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE; - String SRC_USER_INFO = "src." + AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG; - String DEST_SCHEMA_REGISTRY_URL = "dest." + AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG; - String DEST_BASIC_AUTH_CREDENTIALS_SOURCE = "dest." + AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE; - String DEST_USER_INFO = "dest." + AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG; + String SRC_SCHEMA_REGISTRY_URL = "src." + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG; + String SRC_BASIC_AUTH_CREDENTIALS_SOURCE = "src." + AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE; + String SRC_USER_INFO = "src." + AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG; + String DEST_SCHEMA_REGISTRY_URL = "dest." + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG; + String DEST_BASIC_AUTH_CREDENTIALS_SOURCE = "dest." + AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE; + String DEST_USER_INFO = "dest." + AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG; String SCHEMA_CAPACITY = "schema.capacity"; String TRANSFER_KEYS = "transfer.message.keys"; String INCLUDE_HEADERS = "include.message.headers"; From ebac2a8ed84c9dda1240e421d905593c63b13ebd Mon Sep 17 00:00:00 2001 From: Ivan Turcinovic Date: Thu, 4 Jun 2020 13:50:46 +0200 Subject: [PATCH 4/9] imports ordering --- .../jmoore/kafka/connect/transforms/SchemaRegistryTransfer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryTransfer.java b/src/main/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryTransfer.java index a52ec11..0e04751 100644 --- a/src/main/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryTransfer.java +++ b/src/main/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryTransfer.java @@ -9,7 +9,6 @@ import java.util.Objects; import java.util.Optional; -import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import org.apache.kafka.common.cache.Cache; import org.apache.kafka.common.cache.LRUCache; import org.apache.kafka.common.cache.SynchronizedCache; @@ -27,6 +26,7 @@ import io.confluent.kafka.schemaregistry.ParsedSchema; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.serializers.subject.TopicNameStrategy; import io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy; From 4549bbfbaacc29ec6473295296e8dc9b655bb70c Mon Sep 17 00:00:00 2001 From: Ivan Turcinovic Date: Sun, 7 Jun 2020 11:52:09 +0200 Subject: [PATCH 5/9] Removed kafka-clients version, using confluent-version instead. Added ZK lib in provided scope. Added property for jersey.bean.validation version --- pom.xml | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/pom.xml b/pom.xml index 80dc2b0..1871195 100644 --- a/pom.xml +++ b/pom.xml @@ -66,10 +66,10 @@ 1.8 1.7.25 - 2.5.0 5.5.0 2.10.2 1.9.2 + 2.30 1.19.0 @@ -89,17 +89,38 @@ org.apache.kafka kafka-clients - ${kafka.version} + ${confluent.version}-ccs provided org.apache.kafka connect-transforms - ${kafka.version} + ${confluent.version}-ccs provided + + org.apache.zookeeper + zookeeper + 3.4.13 + provided + + + log4j + log4j + + + io.netty + netty + + + org.slf4j + slf4j-log4j12 + + + + com.fasterxml.jackson.core jackson-databind @@ -152,7 +173,7 @@ org.glassfish.jersey.ext jersey-bean-validation - 2.30 + ${jersey.bean.validation.version} provided From 2080a90c446ee902e20e59c90b696545dd85aedc Mon Sep 17 00:00:00 2001 From: Ivan Turcinovic Date: Tue, 9 Jun 2020 08:56:08 +0200 Subject: [PATCH 6/9] bumped version to 0.2.1 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 1871195..ba9ae1e 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ cricket.jmoore schema-registry-transfer-smt - 0.2.1-SNAPSHOT + 0.2.1 schema-registry-transfer-smt A Kafka Connect Transform for copying Confluent Schema Registry schemas between different registries. From 1ebc64455ebf131e7682b28b9f8134bb59bade32 Mon Sep 17 00:00:00 2001 From: Ivan Turcinovic Date: Tue, 9 Jun 2020 09:22:55 +0200 Subject: [PATCH 7/9] implementation of configurable key & value subject name strategies for destination Schema Registry --- .../transforms/SchemaRegistryTransfer.java | 35 +++++++++++++++---- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/src/main/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryTransfer.java b/src/main/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryTransfer.java index 0e04751..6e608cd 100644 --- a/src/main/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryTransfer.java +++ b/src/main/java/cricket/jmoore/kafka/connect/transforms/SchemaRegistryTransfer.java @@ -9,6 +9,7 @@ import java.util.Objects; import java.util.Optional; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.cache.Cache; import org.apache.kafka.common.cache.LRUCache; import org.apache.kafka.common.cache.SynchronizedCache; @@ -27,7 +28,9 @@ import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; +import io.confluent.kafka.serializers.subject.RecordNameStrategy; import io.confluent.kafka.serializers.subject.TopicNameStrategy; +import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy; import io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy; @SuppressWarnings("unused") @@ -56,6 +59,8 @@ public class SchemaRegistryTransfer> implements Trans public static final String DEST_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DEFAULT = AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DEFAULT; public static final String DEST_USER_INFO_CONFIG_DOC = DEST_PREAMBLE + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DOC; public static final String DEST_USER_INFO_CONFIG_DEFAULT = AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DEFAULT; + public static final String KEY_SUBJECT_NAME_STRATEGY_DEFAULT = TopicNameStrategy.class.getSimpleName(); + public static final String VALUE_SUBJECT_NAME_STRATEGY_DEFAULT = TopicNameStrategy.class.getSimpleName(); public static final String TRANSFER_KEYS_CONFIG_DOC = "Whether or not to copy message key schemas between registries."; public static final Boolean TRANSFER_KEYS_CONFIG_DEFAULT = true; @@ -64,7 +69,8 @@ public class SchemaRegistryTransfer> implements Trans private CachedSchemaRegistryClient sourceSchemaRegistryClient; private CachedSchemaRegistryClient destSchemaRegistryClient; - private SubjectNameStrategy subjectNameStrategy; + private SubjectNameStrategy keySubjectNameStrategy; + private SubjectNameStrategy valueSubjectNameStrategy; private boolean transferKeys, includeHeaders; // caches from the source registry to the destination registry @@ -84,8 +90,10 @@ public SchemaRegistryTransfer() { .define(ConfigName.SCHEMA_CAPACITY, ConfigDef.Type.INT, SCHEMA_CAPACITY_CONFIG_DEFAULT, ConfigDef.Importance.LOW, SCHEMA_CAPACITY_CONFIG_DOC) .define(ConfigName.TRANSFER_KEYS, ConfigDef.Type.BOOLEAN, TRANSFER_KEYS_CONFIG_DEFAULT, ConfigDef.Importance.MEDIUM, TRANSFER_KEYS_CONFIG_DOC) .define(ConfigName.INCLUDE_HEADERS, ConfigDef.Type.BOOLEAN, INCLUDE_HEADERS_CONFIG_DEFAULT, ConfigDef.Importance.MEDIUM, INCLUDE_HEADERS_CONFIG_DOC) + .define(ConfigName.KEY_SUBJECT_NAME_STRATEGY, ConfigDef.Type.STRING, KEY_SUBJECT_NAME_STRATEGY_DEFAULT, ConfigDef.Importance.MEDIUM, AbstractKafkaSchemaSerDeConfig.KEY_SUBJECT_NAME_STRATEGY_DOC) + .define(ConfigName.VALUE_SUBJECT_NAME_STRATEGY, ConfigDef.Type.STRING, KEY_SUBJECT_NAME_STRATEGY_DEFAULT, ConfigDef.Importance.MEDIUM, AbstractKafkaSchemaSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY_DOC) ; - // TODO: Other properties might be useful, e.g. the Subject Strategies + // TODO: Other properties might be useful } @Override @@ -122,9 +130,21 @@ public void configure(Map props) { this.transferKeys = config.getBoolean(ConfigName.TRANSFER_KEYS); this.includeHeaders = config.getBoolean(ConfigName.INCLUDE_HEADERS); - // TODO: Make the Strategy configurable, may be different for src and dest - // Strategy for the -key and -value subjects - this.subjectNameStrategy = new TopicNameStrategy(); + // Strategy for the subjects + this.keySubjectNameStrategy = createSubjectStrategy(config.getString(ConfigName.KEY_SUBJECT_NAME_STRATEGY)); + this.valueSubjectNameStrategy = createSubjectStrategy(config.getString(ConfigName.VALUE_SUBJECT_NAME_STRATEGY)); + } + + private SubjectNameStrategy createSubjectStrategy(String subjectStrategyName) { + if (TopicNameStrategy.class.getSimpleName().equals(subjectStrategyName)) { + return new TopicNameStrategy(); + } else if (RecordNameStrategy.class.getSimpleName().equals(subjectStrategyName)) { + return new RecordNameStrategy(); + } else if (TopicRecordNameStrategy.class.getSimpleName().equals(subjectStrategyName)) { + return new TopicRecordNameStrategy(); + } else { + throw new KafkaException("Unknown Subject strategy name: " + subjectStrategyName); + } } @Override @@ -222,8 +242,7 @@ protected Optional copySchema(ByteBuffer buffer, String topic, boolean try { log.trace("Registering schema {} to destination registry", schemaAndDestId.schema); - // It could be possible that the destination naming strategy is different from the source - String subjectName = subjectNameStrategy.subjectName(topic, isKey, schemaAndDestId.schema); + String subjectName = isKey ? keySubjectNameStrategy.subjectName(topic, isKey, schemaAndDestId.schema) : valueSubjectNameStrategy.subjectName(topic, isKey, schemaAndDestId.schema); schemaAndDestId.id = destSchemaRegistryClient.register(subjectName, schemaAndDestId.schema); schemaCache.put(sourceSchemaId, schemaAndDestId); } catch (IOException | RestClientException e) { @@ -254,6 +273,8 @@ interface ConfigName { String SCHEMA_CAPACITY = "schema.capacity"; String TRANSFER_KEYS = "transfer.message.keys"; String INCLUDE_HEADERS = "include.message.headers"; + String VALUE_SUBJECT_NAME_STRATEGY = AbstractKafkaSchemaSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY; + String KEY_SUBJECT_NAME_STRATEGY = AbstractKafkaSchemaSerDeConfig.KEY_SUBJECT_NAME_STRATEGY; } private static class SchemaAndId { From 8f91e8411eea33ec4cf4c4f3c7050e7728e639b0 Mon Sep 17 00:00:00 2001 From: Ivan Turcinovic Date: Tue, 9 Jun 2020 09:24:33 +0200 Subject: [PATCH 8/9] tests for key & value subject name strategies - RecordNameSubjectStrategy and TopicRecordNameSubjectStrategy --- .../connect/transforms/TransformTest.java | 153 ++++++++++++++++++ 1 file changed, 153 insertions(+) diff --git a/src/test/java/cricket/jmoore/kafka/connect/transforms/TransformTest.java b/src/test/java/cricket/jmoore/kafka/connect/transforms/TransformTest.java index b8e81ca..edc45de 100644 --- a/src/test/java/cricket/jmoore/kafka/connect/transforms/TransformTest.java +++ b/src/test/java/cricket/jmoore/kafka/connect/transforms/TransformTest.java @@ -42,6 +42,8 @@ import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.serializers.NonRecordContainer; +import io.confluent.kafka.serializers.subject.RecordNameStrategy; +import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy; @SuppressWarnings("unchecked") public class TransformTest { @@ -74,6 +76,11 @@ private enum ExplicitAuthType { .requiredString("first") .name("surname").aliases("last").type().stringType().noDefault() .endRecord()); + public static final AvroSchema KEY_RECORD_SCHEMA = new AvroSchema(SchemaBuilder.record("Company") + .namespace("cricket.jmoore.kafka.connect.transforms").fields() + .requiredString("name") + .requiredString("address") + .endRecord()); @RegisterExtension final SchemaRegistryMock sourceSchemaRegistry = @@ -102,6 +109,13 @@ private Map getRequiredTransformConfigs() { return configs; } + private void configure(boolean copyKeys, String valueSubjectNameStrategy, String keySubjectNameStrategy) { + smtConfiguration.put(ConfigName.TRANSFER_KEYS, copyKeys); + smtConfiguration.put(ConfigName.VALUE_SUBJECT_NAME_STRATEGY, valueSubjectNameStrategy); + smtConfiguration.put(ConfigName.KEY_SUBJECT_NAME_STRATEGY, keySubjectNameStrategy); + smt.configure(smtConfiguration); + } + private void configure(boolean copyKeys) { smtConfiguration.put(ConfigName.TRANSFER_KEYS, copyKeys); smt.configure(smtConfiguration); @@ -790,4 +804,143 @@ public void testIncompatibleEvolvingValueSchemaTransfer() { fail(e); } } + + @Test + public void testRecordNameSubjectStrategy() { + configure(true, RecordNameStrategy.class.getSimpleName(), RecordNameStrategy.class.getSimpleName()); + + // Create bogus schema in destination so that source and destination ids differ + log.info("Registering schema in destination registry"); + destSchemaRegistry.registerSchema(UUID.randomUUID().toString(), false, BOOLEAN_SCHEMA); + + // Create new schemas for source registry + AvroSchema keySchema = KEY_RECORD_SCHEMA; + AvroSchema valueSchema = NAME_SCHEMA; + + //key subject in destination schema registry is record name, NOT default topicName-key + String destKeySubject = keySchema.name(); + //value subject in destination schema registry is record name, NOT default topicName-value + String destValueSubject = valueSchema.name(); + + registerAndApplyTransformation(keySchema, valueSchema, destKeySubject, destValueSubject); + } + + @Test + public void testTopicRecordNameSubjectStrategy() { + configure(true, TopicRecordNameStrategy.class.getSimpleName(), TopicRecordNameStrategy.class.getSimpleName()); + + // Create bogus schema in destination so that source and destination ids differ + log.info("Registering schema in destination registry"); + destSchemaRegistry.registerSchema(UUID.randomUUID().toString(), false, BOOLEAN_SCHEMA); + + // Create new schemas for source registry + AvroSchema keySchema = KEY_RECORD_SCHEMA; + AvroSchema valueSchema = NAME_SCHEMA; + + //key subject in destination schema registry is topic-record name, NOT default topicName-key + String destKeySubject = TOPIC + "-" + keySchema.name(); + //value subject in destination schema registry is topic-record name, NOT default topicName-value + String destValueSubject = TOPIC + "-" + valueSchema.name(); + + registerAndApplyTransformation(keySchema, valueSchema, destKeySubject, destValueSubject); + } + + private void registerAndApplyTransformation(AvroSchema keySchema, AvroSchema valueSchema, String destKeySubject, String destValueSubject) { + log.info("Registering schemas in source registry"); + int sourceKeyId = sourceSchemaRegistry.registerSchema(TOPIC, true, keySchema); + final String keySubject = TOPIC + "-key"; + assertEquals(1, sourceKeyId, "An empty registry starts at id=1"); + int sourceValueId = sourceSchemaRegistry.registerSchema(TOPIC, false, valueSchema); + final String valueSubject = TOPIC + "-value"; + assertEquals(2, sourceValueId, "unique schema ids monotonically increase"); + + SchemaRegistryClient sourceClient = sourceSchemaRegistry.getSchemaRegistryClient(); + int numSourceKeyVersions = 0; + int numSourceValueVersions = 0; + try { + numSourceKeyVersions = sourceClient.getAllVersions(keySubject).size(); + assertEquals(1, numSourceKeyVersions, "the source registry subject contains the pre-registered key schema"); + numSourceValueVersions = sourceClient.getAllVersions(valueSubject).size(); + assertEquals(1, numSourceValueVersions, "the source registry subject contains the pre-registered value schema"); + } catch (IOException | RestClientException e) { + fail(e); + } + + byte[] key; + byte[] value; + ConnectRecord appliedRecord; + int destinationKeyId; + int destinationValueId; + try { + + GenericData.Record keyRecord = new GenericRecordBuilder(KEY_RECORD_SCHEMA.rawSchema()) + .set("name", "cName") + .set("address", "cAddress") + .build(); + + GenericData.Record valueRecord = new GenericRecordBuilder(NAME_SCHEMA.rawSchema()) + .set("first", "fname") + .set("last", "lname") + .build(); + + ByteArrayOutputStream keyStream = encodeAvroObject(keySchema.rawSchema(), sourceKeyId, keyRecord); + ByteArrayOutputStream valueStream = encodeAvroObject(valueSchema.rawSchema(), sourceValueId, valueRecord); + + key = keyStream.toByteArray(); + value = valueStream.toByteArray(); + ConnectRecord record = createRecord(key, value); + + // check the destination has no versions for this subject + SchemaRegistryClient destClient = destSchemaRegistry.getSchemaRegistryClient(); + List destKeyVersions = destClient.getAllVersions(destKeySubject); + assertTrue(destKeyVersions.isEmpty(), "the destination registry starts empty"); + List destValueVersions = destClient.getAllVersions(destValueSubject); + assertTrue(destValueVersions.isEmpty(), "the destination registry starts empty"); + + // The transform will pass for key and value with byte schemas + log.info("applying transformation"); + appliedRecord = assertDoesNotThrow(() -> smt.apply(record)); + + assertEquals(record.keySchema(), appliedRecord.keySchema(), "key schema unchanged"); + assertEquals(record.valueSchema(), appliedRecord.valueSchema(), "value schema unchanged"); + + // check the value schema was copied, and the destination now has some version + destKeyVersions = destClient.getAllVersions(destKeySubject); + assertEquals(numSourceKeyVersions, destKeyVersions.size(), + "source and destination registries have the same amount of schemas for the key subject"); + destValueVersions = destClient.getAllVersions(destValueSubject); + assertEquals(numSourceValueVersions, destValueVersions.size(), + "source and destination registries have the same amount of schemas for the value subject"); + + // Verify that the ids for the source and destination are different + SchemaMetadata keyMetadata = destClient.getSchemaMetadata(destKeySubject, destKeyVersions.get(0)); + destinationKeyId = keyMetadata.getId(); + log.debug("source_keyId={} ; dest_keyId={}", sourceKeyId, destinationKeyId); + assertTrue(sourceKeyId < destinationKeyId, + "destination id should be different and higher since that registry already had schemas"); + SchemaMetadata valueMetadata = destClient.getSchemaMetadata(destValueSubject, destValueVersions.get(0)); + destinationValueId = valueMetadata.getId(); + log.debug("source_valueId={} ; dest_valueId={}", sourceValueId, destinationValueId); + assertTrue(sourceValueId < destinationValueId, + "destination id should be different and higher since that registry already had schemas"); + + // Verify the schemas are the same + ParsedSchema sourceKeySchema = sourceClient.getSchemaById(sourceKeyId); + ParsedSchema destKeySchema = new AvroSchema(keyMetadata.getSchema()); + + assertEquals(destKeySchema, sourceKeySchema, "source server returned same key schema"); + assertEquals(keySchema, destKeySchema, "destination server returned same key schema"); + assertEquals(sourceKeySchema, destKeySchema, "both servers' key schemas match"); + ParsedSchema sourceValueSchema = sourceClient.getSchemaById(sourceValueId); + ParsedSchema destValueSchema = new AvroSchema(valueMetadata.getSchema()); + assertEquals(destValueSchema, sourceValueSchema, "source server returned same value schema"); + assertEquals(valueSchema, destValueSchema, "destination server returned same value schema"); + assertEquals(sourceValueSchema, destValueSchema, "both servers' value schemas match"); + + } catch (IOException | RestClientException e) { + fail(e); + } + } + + } From 149ee953d1d5af30ea551b6f32ea26b00e00b903 Mon Sep 17 00:00:00 2001 From: Ivan Turcinovic Date: Tue, 9 Jun 2020 09:46:15 +0200 Subject: [PATCH 9/9] updated README.md with description of key.subject.name.strategy and value.subject.name.strategy parameters --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 9d2361e..f3b717c 100644 --- a/README.md +++ b/README.md @@ -80,6 +80,8 @@ Configuration Parameter | Default | Description **transfer.message.keys** | true | Indicates whether Avro schemas from message keys in source records should be copied to the destination Registry. **include.message.headers** | true | Indicates whether message headers from source records should be preserved after the transform. **schema.capacity** | 100 | Capacity of schemas that can be cached in each `CachedSchemaRegistryClient` +**key.subject.name.strategy** | TopicNameStrategy | Key subject name strategy for destination Schema Registry, possible values: TopicNameStrategy, RecordNameStrategy, TopicRecordNameStrategy +**value.subject.name.strategy** | TopicNameStrategy | Value subject name strategy for destination Schema Registry, possible values: TopicNameStrategy, RecordNameStrategy, TopicRecordNameStrategy ## Embedded Schema Registry Client Configuration