您的当前位置:首页正文

RabbitMQ的消息丢失、消息重复、消息积压问题

2024-12-01 来源:个人技术集锦

上一篇文章中我介绍了RabbitMQ实现分布式最终一致性的开发方案,这篇就来解决一下这个方案中存在的一些问题。

 这三个问题中最严重的就是消息丢失的问题。那我我们就来反向介绍一下这几个问题的最简单处理办法。

一、消息积压

消息积压,顾名思义就是给MQ发的消息太多了太快了,消费者消费不过来了。

 解决办法1: 我是富豪我就是有钱,ok,那我们就疯狂加服务器加消费者,直到能够消费的过来。

 解决办法2: 我是穷波一,好吧,穷波一得有自己苟活的法呀,我们再上线一个专门的队列消费服务,但是我们这个服务不去做对应的业务逻辑处理,而是直接把这些消息全部原封不动的存到我们的数据库,这样至少不会丢掉消息,我们记录到数据库,后续我们再慢慢的去处理这些消息就好啦。

举个栗子:这就像是淘宝买东西一样,假如我们的自己写的一个淘宝服务哈,我们写的很low,消费者处理消息很慢,可能我们的服务只能支持一分钟30个人购买,买的人再多就只能消息积压在MQ中,然后再有人继续购买就越等越久,最后导致积压,处理不过来,然后只能被迫给用户返回一个超时错误,用户一看,真垃圾,不买了,我们到手的money飞了。这种事我们怎么允许呢。money、money、money啊,到手的飞了,不可能!这时候我们就上线一个专门的消费者处理这些消息,因为我们不做业务逻辑处理,直接存表,所以我们的速度很快。这样我们一分钟可能就能够支持300个人购买了。太好了,money没飞掉,舒服。过后我们的服务再慢慢处理这些消息,哪怕是我第二天了才处理完,也没有事,money已经到账了,这是不是想想都开心(*^▽^*)。

二、消息重复

顾名思义 消息发送了两次或者多次

解决办法1: 其实通常我们的消费者服务接口都会实现接口的幂等性。(幂等性不懂的看我以前的文章吧~) 

解决办法2: 加防重表,用mysql啊redis啊都可以,唉这个就不想讲了,听到这个名字大家应该都能懂什么意思吧。

三、消息丢失

我滴妈,终于讲这个最重要的了,今天一大早就开始写,有点小累。

通俗理解就是 我们给MQ发了消息,但是最后我们没有消费到。

这个问题出现才是我们最懵逼,最头疼的。

但是想看懂这下面的解决办法首先要了解RabbitMQ的消息可靠投递,不懂的可以转到我以前的文章学习一下()

解决办法 1:加如MQ的消息日志表。(下面直接实战代码)

还是已最通用的电商下单逻辑为例,下面的实战代码就是我上一遍文章《基于RabbitMQ实现下单减库存的最终一致性分布式事务》中的代码基础上进一步完善的。()

1、先创建好我们用的MQ日志记录表。

public class MqMessage {

    @TableId(type = IdType.INPUT)
    private String messageId;

    private String content;

    private String toExchange;

    private String routingKey;

    private String classType;

    private int messageStatus;

    private Date createTime;

    private Date updateTime;
}

 2、提交订单,发送给MQ消息时,加入写入日志功能。

        MqMessage实体类,记录MQ的日志。

        下单业务逻辑部分 

String jsonString = JSON.toJSONString(order.getOrder());
MqMessage mqMessage = new MqMessage();
mqMessage.setMessageId(uuid);
mqMessage.setMessageStatus(0);  // 新建
mqMessage.setContent(jsonString);
mqMessage.setRoutingKey("order.create.order");
mqMessage.setToExchange("order-event-exchange");
mqMessage.setClassType(OrderEntity.class.getTypeName());
mqMessage.setCreateTime(new Date());
mqMessage.setUpdateTime(new Date());
mqMessageService.save(mqMessage);

rabbitTemplate.convertAndSend("order-event-exchange", "order.create.order", order.getOrder(), correlationData);

3、MQ的监听器,里面包含两部分的回调函数,可以判断消息发送在什么阶段。具体代码中也有详细注释。


/**
 * MQ 的回调函数
 */
@Configuration
public class MyRabbitConfig {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Autowired
    MqMessageService mqMessageService;


    @Primary
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setMessageConverter(messageConverter());
        initRabbitTemplate();
        return rabbitTemplate;
    }


    // 把消息使用json序列化的方式
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    
    /**
     * 定制 RabbitTemplate
     * 1、服务器收到消息
     * 1、spring.rabbitmq.publisher-confirms=true
     * 2、设置确认回调 setConfirmCallback
     * 2、 消息正确抵达队列 回调
     * 1、## 开启发送端消息抵达队列的确认
     * spring.rabbitmq.publisher-returns=true
     * 2、## 只要抵达队列 优先异步回调
     * spring.rabbitmq.template.mandatory=true
     * 3、消费端
     * 1、##手动ACK确认
     * spring.rabbitmq.listener.simple.acknowledge-mode=manual
     */
    public void initRabbitTemplate() {
        /**
         * ACK 确认机制
         * @param correlationData correlation data for the callback.
         * @param ack true for ack, false for nack
         * @param cause An optional cause, for nack, when available, otherwise null.
         */
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            System.out.println("==============成功发送消息到服务器回调==========");
            System.out.println("correlationData:" + correlationData + ",ack:" + ack + ",cause:" + cause);
            if (ack) {
                // 消息已成功发送到Broker并得到确认
                System.out.println("订单 消息发送成功");
                String id = correlationData.getId();
                MqMessage mqMessage = new MqMessage();
                mqMessage.setMessageId(id);
                mqMessage.setMessageStatus(1);  // 发送成功
                mqMessage.setUpdateTime(new Date());
                mqMessageService.updateById(mqMessage);

                // 在这里执行相应的逻辑
            } else {
                // 消息发送失败或未得到确认
                System.out.println("消息发送失败:" + cause);
                // 在这里执行相应的逻辑
            }
        });

        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             //             * 只要消息没投递到指定的队列,就触发失败回调
             //             * @param message the returned message.
             //             * @param i the reply code.
             //             * @param s the reply text.
             //             * @param s1 the exchange.
             //             * @param s2 the routing key.
             //             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

                System.out.println("=============交换机发送消息到消息队列失败回调==========");
                System.out.println("message===>" + message + "replyCode===>" + replyCode + "replyText==>" + replyText + "exchange==>" + exchange + "routingKey==>" + routingKey);
                System.out.println("消息为正确抵达队列。。。");
                String string = new String(message.getBody());
                OrderEntity orderEntity = JSON.parseObject(string, OrderEntity.class);
                MqMessage mqMessage = new MqMessage();
                mqMessage.setMessageId(orderEntity.getMqMessageId());
                mqMessage.setMessageStatus(2);  // 错误抵达
                mqMessage.setUpdateTime(new Date());
                mqMessageService.updateById(mqMessage);
            }
        });
    }

}

4、消费者阶段,判断是否成功处理消息。

    @RabbitHandler
    public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {
        System.out.println("过期订单, 准备关闭。。。");
        try {
            orderService.closeOrder(orderEntity);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

            String string = new String(message.getBody());
            System.out.println("记录MQ 的日志 , string=" + string);
            MqMessage mqMessage = new MqMessage();
            mqMessage.setMessageId(orderEntity.getMqMessageId());
            mqMessage.setMessageStatus(3);  // 已抵达
            mqMessage.setUpdateTime(new Date());
            mqMessageService.updateById(mqMessage);
        }catch (Exception e){
            e.printStackTrace();
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
        }
    }

ok,看一下我们的测试结果。

这样就不怕消息丢失了。

MQ的知识应该暂时就可以撒花了。坚持就是胜利。 

显示全文