当producer发送消息失败并且开启了重试机制(retryTimesWhenSendFailed=true或retryTimesWhenSendAsyncFailed=true),接下来将会重新选择消息队列,那么是如何进行重新选择消息队列的呢?
选择消息队列有两种方式
1.sendLatencyFaultEnable=false,默认不启用Broker故障延迟机制
2.sendLatencyFaultEnable=true,启用Broker故障延迟机制
接下来根据RocketMQ源码继续了解Broker故障延迟机制是什么
当sendLatencyFaultEnable=false时
RocketMQ源码如下
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
//lastBrokerName为上次发送失败的Broker
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
//规避上次发送失败的Broker
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
这种规避Broker发送失败的方法有一个缺陷,如果broker宕机,如果上一次选择的队列是宕机broker的第一个队列,那么下次选择的是宕机broker的第二个队列,导致消息再次发送失败。那么我们就需要利用broker故障延迟机制,在第一次broker消息发送失败后就将该broker暂时排除。
当sendLatencyFaultEnable=true时
RocketMQ源码如下
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
if (this.sendLatencyFaultEnable) {
try {
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
//验证该消息队列是否可用
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
//尝试从规避的Broker中选择一个可用的Broker
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
//移除Fault,意味着broker重新参与路由计算
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
return tpInfo.selectOneMessageQueue();
}
return tpInfo.selectOneMessageQueue(lastBrokerName);
}