-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathKafkaConsumer.cs
46 lines (37 loc) · 1.25 KB
/
KafkaConsumer.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
using Confluent.Kafka;
using Microsoft.Extensions.Options;
namespace WebAPI;
public class KafkaConsumer : BackgroundService
{
private readonly ILogger<KafkaConsumer> _log;
private readonly IConsumer<string, string> _consumer;
private readonly KafkaConfigOptions _options;
public KafkaConsumer(ILogger<KafkaConsumer> log, IOptionsMonitor<KafkaConfigOptions> options)
{
_log = log;
_options = options.CurrentValue;
var config = new ConsumerConfig
{
BootstrapServers = _options.Bootstrap,
GroupId = _options.GroupId,
AutoOffsetReset = AutoOffsetReset.Earliest
};
_consumer = new ConsumerBuilder<string, string>(config).Build();
_consumer.Subscribe(_options.Topic);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await Task.Yield();
while (!stoppingToken.IsCancellationRequested)
{
var consumeResult = _consumer.Consume(stoppingToken);
_log.LogInformation(consumeResult.Message.Key + " - " + consumeResult.Message.Value);
_consumer.Commit();
}
}
public override void Dispose()
{
_consumer.Dispose();
base.Dispose();
}
}