springboot+redis自定义注解实现发布订阅的实现代码

 更新时间:2023年08月14日 08:54:52   作者:在下uptown  
在Redis中客户端可以通过订阅特定的频道来接收发送至该频道的消息,本文主要介绍了springboot+redis自定义注解实现发布订阅,具有一定的参考价值,感兴趣的可以了解一下

前言

最近开发了一个内部消息组件,逻辑大体是通过定义注解 @MessageHub,在启动时扫描全部bean中有使用了该注解的方法后台创建一个常驻线程代理消费数据,当线程消费到数据就回写到对应加了注解的方法里。

@Slf4j
@Service
public class RedisConsumerDemo {
    @MessageHub(topic = "${uptown.topic}", type = "REDIS_PUBSUB")
    public void consumer(Object message) {
        log.info("pubsub info {} ", message);
    }   
}

实现redis的队列、stream方式实现都很简单,唯独发布订阅方式,网上的demo全都是一个固定套路,通过redis容器注入监听器,而且回写非常死板。那么如何将这块的逻辑统一呢。之前总结过消息组件的代码设计,这里贴一下链接。

内部消息通道组件

常规写法

常规实现reids的发布订阅模式写法一共三步

创建消息监听器

@Bean 
public MessageListenerAdapter smsExpirationListener(TestSubscriber messageListener) {
    return new MessageListenerAdapter(messageListener, "onMessage");
}

创建订阅器

@Component
public class TestSubscriber implements MessageListener {
    @Override
    public void onMessage(Message message, byte[] pattern) {
        log.info("get data :{}", msg);
    }
}

向redis容器中添加消息监听器

@Configuration
public class RedisConfig {
    @Bean
    public RedisMessageListenerContainer container(
        RedisConnectionFactory redisConnectionFactory,
        MessageListenerAdapter smsExpirationListener) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisConnectionFactory);
        container.addMessageListener(smsExpirationListener, new PatternTopic("test"));
        return container;
    }
}

这样定义非常简单明了,但是有个问题是太代码僵硬了,创建监听者很不灵活,只能指定内部的onMessage方法,那么怎么才能融入到我们的内部消息流转中间件里呢。

自定义注解实现

我们内部组件抽象了两个方法,生产和消费,但这两个方法逻辑截然不同,生产方法是暴露给serverice层接口调用,调用方在调用生产方法后能直接知道生产了几条数据和成功与否。而消费方法是配合Spring生命周期函数服务启动时建立常驻消费线程的。

/**
 * 生产消息
 */
Integer producer(MessageForm messageForm);
/**
 * 消费消息
 */
void consumer(ConsumerAdapterForm adapterForm);

生产消息当然很容易实现,只需要调用已经封装好的convertAndSend方法。

stringRedisTemplate.convertAndSend(messageForm.getTopic(), messageForm.getMessage());

消费方法就有说法了,动态生成监听者的场景下使用redis容器用代码挨个注册已经满足不了了,但仔细过一遍源代码就会发现,监听类的构造方法的入参只有两个,第一个需要回调的代理类,第二个消费到数据后回调的方法。

/**
 * Create a new {@link MessageListenerAdapter} for the given delegate.
 *
 * @param delegate the delegate object
 * @param defaultListenerMethod method to call when a message comes
 * @see #getListenerMethodName
 */
public MessageListenerAdapter(Object delegate, String defaultListenerMethod) {
   this(delegate);
   setDefaultListenerMethod(defaultListenerMethod);
}

那么好了好了,方案有了,本质上就是把RedisMessageListenerContainer注入进来之后,扫描项目里所有加了 @MessageHub 的bean,包装成监听类加载到容器里就完事了。怎么扫描的代码就不再赘述了,实现Spring的生命周期函数BeanPostProcessor#postProcessAfterInitialization,在这里用AnnotationUtils判断是否标注了注解。

MessageHub annotation = AnnotationUtils.findAnnotation(method, MessageHub.class);
if (annotation == null) {
    continue;
}

标注了后判断如果是发布订阅,进入发布订阅的实现类。

@Scope(proxyMode = ScopedProxyMode.TARGET_CLASS)
@Service("redisPubSubProcessor")
public class RedisPubSubProcessor extends MessageHubServiceImpl {
    @Resource
    RedisMessageListenerContainer redisPubSubContainer;
    @Override
    public void produce(ProducerAdapterForm producerAdapterForm) {
        stringRedisTemplate.convertAndSend(producerAdapterForm.getTopic(), producerAdapterForm.getMessage());
    }
    @Override
    public void consume(ConsumerAdapterForm messageForm) {
        MessageListenerAdapter adapter = new MessageListenerAdapter(messageForm.getBean(), messageForm.getInvokeMethod().getName());
        adapter.afterPropertiesSet();
        redisPubSubContainer.addMessageListener(adapter, new PatternTopic(messageForm.getTopic()));
    }
    @Bean
    public RedisMessageListenerContainer redisPubSubContainer(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        return container;
    }
}

首先先将RedisMessageListenerContainer注入到Spring容器里,produce方法只需要调用下现程的api。consume方法由于上一步我们获取了bean和对应的method,直接用MessageListenerAdapter的构造器创建出监听器来,这里有个坑,需要手动调用adapter.afterPropertiesSet()设置一些必要的属性,这个在常规写法里框架帮忙做了。如果不调用的话会出一些空指针之类的bug。

随后把监听器add到容器就实现了方法代理,背后的线程监听到数据会回调到标注了 @MessageHub 的方法里

到此这篇关于springboot+redis自定义注解实现发布订阅的实现代码的文章就介绍到这了,更多相关springboot redis发布订阅内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Spring Boot Actuator应用监控与管理的详细步骤

    Spring Boot Actuator应用监控与管理的详细步骤

    SpringBootActuator是SpringBoot的监控工具,提供健康检查、性能指标、日志管理等核心功能,支持自定义和扩展端点,并通过SpringSecurity配置安全权限,便于生产环境应用监控与管理,本文给大家介绍Spring Boot Actuator应用监控与管理的相关知识,感兴趣的朋友一起看看吧
    2025-07-07
  • Java 数组交集的实现代码

    Java 数组交集的实现代码

    这篇文章主要介绍了Java 数组交集的实现代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-09-09
  • 详解SpringCloud使用Consul做注册中心

    详解SpringCloud使用Consul做注册中心

    这篇文章主要介绍了SpringCloud使用Consul做注册中心,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-11-11
  • Java 读取文本指定的某一行内容的方法

    Java 读取文本指定的某一行内容的方法

    今天小编就为大家分享一篇Java 读取文本指定的某一行内容的方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-07-07
  • MyBatis-Plus 中 typeHandler 的使用实例详解

    MyBatis-Plus 中 typeHandler 的使用实例详解

    本文介绍了在MyBatis-Plus中如何使用typeHandler处理json格式字段和自定义typeHandler,通过使用JacksonTypeHandler,可以简单实现将实体类字段转换为json格式存储,感兴趣的朋友跟随小编一起看看吧
    2024-10-10
  • ActiveMQ简单入门(新手必看篇)

    ActiveMQ简单入门(新手必看篇)

    下面小编就为大家带来一篇ActiveMQ简单入门(新手必看篇)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-06-06
  • Spring源码解密之默认标签的解析

    Spring源码解密之默认标签的解析

    这篇文章主要给大家介绍了关于Spring源码解密之默认标签的解析的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧。
    2018-01-01
  • Java实战之校园外卖点餐系统的实现

    Java实战之校园外卖点餐系统的实现

    这篇文章主要介绍了如何利用Java实现简易的校园外卖点餐系统,文中采用的技术有:JSP、Spring、SpringMVC、MyBatis 等,感兴趣的可以了解一下
    2022-03-03
  • SpringBoot用@Async注解实现异步任务

    SpringBoot用@Async注解实现异步任务

    这篇文章主要介绍了SpringBoot用@Async注解实现异步任务,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-12-12
  • eclipse上配置Maven的图文教程(推荐)

    eclipse上配置Maven的图文教程(推荐)

    下面小编就为大家分享一篇eclipse上配置Maven的图文教程(推荐),具有很好的参考价值。希望对大家有所帮助。一起跟随小编过来看看吧
    2017-11-11

最新评论