redis做服务间通信工具的项目示例

 更新时间:2023年08月14日 09:00:04   作者:在下uptown  
Redis是一种高效的服务间通信工具,它以键值对的形式存储数据,并支持多种数据类型和丰富的操作,本文主要介绍了redis做服务间通信工具的项目示例,感兴趣的可以了解一下

前言

先说一下为什么要有这个东西,用消息中间件的好处就不用说了,日常开发中还是有很多场景需要用到消息传递的,消息的topic如何管理,如何约束topic,重要的topic消费记录、历史消息等就是这个sdk需要做的。

本质上只是一层对消息中间件的封装。这次只是抛砖引玉只引入redis的三种消息类型,pubsub、queue以及stream。

扩展其他中间件按着代码思路一样。望各路大佬赐教

架构设计

一个消息服务sdk首先需要具备两个能力,即生产和消费,这两个功能离不开校验topic合法性,我们姑且简单点陪在mysql数据库中,但不可能每次校验topic是否合法都要去查询数据库,这里借鉴kafka存放topic信息的思想,找一个redis的key存放所有的topic列表。

定义一个核心service接口。

public interface MessageHubService {
    /**
     * 生产消息
     */
    void producer(MessageForm messageForm);
    /**
     * 消费消息
     */
    void consumer(ConsumerAdapterForm adapterForm);
    /**
     * 检查topic、type合法性
     */
    void checkTopic(String topic, String type);
}

方法入参统一使用MessageForm类,里面定义一些基础的信息,比如哪个消息topic,哪个消息类型等等。

@Data
public class MessageForm {
    // 消息组件类型
    private String type;
    // 消息主题
    private String topic;
    private String message = "";
    // 消费者组
    private String group = "UPTOWN";
}

自从之前文章中说的文件夹改造之后特别喜欢三层结构,即service、baseServiceImpl、customizeServiceImpl。

大体就是service定义接口参数、返回类型标准化接口,baseServiceImpl实现service基础接口实现,做一些统一的拦截处理,比如校验topic合法等操作,customizeServiceImpl属于具体实现类extends baseServiceImpl实现具体逻辑。

topic白名单通过Timer维护,定义一个Timer通过lua脚本隔一段时间刷新到redis中。

基础类baseServiceImpl实现

@Service
public class MessageHubServiceImpl implements MessageHubService, ApplicationContextAware {
    @Resource
    protected StringRedisTemplate stringRedisTemplate;
    public Map<String, MessageHubService> messageHubServiceMap = new ConcurrentHashMap<>();
    private ApplicationContext applicationContext;
    @PostConstruct
    public void init() {
        messageHubServiceMap.put(TopicTypeConstants.REDIS_PUBSUB_TYPE, applicationContext.getBean(RedisPubSubProcessor.class));
        messageHubServiceMap.put(TopicTypeConstants.REDIS_STREAM_TYPE, applicationContext.getBean(RedisQueueProcessor.class));
        messageHubServiceMap.put(TopicTypeConstants.REDIS_QUEUE_TYPE, applicationContext.getBean(RedisStreamProcessor.class));
    }
    public void checkTopic(String topic, String type) {
        if (!messageHubServiceMap.containsKey(type)) {
            throw new MatrixException("消息类型不支持");
        }
        List<String> whiteTopicList = stringRedisTemplate.opsForList().range(TopicTypeConstants.WHITE_TOPIC, 0, -1);
        if ((!ObjectUtils.isEmpty(whiteTopicList) && !whiteTopicList.contains(topic)) || ObjectUtils.isEmpty(whiteTopicList)) {
            throw new MatrixException("当前topic未配置");
        }
    }
    @Override
    public void producer(MessageForm messageForm) {
        this.checkTopic(messageForm.getTopic(), messageForm.getType());
        this.messageHubServiceMap.get(messageForm.getType()).producer(messageForm);
    }
    /**
     * 消费者创建通过注解,已校验topic合法性
     */
    @Override
    public void consumer(ConsumerAdapterForm messageForm) {
        this.messageHubServiceMap.get(messageForm.getType()).consumer(messageForm);
    }
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}

具体自定义实现类

@Scope(proxyMode = ScopedProxyMode.TARGET_CLASS)
@Service("redisPubSubProcessor")
public class RedisPubSubProcessor extends MessageHubServiceImpl {
    @Override
    public void producer(MessageForm messageForm) {
        // 具体生产逻辑
    }
    @Override
    public void consumer(ConsumerAdapterForm messageForm) {
        // 具体消费逻辑
    }
}

代码非常清晰了,整体满足service、baseServiceImpl、customizeServiceImpl三层结构。

生产者逻辑

生产者API做的比较简单,只是提供一个API调用,在调用前做一些校验工作,仅仅的是一条命令,不做发送失败的重试等操作。

消费者逻辑

消费者的话还是定义一个注解,还是通过借助SpringBoot生命周期扫描注解的方式在后台建立常驻线程的方式。

@Slf4j
@Component
public class ConsumerConfig implements DisposableBean, SmartInstantiationAwareBeanPostProcessor {
    @Resource(name = "messageHubServiceImpl")
    MessageHubService messageHubService;
    @Bean(name = "redisPubSubConsumerMap")
    public Map<String, MessageListenerAdapter> redisPubSubConsumerMap() {
        return new ConcurrentHashMap<>();
    }
    @Override
    public void destroy() throws Exception {
    }
    @Override
    public Object getEarlyBeanReference(Object bean, String beanName) throws BeansException {
        Method[] methods = ReflectionUtils.getAllDeclaredMethods(bean.getClass());
        for (Method method : methods) {
            MessageHub annotation = AnnotationUtils.findAnnotation(method, MessageHub.class);
            if (annotation == null) {
                continue;
            }
            String resolveTopic = annotation.topic();
            try {
                messageHubService.checkTopic(resolveTopic, annotation.type());
            } catch (Exception e) {
                throw new Error(e.getMessage());
            }
            ConsumerAdapterForm adapterForm = new ConsumerAdapterForm();
            adapterForm.setBean(bean);
            adapterForm.setInvokeMethod(method);
            adapterForm.setTopic(resolveTopic);
            adapterForm.setType(annotation.type());
            adapterForm.setGroup(annotation.group());
            messageHubService.consumer(adapterForm);
        }
        return bean;
    }
}

这里依靠spring生命周期,拿到所有的bean,根据注解标注的方法去走不同的逻辑生成常驻线程,监听到消息之后回调到标注了注解的方法里。
具体的消费逻辑就不赘述了,感兴趣的可以看下源码:gitee.com/atuptown/up…
Topic守护线程

@Slf4j
@Service
public class TopicReloadTask extends TimerTask {
    @Resource
    StringRedisTemplate stringRedisTemplate;
    @Resource
    EntityManager entityManager;
    public final String TOPIC_SQL = " select * from MESSAGEHUB_TOPIC ";
    public final String LUA_SCRIPT =
                "redis.call('del', 'MESSAGEHUB_TOPIC')" +
                "local topics = KEYS " +
                "for i, v in pairs(topics) do " +
                "  redis.call('lpush', 'MESSAGEHUB_TOPIC', v) " +
                "end";
    @Override
    public void run() {
        try {
            List<String> topics = this.getQueryResult(TOPIC_SQL, MessageHubTopicBean.class).stream().map(MessageHubTopicBean::getTopic).collect(Collectors.toList());
            if (!ObjectUtils.isEmpty(topics)) {
                DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(LUA_SCRIPT, Long.class);
                Long result = stringRedisTemplate.execute(redisScript, topics);
                log.info("reload topic finish");
            }
        } catch (Throwable t) {
            log.error("messagehub topic reload error", t);
        }
    }
    private <T> List<T> getQueryResult(String sql, Class<T> clazz) {
        Query dataQuery = entityManager.createNativeQuery(sql, clazz);
        List<T> result = new ArrayList<>();
        List<Object> list = dataQuery.getResultList();
        for (Object o : list) {
            result.add((T) o);
        }
        return result;
    }
}

定义一个timer任务,隔一段时间将mysql中的topic白名单通过lua脚本的方式刷新到指定的reids topic key中。还有一些可以优化的地方,比如同步topic的操作只需要一个服务即可,所以可以使用@ConditionalOnProperty注解判断是否需要进行同步topic。

git地址:https://gitee.com/atuptown/uptown-messagehub

到此这篇关于redis做服务间通信工具的项目示例的文章就介绍到这了,更多相关redis 服务间通信 内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • java实现对服务器的自动巡检邮件通知

    java实现对服务器的自动巡检邮件通知

    这篇文章主要为大家详细介绍了java实现对服务器的自动巡检邮件通知,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-05-05
  • 浅谈Java变量的初始化顺序详解

    浅谈Java变量的初始化顺序详解

    本篇文章是对Java变量的初始化顺序进行了详细的分析介绍,需要的朋友参考下
    2013-06-06
  • Servlet服务端实现原理详解

    Servlet服务端实现原理详解

    Servlet是Sun公司开发动态web的一门技术,Sun公司在这些API中提供了一个接口叫做:Servlet,如果想开发一个Servlet程序,只需要完成两个小步骤:编写一个类,实现Servlet接口、把开发好的Java类部署到web服务器中。但是你了解Servlet实现的原理吗
    2022-07-07
  • Java 读取网络图片存储到本地并生成缩略图

    Java 读取网络图片存储到本地并生成缩略图

    用Java做开发经常需要处理图片。本文就来看一下如何保存图片到本地并生成缩略图
    2021-05-05
  • 详解springmvc 中controller与jsp传值

    详解springmvc 中controller与jsp传值

    本篇文章主要介绍了springmvc 中controller与jsp传值,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-07-07
  • SpringBoot2.0整合SpringCloud Finchley @hystrixcommand注解找不到解决方案

    SpringBoot2.0整合SpringCloud Finchley @hystrixcommand注解找不到解决方案

    这篇文章主要介绍了SpringBoot2.0整合SpringCloud Finchley @hystrixcommand注解找不到解决方案,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-08-08
  • Spring security登录过程逻辑详解

    Spring security登录过程逻辑详解

    这篇文章主要介绍了SSpringsecurity登录过程逻辑详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-04-04
  • Java实现字符串转换成可执行代码的方法

    Java实现字符串转换成可执行代码的方法

    今天小编就为大家分享一篇Java实现字符串转换成可执行代码的方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-07-07
  • Java实现猜数程序

    Java实现猜数程序

    这篇文章主要为大家详细介绍了Java实现猜数程序,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2020-12-12
  • Spring @Conditional通过条件控制bean注册过程

    Spring @Conditional通过条件控制bean注册过程

    这篇文章主要为大家介绍了Spring @Conditional通过条件控制bean注册过程详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-02-02

最新评论