消息队列的消息幂等性,主要是由 MQ 重试机制引起的。
我们通常会认为,消息中间件是一个可靠的组件——这里所谓的可靠是指,只要我把消息成功投递到了消息中间件,消息就不会丢失,即消息肯定会至少保证消息能被消费者成功消费一次,这是消息中间件最基本的特性之一,也就是我们常说的“AT LEAST ONCE”,即消息至少会被“成功消费一遍”。
举个例子,一个消息 M 发送到了消息中间件,消息投递到了消费程序 A,A 接受到了消息,然后进行消费,但在消费到一半的时候程序重启了,这时候这个消息并没有标记为消费成功,这个消息还会继续投递给这个消费者,直到其消费成功了,消息中间件才会停止投递。然而这种可靠的特性导致,消息可能被多次地投递。
再举个例子,程序 A 接受到这个消息 M 并完成消费逻辑之后,正想通知消息中间件“我已经消费成功了”的时候,程序就重启了,那么对于消息中间件来说,这个消息并没有成功消费过,所以他还会继续投递。这时候对于应用程序 A 来说,看起来就是这个消息明明消费成功了,但是消息中间件还在重复投递。
假如我们的逻辑是这样的:
select * from t_order where order_no = 'order123'
if(order != null) {
return ;//消息重复,直接返回
}
那么在并下情况下可能会有问题(当业务还没处理完,此时并发的请求进来了,则会穿透掉检查的挡板),所以要考虑原子性问题。
解决的办法在我们上一篇文章中也有介绍到,即使用:
-
悲观锁(
SELECT ... FOR UPDATE
) -
乐观锁
但无论是 select for update, 还是乐观锁这种解决方案,实际上都是基于业务表本身做去重,这无疑增加了业务开发的复杂度, 如果每个消费逻辑本身都需要基于业务本身而做去重/幂等的开发的话,这是繁琐的工作量。
我们以 RocketMQ 作为消息中间件为例, RocketMQ 的 文档 中描述道:
RocketMQ
无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。可以借助关系数据库进行去重。首先需要确定消息的唯一键,可以是 msgId,也可以是消息内容中的唯一标识字段,例如订单 Id 等。在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过)
msgId 一定是全局唯一标识符,但是实际使用中,可能会存在相同的消息有两个不同 msgId 的情况(消费者主动重发、因客户端重投机制导致的重复等),这种情况就需要使业务字段进行重复消费。
根据文档的描述我们得到了第一种解决方案,即通过去重表
的方法,这点在上一篇文章 《幂等解决方案集合(一)》 中已有所描述,需要注意的是:
在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。
具体来说,我们可以这样做:在数据库中增加一个消息消费记录表,把业务操作和这个消息插入的动作放到同一个事务中一起提交,就能保证消息只会被消费一遍了。
-
开启事务
-
插入消息表(处理好主键冲突的问题)
-
更新订单表(原消费逻辑)
-
提交事务
但是这里有它的局限性
-
消息的消费逻辑必须是依赖于关系型数据库事务。如果消费的消费过程中还涉及其他数据的修改,例如 Redis 这种不支持事务特性的数据源,则这些数据是不可回滚的。
-
数据库的数据必须是在一个库,跨库无法解决
Exactly-Once 投递语义
以下引用自阿里云的文档:
Exactly-Once
是指发送到消息系统的消息只能被消费端处理且仅处理一次,即使生产端重试消息发送导致某消息重复投递,该消息在消费端也只被消费一次。
Exactly-Once
语义是消息系统和流式计算系统中消息流转的最理想状态,但是在业界并没有太多理想的实现。因为真正意义上的 Exactly-Once 依赖消息系统的服务端、消息系统的客户端和用户消费逻辑这三者状态的协调。例如,当您的消费端完成一条消息的消费处理后出现异常宕机,而消费端重启后由于消费的位点没有同步到消息系统的服务端,该消息有可能被重复消费。
业界对于
Exactly-Once
投递语义存在很大的争议,很多人会拿出“FLP 不可能理论”或者其他一致性定律对此议题进行否定,但事实上,特定场景的 Exactly-Once 语义实现并不是非常复杂,只是因为通常大家没有精确的描述问题的本质。
如果您要实现一条消息的消费结果只能在业务系统中生效一次,您需要解决的只是如何保证同一条消息的消费幂等问题。消息队列
RocketMQ
版的Exactly-Once
语义就是解决业务中最常见的一条消息的消费结果(消息在消费端计算处理的结果)在数据库系统中有且仅生效一次的问题。
典型使用场景
在电商系统中,上游实时计算模块发布商品价格变更的信息,异步通知到下游商品管理模块进行价格变更。此时,需要保证每一条信息的消费幂等,即重复的价格变更信息只会生效一次,这样便不会发生价格多次重复修改的情况,确保实现了消息消费的幂等。
操作步骤
1 添加依赖
消息队列 RocketMQ 版的 ExactlyOnceConsumer 在客户端 SDK ons-client-ext-1.8.4.Final 中发布,若要使用 Exactly-Once 投递语义,需在应用中依赖该 SDK。另外,ExactlyOnceConsumer 基于 Spring 实现了通过注解@MQTransaction 开启 Exactly-Once 消费的方式,因此还需要在应用中增加 Spring 3.0 以上版本的依赖。
完整的依赖内容如下所示。
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client-ext</artifactId>
<version>1.8.4.Final</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring-version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>${spring-version}</version>
</dependency>
2 在用于存储消息消费结果的数据库中创建 transaction_record 表 (注意:存储消息消费结果的数据库系统必须支持本地事务)
若要使用消息队列 RocketMQ 版的 Exactly-Once 投递语义,您需要在业务处理结果持久化的数据库中创建一张 transaction_record 表,保证此表与存储业务处理结果的表在同一个数据库内,且该数据库支持本地事务。目前,消息队列 RocketMQ 版的 Exactly-Once 投递语义支持您的业务访问 MySQL 和 SQLServer 两种类型的数据源。这两种类型的数据源下 transaction_record 表的建表语句如以下所示 (mysql)。
CREATE TABLE `transaction_record` (
`consumer_group` varchar(128) NOT NULL DEFAULT '',
`message_id` varchar(255) NOT NULL DEFAULT '',
`topic_name` varchar(255) NOT NULL DEFAULT '',
`ctime` bigint(20) NOT NULL,
`queue_id` int(11) NOT NULL,
`offset` bigint(20) NOT NULL,
`broker_name` varchar(255) NOT NULL DEFAULT '',
`id` bigint(20) NOT NULL AUTO_INCREMENT,
PRIMARY KEY (`id`),
UNIQUE KEY `message_id_unique_key` (`message_id`),
KEY `ctime_key` (`ctime`),
KEY `load_key` (`queue_id`,`broker_name`,`topic_name`,`ctime`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
3 在消息生产端使用 PropertyKeyConst.EXACTLYONCE_DELIVERY 属性设置打开 Exactly-Once 投递语义
/**
* TestExactlyOnceProducer 启动
* 通过 PropertyKeyConst.EXACTLYONCE_DELIVERY 开启 exactly-once 投递语义。
*/
public class TestExactlyOnceProducer {
public static void main(String[] args) {
Properties producerProperties = new Properties();
producerProperties.setProperty(PropertyKeyConst.GROUP_ID,"{GROUP_ID}");
producerProperties.setProperty(PropertyKeyConst.AccessKey,"{accessKey}");
producerProperties.setProperty(PropertyKeyConst.SecretKey,"{secretKey}");
producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR,"{NAMESRV_ADDR}");
producerProperties.setProperty(PropertyKeyConst.EXACTLYONCE_DELIVERY,"true");
Producer producer = ExactlyOnceONSFactory.createProducer(producerProperties);
producer.start();
System.out.println("Producer Started");
for (int i = 0; i < 10; i++) {
Message message = new Message("{topic}", "{tag}", "mq send transaction message test".getBytes());
try {
SendResult sendResult = producer.send(message);
assert sendResult != null;
System.out.println(new Date() + " Send mq message success! msgId is: " + sendResult.getMessageId());
} catch (ONSClientException e) {
System.out.println("发送失败");
//出现异常意味着发送失败,为避免消息丢失,建议缓存该消息然后进行重试。
}
}
producer.shutdown();
}
}
4 在消息消费端创建 ExactlyOnceConsumer,并开启 Exactly-Once 的消费模式。
使用消息队列 RocketMQ 版的 Exactly-Once 投递语义进行消费时,消费端需要使用 ExactlyOnceONSFactory 调用 createExactlyOnceConsumer 接口创建 ExactlyOnceConsumer,然后通过使用 ExactlyOnceConsumer 进行 Exactly-Once 模式的消费。
在使用 ExactlyOnceConsumer 时,需要注意以下两点:
-
创建 ExactlyOnceConsumer 时,可以通过设置 PropertyKeyConst.EXACTLYONCE_DELIVERY 属性打开或者关闭 Exactly-Once 投递语义。ExactlyOnceConsumer 默认打开 Exactly-Once 投递语义。
-
使用 ExactlyOnceConsumer 消费时,在消息监听器 MessageListener 的 consume 方法中,您的业务处理逻辑需要使用 MQDataSource 对数据库的进行读写操作。
您可以选择以下任一方式在消费端开启 Exactly-Once 投递语义:
-
以非 Spring 方式开启 Exactly-Once 投递语义
-
MessageListener 中以事务方式实现多项数据库操作和消息消费的事务性
-
MessageListener 中通过 Springboot 注解方式实现开启 Exactly-Once 投递语义
-
MessageListener 中通过 MyBatis 方式实现 Exactly-Once 投递语义
以下示例为:MessageListener 中以事务方式实现多项数据库操作和消息消费的事务性,其他示例代码,请参考阿里云文档:https://help.aliyun.com/document_detail/102777.html
/**
* TestExactlyOnceListener 实现。
* 实现了一个事务中对多个业务表进行更新的场景,保证事务内的操作有且仅有一次生效。
*/
public class SimpleTxListener implements MessageListener {
private MQDataSource dataSource;
public SimpleTxListener() {
this.dataSource = new MQDataSource("{url}", "{user}", "{passwd}", "{driver}");
}
@Override
public Action consume(Message message, ConsumeContext context) {
Connection connection = null;
Statement statement = null;
try {
/**
* 在此处对消费到的消息 message 做业务计算处理,使用 MQDataSource 将处理结果持久化到数据库系统。
* 此范例演示了在一个事务内对多个表进行更新的业务场景,Exactly-Once 投递语义保证事务内的操作有且仅有一次。
* 实际的业务处理按照:接收消息->业务处理->结果持久化的流程来设计。
*/
connection = dataSource.getConnection();
connection.setAutoCommit(false);
String insertSql = String.format("INSERT INTO app(msg, message_id, ctime) VALUES(\"%s\", \"%s\", %d)",
new String(message.getBody()), message.getMsgID(), System.currentTimeMillis());
String updateSql = String.format("UPDATE consume_count SET cnt = count + 1 WHERE consumer_group = \"%s\"", "GID_TEST");
statement = connection.createStatement();
statement.execute(insertSql);
statement.execute(updateSql);
connection.commit();
System.out.println("consume message :" + message.getMsgID());
return Action.CommitMessage;
} catch (Throwable e) {
try {
connection.rollback();
} catch (Exception e1) {
}
System.out.println("consume message fail");
return Action.ReconsumeLater;
} finally {
if (statement != null) {
try {
statement.close();
} catch (Exception e) { }
}
if (connection != null) {
try {
connection.close();
} catch (Exception e) { }
}
}
}
}
总结
你可以看到,这种方式是阿里云做了封装,但理论上跟上一个方案 基于消息表+本地事务 是一样的。
最后这种方案其实思路跟 基于消息表+本地事务 类似,只不过判断是否重复消息用 Redis 分布式锁实现,关于具体实现步骤也跟我们上一篇文章类似,请参考上一篇文章。
当消费者出现异常,可以让其重试几次,如果重试几次后,仍然有异常,则需要进行数据补偿。 数据补偿方案:当重试多次后仍然出现异常,则让此条消息进入 死信队列
,最终进入到数据库中,接着设置定时 job 查询这些数据,进行手动补偿。