Skip to content
Carlos Miranda edited this page May 26, 2022 · 1 revision

.NET Core 2.1 and later using Hosted Service

Required Packages

Optional Packages

Usage – Simple and Forever retries policies

Simple

.AddMiddlewares(
    middlewares => middlewares // KafkaFlow middlewares
    .RetrySimple(
        (config) => config
            .Handle<ExternalGatewayException>() // Exceptions to be handled
            .TryTimes(3)
            .WithTimeBetweenTriesPlan((retryCount) => 
                TimeSpan.FromMilliseconds(Math.Pow(2, retryCount)*1000) // exponential backoff
            )
    )

Forever

.AddMiddlewares( 
    middlewares => middlewares // KafkaFlow middlewares
    .RetryForever(
        (config) => config
            .Handle<DatabaseTimeoutException>() // Exceptions to be handled
            .WithTimeBetweenTriesPlan(
                TimeSpan.FromMilliseconds(500),
                TimeSpan.FromMilliseconds(1000)
            )
    )
 

Usage – Durable retry policy

Durable

.AddMiddlewares( 
    middlewares => middlewares // KafkaFlow middlewares
    .RetryDurable(
            config => config
                .Handle<NonBlockingException>() // Exceptions to be handled
                .WithMessageType(typeof(TestMessage)) // Message type to be consumed
                .WithEmbeddedRetryCluster( // Retry consumer config
                    cluster,
                    config => config
                        .WithRetryTopicName("test-topic-retry")
                        .WithRetryConsumerBufferSize(4)
                        .WithRetryConsumerWorkersCount(2)
                        .WithRetryConusmerStrategy(RetryConsumerStrategy.GuaranteeOrderedConsumption)
                        .WithRetryTypedHandlers(
                            handlers => handlers
                                .WithHandlerLifetime(InstanceLifetime.Transient)
                                .AddHandler<Handler>()
                        ).Enabled(true)
                )
                .WithQueuePollingJobConfiguration( // Polling configuration
                    config => config
                        .WithId("custom_search_key")
                        .WithCronExpression("0 0/1 * 1/1 * ? *")
                        .WithExpirationIntervalFactor(1)
                        .WithFetchSize(10)
                        .Enabled(true)
                )                      
                .WithMongoDbDataProvider( // Persistence configuration
                    mongoDbconnectionString,
                    mongoDbdatabaseName,
                    mongoDbretryQueueCollectionName,
                    mongoDbretryQueueItemCollectionName
                )          
                .WithRetryPlanBeforeRetryDurable( // Chained simple retry before triggering durable 
                    config => config
                        .TryTimes(3)
                        .WithTimeBetweenTriesPlan(
                            TimeSpan.FromMilliseconds(250),
                            TimeSpan.FromMilliseconds(500),
                            TimeSpan.FromMilliseconds(1000))
                        .ShouldPauseConsumer(false)
                )
        )
    )