Skip to content

快速开始

taowenwu edited this page Sep 27, 2023 · 5 revisions

1. Maven添加依赖

<dependencys>
    <dependency>
        <groupId>io.github.wendy512</groupId>
        <artifactId>stream-core</artifactId>
        <version>1.0.1</version>
    </dependency>
</dependencys>

2. 配置

在SpringBoot Yaml中配置参数

stream:
  # 数据通道
  channel:
    # 通道名称
    localQueue:
      # 通道类型,目前支持:memory
      type: memory
      # 队列容量
      capacity: 20000
  # 消息源配置
  source:
    test:
      # 消息源类型,目前支持:local、mqtt、kafka、rabbitmq
      type: local
      # 关联通道
      channel: localQueue
  # 消息目标配置
  sink:
    test:
      # 消息目标类型,目前支持:default、mqtt、kafka、rabbitmq
      type: default
      # 关联通道
      channel: localQueue
      # 目标由几个固定线程去处理
      threads: 1

3. 发送数据

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = TestApplication.class)
public class TestLocal {

    // 指定哪个通道
    @Channel("localQueue")
    private ChannelProcessor channelProcessor;

    @Test
    public void testConsumer() throws Exception {
        for (int i = 0; i < 100; i++) {
            Message<String> message = MessageBuilder.withPayload(String.format("this is %s message", i)).build();
            channelProcessor.send(message);
        }

        // 保证消费完
        TimeUnit.SECONDS.sleep(5);
    }
}

在SpringBoot Yaml中配置参数

4. 订阅目标处理数据

// 订阅哪个消息目标
@Sink("test")
@Component
public class LocalConsumer implements Consumer<String> {

    @Override
    public void accept(List<Message<String>> messages) {
        messages.forEach(m -> {
            // do something
        });
    }
}
Clone this wiki locally