Java多线程之Disruptor入门

 更新时间:2021年04月29日 09:31:23   作者:EileenChang  
这篇文章主要介绍了Java多线程之Disruptor入门,文中有非常详细的代码示例,对正在学习java多线程的小伙伴们有非常好的帮助,需要的朋友可以参考下

一、Disruptor简介

Disruptor目前是世界上最快的单机消息队列,由英国外汇交易公司LMAX开发,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还获得了Oracle官方的Duke大奖。目前,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。

二、浅聊Disruptor的核心

在这里插入图片描述
  

Disruptor维护了一个环形队列RingBuffer,这个队列本质上是一个首位相连的数组。相比于LinkedBlockdingQueue,RingBuffer的数组结构在查找方面效率更高。此外,LinkedBlockingQueue需要维护一个头节点指针head和一个尾节点指针tail,而RingBuffer只需要维护一个sequence指向下一个可用的位置即可。所以从这两点来说,RingBuffer比LinkedBlockingQueue要快。

三、Disruptor使用

3.1 pom.xml

<dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.4.3</version>
        </dependency>

3.2 事件Event

Disruptor是基于事件的生产者消费者模型。其RingBuffer中存放的其实是将消息封装成的事件。这里定义了一个LongEvent,表示消息队列中存放的是long类型的数据。

public class LongEvent {
	private long value;

	public void set(long value) {
		this.value = value;
	}

    @Override
    public String toString() {
        return "LongEvent{" +
                "value=" + value +
                '}';
    }
}

3.3 EventFactory

实现EventFactory接口,定义Event工厂,用于填充队列。Event工厂其实是为了提高Disruptor的效率,初始化的时候,会调用Event工厂,对RingBuffer进行内存的提前分配,GC的频率会降低。

import com.lmax.disruptor.EventFactory;

public class LongEventFactory implements EventFactory<LongEvent> {
	public LongEvent newInstance() {
		return new LongEvent();
	}
}

3.4 EventHandler

实现EventHandler接口,定义EventHandler(消费者),处理容器中的元素。

import com.lmax.disruptor.EventHandler;

public class LongEventHandler implements EventHandler<LongEvent> {
	public void onEvent(LongEvent event, long sequence, boolean endOfBatch) {
		System.out.println("Event: " + event + ", sequence: " + sequence);
	}
}

3.5 使用Disruptor原始API发布消息

import cn.flying.space.disruptor.demo.LongEvent;
import com.lmax.disruptor.RingBuffer;

import java.nio.ByteBuffer;

/**
 * 定义一个生产者,往Disruptor中投递消息
 */
public class LongEventProducer {

    private RingBuffer<LongEvent> ringBuffer;

    public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void onData(ByteBuffer byteBuffer) {
        // 定位到下一个可存放的位置
        long sequence = ringBuffer.next();
        try {
            // 拿到该位置的event
            LongEvent event = ringBuffer.get(sequence);
            // 设置event的值
            event.set(byteBuffer.getLong(0));
        } finally {
            // 发布
            ringBuffer.publish(sequence);
        }
    }
}

import cn.flying.space.disruptor.demo.LongEvent;
import cn.flying.space.disruptor.demo.LongEventFactory;
import cn.flying.space.disruptor.demo.LongEventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;

import java.nio.ByteBuffer;
import java.util.concurrent.Executors;
public class TestMain {
    public static void main(String[] args) throws InterruptedException {
        // 定义event工厂
        LongEventFactory factory = new LongEventFactory();
        // ringBuffer长度
        int bufferSize = 1024;
        // 构造一个Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, Executors.defaultThreadFactory());
        // 绑定handler
        disruptor.handleEventsWith(new LongEventHandler());

        // 启动Disruptor
        disruptor.start();
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
        LongEventProducer producer = new LongEventProducer(ringBuffer);

        ByteBuffer byteBuffer = ByteBuffer.allocate(8);
        for (long i = 0; true; i++) {
            byteBuffer.clear();
            byteBuffer.putLong(i);
            // 投递消息
            producer.onData(byteBuffer);
            Thread.sleep(1000);
        }
    }
}

3.6 使用Translators发布消息

import cn.flying.space.disruptor.demo.LongEvent;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;

import java.nio.ByteBuffer;

public class LongEventProducerUsingTranslator {
    private RingBuffer<LongEvent> ringBuffer;
    public LongEventProducerUsingTranslator(RingBuffer<LongEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() {
        @Override
        public void translateTo(LongEvent longEvent, long l, ByteBuffer byteBuffer) {
            longEvent.set(byteBuffer.getLong(0));
        }
    };

    public void onData(ByteBuffer byteBuffer) {
        ringBuffer.publishEvent(TRANSLATOR, byteBuffer);
    }
}

import cn.flying.space.disruptor.demo.LongEvent;
import cn.flying.space.disruptor.demo.LongEventFactory;
import cn.flying.space.disruptor.demo.LongEventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;

import java.nio.ByteBuffer;

/**
 * @author ZhangSheng
 * @date 2021-4-26 14:23
 */
public class TestMain {

    public static void main(String[] args) throws InterruptedException {
        LongEventFactory factory = new LongEventFactory();
        int bufferSize = 1024;
        Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);
        disruptor.handleEventsWith(new LongEventHandler());

        disruptor.start();
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
        LongEventProducerUsingTranslator producer = new LongEventProducerUsingTranslator(ringBuffer);
        ByteBuffer byteBuffer = ByteBuffer.allocate(8);

        for (long i = 0L; true; i++) {
            byteBuffer.putLong(0, i);
            // 发布
            producer.onData(byteBuffer);
            Thread.sleep(1000);
        }
    }
}

到此这篇关于Java多线程之Disruptor入门的文章就介绍到这了,更多相关Java Disruptor入门内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java多线程(单例模式,阻塞队列,定时器,线程池)详解

    Java多线程(单例模式,阻塞队列,定时器,线程池)详解

    本文是多线程初级入门,主要介绍了多线程单例模式、阻塞队列、定时器、线程池、多线程面试考点,感兴趣的小伙伴可以跟随小编一起了解一下
    2022-09-09
  • Caused by: java.lang.ClassNotFoundException: org.objectweb.asm.Type异常

    Caused by: java.lang.ClassNotFoundException: org.objectweb.a

    这篇文章主要介绍了Caused by: java.lang.ClassNotFoundException: org.objectweb.asm.Type异常,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-07-07
  • Java虚拟机JVM性能优化(三):垃圾收集详解

    Java虚拟机JVM性能优化(三):垃圾收集详解

    这篇文章主要介绍了Java虚拟机JVM性能优化(三):垃圾收集详解,本文讲解了众多的JVM垃圾收集器知识点,需要的朋友可以参考下
    2014-09-09
  • Intellij IDEA导入JAVA项目并启动(图文教程)

    Intellij IDEA导入JAVA项目并启动(图文教程)

    这篇文章主要介绍了Intellij IDEA导入JAVA项目并启动,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-08-08
  • Java 8实现任意参数的单链表

    Java 8实现任意参数的单链表

    这篇文章主要为大家详细介绍了Java 8实现任意参数的单链表,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2020-10-10
  • SpringBoot+Logback实现一个简单的链路追踪功能

    SpringBoot+Logback实现一个简单的链路追踪功能

    Spring Boot默认使用LogBack日志系统,并且已经引入了相关的jar包,所以我们无需任何配置便可以使用LogBack打印日志。这篇文章主要介绍了SpringBoot+Logback实现一个简单的链路追踪功能,需要的朋友可以参考下
    2019-10-10
  • SpringMVC---配置与使用的示例

    SpringMVC---配置与使用的示例

    这篇文章主要介绍了SpringMVC---配置与使用的示例,帮助大家更好的理解和学习spring框架,感兴趣的朋友可以了解下
    2020-10-10
  • Java 电话号码的组合示例详解

    Java 电话号码的组合示例详解

    这篇文章主要介绍了Java 电话号码的组合,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-03-03
  • MyBatis拦截器如何自动设置创建时间和修改时间

    MyBatis拦截器如何自动设置创建时间和修改时间

    文章介绍了如何通过实现MyBatis的Interceptor接口,在实体类中自动设置创建时间和修改时间,从而提高开发效率
    2025-02-02
  • Mybatis plus中使用in查询出错如何解决

    Mybatis plus中使用in查询出错如何解决

    这篇文章主要介绍了Mybatis plus中使用in查询出错的问题及解决方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-08-08

最新评论