SpringBoot基于Disruptor实现高效的消息队列 

 更新时间:2024年02月22日 09:04:48   作者:wx59bcc77095d22  
Disruptor是一个开源的Java框架,它被设计用于在生产者-消费者问题上获得尽量高的吞吐量和尽量低的延迟,本文主要介绍了SpringBoot基于Disruptor实现高效的消息队列 ,具有一定的参考价值,感兴趣的可以了解一下

一、前言

Disruptor是一个开源的Java框架,它被设计用于在生产者-消费者问题上获得尽量高的吞吐量和尽量低的延迟,从功能上来看Disruptor是实现了队列的功能,而且是一个有界队列。那么它的应用场景自然就是“生产者-消费者”模型的应用场合了。Disruptor 是在内存中以队列的方式去实现的,而且是无锁的。这也是 Disruptor 为什么高效的原因。

二、SpringBoot整合Disruptor

1.添加依赖

<!--Disruptor-->
<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.4</version>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>

2.创建消息体实体

package com.example.aopdemo.disruptor;

import lombok.Data;

/**
 * @author qx
 * @date 2024/2/21
 * @des 消息体
 */
@Data
public class MessageModel {

    private String message;

}

3.创建事件工厂类

package com.example.aopdemo.disruptor;

import com.lmax.disruptor.EventFactory;

/**
 * @author qx
 * @date 2024/2/21
 * @des 事件工厂类
 */
public class MessageEventFactory implements EventFactory<MessageModel> {
    @Override
    public MessageModel newInstance() {
        return new MessageModel();
    }
}

4.创建消费者

package com.example.aopdemo.disruptor;

import com.lmax.disruptor.EventHandler;
import lombok.extern.slf4j.Slf4j;

/**
 * @author qx
 * @date 2024/2/21
 * @des 消息消费者
 */
@Slf4j
public class MessageEventHandler implements EventHandler<MessageModel> {
    @Override
    public void onEvent(MessageModel messageModel, long sequence, boolean endOfBatch) {
        log.info("消费者获取消息:{}", messageModel);
    }
}

5.构造BeanManager

package com.example.aopdemo.disruptor;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

/**
 * @author qx
 * @date 2024/2/21
 * @des
 */
@Component
public class BeanManager implements ApplicationContextAware {

    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        BeanManager.applicationContext = applicationContext;
    }

    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }

    public static Object getBean(String name) {
        return applicationContext.getBean(name);
    }

    public static <T> T getBean(Class<T> clazz) {
        return applicationContext.getBean(clazz);
    }
}

6.创建消息管理器

package com.example.aopdemo.disruptor;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author qx
 * @date 2024/2/21
 * @des 事件管理器
 */
@Configuration
public class MessageManager {

    @Bean("messageModel")
    public RingBuffer<MessageModel> messageModelRingBuffer() {
        // 定义线程池
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        // 指定事件工厂
        MessageEventFactory factory = new MessageEventFactory();

        // 指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率
        int bufferSize = 1024 * 256;

        //单线程模式,获取额外的性能
        Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executorService, ProducerType.SINGLE, new BlockingWaitStrategy());

        //设置事件业务处理器---消费者
        disruptor.handleEventsWith(new MessageEventHandler());

        //启动disruptor线程
        disruptor.start();

        //获取ringbuffer环,用于接取生产者生产的事件
        return disruptor.getRingBuffer();
    }

}

7.创建生产者

package com.example.aopdemo.disruptor;

import com.lmax.disruptor.RingBuffer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * @author qx
 * @date 2024/2/21
 * @des 生产者
 */
@Service
@Slf4j
public class DisruptorService {

    @Autowired
    private RingBuffer<MessageModel> messageModelRingBuffer;

    public void sayMessage(String message) {
        // 获取下一个Event槽的下标
        long sequence = messageModelRingBuffer.next();
        try {
            // 填充数据
            MessageModel messageModel = messageModelRingBuffer.get(sequence);
            messageModel.setMessage(message);
            log.info("往消息队列中添加消息:{}", messageModel);
        } catch (Exception e) {
            log.error("failed to add event to messageModelRingBuffer for : e = {},{}", e, e.getMessage());
        } finally {
            //发布Event,激活观察者去消费,将sequence传递给改消费者
            //注意最后的publish方法必须放在finally中以确保必须得到调用;如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producer
            messageModelRingBuffer.publish(sequence);
        }

    }

}

8.创建测试类

package com.example.aopdemo.controller;

import com.example.aopdemo.disruptor.DisruptorService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author qx
 * @date 2024/2/21
 * @des Disruptor测试
 */
@RestController
public class DisruptorController {

    @Autowired
    private DisruptorService disruptorService;

    @GetMapping("/disruptor")
    public String disruptorTest(String message) {
        disruptorService.sayMessage(message);
        return "发送消息成功";
    }
}

9.测试

启动程序,在浏览器访问请求连接进行测试。

我们在控制台上可以获取到消息的发送和接收信息。

2024-02-21 15:22:16.059  INFO 6788 --- [nio-8080-exec-1] c.e.aopdemo.disruptor.DisruptorService   : 往消息队列中添加消息:MessageModel(message=hello)
2024-02-21 15:22:16.060  INFO 6788 --- [pool-1-thread-1] c.e.a.disruptor.MessageEventHandler      : 消费者获取消息:MessageModel(message=hello)

到此这篇关于SpringBoot基于Disruptor实现高效的消息队列 的文章就介绍到这了,更多相关SpringBoot Disruptor消息队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java实现数组去除重复数据的方法详解

    Java实现数组去除重复数据的方法详解

    这篇文章主要介绍了Java实现数组去除重复数据的方法,结合实例形式详细分析了java数组去除重复的几种常用方法、实现原理与相关注意事项,需要的朋友可以参考下
    2017-09-09
  • SpringBoot+layui实现文件上传功能

    SpringBoot+layui实现文件上传功能

    Spring Boot是由Pivotal团队提供的全新框架,其设计目的是用来简化新Spring应用的初始搭建以及开发过程。这篇文章主要介绍了SpringBoot+layui实现文件上传,需要的朋友可以参考下
    2018-09-09
  • Java基础之自定义类加载器

    Java基础之自定义类加载器

    应该有很多小伙伴还不了解Java自定义类加载器吧,下文中有对Java自定义类加载器非常详细的描述,还有小伙伴们最喜欢的代码环节,需要的朋友可以参考下
    2021-05-05
  • 查看Java所支持的语言及相应的版本信息

    查看Java所支持的语言及相应的版本信息

    Java语言作为第一种支持国际化的语言,在Internet从一开始就具有其他语言无与伦比的国际化的本质特性,查看Java所支持的语言及相应的版本信息可以采用以下代码进行查询
    2014-01-01
  • 一文详解如何使用Java来发送qq邮箱邮件

    一文详解如何使用Java来发送qq邮箱邮件

    这篇文章主要给大家介绍了关于如何使用Java来发送qq邮箱邮件的相关资料,文中降了准备工作(开启服务并生成授权码)、接口调用(引入依赖和编写接口代码)、发送HTML格式邮件等内容,需要的朋友可以参考下
    2024-12-12
  • Java用递归方法解决汉诺塔问题详解

    Java用递归方法解决汉诺塔问题详解

    汉诺塔问题是一个经典的问题。汉诺塔(Hanoi Tower),又称河内塔,源于印度一个古老传说。本文将用Java递归方法求解这一问题,感兴趣的可以学习一下
    2022-04-04
  • Java的中lombok下的@Builder注解用法详解

    Java的中lombok下的@Builder注解用法详解

    这篇文章主要介绍了Java的中lombok下的@Builder注解用法详解,lombok注解在java进行编译时进行代码的构建,对于java对象的创建工作它可以更优雅,不需要写多余的重复的代码,在出现lombok之后,对象的创建工作更提供Builder方法,需要的朋友可以参考下
    2023-11-11
  • 如何开发基于Netty的HTTP/HTTPS应用程序

    如何开发基于Netty的HTTP/HTTPS应用程序

    HTTP/HTTPS是最常见的协议套件之一,并且随着智能手机的成功,它的应用也日益广泛,因为对于任何公司来说,拥有一个可以被移动设备访问的网站几乎是必须的。下面就来看看如何开发基于Netty的HTTP/HTTPS应用程序
    2021-06-06
  • Java线程池由浅入深掌握到精通

    Java线程池由浅入深掌握到精通

    什么是线程池?很简单,简单看名字就知道是装有线程的池子,我们可以把要执行的多线程交给线程池来处理,和连接池的概念一样,通过维护一定数量的线程池来达到多个线程的复用
    2021-09-09
  • Java 输入流中的read(byte[] b)方法详解

    Java 输入流中的read(byte[] b)方法详解

    这篇文章主要介绍了Java 输入流中的read(byte[] b)方法详解,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-01-01

最新评论