File tree Expand file tree Collapse file tree 3 files changed +23
-6
lines changed
src/Microsoft.Azure.WebJobs.Extensions.Kafka Expand file tree Collapse file tree 3 files changed +23
-6
lines changed Original file line number Diff line number Diff line change @@ -219,6 +219,22 @@ public int ChannelFullRetryIntervalInMs
219
219
}
220
220
}
221
221
222
+ /// <summary>
223
+ /// Default compression level parameter for algorithm selected by configuration property <see cref="CompressionType"/>
224
+ /// Can be overridden by setting <see cref="KafkaAttribute.CompressionLevel"/>
225
+ ///
226
+ /// compression.level in librdkafka
227
+ /// </summary>
228
+ public int ? CompressionLevel { get ; set ; }
229
+
230
+ /// <summary>
231
+ /// Default compression codec to use for compressing message sets.
232
+ /// Can be overridden by setting <see cref="KafkaAttribute.CompressionType"/>
233
+ ///
234
+ /// compression.codec in librdkafka
235
+ /// </summary>
236
+ public MessageCompressionType ? CompressionType { get ; set ; }
237
+
222
238
public string Format ( )
223
239
{
224
240
var serializerSettings = new JsonSerializerSettings ( )
Original file line number Diff line number Diff line change @@ -15,7 +15,8 @@ namespace Microsoft.Azure.WebJobs.Extensions.Kafka
15
15
[ Binding ]
16
16
public sealed class KafkaAttribute : Attribute
17
17
{
18
- private int ? compressionLevel ;
18
+ internal int ? DefinedCompressionLevel ;
19
+ internal MessageCompressionType ? DefinedCompressionType ;
19
20
private int ? maxMessageBytes ;
20
21
private int ? batchSize ;
21
22
private bool ? enableIdempotence ;
@@ -152,12 +153,12 @@ public KafkaAttribute()
152
153
/// Compression level parameter for algorithm selected by configuration property <see cref="CompressionType"/>
153
154
/// compression.level in librdkafka
154
155
/// </summary>
155
- public int CompressionLevel { get => compressionLevel . GetValueOrDefault ( - 1 ) ; set => compressionLevel = value ; }
156
+ public int CompressionLevel { set => DefinedCompressionLevel = value ; }
156
157
157
158
/// <summary>
158
159
/// Compression codec to use for compressing message sets.
159
160
/// compression.codec in librdkafka
160
161
/// </summary>
161
- public MessageCompressionType CompressionType { get ; set ; } = MessageCompressionType . NotSet ;
162
+ public MessageCompressionType CompressionType { set => DefinedCompressionType = value ; }
162
163
}
163
164
}
Original file line number Diff line number Diff line change @@ -131,12 +131,12 @@ public ProducerConfig GetProducerConfig(KafkaProducerEntity entity)
131
131
Debug = kafkaOptions ? . LibkafkaDebug ,
132
132
MetadataMaxAgeMs = kafkaOptions ? . MetadataMaxAgeMs ,
133
133
SocketKeepaliveEnable = kafkaOptions ? . SocketKeepaliveEnable ,
134
- CompressionLevel = entity . Attribute . CompressionLevel
134
+ CompressionLevel = entity . Attribute . DefinedCompressionLevel ?? kafkaOptions . CompressionLevel ?? - 1
135
135
} ;
136
136
137
- if ( entity . Attribute . CompressionType != MessageCompressionType . NotSet )
137
+ if ( ( entity . Attribute . DefinedCompressionType ?? kafkaOptions . CompressionType ) != MessageCompressionType . NotSet )
138
138
{
139
- conf . CompressionType = ( CompressionType ) entity . Attribute . CompressionType ;
139
+ conf . CompressionType = ( CompressionType ) ( entity . Attribute . DefinedCompressionType ?? kafkaOptions . CompressionType ) ;
140
140
}
141
141
142
142
if ( entity . Attribute . AuthenticationMode != BrokerAuthenticationMode . NotSet )
You can’t perform that action at this time.
0 commit comments