Skip to content

Commit 0e70574

Browse files
committed
Fallback of producer options
1 parent db8b961 commit 0e70574

File tree

5 files changed

+243
-39
lines changed

5 files changed

+243
-39
lines changed

src/Microsoft.Azure.WebJobs.Extensions.Kafka/Config/KafkaOptions.cs

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ namespace Microsoft.Azure.WebJobs.Extensions.Kafka
1414
/// <summary>
1515
/// Configuration for Kafka Web Jobs extension
1616
/// </summary>
17-
public class KafkaOptions : IOptionsFormatter
17+
public class KafkaOptions : IOptionsFormatter, IProducerOptions
1818
{
1919
/// <summary>
2020
/// The initial time to wait before reconnecting to a broker after the connection has been closed. The time is increased exponentially until `reconnect.backoff.max.ms` is reached. -25% to +50% jitter is applied to each reconnect backoff. A value of 0 disables the backoff and reconnects immediately.
@@ -142,21 +142,21 @@ public int MaxBatchSize
142142
/// A comma-separated list of debug contexts to enable: all,generic,broker,topic,metadata,producer,queue,msg,protocol,cgrp,security,fetch
143143
/// Librdkafka: debug
144144
/// </summary>
145-
public string LibkafkaDebug { get; set; } = null;
145+
public string LibkafkaDebug { get; set; }
146146

147147
// <summary>
148148
// Metadata cache max age.
149149
// https://github.com/Azure/azure-functions-kafka-extension/issues/187
150150
// default: 180000
151151
// </summary>
152-
public int? MetadataMaxAgeMs { get; set; } = 180000;
152+
public int? MetadataMaxAgeMs { get; set; }
153153

154154
// <summary>
155155
// Enable TCP keep-alives (SO_KEEPALIVE) on broker sockets
156156
// https://github.com/Azure/azure-functions-kafka-extension/issues/187
157157
// default: true
158158
// </summary>
159-
public bool? SocketKeepaliveEnable { get; set; } = true;
159+
public bool? SocketKeepaliveEnable { get; set; }
160160

161161
int subscriberIntervalInSeconds = 1;
162162
/// <summary>
@@ -235,6 +235,35 @@ public int ChannelFullRetryIntervalInMs
235235
/// </summary>
236236
public MessageCompressionType? CompressionType { get; set; }
237237

238+
public BrokerAuthenticationMode? AuthenticationMode => throw new NotImplementedException();
239+
240+
public int? BatchSize { get; set; }
241+
242+
public bool? EnableIdempotence { get; set; }
243+
244+
public int? MessageTimeoutMs { get; set; }
245+
246+
public int? MaxMessageBytes { get; set; }
247+
248+
public int? MaxRetries { get; set; }
249+
250+
public BrokerProtocol? Protocol { get; set; }
251+
252+
public int? RequestTimeoutMs { get; set; }
253+
254+
public string SslCaLocation { get; set; }
255+
256+
public string SslCertificateLocation { get; set; }
257+
258+
public string SslKeyLocation { get; set; }
259+
260+
public string SslKeyPassword { get; set; }
261+
262+
[JsonIgnore]
263+
public string Password { get; set; }
264+
265+
public string Username { get; set; }
266+
238267
public string Format()
239268
{
240269
var serializerSettings = new JsonSerializerSettings()
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+
using System;
5+
using Avro.Specific;
6+
using Confluent.Kafka;
7+
using Microsoft.Azure.WebJobs.Description;
8+
9+
namespace Microsoft.Azure.WebJobs.Extensions.Kafka
10+
{
11+
12+
internal interface IProducerOptions
13+
{
14+
BrokerAuthenticationMode? AuthenticationMode { get; }
15+
int? BatchSize { get; }
16+
int? CompressionLevel { get; }
17+
MessageCompressionType? CompressionType { get; }
18+
bool? EnableIdempotence { get; }
19+
int? MessageTimeoutMs { get; }
20+
int? MaxMessageBytes { get; }
21+
int? MaxRetries { get; }
22+
int? MetadataMaxAgeMs { get; }
23+
string Password { get; }
24+
BrokerProtocol? Protocol { get; }
25+
int? RequestTimeoutMs { get; }
26+
bool? SocketKeepaliveEnable { get; }
27+
string SslCaLocation { get; }
28+
string SslCertificateLocation { get; }
29+
string SslKeyLocation { get; }
30+
string SslKeyPassword { get; }
31+
string Username { get; }
32+
}
33+
}

src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs

Lines changed: 54 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,20 @@ namespace Microsoft.Azure.WebJobs.Extensions.Kafka
1313
/// </summary>
1414
[AttributeUsage(AttributeTargets.Parameter | AttributeTargets.ReturnValue)]
1515
[Binding]
16-
public sealed class KafkaAttribute : Attribute
16+
public sealed class KafkaAttribute : Attribute, IProducerOptions
1717
{
18-
internal int? DefinedCompressionLevel;
19-
internal MessageCompressionType? DefinedCompressionType;
18+
private BrokerAuthenticationMode? authenticationMode;
19+
private BrokerProtocol? protocol;
20+
private int? compressionLevel;
21+
private MessageCompressionType? compressionType;
2022
private int? maxMessageBytes;
23+
private int? metadataMaxAgeMs;
2124
private int? batchSize;
2225
private bool? enableIdempotence;
2326
private int? messageTimeoutMs;
2427
private int? requestTimeoutMs;
2528
private int? maxRetries;
29+
private bool? socketKeepAliveEnabled;
2630

2731
/// <summary>
2832
/// Initialize a new instance of the <see cref="KafkaAttribute"/>
@@ -63,7 +67,14 @@ public KafkaAttribute()
6367
/// <summary>
6468
/// Gets or sets the Maximum transmit message size. Default: 1MB
6569
/// </summary>
66-
public int MaxMessageBytes { get => maxMessageBytes.GetValueOrDefault(1000000); set => maxMessageBytes = value; }
70+
public int MaxMessageBytes { get => maxMessageBytes.GetValueOrDefault(); set => maxMessageBytes = value; }
71+
72+
// <summary>
73+
// Metadata cache max age.
74+
// https://github.com/Azure/azure-functions-kafka-extension/issues/187
75+
// default: 180000
76+
// </summary>
77+
public int MetadataMaxAgeMs { get => metadataMaxAgeMs.GetValueOrDefault(); set => metadataMaxAgeMs = value; }
6778

6879
/// <summary>
6980
/// Maximum number of messages batched in one MessageSet. default: 10000
@@ -78,18 +89,18 @@ public KafkaAttribute()
7889
/// <summary>
7990
/// Local message timeout. This value is only enforced locally and limits the time a produced message waits for successful delivery. A time of 0 is infinite. This is the maximum time used to deliver a message (including retries). Delivery error occurs when either the retry count or the message timeout are exceeded. default: 300000
8091
/// </summary>
81-
public int MessageTimeoutMs { get => messageTimeoutMs.GetValueOrDefault(300000); set => messageTimeoutMs = value; }
92+
public int MessageTimeoutMs { get => messageTimeoutMs.GetValueOrDefault(); set => messageTimeoutMs = value; }
8293

8394
/// <summary>
8495
/// The ack timeout of the producer request in milliseconds. default: 5000
8596
/// </summary>
86-
public int RequestTimeoutMs { get => requestTimeoutMs.GetValueOrDefault(5000); set => requestTimeoutMs = value; }
97+
public int RequestTimeoutMs { get => requestTimeoutMs.GetValueOrDefault(); set => requestTimeoutMs = value; }
8798

8899
/// <summary>
89100
/// How many times to retry sending a failing Message. **Note:** default: 2
90101
/// </summary>
91102
/// <remarks>Retrying may cause reordering unless <c>EnableIdempotence</c> is set to <c>true</c>.</remarks>
92-
public int MaxRetries { get => maxRetries.GetValueOrDefault(2); set => maxRetries = value; }
103+
public int MaxRetries { get => maxRetries.GetValueOrDefault(); set => maxRetries = value; }
93104

94105
/// <summary>
95106
/// SASL mechanism to use for authentication.
@@ -98,7 +109,7 @@ public KafkaAttribute()
98109
///
99110
/// sasl.mechanism in librdkafka
100111
/// </summary>
101-
public BrokerAuthenticationMode AuthenticationMode { get; set; } = BrokerAuthenticationMode.NotSet;
112+
public BrokerAuthenticationMode AuthenticationMode { get => authenticationMode.GetValueOrDefault(); set => authenticationMode = value; }
102113

103114
/// <summary>
104115
/// SASL username for use with the PLAIN and SASL-SCRAM-.. mechanisms
@@ -122,7 +133,7 @@ public KafkaAttribute()
122133
///
123134
/// security.protocol in librdkafka
124135
/// </summary>
125-
public BrokerProtocol Protocol { get; set; } = BrokerProtocol.NotSet;
136+
public BrokerProtocol Protocol { get => protocol.GetValueOrDefault(); set => protocol = value; }
126137

127138
/// <summary>
128139
/// Path to client's private key (PEM) used for authentication.
@@ -153,12 +164,44 @@ public KafkaAttribute()
153164
/// Compression level parameter for algorithm selected by configuration property <see cref="CompressionType"/>
154165
/// compression.level in librdkafka
155166
/// </summary>
156-
public int CompressionLevel { set => DefinedCompressionLevel = value; }
167+
public int CompressionLevel { get => compressionLevel.GetValueOrDefault(); set => compressionLevel = value; }
157168

158169
/// <summary>
159170
/// Compression codec to use for compressing message sets.
160171
/// compression.codec in librdkafka
161172
/// </summary>
162-
public MessageCompressionType CompressionType { set => DefinedCompressionType = value; }
173+
public MessageCompressionType CompressionType { get => compressionType.GetValueOrDefault(); set => compressionType = value; }
174+
175+
public bool SocketKeepAliveEnabled { get => socketKeepAliveEnabled.GetValueOrDefault(); set => socketKeepAliveEnabled = value; }
176+
177+
int? IProducerOptions.BatchSize => batchSize;
178+
int? IProducerOptions.CompressionLevel => compressionLevel;
179+
MessageCompressionType? IProducerOptions.CompressionType => compressionType;
180+
181+
bool? IProducerOptions.EnableIdempotence => enableIdempotence;
182+
183+
184+
int? IProducerOptions.MessageTimeoutMs => messageTimeoutMs;
185+
int? IProducerOptions.MaxMessageBytes => maxMessageBytes;
186+
187+
int? IProducerOptions.MaxRetries => maxRetries;
188+
189+
int? IProducerOptions.RequestTimeoutMs => requestTimeoutMs;
190+
191+
BrokerAuthenticationMode? IProducerOptions.AuthenticationMode => authenticationMode;
192+
193+
int? IProducerOptions.MetadataMaxAgeMs => metadataMaxAgeMs;
194+
195+
BrokerProtocol? IProducerOptions.Protocol => protocol;
196+
197+
bool? IProducerOptions.SocketKeepaliveEnable => socketKeepAliveEnabled;
198+
199+
string IProducerOptions.SslCaLocation => SslCaLocation;
200+
201+
string IProducerOptions.SslCertificateLocation => SslCertificateLocation;
202+
203+
string IProducerOptions.SslKeyLocation => SslKeyLocation;
204+
205+
string IProducerOptions.SslKeyPassword => SslKeyPassword;
163206
}
164207
}

src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs

Lines changed: 52 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -99,57 +99,85 @@ private IKafkaProducer Create(Handle producerBaseHandle, KafkaProducerEntity ent
9999

100100
public ProducerConfig GetProducerConfig(KafkaProducerEntity entity)
101101
{
102-
if (!AzureFunctionsFileHelper.TryGetValidFilePath(entity.Attribute.SslCertificateLocation, out var resolvedSslCertificationLocation))
102+
var kafkaOptions = this.config.Get<KafkaOptions>();
103+
104+
var mergedOptions = new MergedProducerOptions(kafkaOptions, entity.Attribute);
105+
106+
if (!AzureFunctionsFileHelper.TryGetValidFilePath(mergedOptions.SslCertificateLocation, out var resolvedSslCertificationLocation))
103107
{
104-
resolvedSslCertificationLocation = entity.Attribute.SslCertificateLocation;
108+
resolvedSslCertificationLocation = mergedOptions.SslCertificateLocation;
105109
}
106110

107-
if (!AzureFunctionsFileHelper.TryGetValidFilePath(entity.Attribute.SslCaLocation, out var resolvedSslCaLocation))
111+
if (!AzureFunctionsFileHelper.TryGetValidFilePath(mergedOptions.SslCaLocation, out var resolvedSslCaLocation))
108112
{
109-
resolvedSslCaLocation = entity.Attribute.SslCaLocation;
113+
resolvedSslCaLocation = mergedOptions.SslCaLocation;
110114
}
111115

112-
if (!AzureFunctionsFileHelper.TryGetValidFilePath(entity.Attribute.SslKeyLocation, out var resolvedSslKeyLocation))
116+
if (!AzureFunctionsFileHelper.TryGetValidFilePath(mergedOptions.SslKeyLocation, out var resolvedSslKeyLocation))
113117
{
114-
resolvedSslKeyLocation = entity.Attribute.SslKeyLocation;
118+
resolvedSslKeyLocation = mergedOptions.SslKeyLocation;
115119
}
116-
var kafkaOptions = this.config.Get<KafkaOptions>();
120+
117121
var conf = new ProducerConfig()
118122
{
119123
BootstrapServers = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.BrokerList),
120-
BatchNumMessages = entity.Attribute.BatchSize,
121-
EnableIdempotence = entity.Attribute.EnableIdempotence,
122-
MessageSendMaxRetries = entity.Attribute.MaxRetries,
123-
MessageTimeoutMs = entity.Attribute.MessageTimeoutMs,
124-
RequestTimeoutMs = entity.Attribute.RequestTimeoutMs,
125-
SaslPassword = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.Password),
126-
SaslUsername = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.Username),
124+
BatchNumMessages = mergedOptions.BatchSize,
125+
EnableIdempotence = mergedOptions.EnableIdempotence,
126+
MessageMaxBytes = mergedOptions.MaxMessageBytes,
127+
MessageSendMaxRetries = mergedOptions.MaxRetries,
128+
MessageTimeoutMs = mergedOptions.MessageTimeoutMs,
129+
RequestTimeoutMs = mergedOptions.RequestTimeoutMs,
130+
SaslPassword = this.config.ResolveSecureSetting(nameResolver, mergedOptions.Password),
131+
SaslUsername = this.config.ResolveSecureSetting(nameResolver, mergedOptions.Username),
127132
SslKeyLocation = resolvedSslKeyLocation,
128-
SslKeyPassword = entity.Attribute.SslKeyPassword,
133+
SslKeyPassword = mergedOptions.SslKeyPassword,
129134
SslCertificateLocation = resolvedSslCertificationLocation,
130135
SslCaLocation = resolvedSslCaLocation,
131136
Debug = kafkaOptions?.LibkafkaDebug,
132-
MetadataMaxAgeMs = kafkaOptions?.MetadataMaxAgeMs,
133-
SocketKeepaliveEnable = kafkaOptions?.SocketKeepaliveEnable,
134-
CompressionLevel = entity.Attribute.DefinedCompressionLevel ?? kafkaOptions.CompressionLevel ?? -1
137+
MetadataMaxAgeMs = mergedOptions.MetadataMaxAgeMs,
138+
SocketKeepaliveEnable = mergedOptions.SocketKeepaliveEnable,
139+
CompressionLevel = mergedOptions.CompressionLevel
135140
};
136141

137-
if ((entity.Attribute.DefinedCompressionType ?? kafkaOptions.CompressionType) != MessageCompressionType.NotSet)
142+
if (mergedOptions.CompressionType != MessageCompressionType.NotSet)
138143
{
139-
conf.CompressionType = (CompressionType)(entity.Attribute.DefinedCompressionType ?? kafkaOptions.CompressionType);
144+
conf.CompressionType = (CompressionType)mergedOptions.CompressionType;
140145
}
141146

142-
if (entity.Attribute.AuthenticationMode != BrokerAuthenticationMode.NotSet)
147+
if (mergedOptions.AuthenticationMode != BrokerAuthenticationMode.NotSet)
143148
{
144-
conf.SaslMechanism = (SaslMechanism)entity.Attribute.AuthenticationMode;
149+
conf.SaslMechanism = (SaslMechanism)mergedOptions.AuthenticationMode;
145150
}
146151

147-
if (entity.Attribute.Protocol != BrokerProtocol.NotSet)
152+
if (mergedOptions.Protocol != BrokerProtocol.NotSet)
148153
{
149-
conf.SecurityProtocol = (SecurityProtocol)entity.Attribute.Protocol;
154+
conf.SecurityProtocol = (SecurityProtocol)mergedOptions.Protocol;
150155
}
151156

152157
return conf;
153158
}
154159
}
160+
161+
public class Fallback<T>
162+
{
163+
private readonly T[] providers;
164+
165+
public Fallback(params T[] providers)
166+
{
167+
this.providers = providers;
168+
}
169+
170+
public TValue GetValue<TValue>(Func<T, TValue> valueGetter, TValue defaultValue)
171+
{
172+
foreach (var provider in providers)
173+
{
174+
var value = valueGetter(provider);
175+
if (value != null)
176+
{
177+
return value;
178+
}
179+
}
180+
return defaultValue;
181+
}
182+
}
155183
}

0 commit comments

Comments
 (0)