Skip to content

Commit

Permalink
feat: add helper method to configure linger time in producers
Browse files Browse the repository at this point in the history
  • Loading branch information
filipeesch committed Feb 1, 2021
1 parent c0e5bff commit 95a2f8b
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ public interface IProducerConfigurationBuilder
/// <returns></returns>
IProducerConfigurationBuilder WithAcks(Acks acks);

/// <summary>
/// Delay in milliseconds to wait for messages in the producer queue to accumulate before constructing message batches to transmit to brokers.
/// A higher value allows larger and more effective (less overhead, improved compression) batches of messages to accumulate at the expense of increased message delivery latency.
/// default: 0.5 (500 microseconds)
/// importance: high
/// </summary>
/// <param name="lingerMs">The time in milliseconds to wait to build the message batch</param>
/// <returns></returns>
IProducerConfigurationBuilder WithLingerMs(double lingerMs);

/// <summary>
/// Adds a handler for the Kafka producer statistics
/// </summary>
Expand Down
9 changes: 9 additions & 0 deletions src/KafkaFlow/Configuration/ProducerConfigurationBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ internal class ProducerConfigurationBuilder : IProducerConfigurationBuilder
private ProducerConfig producerConfig;
private Acks? acks;
private int statisticsInterval;
private double? lingerMs;

public ProducerConfigurationBuilder(IDependencyConfigurator dependencyConfigurator, string name)
{
Expand Down Expand Up @@ -49,6 +50,12 @@ public IProducerConfigurationBuilder WithAcks(Acks acks)
return this;
}

public IProducerConfigurationBuilder WithLingerMs(double lingerMs)
{
this.lingerMs = lingerMs;
return this;
}

public IProducerConfigurationBuilder WithStatisticsHandler(Action<string> statisticsHandler)
{
this.statisticsHandlers.Add(statisticsHandler);
Expand All @@ -64,7 +71,9 @@ public IProducerConfigurationBuilder WithStatisticsIntervalMs(int statisticsInte
public ProducerConfiguration Build(ClusterConfiguration clusterConfiguration)
{
this.producerConfig ??= new ProducerConfig();

this.producerConfig.StatisticsIntervalMs = this.statisticsInterval;
this.producerConfig.LingerMs = this.lingerMs;

this.producerConfig.ReadSecurityInformation(clusterConfiguration);

Expand Down

0 comments on commit 95a2f8b

Please sign in to comment.