diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/IKafkaEventData.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/IKafkaEventData.cs index e1ba90a9..c66a61e3 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/IKafkaEventData.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/IKafkaEventData.cs @@ -14,5 +14,6 @@ public interface IKafkaEventData string Topic { get; } DateTime Timestamp { get; } IKafkaEventDataHeaders Headers { get; } + string ConsumerGroup { get; } } } \ No newline at end of file diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/KafkaEventData.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/KafkaEventData.cs index 9febc97b..426ffd5a 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/KafkaEventData.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/KafkaEventData.cs @@ -21,6 +21,8 @@ public class KafkaEventData : IKafkaEventData object IKafkaEventData.Key => this.Key; + public string ConsumerGroup { get; internal set; } + public KafkaEventData() { this.Headers = new KafkaEventDataHeaders(false); @@ -31,8 +33,10 @@ public KafkaEventData(TKey key, TValue value) : this() this.Key = key; this.Value = value; } - - public KafkaEventData(ConsumeResult consumeResult) + public KafkaEventData(ConsumeResult consumeResult) : this(consumeResult, null) + { + } + public KafkaEventData(ConsumeResult consumeResult, string consumerGroup) { this.Key = consumeResult.Key; this.Value = consumeResult.Value; @@ -40,6 +44,7 @@ public KafkaEventData(ConsumeResult consumeResult) this.Partition = consumeResult.Partition; this.Timestamp = consumeResult.Message.Timestamp.UtcDateTime; this.Topic = consumeResult.Topic; + this.ConsumerGroup = consumerGroup; if (consumeResult.Headers?.Count > 0) { this.Headers = new KafkaEventDataHeaders(consumeResult.Message.Headers); @@ -63,7 +68,9 @@ public class KafkaEventData : IKafkaEventData object IKafkaEventData.Key => null; - public IKafkaEventDataHeaders Headers { get; private set; } + public IKafkaEventDataHeaders Headers { get; private set; } + + public string ConsumerGroup { get; internal set; } public KafkaEventData() { @@ -80,7 +87,7 @@ internal KafkaEventData(KafkaEventDataHeaders headers) this.Headers = headers; } - internal static KafkaEventData CreateFrom(ConsumeResult consumeResult) + internal static KafkaEventData CreateFrom(ConsumeResult consumeResult, string consumerGroup = null) { KafkaEventDataHeaders headers; if (consumeResult.Headers?.Count > 0) @@ -98,7 +105,8 @@ internal static KafkaEventData CreateFrom(ConsumeResult(consumeResult) : - KafkaEventData.CreateFrom(consumeResult); + (IKafkaEventData)new KafkaEventData(consumeResult, this.consumerGroup) : + KafkaEventData.CreateFrom(consumeResult, this.consumerGroup); // add message to executor // if executor pending items is full, flush it diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerBindingStrategy.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerBindingStrategy.cs index d0c48e25..06ce15f6 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerBindingStrategy.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerBindingStrategy.cs @@ -48,7 +48,7 @@ public Dictionary GetBindingContract(bool isSingleDispatch = true) AddBindingContractMember(contract, nameof(KafkaEventData.Topic), typeof(string), isSingleDispatch); AddBindingContractMember(contract, nameof(KafkaEventData.Timestamp), typeof(DateTime), isSingleDispatch); AddBindingContractMember(contract, nameof(KafkaEventData.Offset), typeof(long), isSingleDispatch); - + AddBindingContractMember(contract, nameof(KafkaEventData.ConsumerGroup), typeof(string), isSingleDispatch); return contract; } @@ -89,12 +89,14 @@ internal static void AddBindingData(Dictionary bindingData, IKaf var timestamps = new DateTime[length]; var topics = new string[length]; var keys = new object[length]; + var consumerGroups = new string[length]; bindingData.Add("PartitionArray", partitions); bindingData.Add("OffsetArray", offsets); bindingData.Add("TimestampArray", timestamps); bindingData.Add("TopicArray", topics); bindingData.Add("KeyArray", keys); + bindingData.Add("ConsumerGroupArray", consumerGroups); for (int i = 0; i < events.Length; i++) { @@ -103,6 +105,7 @@ internal static void AddBindingData(Dictionary bindingData, IKaf timestamps[i] = events[i].Timestamp; keys[i] = events[i].Key; topics[i] = events[i].Topic; + consumerGroups[i] = events[i].ConsumerGroup; } } @@ -113,6 +116,7 @@ private static void AddBindingData(Dictionary bindingData, IKafk bindingData.Add(nameof(IKafkaEventData.Topic), eventData.Topic); bindingData.Add(nameof(IKafkaEventData.Timestamp), eventData.Timestamp); bindingData.Add(nameof(IKafkaEventData.Offset), eventData.Offset); + bindingData.Add(nameof(IKafkaEventData.ConsumerGroup), eventData.ConsumerGroup); } } } \ No newline at end of file diff --git a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaTriggerBindingStrategyTest.cs b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaTriggerBindingStrategyTest.cs index 2be4ef39..10bae5af 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaTriggerBindingStrategyTest.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaTriggerBindingStrategyTest.cs @@ -15,12 +15,13 @@ public void GetStaticBindingContract_ReturnsExpectedValue() var strategy = new KafkaTriggerBindingStrategy(); var contract = strategy.GetBindingContract(); - Assert.Equal(5, contract.Count); + Assert.Equal(6, contract.Count); Assert.Equal(typeof(object), contract["Key"]); Assert.Equal(typeof(int), contract["Partition"]); Assert.Equal(typeof(string), contract["Topic"]); Assert.Equal(typeof(DateTime), contract["Timestamp"]); Assert.Equal(typeof(long), contract["Offset"]); + Assert.Equal(typeof(string), contract["ConsumerGroup"]); } [Fact] @@ -29,12 +30,13 @@ public void GetBindingContract_SingleDispatch_ReturnsExpectedValue() var strategy = new KafkaTriggerBindingStrategy(); var contract = strategy.GetBindingContract(true); - Assert.Equal(5, contract.Count); + Assert.Equal(6, contract.Count); Assert.Equal(typeof(object), contract["Key"]); Assert.Equal(typeof(int), contract["Partition"]); Assert.Equal(typeof(string), contract["Topic"]); Assert.Equal(typeof(DateTime), contract["Timestamp"]); Assert.Equal(typeof(long), contract["Offset"]); + Assert.Equal(typeof(string), contract["ConsumerGroup"]); } [Fact] @@ -48,6 +50,7 @@ public void SingleDispatch_GetBindingData_Should_Create_Data_From_Kafka_Event() Timestamp = new DateTime(2019, 1, 10, 9, 21, 0, DateTimeKind.Utc), Topic = "myTopic", Value = "Nothing", + ConsumerGroup = "myConsumerGroup" }; var strategy = new KafkaTriggerBindingStrategy(); @@ -57,6 +60,7 @@ public void SingleDispatch_GetBindingData_Should_Create_Data_From_Kafka_Event() Assert.Equal(2, binding["Partition"]); Assert.Equal(new DateTime(2019, 1, 10, 9, 21, 0, DateTimeKind.Utc), binding["Timestamp"]); Assert.Equal("myTopic", binding["Topic"]); + Assert.Equal("myConsumerGroup", binding["ConsumerGroup"]); // lower case too Assert.Equal("1", binding["key"]); @@ -64,6 +68,7 @@ public void SingleDispatch_GetBindingData_Should_Create_Data_From_Kafka_Event() Assert.Equal(2, binding["partition"]); Assert.Equal(new DateTime(2019, 1, 10, 9, 21, 0, DateTimeKind.Utc), binding["timestamp"]); Assert.Equal("myTopic", binding["topic"]); + Assert.Equal("myConsumerGroup", binding["consumergroup"]); } [Fact] @@ -79,6 +84,7 @@ public void MultiDispatch_GetBindingData_Should_Create_Data_From_Kafka_Event() Timestamp = new DateTime(2019, 1, 10, 9, 21, 0, DateTimeKind.Utc), Topic = "myTopic", Value = "Nothing1", + ConsumerGroup = "myConsumerGroup1" }, new KafkaEventData() { @@ -88,6 +94,7 @@ public void MultiDispatch_GetBindingData_Should_Create_Data_From_Kafka_Event() Timestamp = new DateTime(2019, 1, 10, 9, 21, 1, DateTimeKind.Utc), Topic = "myTopic", Value = "Nothing2", + ConsumerGroup = "myConsumerGroup2" }, }); @@ -98,6 +105,7 @@ public void MultiDispatch_GetBindingData_Should_Create_Data_From_Kafka_Event() Assert.Equal(new[] { 2, 2 }, binding["PartitionArray"]); Assert.Equal(new[] { new DateTime(2019, 1, 10, 9, 21, 0, DateTimeKind.Utc), new DateTime(2019, 1, 10, 9, 21, 1, DateTimeKind.Utc) }, binding["TimestampArray"]); Assert.Equal(new[] { "myTopic", "myTopic" }, binding["TopicArray"]); + Assert.Equal(new[] { "myConsumerGroup1", "myConsumerGroup2" }, binding["ConsumerGroupArray"]); // lower case too Assert.Equal(new[] { "1", "2" }, binding["keyArray"]); @@ -105,6 +113,7 @@ public void MultiDispatch_GetBindingData_Should_Create_Data_From_Kafka_Event() Assert.Equal(new[] { 2, 2 }, binding["partitionArray"]); Assert.Equal(new[] { new DateTime(2019, 1, 10, 9, 21, 0, DateTimeKind.Utc), new DateTime(2019, 1, 10, 9, 21, 1, DateTimeKind.Utc) }, binding["timestampArray"]); Assert.Equal(new[] { "myTopic", "myTopic" }, binding["topicArray"]); + Assert.Equal(new[] { "myConsumerGroup1", "myConsumerGroup2" }, binding["consumerGroupArray"]); } } }