Spring Boot 整合RocketMq实现消息过滤功能

 更新时间:2022年06月07日 14:29:43   作者:剑圣无痕  
这篇文章主要介绍了Spring Boot 整合RocketMq实现消息过滤,本文讲解了RocketMQ实现消息过滤,针对不同的业务场景选择合适的方案即可,需要的朋友可以参考下

简介

消息过滤是指消费者一端在消费消息时,对消息进行选择性过滤,只消费符合过滤条件的消息。 RocketMQ的消息过滤机制大致分为两种:标签过滤和类过滤。其中标签过滤又分为Tag过滤和SQL92过滤。

根据TAG过滤消息

消息发送端只能设置一个tag,消息接收端可以设置多个tag。

生产者

 public void sendTagMessage()
   {
       String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
       for(int i=0;i<10;i++)
       {
           String tag = tags[i % tags.length];
           logger.info("sendTagMessage tag is :{}",tag);
           String msg = "hello, 这是第" + (i + 1) + "条消息";
           org.springframework.messaging.Message<String> msg1 = MessageBuilder.withPayload(msg).build(); 
           rocketMQTemplate.convertAndSend("test-tag-rocketmq" + ":" + tag, msg1);
       }
   }

说明:示例中循环发送了10条消息,每条消息设置了一个tag发送过滤消息的格式为:topic:tag的形式,注意发送端只能设定一个tag。

消费者

@Component
@RocketMQMessageListener(consumerGroup="test-tagrocketmq-group",topic="test-tag-rocketmq",selectorExpression="TagA || TagC || TagD",selectorType=SelectorType.TAG, messageModel = MessageModel.CLUSTERING)
public class TagConsumer implements RocketMQListener<Object>
{
    private Logger logger =LoggerFactory.getLogger(getClass());
    @Override
    public void onMessage(Object o)
    {
        String msg=JSON.toJSONString(o);
        logger.info("send TagA || TagC || TagD  succss content is:{}", msg);
    }
}

说明:

  • selectorType:指定消息通过的tag的方式,默认为SelectorType.TAG
  • messageModel:指定消息的消费模式,默认为MessageModel.CLUSTERING模式每条消息只能由一个消费者消费,而MessageModel.BROADCASTING模式为广播模式,所有订阅者都能消费。
  • selectorExpression :指定那些Tag消息能够被消费,多个采用||分割。

测试结果

从结果我可以看出第2条为TAGC、第7条为TAGC、第8条为TAGD,第3条为TAGD,第5条为TAGA,第0条为TAGA,而消费端监听的TAG为TAGA、TAGC、TAGD所以对于不符合条件的消息进行了过滤。

根据SQL表达式过滤消息

SQL表达式方式可以根据发送消息时输入的属性进行一些计算。

RocketMQ的SQL表达式语法 只定义了一些基本的语法功能。

  • 数字比较,如>,>=,<,<=,BETWEEN,=;
  • 字符比较,如:=,<>,IN;IS NULL or IS NOT NULL;
  • 逻辑运算符:AND, OR, NOT;
  • 常量类型:
  • 数值,如:123, 3.1415;
  • 字符, 如:‘abc’, 必须使用单引号;
  • NULL,特殊常量
  • Boolean, TRUE or FALSE;

生产者

   public void sendSQLMessage()
   {
       String msg = "hello, 这是第1条消息";
       org.springframework.messaging.Message<String> message = MessageBuilder.withPayload(msg).build() ;
       Map<String, Object> headers = new HashMap<>() ;
       headers.put("i", 5) ;
       rocketMQTemplate.convertAndSend("test-sql-rocketmq", message, headers);
   }

说明:传递了参数为5进行条件判断。

消费者

@Component
@RocketMQMessageListener(consumerGroup="test-sqlrocketmq-group",topic="test-sql-rocketmq",selectorExpression = "i=5",selectorType=SelectorType.SQL92, messageModel = MessageModel.CLUSTERING)
public class SQLConsumer implements RocketMQListener<MessageExt>
{
    private Logger logger =LoggerFactory.getLogger(getClass());
    @Override
    public void onMessage(MessageExt message)
    {
        String msg=new String(message.getBody());
        String paramStr=JSON.toJSONString(message.getProperties());
        //消息内容
        logger.info("send succss content is:{}", msg);
        //消息参数
        logger.info("send mssage parma is:{}", paramStr);
    }
}

说明:

  • selectorType:指定消息通过的tag的方式,默认为SelectorType.CLUSTERING
  • messageModel:指定消息的消费模式,默认为MessageModel.CLUSTERING模式每条消息只能由一个消费者消费,而MessageModel.BROADCASTING模式为广播模式,所有订阅者都能消费。
  • selectorExpression : 采用rocketMQ支持的表达式。例如i=5

启动程序报错The broker does not support consumer to filter message by SQL92

原因:默认情况下broke没有开启对SQL语法的支持,需要修改配置

1.打开rocketmq服务下的broke.conf文件,添加如下配置即可。

2.重启broke服务即可.

测试结果

说明:只有满足SQL条件能进行消费。

总结

本文讲解了RocketMQ实现消息过滤,针对不同的业务场景选择合适的方案即可,如果疑问,请随时反馈,

到此这篇关于Spring Boot 整合RocketMq实现消息过滤的文章就介绍到这了,更多相关Spring Boot消息过滤内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 新手初学Java List 接口

    新手初学Java List 接口

    这篇文章主要介绍了Java集合操作之List接口及其实现方法,详细分析了Java集合操作中List接口原理、功能、用法及操作注意事项,需要的朋友可以参考下
    2021-07-07
  • 强烈推荐这些提升代码效率的IDEA使用技巧

    强烈推荐这些提升代码效率的IDEA使用技巧

    在平常的开发中,发现一些同事对Idea 使用的不是很熟练,仅仅用来编辑,编译,不能很好的发挥Idea 的神奇.整理了下我平常用的一些技巧,希望你能从中学习到一些.需要的朋友可以参考下
    2021-05-05
  • 基于Security实现OIDC单点登录的详细流程

    基于Security实现OIDC单点登录的详细流程

    本文主要是给大家介绍 OIDC 的核心概念以及如何通过对 Spring Security 的授权码模式进行扩展来实现 OIDC 的单点登录。对Security实现OIDC单点登录的详细过程感兴趣的朋友,一起看看吧
    2021-09-09
  • java中各种对象的比较方法

    java中各种对象的比较方法

    Java对象的比较是初学者不易掌握的,下面这篇文章主要给大家介绍了关于java中各种对象的比较方法,文中通过实例代码以及图文介绍的非常详细,需要的朋友可以参考下
    2023-04-04
  • 高吞吐、线程安全的LRU缓存详解

    高吞吐、线程安全的LRU缓存详解

    这篇文章主要介绍了高吞吐、线程安全的LRU缓存详解,分享了相关代码示例,小编觉得还是挺不错的,具有一定借鉴价值,需要的朋友可以参考下
    2018-02-02
  • Java通过JsApi方式实现微信支付

    Java通过JsApi方式实现微信支付

    本文讲解了Java如何实现JsApi方式的微信支付,代码内容详细,文章思路清晰,需要的朋友可以参考下
    2015-07-07
  • java实现学籍管理系统

    java实现学籍管理系统

    这篇文章主要为大家详细介绍了java实现学籍管理系统,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2016-12-12
  • springboot配置多数据源(静态和动态数据源)

    springboot配置多数据源(静态和动态数据源)

    在开发过程中,很多时候都会有垮数据库操作数据的情况,需要同时配置多套数据源,本文主要介绍了springboot配置多数据源(静态和动态数据源),感兴趣的可以了解一下
    2023-09-09
  • java导出Excel(非模板)可导出多个sheet方式

    java导出Excel(非模板)可导出多个sheet方式

    Java开发中,导出Excel是常见需求,有时需要支持多个Sheet导出,此技巧介绍非模板方式实现单标题单Sheet以及多Sheet导出,标题一致或不一致均可,可换成Map使用,适合个人开发者和需要Excel导出功能的场景
    2024-09-09
  • Maven构建Hadoop项目的实践步骤

    Maven构建Hadoop项目的实践步骤

    本文主要介绍了Maven构建Hadoop项目的实践步骤,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-06-06

最新评论