Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ public interface IKafkaEventData
string Topic { get; }
DateTime Timestamp { get; }
IKafkaEventDataHeaders Headers { get; }
string ConsumerGroup { get; }
}
}
18 changes: 13 additions & 5 deletions src/Microsoft.Azure.WebJobs.Extensions.Kafka/KafkaEventData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ public class KafkaEventData<TKey, TValue> : IKafkaEventData

object IKafkaEventData.Key => this.Key;

public string ConsumerGroup { get; internal set; }

public KafkaEventData()
{
this.Headers = new KafkaEventDataHeaders(false);
Expand All @@ -31,15 +33,18 @@ public KafkaEventData(TKey key, TValue value) : this()
this.Key = key;
this.Value = value;
}

public KafkaEventData(ConsumeResult<TKey, TValue> consumeResult)
public KafkaEventData(ConsumeResult<TKey, TValue> consumeResult) : this(consumeResult, null)
{
}
public KafkaEventData(ConsumeResult<TKey, TValue> consumeResult, string consumerGroup)
{
this.Key = consumeResult.Key;
this.Value = consumeResult.Value;
this.Offset = consumeResult.Offset;
this.Partition = consumeResult.Partition;
this.Timestamp = consumeResult.Message.Timestamp.UtcDateTime;
this.Topic = consumeResult.Topic;
this.ConsumerGroup = consumerGroup;
if (consumeResult.Headers?.Count > 0)
{
this.Headers = new KafkaEventDataHeaders(consumeResult.Message.Headers);
Expand All @@ -63,7 +68,9 @@ public class KafkaEventData<TValue> : IKafkaEventData

object IKafkaEventData.Key => null;

public IKafkaEventDataHeaders Headers { get; private set; }
public IKafkaEventDataHeaders Headers { get; private set; }

public string ConsumerGroup { get; internal set; }

public KafkaEventData()
{
Expand All @@ -80,7 +87,7 @@ internal KafkaEventData(KafkaEventDataHeaders headers)
this.Headers = headers;
}

internal static KafkaEventData<TValue> CreateFrom<TKey>(ConsumeResult<TKey, TValue> consumeResult)
internal static KafkaEventData<TValue> CreateFrom<TKey>(ConsumeResult<TKey, TValue> consumeResult, string consumerGroup = null)
{
KafkaEventDataHeaders headers;
if (consumeResult.Headers?.Count > 0)
Expand All @@ -98,7 +105,8 @@ internal static KafkaEventData<TValue> CreateFrom<TKey>(ConsumeResult<TKey, TVal
Offset = consumeResult.Offset,
Partition = consumeResult.Partition,
Timestamp = consumeResult.Timestamp.UtcDateTime,
Topic = consumeResult.Topic
Topic = consumeResult.Topic,
ConsumerGroup = consumerGroup
};

return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,8 @@ private void ProcessSubscription(object parameter)
else
{
var kafkaEventData = this.requiresKey ?
(IKafkaEventData)new KafkaEventData<TKey, TValue>(consumeResult) :
KafkaEventData<TValue>.CreateFrom(consumeResult);
(IKafkaEventData)new KafkaEventData<TKey, TValue>(consumeResult, this.consumerGroup) :
KafkaEventData<TValue>.CreateFrom(consumeResult, this.consumerGroup);

// add message to executor
// if executor pending items is full, flush it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public Dictionary<string, Type> GetBindingContract(bool isSingleDispatch = true)
AddBindingContractMember(contract, nameof(KafkaEventData<TKey, TValue>.Topic), typeof(string), isSingleDispatch);
AddBindingContractMember(contract, nameof(KafkaEventData<TKey, TValue>.Timestamp), typeof(DateTime), isSingleDispatch);
AddBindingContractMember(contract, nameof(KafkaEventData<TKey, TValue>.Offset), typeof(long), isSingleDispatch);

AddBindingContractMember(contract, nameof(KafkaEventData<TKey, TValue>.ConsumerGroup), typeof(string), isSingleDispatch);
return contract;
}

Expand Down Expand Up @@ -89,12 +89,14 @@ internal static void AddBindingData(Dictionary<string, object> bindingData, IKaf
var timestamps = new DateTime[length];
var topics = new string[length];
var keys = new object[length];
var consumerGroups = new string[length];

bindingData.Add("PartitionArray", partitions);
bindingData.Add("OffsetArray", offsets);
bindingData.Add("TimestampArray", timestamps);
bindingData.Add("TopicArray", topics);
bindingData.Add("KeyArray", keys);
bindingData.Add("ConsumerGroupArray", consumerGroups);

for (int i = 0; i < events.Length; i++)
{
Expand All @@ -103,6 +105,7 @@ internal static void AddBindingData(Dictionary<string, object> bindingData, IKaf
timestamps[i] = events[i].Timestamp;
keys[i] = events[i].Key;
topics[i] = events[i].Topic;
consumerGroups[i] = events[i].ConsumerGroup;
}
}

Expand All @@ -113,6 +116,7 @@ private static void AddBindingData(Dictionary<string, object> bindingData, IKafk
bindingData.Add(nameof(IKafkaEventData.Topic), eventData.Topic);
bindingData.Add(nameof(IKafkaEventData.Timestamp), eventData.Timestamp);
bindingData.Add(nameof(IKafkaEventData.Offset), eventData.Offset);
bindingData.Add(nameof(IKafkaEventData.ConsumerGroup), eventData.ConsumerGroup);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ public void GetStaticBindingContract_ReturnsExpectedValue()
var strategy = new KafkaTriggerBindingStrategy<string, string>();
var contract = strategy.GetBindingContract();

Assert.Equal(5, contract.Count);
Assert.Equal(6, contract.Count);
Assert.Equal(typeof(object), contract["Key"]);
Assert.Equal(typeof(int), contract["Partition"]);
Assert.Equal(typeof(string), contract["Topic"]);
Assert.Equal(typeof(DateTime), contract["Timestamp"]);
Assert.Equal(typeof(long), contract["Offset"]);
Assert.Equal(typeof(string), contract["ConsumerGroup"]);
}

[Fact]
Expand All @@ -29,12 +30,13 @@ public void GetBindingContract_SingleDispatch_ReturnsExpectedValue()
var strategy = new KafkaTriggerBindingStrategy<string, string>();
var contract = strategy.GetBindingContract(true);

Assert.Equal(5, contract.Count);
Assert.Equal(6, contract.Count);
Assert.Equal(typeof(object), contract["Key"]);
Assert.Equal(typeof(int), contract["Partition"]);
Assert.Equal(typeof(string), contract["Topic"]);
Assert.Equal(typeof(DateTime), contract["Timestamp"]);
Assert.Equal(typeof(long), contract["Offset"]);
Assert.Equal(typeof(string), contract["ConsumerGroup"]);
}

[Fact]
Expand All @@ -48,6 +50,7 @@ public void SingleDispatch_GetBindingData_Should_Create_Data_From_Kafka_Event()
Timestamp = new DateTime(2019, 1, 10, 9, 21, 0, DateTimeKind.Utc),
Topic = "myTopic",
Value = "Nothing",
ConsumerGroup = "myConsumerGroup"
};

var strategy = new KafkaTriggerBindingStrategy<string, string>();
Expand All @@ -57,13 +60,15 @@ public void SingleDispatch_GetBindingData_Should_Create_Data_From_Kafka_Event()
Assert.Equal(2, binding["Partition"]);
Assert.Equal(new DateTime(2019, 1, 10, 9, 21, 0, DateTimeKind.Utc), binding["Timestamp"]);
Assert.Equal("myTopic", binding["Topic"]);
Assert.Equal("myConsumerGroup", binding["ConsumerGroup"]);

// lower case too
Assert.Equal("1", binding["key"]);
Assert.Equal(100L, binding["offset"]);
Assert.Equal(2, binding["partition"]);
Assert.Equal(new DateTime(2019, 1, 10, 9, 21, 0, DateTimeKind.Utc), binding["timestamp"]);
Assert.Equal("myTopic", binding["topic"]);
Assert.Equal("myConsumerGroup", binding["consumergroup"]);
}

[Fact]
Expand All @@ -79,6 +84,7 @@ public void MultiDispatch_GetBindingData_Should_Create_Data_From_Kafka_Event()
Timestamp = new DateTime(2019, 1, 10, 9, 21, 0, DateTimeKind.Utc),
Topic = "myTopic",
Value = "Nothing1",
ConsumerGroup = "myConsumerGroup1"
},
new KafkaEventData<string, string>()
{
Expand All @@ -88,6 +94,7 @@ public void MultiDispatch_GetBindingData_Should_Create_Data_From_Kafka_Event()
Timestamp = new DateTime(2019, 1, 10, 9, 21, 1, DateTimeKind.Utc),
Topic = "myTopic",
Value = "Nothing2",
ConsumerGroup = "myConsumerGroup2"
},
});

Expand All @@ -98,13 +105,15 @@ public void MultiDispatch_GetBindingData_Should_Create_Data_From_Kafka_Event()
Assert.Equal(new[] { 2, 2 }, binding["PartitionArray"]);
Assert.Equal(new[] { new DateTime(2019, 1, 10, 9, 21, 0, DateTimeKind.Utc), new DateTime(2019, 1, 10, 9, 21, 1, DateTimeKind.Utc) }, binding["TimestampArray"]);
Assert.Equal(new[] { "myTopic", "myTopic" }, binding["TopicArray"]);
Assert.Equal(new[] { "myConsumerGroup1", "myConsumerGroup2" }, binding["ConsumerGroupArray"]);

// lower case too
Assert.Equal(new[] { "1", "2" }, binding["keyArray"]);
Assert.Equal(new[] { 100L, 101L }, binding["offsetArray"]);
Assert.Equal(new[] { 2, 2 }, binding["partitionArray"]);
Assert.Equal(new[] { new DateTime(2019, 1, 10, 9, 21, 0, DateTimeKind.Utc), new DateTime(2019, 1, 10, 9, 21, 1, DateTimeKind.Utc) }, binding["timestampArray"]);
Assert.Equal(new[] { "myTopic", "myTopic" }, binding["topicArray"]);
Assert.Equal(new[] { "myConsumerGroup1", "myConsumerGroup2" }, binding["consumerGroupArray"]);
}
}
}