SpringBoot disruptor高性能队列使用

 更新时间:2023年02月02日 16:00:48   作者:ldcaws  
这篇文章主要介绍了SpringBoot disruptor高性能队列使用,Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题

Disruptor是一个高性能队列,常见的还有kafka、rabbitmq等,下面体验一下~

1、Disruptor简介

Disruptor 是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于 Disruptor 开发的系统单线程能支撑每秒 600 万订单,2010 年在 QCon 演讲后,获得了业界关注。

其特点简单总结如下:

  • 开源的java框架,用于生产者-消费者场景;
  • 高吞吐量和低延迟;
  • 有界队列;

disruptor在github网址为:https://github.com/LMAX-Exchange/disruptor

2、Disruptor概念

  • Ring Buffer:环形的缓冲区,环形数组中的元素采用覆盖方式,避免了jvm的GC;
  • Sequence Disruptor:通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理;
  • Sequencer:Sequencer 是 Disruptor 的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法;
  • Sequence Barrier:用于保持对RingBuffer的 main published Sequence 和Consumer依赖的其它Consumer的 Sequence 的引用;
  • Wait Strategy:定义 Consumer 如何进行等待下一个事件的策略;
  • Event:在 Disruptor 的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定;
  • EventProcessor:EventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop);
  • EventHandler:定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现;
  • Producer:生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型;

3、springboot+disruptor实例

在pom.xml文件中添加依赖

		<dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.3.4</version>
        </dependency>

消息体Model

@Data
public class MessageModel {
    private String message;
}

构造EventFactory

public class HelloEventFactory implements EventFactory<MessageModel> {
    @Override
    public MessageModel newInstance() {
        return new MessageModel();
    }
}

构造消费者

@Slf4j
public class HelloEventHandler implements EventHandler<MessageModel> {
    @Override
    public void onEvent(MessageModel event, long sequence, boolean endOfBatch) {
        try {
            //这里停止1000ms是为了确定消费消息是异步的
            Thread.sleep(1000);
            log.info("消费者处理消息开始");
            if (event != null) {
                log.info("消费者消费的信息是:{}",event);
            }
        } catch (Exception e) {
            log.info("消费者处理消息失败");
        }
        log.info("消费者处理消息结束");
    }
}

构造MQManager

@Configuration
public class MqManager {
    @Bean("messageModel")
    public RingBuffer<MessageModel> messageModelRingBuffer() {
        //定义用于事件处理的线程池, Disruptor通过java.util.concurrent.ExecutorSerivce提供的线程来触发consumer的事件处理
        ExecutorService executor = Executors.newFixedThreadPool(2);
        //指定事件工厂
        HelloEventFactory factory = new HelloEventFactory();
        //指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率
        int bufferSize = 1024 * 256;
        //单线程模式,获取额外的性能
        Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor, ProducerType.SINGLE, new BlockingWaitStrategy());
        //设置事件业务处理器---消费者
        disruptor.handleEventsWith(new HelloEventHandler());
        //启动disruptor线程
        disruptor.start();
        //获取ringbuffer环,用于接取生产者生产的事件
        RingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();
        return ringBuffer;
    }
}

构造生产者

@Configuration
public class MqManager {
    @Bean("messageModel")
    public RingBuffer<MessageModel> messageModelRingBuffer() {
        //定义用于事件处理的线程池, Disruptor通过java.util.concurrent.ExecutorSerivce提供的线程来触发consumer的事件处理
        ExecutorService executor = Executors.newFixedThreadPool(2);
        //指定事件工厂
        HelloEventFactory factory = new HelloEventFactory();
        //指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率
        int bufferSize = 1024 * 256;
        //单线程模式,获取额外的性能
        Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor, ProducerType.SINGLE, new BlockingWaitStrategy());
        //设置事件业务处理器---消费者
        disruptor.handleEventsWith(new HelloEventHandler());
        //启动disruptor线程
        disruptor.start();
        //获取ringbuffer环,用于接取生产者生产的事件
        RingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();
        return ringBuffer;
    }
}

测试

	/**
     * 项目内部使用Disruptor做消息队列
     * @throws Exception
     */
    @Test
    public void sayHelloMqTest() throws Exception{
        helloEventProducer.sayHelloMq("Hello world!");
        log.info("消息队列已发送完毕");
        //这里停止2000ms是为了确定是处理消息是异步的
        Thread.sleep(2000);
    }

运行结果如下

4、小结

引用disruptor作为内部的高性能队列,应用于生产者-消费者模式中还是非常nice的,后面若有工程需求可以尝试一下。

到此这篇关于SpringBoot disruptor高性能队列使用的文章就介绍到这了,更多相关SpringBoot disruptor内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 关于Java中Bean的生命周期详解

    关于Java中Bean的生命周期详解

    这篇文章主要介绍了关于Java中Bean的生命周期详解,所谓的⽣命周期指的是⼀个对象从诞⽣到销毁的整个⽣命过程,我们把这个过程就叫做⼀个对象的⽣命周期,需要的朋友可以参考下
    2023-08-08
  • springMVC盗链接详解

    springMVC盗链接详解

    这篇文章主要为大家详细介绍了SpringMVC盗链接详解,具有一定的参考价值,感兴趣的小伙伴们可以参考一下,希望能给你带来帮助
    2021-07-07
  • Jenkins安装与配置及汉化过程

    Jenkins安装与配置及汉化过程

    这篇文章主要介绍了Jenkins安装与配置及汉化过程,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-04-04
  • IntelliJ IDEA右键文件夹没有Java Class文件的原因及解决方法

    IntelliJ IDEA右键文件夹没有Java Class文件的原因及解决方法

    这篇文章主要介绍了IntelliJ IDEA右键文件夹没有Java Class文件的原因及解决方法,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-09-09
  • SpringBoot如何统一清理数据

    SpringBoot如何统一清理数据

    这篇文章主要介绍了SpringBoot如何统一清理数据问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-01-01
  • 详解Spring配置及事务的使用

    详解Spring配置及事务的使用

    这篇文章主要介绍了详解Spring配置及事务的使用,文中附含详细的示例代码说明,有需要的朋友可以借鉴参考下,希望能够有所帮助
    2021-09-09
  • SpringBoot实现网站的登陆注册逻辑记录

    SpringBoot实现网站的登陆注册逻辑记录

    登陆注册功能是我们日常开发中经常遇到的一个功能,下面这篇文章主要给大家介绍了关于SpringBoot实现网站的登陆注册逻辑的相关资料,文中通过示例代码介绍的非常详细,需要的朋友可以参考下
    2021-10-10
  • java内存异常使用导致full gc频繁

    java内存异常使用导致full gc频繁

    Full GC是Java虚拟机中垃圾回收的一种方式,它会暂停应用程序所有的线程并清理整个堆内存。频繁的Full GC会导致应用程序的性能下降,甚至出现长时间的停顿。Java内存异常使用常常是Full GC频繁出现的原因之一,如使用大量的静态变量、内存泄漏等。
    2023-04-04
  • Java倒计时三种实现方式代码实例

    Java倒计时三种实现方式代码实例

    这篇文章主要介绍了Java倒计时三种实现方式代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-07-07
  • Spring Boot Maven插件使用详解

    Spring Boot Maven插件使用详解

    这篇文章主要为大家详细介绍了Spring Boot Maven插件使用方法,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-10-10

最新评论