Skip to content

Commit

Permalink
feat: access message timestamp from message consumer context
Browse files Browse the repository at this point in the history
  • Loading branch information
joelfoliveira authored and filipeesch committed Nov 23, 2020
1 parent 2faab89 commit c0e5bff
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 2 deletions.
6 changes: 6 additions & 0 deletions src/KafkaFlow.Abstractions/IMessageContextConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace KafkaFlow
{
using System;
using System.Threading;

/// <summary>
Expand All @@ -17,6 +18,11 @@ public interface IMessageContextConsumer
/// </summary>
CancellationToken WorkerStopped { get; }

/// <summary>
/// Message timestamp. By default is the UTC timestamp when the message was produced
/// </summary>
DateTime MessageTimestamp { get; }

/// <summary>
/// Gets or Sets if the framework should store the current offset in the end when auto store offset is used
/// </summary>
Expand Down
36 changes: 36 additions & 0 deletions src/KafkaFlow.UnitTests/MessageContextConsumerTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
namespace KafkaFlow.UnitTests
{
using System;
using System.Threading;
using Confluent.Kafka;
using FluentAssertions;
using KafkaFlow.Consumers;
using Microsoft.VisualStudio.TestTools.UnitTesting;

[TestClass]
public class MessageContextConsumerTests
{
[TestMethod]
public void MessageTimestamp_ConsumeResultHasMessageTimestamp_ReturnsMessageTimestampFromResult()
{
// Arrange
var expectedMessageTimestamp = new DateTime(2020, 1, 1, 0, 0, 0);

var consumerResult = new ConsumeResult<byte[], byte[]>
{
Message = new Message<byte[], byte[]>
{
Timestamp = new Timestamp(expectedMessageTimestamp)
}
};

var target = new MessageContextConsumer(null, "consumer", null, consumerResult, CancellationToken.None);

// Act
var messageTimestamp = target.MessageTimestamp;

// Assert
messageTimestamp.Should().Be(expectedMessageTimestamp);
}
}
}
7 changes: 5 additions & 2 deletions src/KafkaFlow/Consumers/MessageContextConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace KafkaFlow.Consumers
{
using System;
using System.Threading;
using Confluent.Kafka;

Expand All @@ -13,7 +14,7 @@ public MessageContextConsumer(
IConsumer<byte[], byte[]> consumer,
string name,
IOffsetManager offsetManager,
ConsumeResult<byte[], byte[]> kafkaResult,
ConsumeResult<byte[], byte[]> kafkaResult,
CancellationToken workerStopped)
{
this.Name = name;
Expand All @@ -24,11 +25,13 @@ public MessageContextConsumer(
}

public string Name { get; }

public CancellationToken WorkerStopped { get; }

public bool ShouldStoreOffset { get; set; } = true;

public DateTime MessageTimestamp => this.kafkaResult.Message.Timestamp.UtcDateTime;

public void StoreOffset()
{
this.offsetManager.StoreOffset(this.kafkaResult.TopicPartitionOffset);
Expand Down

0 comments on commit c0e5bff

Please sign in to comment.