Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Schema Registry Subjects configurable #29

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ Configuration Parameter | Default | Description
**transfer.message.keys** | true | Indicates whether Avro schemas from message keys in source records should be copied to the destination Registry.
**include.message.headers** | true | Indicates whether message headers from source records should be preserved after the transform.
**schema.capacity** | 100 | Capacity of schemas that can be cached in each `CachedSchemaRegistryClient`
**key.subject.name.strategy** | TopicNameStrategy | Key subject name strategy for destination Schema Registry, possible values: TopicNameStrategy, RecordNameStrategy, TopicRecordNameStrategy
**value.subject.name.strategy** | TopicNameStrategy | Value subject name strategy for destination Schema Registry, possible values: TopicNameStrategy, RecordNameStrategy, TopicRecordNameStrategy

## Embedded Schema Registry Client Configuration

Expand Down
55 changes: 31 additions & 24 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>cricket.jmoore</groupId>
<artifactId>schema-registry-transfer-smt</artifactId>
<version>0.2.1-SNAPSHOT</version>
<version>0.2.1</version>
<name>schema-registry-transfer-smt</name>
<description>
A Kafka Connect Transform for copying Confluent Schema Registry schemas between different registries.
Expand Down Expand Up @@ -66,11 +66,10 @@
<maven.compiler.target>1.8</maven.compiler.target>

<slf4j.version>1.7.25</slf4j.version>
<kafka.version>2.1.0</kafka.version>
<confluent.version>5.1.0</confluent.version>
<confluent.patch.version>-cp1</confluent.patch.version>
<jackson.version>2.9.7</jackson.version>
<jackson.asl.version>1.9.13</jackson.asl.version>
<confluent.version>5.5.0</confluent.version>
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be possible to implement the feature without upgrades, IMO.

Can the version bump be a separate PR?

<jackson.version>2.10.2</jackson.version>
<avro.version>1.9.2</avro.version>
<jersey.bean.validation.version>2.30</jersey.bean.validation.version>

<spotless.version>1.20.0</spotless.version>

Expand All @@ -90,14 +89,14 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}${confluent.patch.version}</version>
<version>${confluent.version}-ccs</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-transforms</artifactId>
<version>${kafka.version}${confluent.patch.version}</version>
<version>${confluent.version}-ccs</version>
<scope>provided</scope>
</dependency>

Expand All @@ -122,13 +121,6 @@
</exclusions>
</dependency>

<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand All @@ -137,16 +129,16 @@
</dependency>

<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>${jackson.asl.version}</version>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>${jackson.asl.version}</version>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>

Expand Down Expand Up @@ -178,11 +170,18 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.glassfish.jersey.ext</groupId>
<artifactId>jersey-bean-validation</artifactId>
<version>${jersey.bean.validation.version}</version>
<scope>provided</scope>
</dependency>

<!-- Runtime dependencies -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.1</version>
<version>${avro.version}</version>
</dependency>

<dependency>
Expand All @@ -201,8 +200,16 @@
<artifactId>kafka-clients</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<groupId>io.swagger</groupId>
<artifactId>swagger-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>io.swagger</groupId>
<artifactId>swagger-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.glassfish.jersey.ext</groupId>
<artifactId>jersey-bean-validation</artifactId>
</exclusion>
</exclusions>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.Objects;
import java.util.Optional;

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.cache.Cache;
import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
Expand All @@ -23,10 +24,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.serializers.subject.RecordNameStrategy;
import io.confluent.kafka.serializers.subject.TopicNameStrategy;
import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy;
import io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy;

@SuppressWarnings("unused")
Expand All @@ -44,17 +48,19 @@ public class SchemaRegistryTransfer<R extends ConnectRecord<R>> implements Trans

public static final String SRC_PREAMBLE = "For source consumer's schema registry, ";
public static final String SRC_SCHEMA_REGISTRY_CONFIG_DOC = "A list of addresses for the Schema Registry to copy from. The consumer's Schema Registry.";
public static final String SRC_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DOC = SRC_PREAMBLE + AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DOC;
public static final String SRC_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DEFAULT = AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DEFAULT;
public static final String SRC_USER_INFO_CONFIG_DOC = SRC_PREAMBLE + AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DOC;
public static final String SRC_USER_INFO_CONFIG_DEFAULT = AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DEFAULT;
public static final String SRC_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DOC = SRC_PREAMBLE + AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DOC;
public static final String SRC_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DEFAULT = AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DEFAULT;
public static final String SRC_USER_INFO_CONFIG_DOC = SRC_PREAMBLE + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DOC;
public static final String SRC_USER_INFO_CONFIG_DEFAULT = AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DEFAULT;

public static final String DEST_PREAMBLE = "For target producer's schema registry, ";
public static final String DEST_SCHEMA_REGISTRY_CONFIG_DOC = "A list of addresses for the Schema Registry to copy to. The producer's Schema Registry.";
public static final String DEST_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DOC = DEST_PREAMBLE + AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DOC;
public static final String DEST_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DEFAULT = AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DEFAULT;
public static final String DEST_USER_INFO_CONFIG_DOC = DEST_PREAMBLE + AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DOC;
public static final String DEST_USER_INFO_CONFIG_DEFAULT = AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DEFAULT;
public static final String DEST_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DOC = DEST_PREAMBLE + AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DOC;
public static final String DEST_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DEFAULT = AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DEFAULT;
public static final String DEST_USER_INFO_CONFIG_DOC = DEST_PREAMBLE + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DOC;
public static final String DEST_USER_INFO_CONFIG_DEFAULT = AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DEFAULT;
public static final String KEY_SUBJECT_NAME_STRATEGY_DEFAULT = TopicNameStrategy.class.getSimpleName();
public static final String VALUE_SUBJECT_NAME_STRATEGY_DEFAULT = TopicNameStrategy.class.getSimpleName();

public static final String TRANSFER_KEYS_CONFIG_DOC = "Whether or not to copy message key schemas between registries.";
public static final Boolean TRANSFER_KEYS_CONFIG_DEFAULT = true;
Expand All @@ -63,7 +69,8 @@ public class SchemaRegistryTransfer<R extends ConnectRecord<R>> implements Trans

private CachedSchemaRegistryClient sourceSchemaRegistryClient;
private CachedSchemaRegistryClient destSchemaRegistryClient;
private SubjectNameStrategy<org.apache.avro.Schema> subjectNameStrategy;
private SubjectNameStrategy keySubjectNameStrategy;
private SubjectNameStrategy valueSubjectNameStrategy;
private boolean transferKeys, includeHeaders;

// caches from the source registry to the destination registry
Expand All @@ -83,8 +90,10 @@ public SchemaRegistryTransfer() {
.define(ConfigName.SCHEMA_CAPACITY, ConfigDef.Type.INT, SCHEMA_CAPACITY_CONFIG_DEFAULT, ConfigDef.Importance.LOW, SCHEMA_CAPACITY_CONFIG_DOC)
.define(ConfigName.TRANSFER_KEYS, ConfigDef.Type.BOOLEAN, TRANSFER_KEYS_CONFIG_DEFAULT, ConfigDef.Importance.MEDIUM, TRANSFER_KEYS_CONFIG_DOC)
.define(ConfigName.INCLUDE_HEADERS, ConfigDef.Type.BOOLEAN, INCLUDE_HEADERS_CONFIG_DEFAULT, ConfigDef.Importance.MEDIUM, INCLUDE_HEADERS_CONFIG_DOC)
.define(ConfigName.KEY_SUBJECT_NAME_STRATEGY, ConfigDef.Type.STRING, KEY_SUBJECT_NAME_STRATEGY_DEFAULT, ConfigDef.Importance.MEDIUM, AbstractKafkaSchemaSerDeConfig.KEY_SUBJECT_NAME_STRATEGY_DOC)
.define(ConfigName.VALUE_SUBJECT_NAME_STRATEGY, ConfigDef.Type.STRING, KEY_SUBJECT_NAME_STRATEGY_DEFAULT, ConfigDef.Importance.MEDIUM, AbstractKafkaSchemaSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY_DOC)
;
// TODO: Other properties might be useful, e.g. the Subject Strategies
// TODO: Other properties might be useful
}

@Override
Expand All @@ -98,17 +107,17 @@ public void configure(Map<String, ?> props) {

List<String> sourceUrls = config.getList(ConfigName.SRC_SCHEMA_REGISTRY_URL);
final Map<String, String> sourceProps = new HashMap<>();
sourceProps.put(AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE,
sourceProps.put(AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE,
"SRC_" + config.getString(ConfigName.SRC_BASIC_AUTH_CREDENTIALS_SOURCE));
sourceProps.put(AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG,
sourceProps.put(AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG,
config.getPassword(ConfigName.SRC_USER_INFO)
.value());

List<String> destUrls = config.getList(ConfigName.DEST_SCHEMA_REGISTRY_URL);
final Map<String, String> destProps = new HashMap<>();
destProps.put(AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE,
destProps.put(AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE,
"DEST_" + config.getString(ConfigName.DEST_BASIC_AUTH_CREDENTIALS_SOURCE));
destProps.put(AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG,
destProps.put(AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG,
config.getPassword(ConfigName.DEST_USER_INFO)
.value());

Expand All @@ -121,9 +130,21 @@ public void configure(Map<String, ?> props) {
this.transferKeys = config.getBoolean(ConfigName.TRANSFER_KEYS);
this.includeHeaders = config.getBoolean(ConfigName.INCLUDE_HEADERS);

// TODO: Make the Strategy configurable, may be different for src and dest
// Strategy for the -key and -value subjects
this.subjectNameStrategy = new TopicNameStrategy();
// Strategy for the subjects
this.keySubjectNameStrategy = createSubjectStrategy(config.getString(ConfigName.KEY_SUBJECT_NAME_STRATEGY));
this.valueSubjectNameStrategy = createSubjectStrategy(config.getString(ConfigName.VALUE_SUBJECT_NAME_STRATEGY));
Comment on lines -124 to +135
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry it took me so long to review this.

Were you not able to make the source and destination use differing strategies? Or do you think that use case would ultimately cause confusion?

}

private SubjectNameStrategy createSubjectStrategy(String subjectStrategyName) {
if (TopicNameStrategy.class.getSimpleName().equals(subjectStrategyName)) {
return new TopicNameStrategy();
} else if (RecordNameStrategy.class.getSimpleName().equals(subjectStrategyName)) {
return new RecordNameStrategy();
} else if (TopicRecordNameStrategy.class.getSimpleName().equals(subjectStrategyName)) {
return new TopicRecordNameStrategy();
} else {
throw new KafkaException("Unknown Subject strategy name: " + subjectStrategyName);
}
}

@Override
Expand Down Expand Up @@ -206,23 +227,22 @@ protected Optional<Integer> copySchema(ByteBuffer buffer, String topic, boolean

schemaAndDestId = schemaCache.get(sourceSchemaId);
if (schemaAndDestId != null) {
log.trace("Schema id {} has been seen before. Not registering with destination registry again.");
log.trace("Schema id {} has been seen before. Not registering with destination registry again.", sourceSchemaId);
} else { // cache miss
log.trace("Schema id {} has not been seen before", sourceSchemaId);
schemaAndDestId = new SchemaAndId();
try {
log.trace("Looking up schema id {} in source registry", sourceSchemaId);
// Can't do getBySubjectAndId because that requires a Schema object for the strategy
schemaAndDestId.schema = sourceSchemaRegistryClient.getById(sourceSchemaId);
schemaAndDestId.schema = sourceSchemaRegistryClient.getSchemaById(sourceSchemaId);
} catch (IOException | RestClientException e) {
log.error(String.format("Unable to fetch source schema for id %d.", sourceSchemaId), e);
throw new ConnectException(e);
}

try {
log.trace("Registering schema {} to destination registry", schemaAndDestId.schema);
// It could be possible that the destination naming strategy is different from the source
String subjectName = subjectNameStrategy.subjectName(topic, isKey, schemaAndDestId.schema);
String subjectName = isKey ? keySubjectNameStrategy.subjectName(topic, isKey, schemaAndDestId.schema) : valueSubjectNameStrategy.subjectName(topic, isKey, schemaAndDestId.schema);
schemaAndDestId.id = destSchemaRegistryClient.register(subjectName, schemaAndDestId.schema);
schemaCache.put(sourceSchemaId, schemaAndDestId);
} catch (IOException | RestClientException e) {
Expand All @@ -244,25 +264,27 @@ public void close() {
}

interface ConfigName {
String SRC_SCHEMA_REGISTRY_URL = "src." + AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG;
String SRC_BASIC_AUTH_CREDENTIALS_SOURCE = "src." + AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE;
String SRC_USER_INFO = "src." + AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG;
String DEST_SCHEMA_REGISTRY_URL = "dest." + AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG;
String DEST_BASIC_AUTH_CREDENTIALS_SOURCE = "dest." + AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE;
String DEST_USER_INFO = "dest." + AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG;
String SRC_SCHEMA_REGISTRY_URL = "src." + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG;
String SRC_BASIC_AUTH_CREDENTIALS_SOURCE = "src." + AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE;
String SRC_USER_INFO = "src." + AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG;
String DEST_SCHEMA_REGISTRY_URL = "dest." + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG;
String DEST_BASIC_AUTH_CREDENTIALS_SOURCE = "dest." + AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE;
String DEST_USER_INFO = "dest." + AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG;
String SCHEMA_CAPACITY = "schema.capacity";
String TRANSFER_KEYS = "transfer.message.keys";
String INCLUDE_HEADERS = "include.message.headers";
String VALUE_SUBJECT_NAME_STRATEGY = AbstractKafkaSchemaSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY;
String KEY_SUBJECT_NAME_STRATEGY = AbstractKafkaSchemaSerDeConfig.KEY_SUBJECT_NAME_STRATEGY;
}

private static class SchemaAndId {
private Integer id;
private org.apache.avro.Schema schema;
private ParsedSchema schema;

SchemaAndId() {
}

SchemaAndId(int id, org.apache.avro.Schema schema) {
SchemaAndId(int id, ParsedSchema schema) {
this.id = id;
this.schema = schema;
}
Expand Down
Loading