diff --git a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/AvroConfluentFormatOptions.java b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/AvroConfluentFormatOptions.java index 42e7353d009ba..071078387356c 100644 --- a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/AvroConfluentFormatOptions.java +++ b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/AvroConfluentFormatOptions.java @@ -65,6 +65,13 @@ public class AvroConfluentFormatOptions { // Commonly used options maintained by Flink for convenience // -------------------------------------------------------------------------------------------- + public static final ConfigOption AUTO_REGISTER_SCHEMAS = + ConfigOptions.key("auto.register.schemas") + .booleanType() + .defaultValue(true) + .withDescription( + "Flag to indicate if the schema should be registered automatically."); + public static final ConfigOption SSL_KEYSTORE_LOCATION = ConfigOptions.key("ssl.keystore.location") .stringType() diff --git a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/CachedSchemaCoderProvider.java b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/CachedSchemaCoderProvider.java index 53be09c7475a6..d8a59846f69c3 100644 --- a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/CachedSchemaCoderProvider.java +++ b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/CachedSchemaCoderProvider.java @@ -59,7 +59,8 @@ class CachedSchemaCoderProvider implements SchemaCoder.SchemaCoderProvider { public SchemaCoder get() { return new ConfluentSchemaRegistryCoder( this.subject, - new CachedSchemaRegistryClient(url, identityMapCapacity, registryConfigs)); + new CachedSchemaRegistryClient(url, identityMapCapacity, registryConfigs), + registryConfigs); } @Override diff --git a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java index 7be3145314835..624ba1cccc8d9 100644 --- a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java +++ b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java @@ -24,11 +24,14 @@ import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import org.apache.avro.Schema; +import javax.annotation.Nullable; + import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.util.Map; import static java.lang.String.format; @@ -36,6 +39,7 @@ public class ConfluentSchemaRegistryCoder implements SchemaCoder { private final SchemaRegistryClient schemaRegistryClient; + private final Map registryConfigs; private String subject; private static final int CONFLUENT_MAGIC_BYTE = 0; @@ -46,9 +50,24 @@ public class ConfluentSchemaRegistryCoder implements SchemaCoder { * @param schemaRegistryClient client to connect schema registry * @param subject subject of schema registry to produce */ - public ConfluentSchemaRegistryCoder(String subject, SchemaRegistryClient schemaRegistryClient) { + public ConfluentSchemaRegistryCoder( + String subject, + SchemaRegistryClient schemaRegistryClient, + @Nullable Map registryConfigs) { this.schemaRegistryClient = schemaRegistryClient; this.subject = subject; + this.registryConfigs = registryConfigs; + } + + /** + * Creates {@link SchemaCoder} that uses provided {@link SchemaRegistryClient} to connect to + * schema registry. + * + * @param schemaRegistryClient client to connect schema registry + * @param subject subject of schema registry to produce + */ + public ConfluentSchemaRegistryCoder(String subject, SchemaRegistryClient schemaRegistryClient) { + this(subject, schemaRegistryClient, null); } /** @@ -58,7 +77,7 @@ public ConfluentSchemaRegistryCoder(String subject, SchemaRegistryClient schemaR * @param schemaRegistryClient client to connect schema registry */ public ConfluentSchemaRegistryCoder(SchemaRegistryClient schemaRegistryClient) { - this.schemaRegistryClient = schemaRegistryClient; + this(null, schemaRegistryClient, null); } @Override @@ -81,13 +100,28 @@ public Schema readSchema(InputStream in) throws IOException { @Override public void writeSchema(Schema schema, OutputStream out) throws IOException { - try { - int registeredId = schemaRegistryClient.register(subject, schema); - out.write(CONFLUENT_MAGIC_BYTE); - byte[] schemaIdBytes = ByteBuffer.allocate(4).putInt(registeredId).array(); - out.write(schemaIdBytes); - } catch (RestClientException e) { - throw new IOException("Could not register schema in registry", e); + int registeredId; + if (registerSchema()) { + try { + registeredId = schemaRegistryClient.register(subject, schema); + } catch (RestClientException e) { + throw new IOException("Could not register schema in registry", e); + } + } else { + try { + registeredId = schemaRegistryClient.getId(subject, schema); + } catch (RestClientException e) { + throw new IOException("Could not retrieve schema in registry", e); + } } + out.write(CONFLUENT_MAGIC_BYTE); + byte[] schemaIdBytes = ByteBuffer.allocate(4).putInt(registeredId).array(); + out.write(schemaIdBytes); + } + + private boolean registerSchema() { + return this.registryConfigs != null && registryConfigs.containsKey("auto.register.schemas") + ? Boolean.parseBoolean((String) registryConfigs.get("auto.register.schemas")) + : false; } } diff --git a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java index e43d5a76e0681..806ec45ff3699 100644 --- a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java +++ b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java @@ -60,6 +60,7 @@ import java.util.stream.Stream; import static java.lang.String.format; +import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.AUTO_REGISTER_SCHEMAS; import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.BASIC_AUTH_CREDENTIALS_SOURCE; import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.BASIC_AUTH_USER_INFO; import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.BEARER_AUTH_CREDENTIALS_SOURCE; @@ -186,6 +187,7 @@ public Set> optionalOptions() { options.add(BASIC_AUTH_USER_INFO); options.add(BEARER_AUTH_CREDENTIALS_SOURCE); options.add(BEARER_AUTH_TOKEN); + options.add(AUTO_REGISTER_SCHEMAS); return options; } @@ -203,7 +205,8 @@ public Set> forwardOptions() { BASIC_AUTH_CREDENTIALS_SOURCE, BASIC_AUTH_USER_INFO, BEARER_AUTH_CREDENTIALS_SOURCE, - BEARER_AUTH_TOKEN) + BEARER_AUTH_TOKEN, + AUTO_REGISTER_SCHEMAS) .collect(Collectors.toSet()); } @@ -237,6 +240,9 @@ public Set> forwardOptions() { formatOptions .getOptional(BEARER_AUTH_TOKEN) .ifPresent(v -> properties.put("bearer.auth.token", v)); + formatOptions + .getOptional(AUTO_REGISTER_SCHEMAS) + .ifPresent(v -> properties.put("auto.register.schemas", Boolean.toString(v))); if (properties.isEmpty()) { return null;