Skip to content

Commit

Permalink
[FLINK-33045] format avro-confluent - disable auto-registering schema…
Browse files Browse the repository at this point in the history
… in registry
  • Loading branch information
parisni committed Sep 26, 2024
1 parent fb50473 commit 5c768a1
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ public class AvroConfluentFormatOptions {
// Commonly used options maintained by Flink for convenience
// --------------------------------------------------------------------------------------------

public static final ConfigOption<Boolean> AUTO_REGISTER_SCHEMAS =
ConfigOptions.key("auto.register.schemas")
.booleanType()
.defaultValue(true)
.withDescription(
"Flag to indicate if the schema should be registered automatically.");

public static final ConfigOption<String> SSL_KEYSTORE_LOCATION =
ConfigOptions.key("ssl.keystore.location")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ class CachedSchemaCoderProvider implements SchemaCoder.SchemaCoderProvider {
public SchemaCoder get() {
return new ConfluentSchemaRegistryCoder(
this.subject,
new CachedSchemaRegistryClient(url, identityMapCapacity, registryConfigs));
new CachedSchemaRegistryClient(url, identityMapCapacity, registryConfigs),
registryConfigs);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,22 @@
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import org.apache.avro.Schema;

import javax.annotation.Nullable;

import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Map;

import static java.lang.String.format;

/** Reads and Writes schema using Confluent Schema Registry protocol. */
public class ConfluentSchemaRegistryCoder implements SchemaCoder {

private final SchemaRegistryClient schemaRegistryClient;
private final Map<String, ?> registryConfigs;
private String subject;
private static final int CONFLUENT_MAGIC_BYTE = 0;

Expand All @@ -46,9 +50,24 @@ public class ConfluentSchemaRegistryCoder implements SchemaCoder {
* @param schemaRegistryClient client to connect schema registry
* @param subject subject of schema registry to produce
*/
public ConfluentSchemaRegistryCoder(String subject, SchemaRegistryClient schemaRegistryClient) {
public ConfluentSchemaRegistryCoder(
String subject,
SchemaRegistryClient schemaRegistryClient,
@Nullable Map<String, ?> registryConfigs) {
this.schemaRegistryClient = schemaRegistryClient;
this.subject = subject;
this.registryConfigs = registryConfigs;
}

/**
* Creates {@link SchemaCoder} that uses provided {@link SchemaRegistryClient} to connect to
* schema registry.
*
* @param schemaRegistryClient client to connect schema registry
* @param subject subject of schema registry to produce
*/
public ConfluentSchemaRegistryCoder(String subject, SchemaRegistryClient schemaRegistryClient) {
this(subject, schemaRegistryClient, null);
}

/**
Expand All @@ -58,7 +77,7 @@ public ConfluentSchemaRegistryCoder(String subject, SchemaRegistryClient schemaR
* @param schemaRegistryClient client to connect schema registry
*/
public ConfluentSchemaRegistryCoder(SchemaRegistryClient schemaRegistryClient) {
this.schemaRegistryClient = schemaRegistryClient;
this(null, schemaRegistryClient, null);
}

@Override
Expand All @@ -81,13 +100,28 @@ public Schema readSchema(InputStream in) throws IOException {

@Override
public void writeSchema(Schema schema, OutputStream out) throws IOException {
try {
int registeredId = schemaRegistryClient.register(subject, schema);
out.write(CONFLUENT_MAGIC_BYTE);
byte[] schemaIdBytes = ByteBuffer.allocate(4).putInt(registeredId).array();
out.write(schemaIdBytes);
} catch (RestClientException e) {
throw new IOException("Could not register schema in registry", e);
int registeredId;
if (registerSchema()) {
try {
registeredId = schemaRegistryClient.register(subject, schema);
} catch (RestClientException e) {
throw new IOException("Could not register schema in registry", e);
}
} else {
try {
registeredId = schemaRegistryClient.getId(subject, schema);
} catch (RestClientException e) {
throw new IOException("Could not retrieve schema in registry", e);
}
}
out.write(CONFLUENT_MAGIC_BYTE);
byte[] schemaIdBytes = ByteBuffer.allocate(4).putInt(registeredId).array();
out.write(schemaIdBytes);
}

private boolean registerSchema() {
return this.registryConfigs != null && registryConfigs.containsKey("auto.register.schemas")
? Boolean.parseBoolean((String) registryConfigs.get("auto.register.schemas"))
: false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.stream.Stream;

import static java.lang.String.format;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.AUTO_REGISTER_SCHEMAS;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.BASIC_AUTH_CREDENTIALS_SOURCE;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.BASIC_AUTH_USER_INFO;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.BEARER_AUTH_CREDENTIALS_SOURCE;
Expand Down Expand Up @@ -186,6 +187,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(BASIC_AUTH_USER_INFO);
options.add(BEARER_AUTH_CREDENTIALS_SOURCE);
options.add(BEARER_AUTH_TOKEN);
options.add(AUTO_REGISTER_SCHEMAS);
return options;
}

Expand All @@ -203,7 +205,8 @@ public Set<ConfigOption<?>> forwardOptions() {
BASIC_AUTH_CREDENTIALS_SOURCE,
BASIC_AUTH_USER_INFO,
BEARER_AUTH_CREDENTIALS_SOURCE,
BEARER_AUTH_TOKEN)
BEARER_AUTH_TOKEN,
AUTO_REGISTER_SCHEMAS)
.collect(Collectors.toSet());
}

Expand Down Expand Up @@ -237,6 +240,9 @@ public Set<ConfigOption<?>> forwardOptions() {
formatOptions
.getOptional(BEARER_AUTH_TOKEN)
.ifPresent(v -> properties.put("bearer.auth.token", v));
formatOptions
.getOptional(AUTO_REGISTER_SCHEMAS)
.ifPresent(v -> properties.put("auto.register.schemas", Boolean.toString(v)));

if (properties.isEmpty()) {
return null;
Expand Down

0 comments on commit 5c768a1

Please sign in to comment.