上一篇文章中我介绍了RabbitMQ实现分布式最终一致性的开发方案,这篇就来解决一下这个方案中存在的一些问题。
这三个问题中最严重的就是消息丢失的问题。那我我们就来反向介绍一下这几个问题的最简单处理办法。
消息积压,顾名思义就是给MQ发的消息太多了太快了,消费者消费不过来了。
解决办法1: 我是富豪我就是有钱,ok,那我们就疯狂加服务器加消费者,直到能够消费的过来。
解决办法2: 我是穷波一,好吧,穷波一得有自己苟活的法呀,我们再上线一个专门的队列消费服务,但是我们这个服务不去做对应的业务逻辑处理,而是直接把这些消息全部原封不动的存到我们的数据库,这样至少不会丢掉消息,我们记录到数据库,后续我们再慢慢的去处理这些消息就好啦。
举个栗子:这就像是淘宝买东西一样,假如我们的自己写的一个淘宝服务哈,我们写的很low,消费者处理消息很慢,可能我们的服务只能支持一分钟30个人购买,买的人再多就只能消息积压在MQ中,然后再有人继续购买就越等越久,最后导致积压,处理不过来,然后只能被迫给用户返回一个超时错误,用户一看,真垃圾,不买了,我们到手的money飞了。这种事我们怎么允许呢。money、money、money啊,到手的飞了,不可能!这时候我们就上线一个专门的消费者处理这些消息,因为我们不做业务逻辑处理,直接存表,所以我们的速度很快。这样我们一分钟可能就能够支持300个人购买了。太好了,money没飞掉,舒服。过后我们的服务再慢慢处理这些消息,哪怕是我第二天了才处理完,也没有事,money已经到账了,这是不是想想都开心(*^▽^*)。
顾名思义 消息发送了两次或者多次
解决办法1: 其实通常我们的消费者服务接口都会实现接口的幂等性。(幂等性不懂的看我以前的文章吧~)
解决办法2: 加防重表,用mysql啊redis啊都可以,唉这个就不想讲了,听到这个名字大家应该都能懂什么意思吧。
我滴妈,终于讲这个最重要的了,今天一大早就开始写,有点小累。
通俗理解就是 我们给MQ发了消息,但是最后我们没有消费到。
这个问题出现才是我们最懵逼,最头疼的。
但是想看懂这下面的解决办法首先要了解RabbitMQ的消息可靠投递,不懂的可以转到我以前的文章学习一下()
解决办法 1:加如MQ的消息日志表。(下面直接实战代码)
还是已最通用的电商下单逻辑为例,下面的实战代码就是我上一遍文章《基于RabbitMQ实现下单减库存的最终一致性分布式事务》中的代码基础上进一步完善的。()
public class MqMessage {
@TableId(type = IdType.INPUT)
private String messageId;
private String content;
private String toExchange;
private String routingKey;
private String classType;
private int messageStatus;
private Date createTime;
private Date updateTime;
}
MqMessage实体类,记录MQ的日志。
下单业务逻辑部分
String jsonString = JSON.toJSONString(order.getOrder());
MqMessage mqMessage = new MqMessage();
mqMessage.setMessageId(uuid);
mqMessage.setMessageStatus(0); // 新建
mqMessage.setContent(jsonString);
mqMessage.setRoutingKey("order.create.order");
mqMessage.setToExchange("order-event-exchange");
mqMessage.setClassType(OrderEntity.class.getTypeName());
mqMessage.setCreateTime(new Date());
mqMessage.setUpdateTime(new Date());
mqMessageService.save(mqMessage);
rabbitTemplate.convertAndSend("order-event-exchange", "order.create.order", order.getOrder(), correlationData);
/**
* MQ 的回调函数
*/
@Configuration
public class MyRabbitConfig {
@Autowired
RabbitTemplate rabbitTemplate;
@Autowired
MqMessageService mqMessageService;
@Primary
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setMessageConverter(messageConverter());
initRabbitTemplate();
return rabbitTemplate;
}
// 把消息使用json序列化的方式
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
/**
* 定制 RabbitTemplate
* 1、服务器收到消息
* 1、spring.rabbitmq.publisher-confirms=true
* 2、设置确认回调 setConfirmCallback
* 2、 消息正确抵达队列 回调
* 1、## 开启发送端消息抵达队列的确认
* spring.rabbitmq.publisher-returns=true
* 2、## 只要抵达队列 优先异步回调
* spring.rabbitmq.template.mandatory=true
* 3、消费端
* 1、##手动ACK确认
* spring.rabbitmq.listener.simple.acknowledge-mode=manual
*/
public void initRabbitTemplate() {
/**
* ACK 确认机制
* @param correlationData correlation data for the callback.
* @param ack true for ack, false for nack
* @param cause An optional cause, for nack, when available, otherwise null.
*/
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
System.out.println("==============成功发送消息到服务器回调==========");
System.out.println("correlationData:" + correlationData + ",ack:" + ack + ",cause:" + cause);
if (ack) {
// 消息已成功发送到Broker并得到确认
System.out.println("订单 消息发送成功");
String id = correlationData.getId();
MqMessage mqMessage = new MqMessage();
mqMessage.setMessageId(id);
mqMessage.setMessageStatus(1); // 发送成功
mqMessage.setUpdateTime(new Date());
mqMessageService.updateById(mqMessage);
// 在这里执行相应的逻辑
} else {
// 消息发送失败或未得到确认
System.out.println("消息发送失败:" + cause);
// 在这里执行相应的逻辑
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
// * 只要消息没投递到指定的队列,就触发失败回调
// * @param message the returned message.
// * @param i the reply code.
// * @param s the reply text.
// * @param s1 the exchange.
// * @param s2 the routing key.
// */
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("=============交换机发送消息到消息队列失败回调==========");
System.out.println("message===>" + message + "replyCode===>" + replyCode + "replyText==>" + replyText + "exchange==>" + exchange + "routingKey==>" + routingKey);
System.out.println("消息为正确抵达队列。。。");
String string = new String(message.getBody());
OrderEntity orderEntity = JSON.parseObject(string, OrderEntity.class);
MqMessage mqMessage = new MqMessage();
mqMessage.setMessageId(orderEntity.getMqMessageId());
mqMessage.setMessageStatus(2); // 错误抵达
mqMessage.setUpdateTime(new Date());
mqMessageService.updateById(mqMessage);
}
});
}
}
@RabbitHandler
public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {
System.out.println("过期订单, 准备关闭。。。");
try {
orderService.closeOrder(orderEntity);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
String string = new String(message.getBody());
System.out.println("记录MQ 的日志 , string=" + string);
MqMessage mqMessage = new MqMessage();
mqMessage.setMessageId(orderEntity.getMqMessageId());
mqMessage.setMessageStatus(3); // 已抵达
mqMessage.setUpdateTime(new Date());
mqMessageService.updateById(mqMessage);
}catch (Exception e){
e.printStackTrace();
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
}
}
ok,看一下我们的测试结果。
这样就不怕消息丢失了。
MQ的知识应该暂时就可以撒花了。坚持就是胜利。