Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mm2 updates #26

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 31 additions & 24 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>cricket.jmoore</groupId>
<artifactId>schema-registry-transfer-smt</artifactId>
<version>0.2.1-SNAPSHOT</version>
<version>0.2.1</version>
<name>schema-registry-transfer-smt</name>
<description>
A Kafka Connect Transform for copying Confluent Schema Registry schemas between different registries.
Expand Down Expand Up @@ -66,11 +66,10 @@
<maven.compiler.target>1.8</maven.compiler.target>

<slf4j.version>1.7.25</slf4j.version>
<kafka.version>2.1.0</kafka.version>
<confluent.version>5.1.0</confluent.version>
<confluent.patch.version>-cp1</confluent.patch.version>
<jackson.version>2.9.7</jackson.version>
<jackson.asl.version>1.9.13</jackson.asl.version>
<confluent.version>5.5.0</confluent.version>
<jackson.version>2.10.2</jackson.version>
<avro.version>1.9.2</avro.version>
<jersey.bean.validation.version>2.30</jersey.bean.validation.version>

<spotless.version>1.20.0</spotless.version>

Expand All @@ -90,14 +89,14 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}${confluent.patch.version}</version>
<version>${confluent.version}-ccs</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-transforms</artifactId>
<version>${kafka.version}${confluent.patch.version}</version>
<version>${confluent.version}-ccs</version>
<scope>provided</scope>
</dependency>

Expand All @@ -122,13 +121,6 @@
</exclusions>
</dependency>

<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand All @@ -137,16 +129,16 @@
</dependency>

<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>${jackson.asl.version}</version>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>${jackson.asl.version}</version>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>

Expand Down Expand Up @@ -178,11 +170,18 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.glassfish.jersey.ext</groupId>
<artifactId>jersey-bean-validation</artifactId>
<version>${jersey.bean.validation.version}</version>
<scope>provided</scope>
</dependency>

<!-- Runtime dependencies -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.1</version>
<version>${avro.version}</version>
</dependency>

<dependency>
Expand All @@ -201,8 +200,16 @@
<artifactId>kafka-clients</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<groupId>io.swagger</groupId>
<artifactId>swagger-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>io.swagger</groupId>
<artifactId>swagger-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.glassfish.jersey.ext</groupId>
<artifactId>jersey-bean-validation</artifactId>
</exclusion>
</exclusions>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.serializers.subject.TopicNameStrategy;
import io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy;

Expand All @@ -44,17 +45,17 @@ public class SchemaRegistryTransfer<R extends ConnectRecord<R>> implements Trans

public static final String SRC_PREAMBLE = "For source consumer's schema registry, ";
public static final String SRC_SCHEMA_REGISTRY_CONFIG_DOC = "A list of addresses for the Schema Registry to copy from. The consumer's Schema Registry.";
public static final String SRC_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DOC = SRC_PREAMBLE + AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DOC;
public static final String SRC_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DEFAULT = AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DEFAULT;
public static final String SRC_USER_INFO_CONFIG_DOC = SRC_PREAMBLE + AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DOC;
public static final String SRC_USER_INFO_CONFIG_DEFAULT = AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DEFAULT;
public static final String SRC_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DOC = SRC_PREAMBLE + AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DOC;
public static final String SRC_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DEFAULT = AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DEFAULT;
public static final String SRC_USER_INFO_CONFIG_DOC = SRC_PREAMBLE + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DOC;
public static final String SRC_USER_INFO_CONFIG_DEFAULT = AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DEFAULT;

public static final String DEST_PREAMBLE = "For target producer's schema registry, ";
public static final String DEST_SCHEMA_REGISTRY_CONFIG_DOC = "A list of addresses for the Schema Registry to copy to. The producer's Schema Registry.";
public static final String DEST_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DOC = DEST_PREAMBLE + AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DOC;
public static final String DEST_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DEFAULT = AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DEFAULT;
public static final String DEST_USER_INFO_CONFIG_DOC = DEST_PREAMBLE + AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DOC;
public static final String DEST_USER_INFO_CONFIG_DEFAULT = AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DEFAULT;
public static final String DEST_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DOC = DEST_PREAMBLE + AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DOC;
public static final String DEST_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DEFAULT = AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DEFAULT;
public static final String DEST_USER_INFO_CONFIG_DOC = DEST_PREAMBLE + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DOC;
public static final String DEST_USER_INFO_CONFIG_DEFAULT = AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DEFAULT;

public static final String TRANSFER_KEYS_CONFIG_DOC = "Whether or not to copy message key schemas between registries.";
public static final Boolean TRANSFER_KEYS_CONFIG_DEFAULT = true;
Expand All @@ -63,7 +64,7 @@ public class SchemaRegistryTransfer<R extends ConnectRecord<R>> implements Trans

private CachedSchemaRegistryClient sourceSchemaRegistryClient;
private CachedSchemaRegistryClient destSchemaRegistryClient;
private SubjectNameStrategy<org.apache.avro.Schema> subjectNameStrategy;
private SubjectNameStrategy subjectNameStrategy;
private boolean transferKeys, includeHeaders;

// caches from the source registry to the destination registry
Expand Down Expand Up @@ -98,17 +99,17 @@ public void configure(Map<String, ?> props) {

List<String> sourceUrls = config.getList(ConfigName.SRC_SCHEMA_REGISTRY_URL);
final Map<String, String> sourceProps = new HashMap<>();
sourceProps.put(AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE,
sourceProps.put(AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE,
"SRC_" + config.getString(ConfigName.SRC_BASIC_AUTH_CREDENTIALS_SOURCE));
sourceProps.put(AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG,
sourceProps.put(AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG,
config.getPassword(ConfigName.SRC_USER_INFO)
.value());

List<String> destUrls = config.getList(ConfigName.DEST_SCHEMA_REGISTRY_URL);
final Map<String, String> destProps = new HashMap<>();
destProps.put(AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE,
destProps.put(AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE,
"DEST_" + config.getString(ConfigName.DEST_BASIC_AUTH_CREDENTIALS_SOURCE));
destProps.put(AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG,
destProps.put(AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG,
config.getPassword(ConfigName.DEST_USER_INFO)
.value());

Expand Down Expand Up @@ -206,14 +207,14 @@ protected Optional<Integer> copySchema(ByteBuffer buffer, String topic, boolean

schemaAndDestId = schemaCache.get(sourceSchemaId);
if (schemaAndDestId != null) {
log.trace("Schema id {} has been seen before. Not registering with destination registry again.");
log.trace("Schema id {} has been seen before. Not registering with destination registry again.", sourceSchemaId);
} else { // cache miss
log.trace("Schema id {} has not been seen before", sourceSchemaId);
schemaAndDestId = new SchemaAndId();
try {
log.trace("Looking up schema id {} in source registry", sourceSchemaId);
// Can't do getBySubjectAndId because that requires a Schema object for the strategy
schemaAndDestId.schema = sourceSchemaRegistryClient.getById(sourceSchemaId);
schemaAndDestId.schema = sourceSchemaRegistryClient.getSchemaById(sourceSchemaId);
} catch (IOException | RestClientException e) {
log.error(String.format("Unable to fetch source schema for id %d.", sourceSchemaId), e);
throw new ConnectException(e);
Expand Down Expand Up @@ -244,25 +245,25 @@ public void close() {
}

interface ConfigName {
String SRC_SCHEMA_REGISTRY_URL = "src." + AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG;
String SRC_BASIC_AUTH_CREDENTIALS_SOURCE = "src." + AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE;
String SRC_USER_INFO = "src." + AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG;
String DEST_SCHEMA_REGISTRY_URL = "dest." + AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG;
String DEST_BASIC_AUTH_CREDENTIALS_SOURCE = "dest." + AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE;
String DEST_USER_INFO = "dest." + AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG;
String SRC_SCHEMA_REGISTRY_URL = "src." + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG;
String SRC_BASIC_AUTH_CREDENTIALS_SOURCE = "src." + AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE;
String SRC_USER_INFO = "src." + AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG;
String DEST_SCHEMA_REGISTRY_URL = "dest." + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG;
String DEST_BASIC_AUTH_CREDENTIALS_SOURCE = "dest." + AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE;
String DEST_USER_INFO = "dest." + AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG;
String SCHEMA_CAPACITY = "schema.capacity";
String TRANSFER_KEYS = "transfer.message.keys";
String INCLUDE_HEADERS = "include.message.headers";
}

private static class SchemaAndId {
private Integer id;
private org.apache.avro.Schema schema;
private ParsedSchema schema;

SchemaAndId() {
}

SchemaAndId(int id, org.apache.avro.Schema schema) {
SchemaAndId(int id, ParsedSchema schema) {
this.id = id;
this.schema = schema;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
Expand Down Expand Up @@ -152,22 +154,22 @@ public void beforeEach(final ExtensionContext context) {
.willReturn(WireMock.aResponse().withTransformers(this.getVersionHandler.getName())));
this.stubFor.apply(WireMock.get(WireMock.urlPathMatching(CONFIG_PATTERN))
.willReturn(WireMock.aResponse().withTransformers(this.getConfigHandler.getName())));
this.stubFor.apply(WireMock.get(WireMock.urlPathMatching(SCHEMA_BY_ID_PATTERN + "\\d+"))
this.stubFor.apply(WireMock.get(WireMock.urlPathMatching(SCHEMA_BY_ID_PATTERN + "\\d+/(?:fetchMaxId=false)"))
.willReturn(WireMock.aResponse().withStatus(HTTP_NOT_FOUND)));
}

public int registerSchema(final String topic, boolean isKey, final Schema schema) {
public int registerSchema(final String topic, boolean isKey, final ParsedSchema schema) {
return this.registerSchema(topic, isKey, schema, new TopicNameStrategy());
}

public int registerSchema(final String topic, boolean isKey, final Schema schema, SubjectNameStrategy<Schema> strategy) {
public int registerSchema(final String topic, boolean isKey, final ParsedSchema schema, SubjectNameStrategy strategy) {
return this.register(strategy.subjectName(topic, isKey, schema), schema);
}

private int register(final String subject, final Schema schema) {
private int register(final String subject, final ParsedSchema schema) {
try {
final int id = this.schemaRegistryClient.register(subject, schema);
this.stubFor.apply(WireMock.get(WireMock.urlEqualTo(SCHEMA_BY_ID_PATTERN + id))
this.stubFor.apply(WireMock.get(WireMock.urlEqualTo(SCHEMA_BY_ID_PATTERN + id + "?fetchMaxId=false"))
.willReturn(ResponseDefinitionBuilder.okForJson(new SchemaString(schema.toString()))));
log.debug("Registered schema {}", id);
return id;
Expand Down Expand Up @@ -242,8 +244,8 @@ public ResponseDefinition transform(final Request request, final ResponseDefinit
final FileSource files, final Parameters parameters) {
try {
final int id = SchemaRegistryMock.this.register(getSubject(request),
new Schema.Parser()
.parse(RegisterSchemaRequest.fromJson(request.getBodyAsString()).getSchema()));
new AvroSchema(new Schema.Parser()
.parse(RegisterSchemaRequest.fromJson(request.getBodyAsString()).getSchema())));
final RegisterSchemaResponse registerSchemaResponse = new RegisterSchemaResponse();
registerSchemaResponse.setId(id);
return ResponseDefinitionBuilder.jsonResponse(registerSchemaResponse);
Expand Down Expand Up @@ -279,7 +281,8 @@ private class GetVersionHandler extends SubjectsVersioHandler {
@Override
public ResponseDefinition transform(final Request request, final ResponseDefinition responseDefinition,
final FileSource files, final Parameters parameters) {
String versionStr = Iterables.get(this.urlSplitter.split(request.getUrl()), 3);
String versionStrFull = Iterables.get(this.urlSplitter.split(request.getUrl()), 3);
String versionStr = versionStrFull.substring(0, versionStrFull.indexOf("?"));
SchemaMetadata metadata;
if (versionStr.equals("latest")) {
metadata = SchemaRegistryMock.this.getSubjectVersion(getSubject(request), versionStr);
Expand Down
Loading