RocketMQ设计之故障规避机制

 更新时间:2022年03月21日 09:35:28   作者:周杰伦本人  
这篇文章主要介绍了RocketMQ设计之故障规避机制,故障规避机制就是用来解决当Broker出现故障,Producer不能及时感知而导致消息发送失败的问题,下面详细介绍需要的小伙伴可以参考一下

NameServer为了简化和客户端通信,发现Broker故障时并不会立即通知客户端。故障规避机制就是用来解决当Broker出现故障,Producer不能及时感知而导致消息发送失败的问题。默认不开启,如果开启,消息发送失败的时候会将失败的Broker暂时排除在队列选择列表外

MQFaultStrategy类的:

public class MQFaultStrategy {
    private final static InternalLogger log = ClientLogger.getLog();
    private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();

    private boolean sendLatencyFaultEnable = false;

    private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
    private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

    public long[] getNotAvailableDuration() {
        return notAvailableDuration;
    }

    public void setNotAvailableDuration(final long[] notAvailableDuration) {
        this.notAvailableDuration = notAvailableDuration;
    }

    public long[] getLatencyMax() {
        return latencyMax;
    }

    public void setLatencyMax(final long[] latencyMax) {
        this.latencyMax = latencyMax;
    }

    public boolean isSendLatencyFaultEnable() {
        return sendLatencyFaultEnable;
    }

    public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {
        this.sendLatencyFaultEnable = sendLatencyFaultEnable;
    }

    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        //是否开启故障延迟机制
        if (this.sendLatencyFaultEnable) {
            try {
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    //判断Queue是否可用
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            return mq;
                    }
                }

                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }

            return tpInfo.selectOneMessageQueue();
        }

        //默认轮询
        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }

    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
        if (this.sendLatencyFaultEnable) {
            long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
            this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
        }
    }

    private long computeNotAvailableDuration(final long currentLatency) {
        for (int i = latencyMax.length - 1; i >= 0; i--) {
            if (currentLatency >= latencyMax[i])
                return this.notAvailableDuration[i];
        }

        return 0;
    }
}

在选择查找路由时,选择消息队列的关键步骤:

  • 先按轮询算法选择一个消息队列
  • 从故障列表判断该消息队列是否可用

LatencyFaultToleranceImpl中判断是否可用:

@Override
public boolean isAvailable(final String name) {
    final FaultItem faultItem = this.faultItemTable.get(name);
    if (faultItem != null) {
        return faultItem.isAvailable();
    }
    return true;
}

public boolean isAvailable() {
            return (System.currentTimeMillis() - startTimestamp) >= 0;
        }
  • 判断是否在故障列表中,不在故障列表中代表可用。
  • 在故障列表中判断当前时间是否大于等于故障规避的开始时间startTimestamp

在消息发送结束后和发送出现异常时调用updateFaultItem()方法来更新故障列表,computeNotAvailableDuration()根据响应时间来计算故障周期时长,响应时间越长故障周期越长。网络异常、Broker异常、客户端异常都是固定响应时长30s,它们故障周期时长为10分钟。消息发送成功或线程中断异常响应时间在100毫秒以内,故障周期时长为0。

LatencyFaultToleranceImpl类的updateFaultItem方法:

@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
    FaultItem old = this.faultItemTable.get(name);
    if (null == old) {
        final FaultItem faultItem = new FaultItem(name);
        faultItem.setCurrentLatency(currentLatency);
        faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

        //加入故障列表
        old = this.faultItemTable.putIfAbsent(name, faultItem);
        if (old != null) {
            old.setCurrentLatency(currentLatency);
            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        }
    } else {
        old.setCurrentLatency(currentLatency);
        old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
    }
}

FaultItem存储Broker名称、响应时长、故障规避开始时间,最重要的是故障规避开始时间,用来判断Queue是否可用

到此这篇关于RocketMQ设计之故障规避机制的文章就介绍到这了,更多相关RocketMQ故障规避机制内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 浅谈JAVA中输入输出流实例详解

    浅谈JAVA中输入输出流实例详解

    Java中的流分为两种,一种是字节流,另一种是字符流。这篇文章主要介绍了JAVA中输入输出流的相关资料,需要的朋友可以参考下
    2016-07-07
  • Java实现的程序员老黄历实例

    Java实现的程序员老黄历实例

    这篇文章主要介绍了Java实现的程序员老黄历实例,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-05-05
  • 新手了解java IO基础知识

    新手了解java IO基础知识

    这篇文章主要介绍了java 基础知识之IO总结的相关资料,Java中的I/O分为两种类型,一种是顺序读取,一种是随机读取,需要的朋友可以参考下,希望对你有帮助
    2021-07-07
  • 详解java定时任务

    详解java定时任务

    这篇文章主要为大家详细介绍了java定时任务,使用JDK中的Timer定时任务来实现,感兴趣的小伙伴们可以参考一下
    2016-03-03
  • java获取本地文件的多种方式实现与比较

    java获取本地文件的多种方式实现与比较

    这篇文章主要为大家详细介绍了java获取本地文件的多种方式实现与结果比较,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下
    2023-11-11
  • PowerJob的MapProcessor工作流程源码解读

    PowerJob的MapProcessor工作流程源码解读

    这篇文章主要为大家介绍了PowerJob的MapProcessor工作流程源码解读,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2024-01-01
  • springboot+hutool批量生成二维码压缩导出功能

    springboot+hutool批量生成二维码压缩导出功能

    这篇文章主要介绍了springboot+hutool批量生成二维码压缩导出功能,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-10-10
  • Java中资源加载的方法及Spring的ResourceLoader应用小结

    Java中资源加载的方法及Spring的ResourceLoader应用小结

    在Java开发中,资源加载是一个基础而重要的操作,这篇文章主要介绍了深入理解Java中资源加载的方法及Spring的ResourceLoader应用,本文通过实例代码演示了通过ClassLoader和Class获取资源的内容,以及使用Spring的ResourceLoader加载多个资源的过程,需要的朋友可以参考下
    2024-01-01
  • java.math包下计算浮点数和整数的类的实例

    java.math包下计算浮点数和整数的类的实例

    这篇文章主要介绍了java.math包下计算浮点数和整数的类的实例代码,本文通过使用BigDecimal进行浮点数比较给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-02-02
  • 30分钟入门Java8之lambda表达式学习

    30分钟入门Java8之lambda表达式学习

    本篇文章主要介绍了30分钟入门Java8之lambda表达式学习,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-04-04

最新评论