这操作,挽救了一次大事故!

你好,我是苏三

这次事故是有关消息的场景,实际也涉及到分布式事务的内容,我们先简单来回顾下分布式事务。

之前我写过分布式事务相关的文章,基本上把常见的几个分布式事务都捋了捋。

其中我分析了挺多,例如 2pc、TCC、本地消息表、事务消息等等,更多的可以看之前的那篇文章。

这次事故主要是有关本地消息表的实现,借着这篇我也给个实战代码,拿来即用,这里我先再简单把原理介绍一遍。

本地消息表原理

本地消息就是利用了本地事务,会在数据库中存放一张本地事务消息表,在进行本地事务操作中加入了本地消息的插入,即将业务的执行和将消息放入消息表中的操作放在同一个事务中提交,

这样本地事务执行成功的话,消息肯定也插入成功,然后再调用其他服务,如果调用成功就修改这条本地消息的状态。

如果失败也不要紧,会有一个后台线程扫描,发现这些状态的消息,会一直调用相应的服务,一般会设置重试的次数,如果一直不行则特殊记录,待人工介入处理。

可以看到还是很简单的,也是一种最大努力通知思想。

实战

上面的原理大家应该都清晰了,这篇我就基于本地消息表补偿消息来实现事务和消息发送的事务一致性

在我们日常业务中,MQ的应用必不可少,相信你肯定会遇到这个场景:一个 service 方法里执行一些业务,修改了一些数据落库,然后再发送一条MQ消息,触发下一个流程。

那么问题来了,如何保证当前 service 修改的数据事务提交了,消息一定就发出去了呢?

这个简单,将同步发送消息的逻辑写在事务内部,就能保证发送失败,事务不会提交。

那么问题又来了,如果消息发送成功了,最后事务提交失败了呢?那发出去的消息还能追回吗?

因此我们要解决的第一个问题其实是:当前 service 事务提交后,才能发送消息,不然就可能导致消息发出去了,实际事务是没执行成功的。

而上述的操作使得我们兜兜转转又回到第一个问题:如何保证当前 service 修改的数据事务提交了,消息一定就发出去了呢?,万一事务提交了应用就挂了呢?消息不就没了,后续的流程也就中断了。

这归根结底是分布式事务问题,是数据库操作跟MQ消息的爱恨情仇,关于这个 RocketMQ 提供了解决方案即事务消息,但是它的侵入性比较大,需要修改接口适配事务消息的实现。

而本地消息表则非常简单,接下来我们开始操作!

首先我们需要建立一张本地消息表(当前这个设计主要是为了MQ消息的事务场景):

CREATE TABLE `message` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'id',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
  `status_delete` tinyint NOT NULL DEFAULT '0' COMMENT '删除标记 0正常 1删除',
  `topic` varchar(64CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'topic',
  `tag` varchar(128CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'tag',
  `msg_id` varchar(64CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT '消息id',
  `msg_key` varchar(64COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '消息key',
  `data` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '消息json串',
  `try_num` int NOT NULL DEFAULT '0' COMMENT '重试次数',
  `status` tinyint NOT NULL DEFAULT '0' COMMENT '发送状态 0-未发送 1-已发送',
  `next_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '下次驱动开始时间'
  PRIMARY KEY (`id`),
  KEY `idx_key` (`msg_key`),
  KEY `idx_nexttime_status` (`next_time`,`status`),
  KEY `idx_msgid` (`msg_id`)
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC COMMENT='本地消息记录表';

然后再写个 MessageService 来包装下消息的发送流程,把本地消息记录保存封装在里面。

@Service
public class MessageService implements IMessageService{
    @Resource
    private Producer producer;
    @Resource
    private MessageMapper messageMapper;

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void send(String topicString tagString keyObject obj
{
        sendDelay(topic, tag, key, obj, 0L);
    }
    
    @Override
    @Transactional(rollbackFor = Exception.class)
    public void sendDelay(String topicString tagString keyObject objLong period
{
        //计算时间,防止定时任务扫描将还在正常流程中的消息进行重试
        int time = (period == 0L ? 10 : period.intValue() / 1000);
        Date nextTime = DateUtil.getAfterNewDateSecond(new Date(), time);
        String data = JSON.toJSONString(obj);
        Message message = new Message()
                .setStatusDelete(0)
                .setTopic(topic)
                .setTag(tag)
                .setMsgId("")
                .setMsgKey(key)
                .setData(data)
                .setTryNum(0)
                .setStatus(0)
                .setNextTime(nextTime);
        // 保存本地消息记录
        messageMapper.save(message);

        // 当前事务提交后,再执行发送消息和更改本地消息记录状态
        TransactionSynchronizationManager.registerSynchronization(
                new TransactionSynchronizationAdapter() {
                    @Override
                    public void afterCommit() {
                        String messageId;
                        try {
                            if (period == 0L) {
                                messageId = producer.send(topic, tag, key, data);
                            } else {
                                messageId = producer.sendDelay(topic, tag, key, data, period);
                            }
                            Message update = new Message()
                                    .setId(message.getId())
                                    .setMsgId(messageId)
                                    .setStatus(1);
                            messageMapper.updateById(update);
                        } catch (Exception e) {
                            log.error("..");
                        }
                    }
                }
        );
    }
}

定时任务的逻辑就很简单了,就是扫描 nextTime 到期且未发送的消息,重新发送即可,这里不多赘述。

最终的使用就非常简单了:

@Transactional(rollbackFor = Exception.class)
public void doSth(xx
{
    saveA();
    saveB();
    messageService.send(xxx);
}

我们来分析一下:

  1. 假设数据库事务提交失败,那么无事发生,消息也没发出去,此时业务正常。
  2. 假设数据库操作成功,但是数据库事务提交后,服务宕机了,那么消息没发出去,此时 saveA 和 saveB 都保存成功,那么 message 肯定也插入了(它们在同一个事务中),message 的 status 是 0 ,那么我们有个定时任务,根据 nextTime 和 status 来扫描得到未成功发送的消息,进行重试即可,后续消息可正常发送
  3. 假设数据库操作成功,但是数据库事务提交了,MQ有问题,使得消息发不出去,同理第二条,后续定时任务扫描重试即可。

就在两个月前,公司用的阿里云的MQ故障,导致消息发送频繁超时,就是因为我们的消息发送都做了以上的改造,因此没有影响业务(数据都正常落库,事务正常提交),部分消息发送超时,由后续补偿任务自动补偿重试。

可以想象,如果没有这个机制可能会发送两种情况:

  1. 如果消息在事务内发送,由于消息发送出错,那么事务提交失败,业务会直接受到影响,线上频繁报错(还解决不了,因为这是阿里云MQ底层升级导致的问题),妥妥P0故障。
  2. 如果消息在事务提交后发送,又没落库记录,那么消息发送超时,后续流程中断,后续需要手动补数据,能累死个人。

简要分析

一般 service 事务相关方法都用 @Transactional 修饰, messageService.send 也被 @Transactional,默认事务传播级别是 PROPAGATION_REQUIRED,继承外部事务,因此它们处于同一个事务。

然后 TransactionSynchronizationManager 可以管理当前线程的事务,内部的 TransactionSynchronizationAdapter 是一个抽象类

可以看到,它能让我们在事务提交前、后、暂停等各阶段实现一些自己的逻辑。

最后

具体操作还是很简单的,仅需一张表,一个服务的所有消息发送都能复用。

回头看看现在的业务代码,看看是不是有业务执行了但是消息没发送成功的风险?小心遇到我之前的问题,有的话赶紧改造吧!

苏三第一个付费专栏,正式上线了。


这个专栏分享了我写作这3年时间中,在公众号、B站、知乎、CSDN、掘金、今日头条等多个平台,涨粉的技巧,以及一些运营、推广和变现的方式。


还会分享一些技术文章的写作技巧。


如果你也想尝试写技术文章,或者做自媒体但是不知道怎么吸引粉丝,这个专栏或许可以帮助你。


原价399元,目前在内测阶段仅14.9元,永久买断,专栏会更新80篇文章。订阅读者每增加200人,会涨价。


扫码下方二维码即可购买专栏。

点个在看 你最好看

相关推荐

  • 基于大模型(LLM)的Agent 应用开发
  • 进字节了,50k*18薪面经曝光!
  • 前端程序员是怎么做物联网开发的
  • [译]装饰器的10年历史
  • @Resource注解实现注入
  • 一名退休的亚马逊副总裁自述:高管与普通员工的脱节
  • 传鸿蒙系统明年将不兼容安卓;OpenAI 超 700 名员工联名逼宫董事会,不解散就跳槽微软|极客头条
  • 如果按代码量算工资,也许应该这样写!
  • 前端日历实现:公历、农历、黄历、星座、节气、天干、地支、八字、星宿、五行...
  • 改变axios的用法后,我的工作效率提升了3倍
  • 开源白板+AI:画出UI需求,自动获得代码
  • Spring Boot 3.x 最简集成 Spring Doc-OpenApi
  • Vite 5.0 正式发布,整了哪些活?
  • 请珍惜现在的工作机会
  • 被誉为 JavaScript 中 最难最常见 的手写题之一!!!
  • vue props的不规范使用破坏了props的单向数据流动
  • 15篇MyBatis-Plus系列集合篇「值得收藏学习」
  • 总结了十个Vue3超级实用但是很冷门的API
  • 11月24日,OC城市行·深圳站「操作系统与AI技术应用实践沙龙」邀你参与!
  • 99%的程序员容易忽视的“系统”健康问题