写好发消息代码的艺术:业务团队实践指南

原文标题:怎么在业务团队写好发消息的代码?

原文作者:阿里云开发者

冷月清谈:

**发消息的时机**

在处理业务代码时,对于发送消息的时机主要有三种选择:

  1. **持久化前发消息:**优点是失败率较低,但缺点是如果业务持久化失败,可能导致发送了不存在订单的消息。
  2. **持久化中发消息:**与持久化前发消息类似,但此时事务尚未提交,存在发送消息导致事务无法提交的风险。
  3. **持久化后发消息:**确保了订单已成功创建,但存在偶发性丢消息的风险,因为可能在发送消息时遇到网络错误或服务器重启。

事务消息

为了解决持久化后发消息的丢消息问题,可以考虑使用事务消息。事务消息在业务持久化前先发送一条半消息,然后在持久化结束后提交真正的消息。如果持久化失败,半消息将被回滚。这可以有效解决丢消息问题,但需要注意:

  1. 事务消息的回查实现需要仔细设计,以避免因数据库故障导致的消息重复发送或丢失。
  2. 发送消息的超时时间需要设定得足够长,以防止因网络问题导致的事务无法提交。

消息表

除了事务消息外,还可以使用消息表来实现消息的可靠性。消息表与业务数据表存储在同一个数据库中,在业务操作过程中将消息记录写入消息表,然后通过扫描任务异步处理消息。此方案的优点是消息与业务操作的一致性保障较好,但会增加数据库的写入和读取压力,并需要实现消息表的扫描任务和消息状态管理。

其他注意事项

除了发消息的时机和方式之外,还需要考虑以下问题:

  1. **弱依赖消息:**是否可以接受偶尔的丢消息,如果是,如何补偿?弱依赖消息的处理需要根据具体业务场景仔细评估。
  2. **发送消息的超时时间:**需要根据业务特点和网络环境设置合适的超时时间,避免因超时导致的大面积消息发送失败。
  3. **消息消费方的幂等性:**消息消费方(通常是其他系统)需要做好幂等处理,以确保消息即使被重复消费也不会导致业务问题。
  4. **消息重投:**需要设计消息重投策略,以处理发送消息失败的情况,但要注意避免过度重投导致系统压力过大。



怜星夜思:


1、发消息的时机选择中,是否存在通用的最佳实践?
2、弱依赖消息的处理方法有哪些?
3、发消息的超时时间该如何设置?

原文内容

阿里妹导读


作者认为其实是没有最佳实践的,大多数时候要根据自己的业务情况做取舍。同时,真的发生问题的时候,事前做好容错设计才是确保稳定性的银弹。

遇到的问题

做技术的同学,尤其是业务开发同学都是经常和消息打交道的,大家也都喜欢研究像MetaQ这种消息中间件的一些实现代码。作为一曾经的业务开发同学(目前在负责稳定性),深知要在业务团队写好发消息的代码,也绝非易事。

曾经我是交易订单团队的一名开发,我遇到了下面的一个问题:

try {
transactionTemplate.start();
// 位置1
orderManager.createOrder(order);
// 位置2
messageProducer.send(buildOrderCreatedMsg(order)); // 发送订单创建成功的消息。
transactionTemplate.commit();
// 位置3
} catch (Exception e) {
transactionTemplate.rollback();
}

我需要发送订单创建成功的消息,目前我是在位置2上面发送的消息,不过我在纠结,我为什么不是在位置1或者位置3上发送消息呢?

谁才是完美的答案

如果这段代码在运行过程中没有任何意外的行为,DB操作总是很快成功,那么看起来在位置1、位置2、位置3上发送消息好像差别并不是很大。但实际的运行环境肯定并不是这么完美的,作为一个自封的靠谱的业务开发,我需要做容错设计。


持久化前发消息

也就是代码中的位置1。经过线上观察,orderManager.createOrder这行代码的执行时间平均耗时在5ms,但极端情况下会有超过2s的案例,同时也能观察到极少数的执行失败的情况。这些现象表明我们不能在业务持久化前发消息,否则我们很可能为不存在的订单发送了创单成功消息。


持久化中发消息

也就是代码中的位置2。这个时候orderManager.createOrder已经执行成功了,按理说是个不错的位置,但这个时候事务实际上还没有提交,这个时候发出消息理论上和前面的位置1是比较类似的情况,只是说失败率要低很多。同时,如果发送消息的RT变大,甚至hang住的话,也会导致事务无法提交。


持久化后发消息

也就是代码中的位置3。这个时候可以确保订单已经是创建成功了,事务都提交了。但是代码上线过后,一个星期总是会遇到几笔丢消息的场景,即订单创建成功了,却没有消息发出来。虽然不多,但是很烦。排查下来,大多数都是发送消息时遇到了网络错误导致消息发送失败了,也有少数是发布的时候执行到这里的时候机器被重启了。


使用事务消息

前面的3个方案,对这个场景来说都不是很完美,想到了事务消息。于是代码修改如下:

try {
transactionTemplate.start();
messageTransactionProducer.send(buildOrderCreatedMsg(order), new TransactionExecutor(transactionTemplate){
@override
public TransactionStatus execute(Message msg) {
orderManager.createOrder(order);
return TransactionStatus.CommitTransaction;
}
});
} catch (Exception e) {
transactionTemplate.rollback();
}

// 订单创建成功消息的半消息回查实现
public class OrderTransactionCheckerImpl implements TransactionChecker {

@override
public TransactionStatus check(Message msg) {
if (orderManager.isOrderExist(getOrderId(msg)) {
return TransactionStatus.CommitTransaction;
} else {
return TransactionStatus.RollbackTransaction;
}
}
}

这下世界总应该清静了吧?此后的大半个月,再也没有接到关于丢消息的咨询(其实是指责),心想事情肯定已经完全解决了。直到有一天晚上,订单的数据库集群中有一个分库遇到磁盘IO故障,然后第二天早上很多业务方找过来,排查下来一下子丢了几百条消息。哎呀,我的天。

冷静下来分析,发现问题在于消息回查的实现有问题,DB发生故障的时候,创建订单的代码流程hang住了,事务并未提交,而半消息回查的通知很快过来了,这个时候通过订单号查询订单显然是查询不到的,所以再次修改了半消息回查的代码。

// 订单创建成功消息的半消息回查实现
public class OrderTransactionCheckerImpl implements TransactionChecker {

@override
public TransactionStatus check(Message msg) {
if (orderManager.isOrderExist(getOrderId(msg)) {
return TransactionStatus.CommitTransaction;
} else {
// 如果消息发送时间距当前时间20s以上,查询不到订单则认为是真的查询不到,否则有可能是创单的事务还未提交
// 20s的时间是来源于DB事务默认的超时时间设置15s加上5s的Buffer,不同团队会不同,这个值不能照搬
if (new Date().getTime() - msg.getSendTime() > 20000) {
return TransactionStatus.RollbackTransaction;
} else {
return TransactionStatus.Unknown;
}
}
}
}

重点是增加了查询不到订单时20s的保护时间,在保护时间内返回Unkown的状态,因为这个时候我们是真的不能确认是否是创建订单的操作被Block住了。从此以后再也没有接到丢消息的投诉了。


使用消息表

还有一种简单的方式是不在业务流程中发消息,而是直接依赖数据库的高可用性在写入业务数据的同时在自建的消息表中写入一条记录,后续通过读取这条记录的状态和内容来做异步的动作。感谢评论中璞尧逾明等同学的建议和反馈。代码如下:

try {
transactionTemplate.start();
orderManager.createOrder(order);
miniBus.addMsg(buildOrderCreatedMsg(order)); // 在订单库中创建一张消息表,这里实际的动作是在这个消息表中写入一条记录。因为消息表和订单表存在于同一个数据库中,这里巧妙的利用了数据的事务特性。
transactionTemplate.commit();
} catch (Exception e) {
transactionTemplate.rollback();
}

这种方式在我自己的实践中是用过的,git仓库上大家可以搜索hjbus或者minibus代码库查看这个小组件的方案。通常它需要一个扫描任务一直在读取自己建立的消息表,并维护它们的后续流转状态,当流转完成后会对记录做物理删除。这种方案会增加一部分数据库的写入和读取压力(一般情况下并不大,毕竟我们的消息体通常是不大的),以及代码的实现复杂度(需要自建消息表的扫描任务、管理消息记录的状态等,但对于一个工程来说这是一次性的任务),同时这个自建的消息表碎片率肯定会很高(似乎不是什么大事儿),不过除此外优势也是相当明显的,消息与业务操作的一致性保障的很好,毕竟数据库是我们最值得依赖的存储。还有一个优势就是它让业务操作的代码看起来真的很整洁。

有最佳实践真好

上面事务消息的方案,看起来就是最佳实践。那么大家一般会有几个问题:


照着最佳实践写就行了吗?

根据我自己踩过已及见别人踩过的坑,事情远没有这么简单。

比如orderManager.isOrderExist的订单是否存在的检查,你得确保你使用了主库,而不是因为读写分离读到了备库上面去。

我这里是订单创建,可以很简单的判断订单是不是存在了。那如果是订单状态的变更呢?即update型的业务操作,就需要判断这个业务操作是不是发生了,那判断起来恐怕就要复杂不少了。(幸好delete型的业务操作都是软删除,好判断)

发送消息的Producer,大多数情况下默认的超时时间是3s,如果大面积发生hang住的问题我们能接受吗?这个时候是不是应当让创建订单失败?

弱依赖消息可不可以简单点?

弱依赖这个说法背后的情况挺复杂的。大多数情况下大家可能说的是偶尔丢几条消息其实没事儿,然后就采用了前面的“持久化后发消息”的方案,就比如我遇到的场景是给用户发送红包成功后的消息,用它来给用户发送收到红包的提醒,偶尔丢几个通知的确是出不了事的。不过我曾经遇到过2起消息中间件的故障,期间大概有5分钟发送消息会有大量(超过50%比例)的失败,大家再想想,是不是这种情况下也能接受?短时间大批量的丢消息,真的能接受吗?

结合我自己遇到过的情况,要点是故障相关的情况,弱依赖到底是怎么个弱法是需要仔细斟酌的,尤其是失败的场景会是怎么样的失败形为需要有心理预期,同时失败后到底是怎么补偿要做好预案,并没有大家一开始认为的那么简单。(根据我自己的实践经验,为了做弱依赖的弥补,大多数时候做的事情反而要更多)

发个消息真的需要这么复杂吗?

是真的,不骗人。

事务消息这么复杂,我好想用binlog监听。(其实还真的是可以的,只是你要接受更高的延迟,以及更高的代码维护成本。同时根据DB记录拼装消息体可能要复杂一点儿。像营销、商品等等的缓存失效都是这么干的)

没有银弹,只有取舍

前面零星提到的一些问题,这里再总结一下,根据我的自己的经验,实际上是没有最佳实践的,而是要根据自己的业务情况做取舍。同时,真的发生问题的时候,往往是发生最严重的问题,而不是我们想像的那样的小问题,事前做好容错设计才是确保稳定性的银弹。

1.发送消息失败了,应该BLOCK业务流程吗?

2.发送消息的超时时间是多少?大面积超时的情况下,业务流程应当是什么样的反应?

3.能够接受丢消息吗?短时间大面积的丢消息呢?补偿方案是什么?

4.消息的消费方(很多时候是你对面的哥们)做好消息的幂等了吗?我们能不能短时间大量进行消息重投?

随需而动:自动弹性,稳定交付


本方案使用应用型负载均衡(ALB)和弹性伸缩(ESS)智能分配网络流量、动态调整服务器资源,提高应用的高可用性和吞吐量,弹性控制资源利用率、缩减资源成本。快点击阅读原文查看详情吧~

可以在消息消费端做好幂等处理,这样即使消息重复消费也不会对业务造成影响。

如果业务允许偶尔的丢消息,持久化前发消息可以降低代码复杂度和维护成本。

设置超时时间时,需要考虑业务的容忍度,如果业务允许偶尔的超时,可以设置一个较长的超时时间,否则需要设置一个较短的超时时间以保证业务的及时性。

可以根据消息的优先级和重要性,对消息进行分类处理,重要的消息优先处理,次要消息可以稍后处理或丢弃。