diff --git a/.gitignore b/.gitignore
index 973e8d52..27f696e0 100644
--- a/.gitignore
+++ b/.gitignore
@@ -36,4 +36,5 @@ tools/flink
tools/flink-*
tools/releasing/release
tools/japicmp-output
-*/.idea/
\ No newline at end of file
+*/.idea/
+.java-version
diff --git a/docs/content/docs/connectors/table/formats/avro-glue.md b/docs/content/docs/connectors/table/formats/avro-glue.md
new file mode 100644
index 00000000..eab089e6
--- /dev/null
+++ b/docs/content/docs/connectors/table/formats/avro-glue.md
@@ -0,0 +1,226 @@
+---
+title: Avro Glue Schema Registry
+weight: 5
+type: docs
+aliases:
+- /dev/table/connectors/formats/avro-glue.html
+---
+
+
+# AWS Glue Avro Format
+
+{{< label "Format: Serialization Schema" >}}
+{{< label "Format: Deserialization Schema" >}}
+
+The AWS Glue Schema Registry (``avro-glue``) format allows you to read records that were serialized by ``com.amazonaws.services.schemaregistry.serializers.avro.AWSKafkaAvroSerializer`` and to write records that can in turn be read by ``com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer``.
+These records have their schemas stored out-of-band in a configured registry provided by the [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html#schema-registry-schemas).
+
+When reading (deserializing) a record with this format the Avro writer schema is fetched from the configured AWS Glue Schema Registry, based on the schema version id encoded in the record, while the reader schema is inferred from table schema.
+
+When writing (serializing) a record with this format the Avro schema is inferred from the table schema and used to retrieve a schema id to be encoded with the data.
+The lookup is performed against the configured AWS Glue Schema Registry under the [value](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html#schema-registry-schemas) given in `avro-glue.schema-name`.
+Optionally, you can enable schema auto-registration, allowing the writer to register a new schema version in the schema registry, directly. The new schema will be accepted only if it does not violate the compatibility mode that was set when the schema was created in the first place.
+
+The AWS Glue Schema format can only be used in conjunction with the [Apache Kafka SQL connector]({{< ref "docs/connectors/table/kafka" >}}) or the [Upsert Kafka SQL Connector]({{< ref "docs/connectors/table/upsert-kafka" >}}).
+
+Dependencies
+------------
+
+{{< sql_download_table "avro-glue" >}}
+
+How to create tables with Avro-Glue format
+--------------
+
+
+Example of a table using raw UTF-8 string as Kafka key and Avro records registered in the Schema Registry as Kafka values:
+
+```sql
+CREATE TABLE user_created (
+
+ -- one column mapped to the Kafka raw UTF-8 key
+ the_kafka_key STRING,
+
+ -- a few columns mapped to the Avro fields of the Kafka value
+ id STRING,
+ name STRING,
+ email STRING
+
+) WITH (
+
+ 'connector' = 'kafka',
+ 'topic' = 'user_events_example1',
+ 'properties.bootstrap.servers' = 'localhost:9092',
+
+ -- UTF-8 string as Kafka keys, using the 'the_kafka_key' table column
+ 'key.format' = 'raw',
+ 'key.fields' = 'the_kafka_key',
+
+ 'value.format' = 'avro-glue',
+ 'value.avro-glue.region' = 'us-east-1',
+ 'value.avro-glue.registry.name' = 'my-schema-registry',
+ 'value.avro-glue.schema.name' = 'my-schema-name',
+ 'avro-glue.schema.autoRegistration' = 'true',
+ 'value.fields-include' = 'EXCEPT_KEY'
+)
+```
+
+You can write data into the Kafka table as follows:
+
+```
+INSERT INTO user_created
+SELECT
+ -- replicating the user id into a column mapped to the kafka key
+ id as the_kafka_key,
+
+ -- all values
+ id, name, email
+FROM some_table
+```
+
+Format Options
+----------------
+
+Note that naming of the properties slightly diverges from the [AWS Glue client code](https://github.com/awslabs/aws-glue-schema-registry/blob/master/common/src/main/java/com/amazonaws/services/schemaregistry/utils/AWSSchemaRegistryConstants.java#L20) properties, to match with the conventions used by other Flink formats.
+
+
+
+
+ Option |
+ Required |
+ Default |
+ Type |
+ Description |
+
+
+
+
+ format |
+ required |
+ (none) |
+ String |
+ Specify what format to use, here should be 'avro-glue' . |
+
+
+ avro-glue.aws.region |
+ required |
+ (none) |
+ String |
+ Specify what AWS region the Glue Schema Registry is, such as 'us-east-1' . |
+
+
+ avro-glue.aws.endpoint |
+ optional |
+ (none) |
+ String |
+ The HTTP endpoint to use for AWS calls. |
+
+
+ avro-glue.registry.name |
+ required |
+ (none) |
+ String |
+ The name (not the ARN) of the Glue schema registry in which to store the schemas. |
+
+
+ avro-glue.schema.name |
+ required |
+ (none) |
+ String |
+ The name under which to store the schema in the registry. |
+
+
+ avro-glue.schema.autoRegistration |
+ optional |
+ false |
+ Boolean |
+ Whether new schemas should be automatically registered rather than treated as errors. Only used when writing (serializing). Ignored when reading (deserializing).( |
+
+
+ avro-glue.schema.compression |
+ optional |
+ NONE |
+ String |
+ What kind of compression to use. Valid values are 'NONE' and 'ZLIB' . |
+
+
+ avro-glue.schema.compatibility |
+ optional |
+ BACKWARD |
+ String |
+ The schema compatibility mode under which to store the schema. Valid values are:
+ 'NONE' ,
+ 'DISABLED' ,
+ 'BACKWARD' ,
+ 'BACKWARD_ALL' ,
+ 'FORWARD' ,
+ 'FORWARD_ALL' ,
+ 'FULL' , and
+ 'FULL_ALL' .
+ Only used when schema auto-registration is enabled and when the schema is registered in the first place.
+ Ignored when reading or when a new schema version is auto-registered in an existing schema.
+ |
+
+
+ avro-glue.cache.size |
+ optional |
+ 200 |
+ Integer |
+ The size (in number of items, not bytes) of the cache the Glue client code should manage |
+
+
+ avro-glue.cache.ttlMs |
+ optional |
+ 1 day (24 * 60 * 60 * 1000) |
+ Integer |
+ The time to live (in milliseconds) for cache entries. Defaults to 1 day. |
+
+
+
+
+Note that the schema type (Generic or Specific Record) cannot be specified while using Table API.
+
+Schema Auto-registration
+------------------------
+
+By default, the schema auto-registration is disabled. When writing to a Kafka table new records are accepted only if a schema version that matches the table schema exactly is already registered in the Schema Registry at `registry.name` and `schema.name`. Otherwise, an exception is thrown.
+
+You can enable schema auto-registration setting the property `avro-glue.schema.autoRegistration` = `true`.
+
+When auto-registration is enabled, Flink will first check whether a schema matching the table schema is already registered in the Schema Registry. If the schema is already registered, the writer will reuse the schemaId.
+If the table schema does not match any schema version already registered at the specified `registry.name` and `schema.name`, the writer will try to auto-register a new schema version.
+
+When auto-registering a new schema version, there are two different cases:
+
+1. No schema is registered at the specified `registry.name` and `schema.name`: a new schema, matching the table schema, will be registered. The compatibility mode is set to the value of the `schema.compatibility` property.
+2. Another, different schema version is already registered at the specified `registry.name` and `schema.name`: in this case the new schema version will be accepted only it does not violate the schema evolution rules defined by the Compatibility Mode that has been set when the Schema has been created in the first place.
+
+When auto-registering a new schema, the schema compatibility mode is set based on the `avro-glue.schema.compatibility` property.
+
+Note that `avro-glue.schema.compatibility` is used only when a new schema is auto-registered in the first place. When a new schema version is auto-registered in an existing schema, the compatibility mode of the schema is never changed and the `avro-glue.schema.compatibility` is ignored.
+
+Data Type Mapping
+----------------
+
+Currently, Apache Flink always uses the table schema to derive the Avro reader schema during deserialization and Avro writer schema during serialization. Explicitly defining an Avro schema is not supported yet.
+See the [Apache Avro Format]({{< ref "docs/connectors/table/formats/avro" >}}#data-type-mapping) for the mapping between Avro and Flink DataTypes.
+
+In addition to the types listed there, Flink supports reading/writing nullable types. Flink maps nullable types to Avro `union(something, null)`, where `something` is the Avro type converted from Flink type.
+
+You can refer to [Avro Specification](https://avro.apache.org/docs/current/spec.html) for more information about Avro types.
\ No newline at end of file
diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/pom.xml b/flink-formats-aws/flink-avro-glue-schema-registry/pom.xml
index 35a1977c..5265a68e 100644
--- a/flink-formats-aws/flink-avro-glue-schema-registry/pom.xml
+++ b/flink-formats-aws/flink-avro-glue-schema-registry/pom.xml
@@ -39,6 +39,18 @@ under the License.
${flink.version}
provided
+
+ org.apache.flink
+ flink-table-api-java
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-table-common
+ ${flink.version}
+ provided
+
org.apache.flink
flink-avro
@@ -50,7 +62,7 @@ under the License.
${project.version}
-
+
software.amazon.glue
schema-registry-serde
${glue.schema.registry.version}
@@ -61,8 +73,36 @@ under the License.
${glue.schema.registry.version}
-
+
+
+ org.apache.flink
+ flink-table-runtime
+ ${flink.version}
+ test
+
+
+ org.apache.flink
+ flink-table-api-java
+ ${flink.version}
+ test
+ test-jar
+
+
+ org.apache.flink
+ flink-table-common
+ ${flink.version}
+ test
+ test-jar
+
+
+ org.apache.flink
+ flink-avro
+ ${flink.version}
+ test
+ test-jar
+
+
org.apache.flink
flink-architecture-tests-test
diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/AvroGlueFormatOptions.java b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/AvroGlueFormatOptions.java
new file mode 100644
index 00000000..0fac1a8a
--- /dev/null
+++ b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/AvroGlueFormatOptions.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.glue.schema.registry;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
+import com.amazonaws.services.schemaregistry.utils.AvroRecordType;
+import software.amazon.awssdk.services.glue.model.Compatibility;
+
+import java.time.Duration;
+
+/** Options for AWS Glue Schema Registry Avro format. */
+@PublicEvolving
+public class AvroGlueFormatOptions {
+ public static final ConfigOption AWS_REGION =
+ ConfigOptions.key("aws.region")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("AWS region");
+
+ public static final ConfigOption AWS_ENDPOINT =
+ ConfigOptions.key("aws.endpoint").stringType().noDefaultValue();
+
+ public static final ConfigOption CACHE_SIZE =
+ ConfigOptions.key("cache.size")
+ .intType()
+ .defaultValue(200)
+ .withDescription("Cache maximum size in *items*. Defaults to 200");
+
+ public static final ConfigOption CACHE_TTL_MS =
+ ConfigOptions.key("cache.ttlMs")
+ .longType()
+ .defaultValue(Duration.ofDays(1L).toMillis())
+ .withDescription("Cache TTL in milliseconds. Defaults to 1 day");
+
+ public static final ConfigOption REGISTRY_NAME =
+ ConfigOptions.key("registry.name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Registry name");
+
+ public static final ConfigOption SCHEMA_AUTO_REGISTRATION =
+ ConfigOptions.key("schema.autoRegistration")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether auto-registration is enabled. Defaults to false.");
+
+ public static final ConfigOption SCHEMA_COMPATIBILITY =
+ ConfigOptions.key("schema.compatibility")
+ .enumType(Compatibility.class)
+ .defaultValue(AWSSchemaRegistryConstants.DEFAULT_COMPATIBILITY_SETTING);
+
+ public static final ConfigOption SCHEMA_COMPRESSION =
+ ConfigOptions.key("schema.compression")
+ .enumType(AWSSchemaRegistryConstants.COMPRESSION.class)
+ .defaultValue(AWSSchemaRegistryConstants.COMPRESSION.NONE)
+ .withDescription("Compression type");
+
+ public static final ConfigOption SCHEMA_NAME =
+ ConfigOptions.key("schema.name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The Schema name under which to register the schema used by this format during serialization.");
+
+ public static final ConfigOption SCHEMA_TYPE =
+ ConfigOptions.key("schema.type")
+ .enumType(AvroRecordType.class)
+ .defaultValue(AvroRecordType.GENERIC_RECORD)
+ .withDescription("Record type");
+
+ private AvroGlueFormatOptions() {}
+}
diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroFormatFactory.java b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroFormatFactory.java
new file mode 100644
index 00000000..e4a250be
--- /dev/null
+++ b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroFormatFactory.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.glue.schema.registry;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.formats.avro.AvroRowDataDeserializationSchema;
+import org.apache.flink.formats.avro.AvroRowDataSerializationSchema;
+import org.apache.flink.formats.avro.AvroToRowDataConverters;
+import org.apache.flink.formats.avro.RowDataToAvroConverters;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
+import org.apache.avro.generic.GenericRecord;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.AWS_ENDPOINT;
+import static org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.AWS_REGION;
+import static org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.CACHE_SIZE;
+import static org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.CACHE_TTL_MS;
+import static org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.REGISTRY_NAME;
+import static org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.SCHEMA_AUTO_REGISTRATION;
+import static org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.SCHEMA_COMPATIBILITY;
+import static org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.SCHEMA_COMPRESSION;
+import static org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.SCHEMA_NAME;
+import static org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.SCHEMA_TYPE;
+
+/**
+ * Table format factory for providing configured instances of AWS Glue Schema Registry Avro to
+ * RowData {@link SerializationSchema} and {@link DeserializationSchema}.
+ */
+@Internal
+public class GlueSchemaRegistryAvroFormatFactory
+ implements DeserializationFormatFactory, SerializationFormatFactory {
+
+ public static final String IDENTIFIER = "avro-glue";
+
+ @Override
+ public DecodingFormat> createDecodingFormat(
+ DynamicTableFactory.Context context, ReadableConfig formatOptions) {
+ FactoryUtil.validateFactoryOptions(this, formatOptions);
+ final Map configMap = buildConfigMap(formatOptions);
+
+ return new DecodingFormat>() {
+ @Override
+ public DeserializationSchema createRuntimeDecoder(
+ DynamicTableSource.Context context, DataType producedDataType) {
+ final RowType rowType = (RowType) producedDataType.getLogicalType();
+ final TypeInformation rowDataTypeInfo =
+ context.createTypeInformation(producedDataType);
+ final org.apache.avro.Schema avroSchema =
+ AvroSchemaConverter.convertToSchema(rowType);
+ final GlueSchemaRegistryAvroDeserializationSchema nestedSchema =
+ GlueSchemaRegistryAvroDeserializationSchema.forGeneric(
+ avroSchema, configMap);
+ final AvroToRowDataConverters.AvroToRowDataConverter runtimeConverter =
+ AvroToRowDataConverters.createRowConverter(rowType);
+ return new AvroRowDataDeserializationSchema(
+ nestedSchema, runtimeConverter, rowDataTypeInfo);
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.insertOnly();
+ }
+ };
+ }
+
+ @Override
+ public EncodingFormat> createEncodingFormat(
+ DynamicTableFactory.Context context, ReadableConfig formatOptions) {
+ FactoryUtil.validateFactoryOptions(this, formatOptions);
+
+ return new EncodingFormat>() {
+ @Override
+ public SerializationSchema createRuntimeEncoder(
+ DynamicTableSink.Context context, DataType consumedDataType) {
+ final RowType rowType = (RowType) consumedDataType.getLogicalType();
+ final org.apache.avro.Schema avroSchema =
+ AvroSchemaConverter.convertToSchema(rowType);
+ final String transportName = formatOptions.get(SCHEMA_NAME);
+ final Map configMap = buildConfigMap(formatOptions);
+ final GlueSchemaRegistryAvroSerializationSchema nestedSchema =
+ GlueSchemaRegistryAvroSerializationSchema.forGeneric(
+ avroSchema, transportName, configMap);
+ final RowDataToAvroConverters.RowDataToAvroConverter runtimeConverter =
+ RowDataToAvroConverters.createConverter(rowType);
+ return new AvroRowDataSerializationSchema(rowType, nestedSchema, runtimeConverter);
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.insertOnly();
+ }
+ };
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set> requiredOptions() {
+ Set> result = new HashSet<>();
+ result.add(REGISTRY_NAME);
+ result.add(AWS_REGION);
+ result.add(SCHEMA_NAME);
+ return result;
+ }
+
+ @Override
+ public Set> optionalOptions() {
+ Set> result = new HashSet<>();
+ result.add(AWS_ENDPOINT);
+ result.add(CACHE_SIZE);
+ result.add(CACHE_TTL_MS);
+ result.add(SCHEMA_AUTO_REGISTRATION);
+ result.add(SCHEMA_COMPATIBILITY);
+ result.add(SCHEMA_COMPRESSION);
+ result.add(SCHEMA_TYPE);
+ return result;
+ }
+
+ private Map buildConfigMap(ReadableConfig formatOptions) {
+ final Map properties = new HashMap();
+
+ formatOptions
+ .getOptional(AWS_REGION)
+ .ifPresent(v -> properties.put(AWSSchemaRegistryConstants.AWS_REGION, v));
+ formatOptions
+ .getOptional(AWS_ENDPOINT)
+ .ifPresent(v -> properties.put(AWSSchemaRegistryConstants.AWS_ENDPOINT, v));
+ formatOptions
+ .getOptional(CACHE_SIZE)
+ .ifPresent(v -> properties.put(AWSSchemaRegistryConstants.CACHE_SIZE, v));
+ formatOptions
+ .getOptional(CACHE_TTL_MS)
+ .ifPresent(
+ v ->
+ properties.put(
+ AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, v));
+ formatOptions
+ .getOptional(REGISTRY_NAME)
+ .ifPresent(v -> properties.put(AWSSchemaRegistryConstants.REGISTRY_NAME, v));
+ formatOptions
+ .getOptional(SCHEMA_AUTO_REGISTRATION)
+ .ifPresent(
+ v ->
+ properties.put(
+ AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING,
+ v));
+ formatOptions
+ .getOptional(SCHEMA_COMPATIBILITY)
+ .ifPresent(
+ v -> properties.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, v));
+ formatOptions
+ .getOptional(SCHEMA_COMPRESSION)
+ .ifPresent(v -> properties.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, v));
+ formatOptions
+ .getOptional(SCHEMA_TYPE)
+ .ifPresent(v -> properties.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, v));
+ return properties;
+ }
+}
diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroSchemaCoderProvider.java b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroSchemaCoderProvider.java
index d4c24f03..c20e616f 100644
--- a/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroSchemaCoderProvider.java
+++ b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroSchemaCoderProvider.java
@@ -22,6 +22,7 @@
import org.apache.flink.formats.avro.SchemaCoder;
import java.util.Map;
+import java.util.Objects;
/** Provider for {@link GlueSchemaRegistryAvroSchemaCoder}. */
@PublicEvolving
@@ -50,4 +51,30 @@ public GlueSchemaRegistryAvroSchemaCoderProvider(Map configs) {
public GlueSchemaRegistryAvroSchemaCoder get() {
return new GlueSchemaRegistryAvroSchemaCoder(transportName, configs);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ GlueSchemaRegistryAvroSchemaCoderProvider that =
+ (GlueSchemaRegistryAvroSchemaCoderProvider) o;
+ if (transportName == null) {
+ if (that.transportName != null) {
+ return false;
+ }
+ } else if (!transportName.equals(that.transportName)) {
+ return false;
+ }
+ return configs.equals(that.configs);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(transportName, configs);
+ }
}
diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 00000000..28a2da31
--- /dev/null
+++ b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroFormatFactory
\ No newline at end of file
diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryFormatFactoryTest.java b/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryFormatFactoryTest.java
new file mode 100644
index 00000000..d39f0a16
--- /dev/null
+++ b/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryFormatFactoryTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.glue.schema.registry;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.formats.avro.AvroRowDataDeserializationSchema;
+import org.apache.flink.formats.avro.AvroRowDataSerializationSchema;
+import org.apache.flink.formats.avro.AvroToRowDataConverters;
+import org.apache.flink.formats.avro.RowDataToAvroConverters;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.TestDynamicTableFactory;
+import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
+import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link GlueSchemaRegistryAvroFormatFactory}. */
+public class GlueSchemaRegistryFormatFactoryTest {
+
+ private static final ResolvedSchema SCHEMA =
+ ResolvedSchema.of(
+ Column.physical("a", DataTypes.STRING()),
+ Column.physical("b", DataTypes.INT()),
+ Column.physical("c", DataTypes.BOOLEAN()));
+
+ private static final RowType ROW_TYPE =
+ (RowType) SCHEMA.toPhysicalRowDataType().getLogicalType();
+
+ private static final String SCHEMA_NAME = "test-subject";
+ private static final String REGISTRY_NAME = "test-registry-name";
+ private static final String REGION = "us-middle-1";
+ private static final Map REGISTRY_CONFIG =
+ Map.of(
+ AWSSchemaRegistryConstants.REGISTRY_NAME, REGISTRY_NAME,
+ AWSSchemaRegistryConstants.AWS_REGION, REGION);
+
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testDeserializationSchema() {
+ final AvroRowDataDeserializationSchema expectedDeser =
+ new AvroRowDataDeserializationSchema(
+ GlueSchemaRegistryAvroDeserializationSchema.forGeneric(
+ AvroSchemaConverter.convertToSchema(ROW_TYPE), REGISTRY_CONFIG),
+ AvroToRowDataConverters.createRowConverter(ROW_TYPE),
+ InternalTypeInfo.of(ROW_TYPE));
+
+ final DynamicTableSource actualSource = createTableSource(SCHEMA, getDefaultOptions());
+ assertThat(actualSource, instanceOf(TestDynamicTableFactory.DynamicTableSourceMock.class));
+ TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
+ (TestDynamicTableFactory.DynamicTableSourceMock) actualSource;
+
+ DeserializationSchema actualDeser =
+ scanSourceMock.valueFormat.createRuntimeDecoder(
+ ScanRuntimeProviderContext.INSTANCE, SCHEMA.toPhysicalRowDataType());
+
+ assertEquals(expectedDeser, actualDeser);
+ }
+
+ @Test
+ public void testSerializationSchema() {
+ final AvroRowDataSerializationSchema expectedSer =
+ new AvroRowDataSerializationSchema(
+ ROW_TYPE,
+ GlueSchemaRegistryAvroSerializationSchema.forGeneric(
+ AvroSchemaConverter.convertToSchema(ROW_TYPE),
+ SCHEMA_NAME,
+ REGISTRY_CONFIG),
+ RowDataToAvroConverters.createConverter(ROW_TYPE));
+
+ final DynamicTableSink actualSink = createTableSink(SCHEMA, getDefaultOptions());
+ assertThat(actualSink, instanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class));
+ TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
+ (TestDynamicTableFactory.DynamicTableSinkMock) actualSink;
+
+ SerializationSchema actualSer =
+ sinkMock.valueFormat.createRuntimeEncoder(null, SCHEMA.toPhysicalRowDataType());
+
+ assertEquals(expectedSer, actualSer);
+ }
+
+ @Test
+ public void testMissingSubjectForSink() {
+ thrown.expect(ValidationException.class);
+ final Map options =
+ getModifiedOptions(opts -> opts.remove("avro-glue.schema.name"));
+
+ createTableSink(SCHEMA, options);
+ }
+
+ /**
+ * Returns the full options modified by the given consumer {@code optionModifier}.
+ *
+ * @param optionModifier Consumer to modify the options
+ */
+ private Map getModifiedOptions(Consumer