-
Notifications
You must be signed in to change notification settings - Fork 92
使用Kafka
taowenwu edited this page Sep 27, 2023
·
4 revisions
<dependencys>
<dependency>
<groupId>io.github.wendy512</groupId>
<artifactId>stream-core</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>io.github.wendy512</groupId>
<artifactId>stream-kafka</artifactId>
<version>1.0.1</version>
</dependency>
</dependencys>
在SpringBoot Yaml中配置参数
stream:
# 数据通道
channel:
# 接受通道名称
sourceQueue:
# 通道类型,目前支持:memory
type: memory
# 队列容量
capacity: 20000
# 发送通道名称
sinkQueue:
type: memory
# 队列容量
capacity: 20000
# 消息源配置
source:
test:
# 消息源类型,目前支持:local、mqtt、kafka、rabbitmq
type: kafka
# 关联通道
channel: sourceQueue
# 配置Kafka
config:
"[bootstrap.servers]": 127.0.0.1:9092
"[group.id]": test1
topic: stream
# 接受本地通道的数据,然后发送到Kafka Broker
test2:
type: local
channel: sinkQueue
# 消息目标配置
sink:
# 接受处理
test:
# 消息目标类型,目前支持:default、mqtt、kafka、rabbitmq
type: default
# 关联通道
channel: sourceQueue
# 目标由几个固定线程去处理
threads: 1
# 发送处理
test2:
type: kafka
channel: sinkQueue
threads: 1
# 配置MQTT
config:
"[bootstrap.servers]": 127.0.0.1:9092
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = TestApplication.class)
public class TestKafkaSink {
// 指定哪个通道
@Channel("sinkQueue")
private ChannelProcessor channelProcessor;
@Test
public void testSink() throws Exception {
for (int i = 0; i < 100; i++) {
Message<String> message = MessageBuilder.withPayload(String.format("this is %s message", i)).setHeader("topic", "stream").build();
channelProcessor.send(message);
}
TimeUnit.SECONDS.sleep(5);
}
}
在SpringBoot Yaml中配置参数
// 订阅哪个消息目标
@Sink("test")
@Component
public class MqttConsumer implements Consumer<String> {
@Override
public void accept(List<Message<String>> messages) {
messages.forEach(m -> {
// do something
});
}
}