大家好,我是苍何。

经过前面十几篇文章对 RocketMQ 进行了深度剖析,说实话,每一篇文章都查阅了大量的资料,然后是手绘图,一篇花费时长基本是在 4 小时左右。

我相信好的东西一定是可以经得起时间的考验的,而我也希望我的专栏能经得起时间的考验。

那么这一篇呢,苍何想给大家分享的是在面试中的高频面试题,如何保证消息不丢失?但凡你简历中写上消息队列,面试官贼喜欢问这个问题。

所以,我们站在 RocketMQ 的角度来深度剖析下这道难啃的面试题。

消息丢失有啥影响

消息丢失对于有些业务场景来说不痛不痒,比如用户点赞的消息丢了,对整体业务影响较小,无非就是被用户说说。

但对于有些业务,那影响可就大了,丢失的不止是消息,丢的是真金白银啊?。

比方说七夕节快到了,你在某平台上买了口红,打算送给你那貌美如花的女朋友,你兴致勃勃的下了一个单。

你等啊等,始终没等来商品出库,你于是好奇问客服,客服告诉你说,他们的系统出问题了,你的订单没有对应的扣减库存,也就没法出库。

沃日,这事听起来是不是挺搞笑,明明花钱买了东西,却收不到货,这个七夕,你只能跪在搓衣板上思考人生了?。

那对于平台来说,下单的消息丢失,会导致下游系统如库存系统、物流系统通通都收不到消息,那就造成商品显示库存和实际库存数据不一致

消息丢的直接可以把平台倒闭了…

你说影响大不大,我们来手绘个草图来简单看看这流程吧:

所以,了解如何防止消息丢失,显得就很重要了。

消息如何流转

经过前面十几章的学习,相信你对 RocketMQ 消息的流转再熟悉不过了,但老苍何还是 BB 下,防止有些小伙伴遗忘。

所以你看,消息其实经过了 3 座大山,要防止消息不丢失,那得从这 3 座大山入手。

  • 生产者:Producer 发送消息
  • 消息队列:RocketMQ Broker存储消息
  • 消费者:Consumer 消费消息

要保证消息不丢失,Producer 需要保证消息一定完整的发送到 Broker 。

Broker 一定要保证持久化存储的消息不丢失。

Consumer 要保证自己拉取的消息一定被消费。

这三者缺一不可。那接下来我们就针对这 3 座大山如何保证消息不丢失进行深入剖析。

Producer 如何保证消息不丢失

在之前的教程中,我有提到过 RocketMQ 的 ACK 确认机制。其实你要说这玩意有啥作用,其实就是来保证 Producer 确确实实发送到了消息到 Broker。

就如同 TCP 的 3 次握手和 4 次挥手,当然这里人家只是发个 ACK 心跳包来确认。

Producer 发送消息到 Broker,如果 Broker 确认收到了这一条消息,就会发送 ACK 给到 Producer,Producer 收到这个 ACK 确认消息后,才算是发送成功。

这是 Producer 同步发送保证消息不丢失的情况,当然如果是异步发送,实际上 Producer 不会等收到 ACK 才确认自己发送成功,而是会一直异步发送。

如果 Producer 没有收到 ACK,就会一直按照配置进行重试,直到达到最大重试次数,在默认 3 次重试后,Producer 就会抛出 MQClientException 异常,下面是源码:

if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
    if (times >= this.defaultMQProducer.getRetryTimesWhenSendFailed()) {
        throw new MQClientException("send failed after retrying " + times + " times", sendResult.getSendStatus().getStatus());
    }
} else {
    // success logic
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.

这时,对于 Producer 正确处理异常就显得很重要了,也就是通过 Broker 重试都还发送失败,这个消息就一定要存在本地进行兜底,以保证消息不丢失。

还是拿下单减库存的例子,通常对于订单系统,除了自身的业务逻辑,肯定有一个逻辑是发送消息到 MQ。

为了保证不会因 MQ 发送异常而导致业务流程终止,成熟的程序员通常会包一层 try-catch,捕获异常,然后把发送失败的订单存下来。

public class OrderService {

    public boolean addOrder(Order order) {
        // 保存订单
        saveOrder(order);

        try {
            // 发送添加积分消息
            SendResult sendResult = sendMessage(order);
            // 判断消息发送结果
            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                // 记录失败消息
                recordFailedOrder(order, "Message send status is not OK.");
            }
        } catch (Exception e) {
            // 捕获异常并记录日志
            log.error("send msg error", e);
            // 记录发送失败的订单
            recordFailedOrder(order, e.getMessage());
        }

        return true;
    }

    // 保存订单的方法
    private void saveOrder(Order order) {
        // 订单保存逻辑
        // 例如:保存到数据库
    }

    // 发送消息的方法
    private SendResult sendMessage(Order order) throws Exception {
        // 模拟消息发送逻辑
        // 实际中调用 RocketMQ 的 Producer 发送消息
        SendResult sendResult = new SendResult();
        sendResult.setSendStatus(SendStatus.SEND_OK);
        return sendResult;
    }

    // 记录发送失败的订单的方法
    private void recordFailedOrder(Order order, String reason) {
        // 记录订单失败逻辑
        // 例如:将失败订单保存到数据库或文件中
        log.info("Failed to send message for order: " + order.getId() + ", reason: " + reason);
    }
    
    // 示例中的日志记录
    private static final Logger log = LoggerFactory.getLogger(OrderService.class);

    // 假设 Order 类
    public static class Order {
        private String id;

        public String getId() {
            return id;
        }

        public void setId(String id) {
            this.id = id;
        }
    }

    // 模拟 SendResult 和 SendStatus 类
    public static class SendResult {
        private SendStatus sendStatus;

        public SendStatus getSendStatus() {
            return sendStatus;
        }

        public void setSendStatus(SendStatus sendStatus) {
            this.sendStatus = sendStatus;
        }
    }

    public enum SendStatus {
        SEND_OK, SEND_FAILED
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.

这样即使订单消息发送失败,也不会因小失大影响整个业务流程,而且做了日志记录和落库保存发送失败订单。

当然有些大厂,还会发送告警消息给到对应人员和补偿系统,补偿系统会按照设定去重新发送失败的订单到 MQ。

RocketMQ 如何保证存储不丢失

好了,现在假设 Producer 的消息一定发上来了,那到了 RocketMQ 服务端,我们说过 Broker 会把消息持久化到 commitlog,

其实持久化的过程并不会直接就存储到磁盘,RocketMQ 实际上用的是异步刷盘,也就是消息先写入内存,然后再由专门的线程异步刷到磁盘。

也就是默认情况下消息还在存在 Broker 的内存中,就给 Producer 返回 ACK 消息了,这个时候,如果 Broker 重启,存在内存中的消息自然就丢失了。

所以这种情况下,虽然异步刷盘性能更高(不用频繁写入磁盘),但实际是无法严格保证消息不丢失的。

所以对于有些对消息严格不能丢失的业务来说,就要更改默认的异步刷盘为同步刷盘,也就是在消息写入到磁盘之前,会阻塞发送请求,直到消息成功刷盘。

不管 Broker 在此过程中是否抽筋断了,数据是成功保存在磁盘了

当然问题很多的小明他又问了,你这也也不行啊,万一磁盘也崩溃了呢?消息一样丢失啊。

没错,但现在都 21 世纪的 2024 年了,磁盘都自带了备份,即使崩溃也是数据也是还在的,那你说假设起火服务器直接都烧了呢。

好吧,不排除这种情况,所以生产上通常会利用到 RocketMQ 的集群部署,Broker 会被分散在不同地方的服务器,且有主从 master-slaver 概念。

master 机器挂了,在 slaver 还存在着副本,无缝切换,这种方案完美倒是很完美,就是有点费钱?。

Consumer 如何保证消费不丢失

好啦,现在前面 2 座大山都保证了消息绝对不会丢失,最后压力给到了消费者这边。

我们在讲 RocketMQ 的消息模型时,说到过 Consumer 其实是会把自己的消费位点上报给 Broker,并存在 ConsumerQueue 中。

这部分不大熟悉的小伙伴可以跳转到消息模型章节。

所以要想保证消费过程中消息不丢失,在提交消费位点之前,一定要保证已经处理完业务流程,再进行消费位点的提交。

RocketMQ 防止消息丢失的核心机制

我们上面主要从生产者发送、Broker 存储、消费者消费的角度分析了如何保证消息不丢失,那从 RocketMQ 自身角度来说的话,还是提供了很多机制来保证的。

个别会和上面有冲突,但不影响,回答的形式不同。

同步刷盘
  • 同步刷盘:在消息写入到磁盘之前,会阻塞发送请求,直到消息成功刷盘。这样可以确保消息持久化到磁盘后才返回成功响应,减少因服务器宕机导致的数据丢失风险。
  • 异步刷盘:消息先写入内存,然后再由专门的线程异步刷盘。虽然性能更高,但在系统崩溃时可能会丢失尚未刷盘的消息。

默认的 RocketMQ 使用的是异步刷盘,当然是为了提升性能,但如果要保证消息的绝对不丢失,可以将默认设置修改为同步刷盘

只需要修改 broker.conf:

flushDiskType=SYNC_FLUSH
  • 1.

这个配置项将 Broker 的刷盘模式设置为同步刷盘 (SYNC_FLUSH)。默认为异步刷盘 (ASYNC_FLUSH),通过将其修改为 SYNC_FLUSH,你可以确保消息在写入磁盘前,发送者会阻塞等待,直到消息成功写入磁盘,从而提高数据的安全性。

主从同步复制

主从架构很好理解,就是一个主节点(Master)和一个或多个从节点(Slave)。

主节点负责处理所有的读写请求,而从节点则被动地从主节点同步数据,主要用作备份。

当主节点发生故障时,系统可以切换到从节点以确保服务的高可用性。

RocketMQ 默认的是异步复制,主节点在写入数据后立即向生产者返回确认,而不会等待从节点的同步完成。但如果主节点在数据尚未同步到从节点之前宕机,可能会导致部分数据丢失。

所以为了保证数据完全不丢失,我们可以修改默认为同步复制。主节点会在消息写入后,同步将数据复制到从节点,并等待从节点确认收到数据后才向生产者返回确认。

同理修改 broker.conf,添加如下配置:

brokerRole=SYNC_MASTER
  • 1.

同步复制会损失性能,但却是保证了绝对的消息不丢失,各取所需,要针对具体业务场景来设置才行。

消息重试

其实消息重试和消息确认机制,也是为了保证消息不丢失,你想一想啊,生产者发送失败了 1 次,就不再发送了,那会有多拉胯?

所以消息重试实际上也是为了保证消息不丢失的一大机制,划重点,面试可以回答这个,加分,加分?。

具体还需要看重试的机制,可以看苍何之前关于重试的文章。

总结

其实要回答好这个面试题,是需要系统的从多个角度来分析的,你可以从 RocketMQ 本身的维度,也可以从消费者、生产者维度,还可以从业务维度。

当然如果你是面试官,我希望你可以更多关注面试者的理解程度,毕竟大家都是八股选手谁也别为难谁?

好啦,有关 RocketMQ 如何防止消息丢失教程就到这里啦,有疑惑和问题欢迎大家留言讨论。

我是苍何,这是图解 RocketMQ 教程的第 13 篇,我们下篇见~