Skip to content

Commit 632db20

Browse files
committed
Enable setting compression via the KafkaAttribute
1 parent 407ec1c commit 632db20

File tree

5 files changed

+65
-1
lines changed

5 files changed

+65
-1
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,8 @@ The settings exposed here are targeted to more advanced users that want to custo
322322
|LibkafkaDebug|debug|Both
323323
|MetadataMaxAgeMs|metadata.max.age.ms|Both
324324
|SocketKeepaliveEnable|socket.keepalive.enable|Both
325+
|CompressionType|compression.codec|Output
326+
|CompressionLevel|compression.level|Output
325327

326328
**NOTE:** `MetadataMaxAgeMs` default is `180000` `SocketKeepaliveEnable` default is `true` otherwise, the default value is the same as the [Configuration properties](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). The reason of the default settings, refer to this [issue](https://github.com/Azure/azure-functions-kafka-extension/issues/187).
327329
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+
namespace Microsoft.Azure.WebJobs.Extensions.Kafka
5+
{
6+
/// <summary>
7+
/// Defines the message compression type
8+
/// </summary>
9+
public enum MessageCompressionType
10+
{
11+
NotSet = -1,
12+
None = 0,
13+
Gzip = 1,
14+
Snappy = 2,
15+
Lz4 = 3,
16+
Zstd = 4
17+
}
18+
}

src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ namespace Microsoft.Azure.WebJobs.Extensions.Kafka
1515
[Binding]
1616
public sealed class KafkaAttribute : Attribute
1717
{
18+
private int? compressionLevel;
1819
private int? maxMessageBytes;
1920
private int? batchSize;
2021
private bool? enableIdempotence;
@@ -146,5 +147,17 @@ public KafkaAttribute()
146147
/// ssl.key.password in librdkafka
147148
/// </summary>
148149
public string SslKeyPassword { get; set; }
150+
151+
/// <summary>
152+
/// Compression level parameter for algorithm selected by configuration property <see cref="CompressionType"/>
153+
/// compression.level in librdkafka
154+
/// </summary>
155+
public int CompressionLevel { get => compressionLevel.GetValueOrDefault(-1); set => compressionLevel = value; }
156+
157+
/// <summary>
158+
/// Compression codec to use for compressing message sets.
159+
/// compression.codec in librdkafka
160+
/// </summary>
161+
public MessageCompressionType CompressionType { get; set; } = MessageCompressionType.NotSet;
149162
}
150163
}

src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,9 +130,15 @@ public ProducerConfig GetProducerConfig(KafkaProducerEntity entity)
130130
SslCaLocation = resolvedSslCaLocation,
131131
Debug = kafkaOptions?.LibkafkaDebug,
132132
MetadataMaxAgeMs = kafkaOptions?.MetadataMaxAgeMs,
133-
SocketKeepaliveEnable = kafkaOptions?.SocketKeepaliveEnable
133+
SocketKeepaliveEnable = kafkaOptions?.SocketKeepaliveEnable,
134+
CompressionLevel = entity.Attribute.CompressionLevel
134135
};
135136

137+
if (entity.Attribute.CompressionType != MessageCompressionType.NotSet)
138+
{
139+
conf.CompressionType = (CompressionType)entity.Attribute.CompressionType;
140+
}
141+
136142
if (entity.Attribute.AuthenticationMode != BrokerAuthenticationMode.NotSet)
137143
{
138144
conf.SaslMechanism = (SaslMechanism)entity.Attribute.AuthenticationMode;

test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,5 +304,30 @@ public void GetProducerConfig_When_Ssl_Locations_Resolve_InAzure_Should_Contain_
304304
Assert.Equal(sslCa.FullName, config.SslCaLocation);
305305
Assert.Equal(sslKeyLocation.FullName, config.SslKeyLocation);
306306
}
307+
308+
[Theory]
309+
[InlineData(MessageCompressionType.NotSet, null)]
310+
[InlineData(MessageCompressionType.None, CompressionType.None)]
311+
[InlineData(MessageCompressionType.Gzip, CompressionType.Gzip)]
312+
[InlineData(MessageCompressionType.Snappy, CompressionType.Snappy)]
313+
[InlineData(MessageCompressionType.Lz4, CompressionType.Lz4)]
314+
[InlineData(MessageCompressionType.Zstd, CompressionType.Zstd)]
315+
public void GetProducerConfig_When_CompressionType_Defined_Should_Set_CompressionType(MessageCompressionType sourceType, CompressionType? targetType)
316+
{
317+
var attribute = new KafkaAttribute("brokers:9092", "myTopic")
318+
{
319+
CompressionType = sourceType
320+
};
321+
322+
var entity = new KafkaProducerEntity()
323+
{
324+
Attribute = attribute,
325+
ValueType = typeof(ProtoUser),
326+
};
327+
328+
var factory = new KafkaProducerFactory(emptyConfiguration, new DefaultNameResolver(emptyConfiguration), NullLoggerProvider.Instance);
329+
var config = factory.GetProducerConfig(entity);
330+
Assert.Equal(targetType, config.CompressionType);
331+
}
307332
}
308333
}

0 commit comments

Comments
 (0)