Skip to content

Serializer Middleware

Carlos Miranda edited this page May 26, 2022 · 1 revision

Kafka.Flow Commonly Used

KafkaFlow-Serializer

Avro Sample

internal static class KafkaClusterConfigurationBuilder
{
        internal static IClusterConfigurationBuilder SetupRetryDurableMongoAvroDb(
            this IClusterConfigurationBuilder cluster,
            string mongoDbConnectionString,
            string mongoDbDatabaseName,
            string mongoDbRetryQueueCollectionName,
            string mongoDbRetryQueueItemCollectionName)
        {
           cluster.AddProducer(
                    "kafka-flow-retry-durable-mongodb-avro-producer",
                    producer => producer
                        .DefaultTopic("sample-kafka-flow-retry-durable-mongodb-avro-topic")
                        .WithCompression(Confluent.Kafka.CompressionType.Gzip)
                        .AddMiddlewares(
                            middlewares => middlewares
                                .AddSchemaRegistryAvroSerializer(
                                    new AvroSerializerConfig
                                    {
                                        SubjectNameStrategy = SubjectNameStrategy.TopicRecord
                                    })
                        )
                        .WithAcks(Acks.All)
              )
              .AddConsumer(
                    consumer => consumer
                        .Topic("sample-kafka-flow-retry-durable-mongodb-avro-topic")
                        .WithGroupId("sample-consumer-kafka-flow-retry-durable-mongodb-avro")
                        .WithName("kafka-flow-retry-durable-mongodb-avro-consumer")
                        .WithBufferSize(10)
                        .WithWorkersCount(20)
                        .WithAutoOffsetReset(AutoOffsetReset.Latest)
                        .AddMiddlewares(
                            middlewares => middlewares
                                .AddSchemaRegistryAvroSerializer()
                                .RetryDurable(
                                    configure => configure
                                        .Handle<RetryDurableTestException>()
                                        .WithMessageType(typeof(AvroLogMessage))
                                        .WithMessageSerializeSettings(new JsonSerializerSettings
                                        {
                                            ContractResolver = new WritablePropertiesOnlyResolver()
                                        })
                                        ...
                );

Contract Resolver

The DefaultContractResolver is the default resolver used by the serializer. It provides many avenues of extensibility in the form of virtual methods that can be overridden.

internal class WritablePropertiesOnlyResolver : DefaultContractResolver
{
        protected override IList<JsonProperty> CreateProperties(Type type, MemberSerialization memberSerialization)
        {
            IList<JsonProperty> props = base.CreateProperties(type, memberSerialization);
            return props.Where(p => p.Writable).ToList();
        }
}
Clone this wiki locally