-
Notifications
You must be signed in to change notification settings - Fork 17
MessageQueue 消息队列
MornBoot支持通用API操作多种消息队列中间件,同时提供了基于注解的本地消息分发策略。
Since:v1.2.1
目前支持:
Kafka- RocketMQ
开始使用MornBoot前,请确保应用与MQ中间件已经连通
SpringBootApplication
@EnableCaching // 开启缓存
无
<!--Morn-->
<dependency>
<groupId>site.morn.boot</groupId>
<artifactId>morn-boot-rocket</artifactId>
</dependency>
<!--Rocket-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocket.version}</version>
</dependency>
使用BroadcastMessageBuilder
构建消息,
使用BroadcastMessageSendingOperations
发送消息。
@Slf4j
@SpringBootTest
@RunWith(SpringRunner.class)
public class RocketSendingOperationsTest {
@Autowired
private BroadcastMessageSendingOperations sendingOperations;
@Test
public void syncSend() {
// 创建消息内容
TestUser user = new TestUser();
user.setId(2L);
user.setUsername("bar");
// 构建消息
BroadcastMessage<TestUser> message = BroadcastMessageBuilder.withPayload(user)
.setTopic("userData").setType("add").build();
// 同步发送消息
MessageResult messageResult = sendingOperations.syncSend(message);
Assert.assertNotNull(messageResult);
Assert.assertTrue(messageResult.isSuccess());
}
}
原生结果(如
SendResult
)转成MessageResult
的过程是隐式的, 可以通过注册MessageResultConverter
覆盖现有实现。
消息消费使用原生框架,示例为RocketMQ
。
使用BroadcastMessageResolvingOperations
进行本地消息分发。
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "testGroup", topic = "userData")
public class UserMessageConsumer implements RocketMQListener<MessageExt> {
private final BroadcastMessageResolvingOperations<MessageExt> resolvingOperations;
public UserMessageConsumer(BroadcastMessageResolvingOperations<MessageExt> resolvingOperations) {
this.resolvingOperations = resolvingOperations;
}
@Override
public void onMessage(MessageExt message) {
resolvingOperations.syncResolve(message);
}
}
基于注解编写消息处理类和方法。
消息处理类的@MessageTopic
注解对应消息的主题Topic
,
消息处理方法的@MessageType
注解对应消息的类型Type
。
消息处理方法允许3种类型的参数,分别为消息体类型、
BroadcastMessage
、BroadcastMessageHeaders
。 参数顺序不限,消息体必选,且必须使用@Payload
注解。BroadcastMessage
和BroadcastMessageHeaders
可选。 注意:消息体会反序列化为@Payload
参数类型(即TestUser
),务必确保二者类型一致。
@Slf4j
@Component
@MessageTopic("userData")
public class UserMessageHandler {
@MessageType("add")
public void addUser(@Payload TestUser user, BroadcastMessageHeaders headers) {
log.info("Message id {}", headers.getId());
log.info("Add user: {}", user.getUsername());
}
@MessageType("update")
public void updateUser(@Payload TestUser user, BroadcastMessageHeaders headers) {
log.info("Message id {}", headers.getId());
log.info("Update user: {}", user.getUsername());
}
}
原生消息(如
MessageExt
)解析成BroadcastMessage
的过程是隐式的, 可以通过注册BroadcastMessageHeaderResolver
、BroadcastMessagePayloadResolver
覆盖现有实现。
异步发送示例。
ListenableFuture<MessageResult> future = sendingOperations.asyncSend(message);
future.addCallback(result -> log.info("Rocket send success."),
ex -> log.info("Rocket send failure."));
更多自定义配置及实现参考
MessageAutoConfiguration
、RocketAutoConfiguration
。