Java EventBus手把手带你实现

 更新时间:2023年01月09日 16:08:50   作者:鲲鹏飞九万里  
EventBus是Guava的事件处理机制,是设计模式中观察者模式(生产/消费者编程模型)的优雅实现。本文就来和大家聊聊EventBus的使用,需要的可以参考一下

一、说明

在Guava中,EventBus简化了观察者模式的实现。理解EventBus的原理来,自动动手实现一个简单的EventBus。

二、Guava的EventBus

EventBus叫做“时间总线”,它提供了实现观察者模式的骨架代码。可以基于此框架,非常容易地在自己的业务场景中实现观察者模式。它不仅支持异步非阻塞模式,同时支持同步阻塞模式。

基于EventBus,不需要定义Observer接口(观察者接口),任意类型的对象都可以注册到EventBus中。通过@Subscribe注解来表明类中哪个函数可以接收观察者发送的消息。

Guava EventBus中的几个主要的类和函数:

EventBus、SyncEventBus

EventBus类中封装了对外暴露的所有可调用接口。其中EventBus实现了同步阻塞的观察者模式,SyncEventBus继承EventBus提供了异步非阻塞的观察者模式。

// 同步阻塞的方式
EventBus eventBus = new EventBus(); 
// 异步非阻塞的方式
final int DEFAULT_EVENTBUS_THREAD_POOL_SIZE = 20; // 异步非阻塞线程池大小
ExecutorService executorService = Executors.newFixedThreadPool(DEFAULT_EVENTBUS_THREAD_POOL_SIZE);
EventBus eventBus = new AsyncEventBus(executorService);

register() 函数

EventBus通过register()函数来注册观察者。它可以接收任意类型(Object)的观察者。具体的函数定义如下:

public void register(Object object) {
  //......
}

unregister() 函数

相对于register()unregister()函数是从EventBus中删除某个观察者。

public void unregister(Object object) {
  //......
}

post函数

EventBus提供post()函数 ,用来给观察者发消息。

public void post(Object event) {
  //......
}

post发送消息的时候,并不是把消息发送给所有的观察者,而是发送给可匹配的观察者。所谓可匹配指的是,能接收的消息类型是发送消息(post函数中定义的父类)。

比如,AObserver能接收的消息类型是XMsg,BObserver能接收的消息类型是YMsg,CObserver能接收的消息类型是ZMsg。其中,XMsg是YMsg的父类。

XMsg xMsg= new XMsg();
YMsg yMsg= new YMsg();
ZMsg zMsg= new ZMsg();
post(xMsg);// AObserver  接收消息
post(yMsg);// AObserver和BObserver接收到消息
post(zMsg);// CObserver接收到消息

Observer(观察者)能接收到消息类型是通过@Subscribe注解定义的。

@Subscribe 注解

EventBus通过@Subscribe注解类标明,某个函数能接收哪种类型的消息。(类型不能是基本类型)

三、EventBus的原理

四、动手实现一个EventBus

@Beat 标注一个公共的API(公共的类、方法或字段) 在未来的发行版本中会发生不兼容的变化。带有此注释的 API 不受其包含库所做的任何兼容性保证。请注意,此注释的存在并不意味着所讨论 API 的质量或性能,只是它不是“API 冻结”的事实。

应用程序依赖 beta API 通常是安全的,但需要在升级期间进行一些额外的工作。然而,不建议在类库(包含在用户的CLASSPATH中,不受开发人员的控制)上这么做。

4.1 定义Subscribe注解

定义Subscribe注解,用于标明哪个函数可以接收消息。

/**
 * 定义一个注解,表明观察者中的哪个函数可以接收消息
 */
@Retention(RetentionPolicy.RUNTIME)  // 注解的声明周期
@Target(ElementType.METHOD)  // 注解作用的地方
@Beta  // 标注API在未来发行的版本是可能有不兼容的变化
public @interface MySubscribe {
}

4.2 ObserverAction

用来表示@MySubscribe注解的方法。

/**
 * 用来表示 @MySubscribe 注解方法
 */
public class MyObserverAction {
    private Object target;
    private Method method;
    public MyObserverAction(Object target, Method method) {
        this.target = checkNotNull(target);
        this.method = method;
        this.method.setAccessible(true);
    }
    /**
     * event是method方法的参数
     * @param event
     */
    public void execute(Object event) {
        try {
            method.invoke(target, event);
        } catch (IllegalAccessException | InvocationTargetException  e) {
            throw new RuntimeException(e);
        }
    }
}

4.3 ObserverRegister

Observer 注册表。

/**
 * Observer 注册表
 */
public class MyObserverRegister {
    // 注册表, 消息类型: 观察者方法
    private ConcurrentMap<Class<?>, CopyOnWriteArraySet<MyObserverAction>> registry = new ConcurrentHashMap<>();
    /**
     * 将观察者注册到 注册表中
     * @param observer 观察者
     */
    public void register(Object observer) {
        Map<Class<?>, Collection<MyObserverAction>> observerActions = findAllObserverActions(observer);
        for (Map.Entry<Class<?>, Collection<MyObserverAction>> entry : observerActions.entrySet()) {
            Class<?> eventType = entry.getKey();
            Collection<MyObserverAction> evenActions = entry.getValue();
            CopyOnWriteArraySet<MyObserverAction> registryEvenActions =
                    registry.getOrDefault(eventType, new CopyOnWriteArraySet<>());
            registryEvenActions.addAll(evenActions);
            registry.put(eventType, registryEvenActions);
        }
    }
    /**
     * 获取匹配的观察者事件
     * @param event
     * @return
     */
    public List<MyObserverAction> getMatchedMyObserverActions(Object event) {
        List<MyObserverAction> result = new ArrayList<>();
        Class<?> postedEventClass = event.getClass();
        for (Map.Entry<Class<?>, CopyOnWriteArraySet<MyObserverAction>> entry : registry.entrySet()) {
            Class<?> eventClass = entry.getKey();
            // 匹配相同类型或父类型
            if (postedEventClass.isAssignableFrom(eventClass)) {
                result.addAll(entry.getValue());
            }
        }
        return result;
    }
    // 消息类型(观察者类型类型及其父类型) 观察者方法
    public Map<Class<?>, Collection<MyObserverAction>> findAllObserverActions(Object observer) {
        Map<Class<?>, Collection<MyObserverAction>> result = new HashMap<>();
        // 观察者类型
        Class<?> observerClass = observer.getClass();
        for (Method method : getAnnotatedMethods(observerClass)) {
            Class<?>[] parameterTypes = method.getParameterTypes();
            Class<?> eventType = parameterTypes[0];
            result.putIfAbsent(eventType, new ArrayList<>());
            result.get(eventType).add(new MyObserverAction(observer, method));
        }
        return result;
    }
    /**
     * 根据观察者类型,查找方法列表
     * @param clazz
     * @return
     */
    public List<Method> getAnnotatedMethods(Class<?> clazz) {
        List<Method> result = new ArrayList<>();
        for (Method method : clazz.getDeclaredMethods()) {
            if (method.isAnnotationPresent(MySubscribe.class)) {
                Class<?>[] parameterTypes = method.getParameterTypes();
                checkArgument(parameterTypes.length==1,
                        "方法%s 有一个注解@MySubscribe ,它有%s个参数,实际要求有且只有一个参数",
                        method, parameterTypes.length);
                result.add(method);
            }
        }
        return result;
    }
}

4.4 EventBus

/**
 * 实现 同步阻塞的 EventBus
 */
public class MyEventBus {
    private Executor executor;
    private MyObserverRegister register = new MyObserverRegister();
    public MyEventBus() {
        // MoreExecutors.directExecutor() 是 Google Guava 提供的工具类,看似是多线程,实际上是单线程。
        // 之所以要这么实现,主要还是为了跟 AsyncEventBus 统一代码逻辑,做到代码复用
        this(MoreExecutors.directExecutor());
    }
    // 注意这里的修饰符
    protected MyEventBus(Executor executor) {
        this.executor = executor;
    }
    public void register(Object observer) {
        register.register(observer);
    }
    public void post(Object event) {
        List<MyObserverAction> observerActions = register.getMatchedMyObserverActions(event);
        for (MyObserverAction observerAction : observerActions) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    observerAction.execute(event);
                }
            });
        }
    }
}

4.5 SyncEventBus

/**
 * 异步非阻塞的EventBus
 */
public class MySyncEventBus extends MyEventBus {
    public MySyncEventBus(Executor executor) {
        super(executor);
    }
}

五、使用自定义的EventBus

    public static void main(String[] args) {
        // 自定义的EventBus
        MyEventBus myEventBus = new MyEventBus();
        // 注册一个观察者
        myEventBus.register(new CurrentConditionsDisplayListener());
        // 向观察者发送消息
        myEventBus.post(23.0f);
    }

六、扩展

Spring Event

到此这篇关于Java EventBus手把手带你实现的文章就介绍到这了,更多相关Java EventBus内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • HDFS的Java API的访问方式实例代码

    HDFS的Java API的访问方式实例代码

    这篇文章主要介绍了HDFS的Java API的访问方式实例代码,分享了相关代码示例,小编觉得还是挺不错的,具有一定借鉴价值,需要的朋友可以参考下
    2018-02-02
  • Java中值类型和引用类型的比较与问题解决

    Java中值类型和引用类型的比较与问题解决

    这篇文章主要给大家介绍了关于Java中值类型和引用类型的比较与问题解决方法,文中通过示例代码介绍的非常详细,对大家学习或者使用Java具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧
    2019-12-12
  • Java实现仿淘宝滑动验证码研究代码详解

    Java实现仿淘宝滑动验证码研究代码详解

    这篇文章主要介绍了Java实现仿淘宝滑动验证码研究代码详解的相关资料,非常不错,具有参考借鉴价值,需要的朋友可以参考下
    2016-06-06
  • spring中的注解@@Transactional失效的场景代码演示

    spring中的注解@@Transactional失效的场景代码演示

    这篇文章主要介绍了spring中的注解@@Transactional失效的场景代码演示,@Transactional注解是Spring框架提供的用于声明事务的注解,作用于类和方法上,需要的朋友可以参考下
    2024-01-01
  • Java实现简易购物系统

    Java实现简易购物系统

    这篇文章主要为大家详细介绍了Java实现简易购物系统,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-05-05
  • java Timer 定时每天凌晨1点执行任务

    java Timer 定时每天凌晨1点执行任务

    这篇文章主要介绍了java Timer 定时每天凌晨1点执行任务的代码,代码简单易懂,非常不错,具有一定的参考借鉴价值,需要的朋友可以参考下
    2019-09-09
  • Java 模拟真正的并发请求详情

    Java 模拟真正的并发请求详情

    有时需要测试一下某个功能的并发性能,又不要想借助于其他工具,索性就自己的开发语言,来一个并发请求就最方便了。下文我们就来学习Java 如何模拟真正的并发请求
    2021-09-09
  • mybatis 实现字段大小写赋值

    mybatis 实现字段大小写赋值

    这篇文章主要介绍了mybatis 实现字段大小写赋值,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-11-11
  • Java比较两个List的值是否相等的方法

    Java比较两个List的值是否相等的方法

    这篇文章主要介绍了Java比较两个List的值是否相等的方法,涉及java针对队列比较的相关技巧,具有一定参考借鉴价值,需要的朋友可以参考下
    2015-07-07
  • Spring Security OAuth2实现使用JWT的示例代码

    Spring Security OAuth2实现使用JWT的示例代码

    这篇文章主要介绍了Spring Security OAuth2实现使用JWT的示例代码,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-09-09

最新评论