图文并茂讲解RocketMQ消息类别

 更新时间:2022年12月27日 15:35:42   作者:一个双子座的Java攻城狮  
这篇文章主要介绍了图文并茂讲解RocketMQ消息类别,RocketMQ对于消息提供了很多用法,包括:同步消息、异步消息、单向发送、顺序消息、延时消息、批量消息、过滤消息、事务消息等

1、同步消息

即时性较强,重要的消息,且必须有回执的消息,例如短信,通知(转账成功)

生产者:

public class Producer {
    public static void main(String[] args) throws Exception{
        DefaultMQProducer producer=new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.23.127:9876");
        producer.start();
        for (int i = 1; i <= 5; i++) {
            Message msg = new Message("topic2",("同步消息:hello rocketmq "+i).getBytes("UTF-8"));
            //同步消息发送
            SendResult result = producer.send(msg);
            System.out.println("返回结果:"+result);
        }
        producer.shutdown();
    }
}

消费者:

public class Consumer {
    public static void main(String[] args) throws Exception{
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1");
        consumer.setNamesrvAddr("192.168.23.127:9876");
        consumer.subscribe("topic2","*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    //System.out.println("收到消息:"+msg);
                    System.out.println("消息:"+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功处理, mq 收到这个 标记后相同的消息讲不会再次发给消费者
            }
        });
        consumer.start();// 开启多线程 监控消息,持续运行
        System.out.println("接收消息服务已运行");
    }
}

测试:

2、异步消息

即时性较弱,但需要有回执的消息,例如订单中的某些信息

生产者:

public class Producer {
    public static void main(String[] args) throws Exception{
        DefaultMQProducer producer=new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.23.127:9876");
        producer.start();
        for (int i = 1; i <= 5; i++) {
            //异步消息发送
            Message msg = new Message("topic2",("异步消息:hello rocketmq "+i).getBytes("UTF-8"));
            producer.send(msg, new SendCallback() {
                //表示成功返回结果
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println(sendResult);
                }
                //表示发送消息失败
                @Override
                public void onException(Throwable throwable) {
                    System.out.println(throwable);
                }
            });
        }
        //添加一个休眠操作,确保异步消息返回后能够输出
        TimeUnit.SECONDS.sleep(10);
        producer.shutdown();
    }
}

消费者:

public class Consumer {
    public static void main(String[] args) throws Exception{
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1");
        consumer.setNamesrvAddr("192.168.23.127:9876");
        consumer.subscribe("topic2","*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    //System.out.println("收到消息:"+msg);
                    System.out.println("消息:"+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功处理, mq 收到这个 标记后相同的消息讲不会再次发给消费者
            }
        });
        consumer.start();// 开启多线程 监控消息,持续运行
        System.out.println("接收消息服务已运行");
    }
}

测试:

3、单向消息

不需要有回执的消息,例如日志类消息

生产者:

public class Producer {
    public static void main(String[] args) throws Exception{
        DefaultMQProducer producer=new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.23.127:9876");
        producer.start();
        for (int i = 1; i <= 5; i++) {
            //单向消息
            Message msg = new Message("topic2",("单向消息:hello rocketmq "+i).getBytes("UTF-8"));
            producer.sendOneway(msg);
        }
        //添加一个休眠操作,确保异步消息返回后能够输出
        TimeUnit.SECONDS.sleep(10);
        producer.shutdown();
    }
}

消费者代码同上

测试:

总结 同步消息

SendResult result = producer.send(msg);

异步消息(回调处理结果必须在生产者进程结束前执行,否则回调无法正确执行)

		producer.send(msg, new SendCallback() {
                //表示成功返回结果
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println(sendResult);
                }
                //表示发送消息失败
                @Override
                public void onException(Throwable throwable) {
                    System.out.println(throwable);
                }
            });

单向消息

producer.sendOneway(msg);

到此这篇关于图文并茂讲解RocketMQ消息类别的文章就介绍到这了,更多相关RocketMQ消息类别内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Spring Boot统一异常处理详解

    Spring Boot统一异常处理详解

    我们在做Web应用的时候,请求处理过程中发生错误是非常常见的情况。这个时候就需要统一异常处理了,这篇文章主要给大家介绍了Spring Boot如何进行统一异常处理的相关资料,需要的朋友可以参考借鉴,下面来一起看看吧。
    2017-02-02
  • Java函数式编程(八):字符串及方法引用

    Java函数式编程(八):字符串及方法引用

    这篇文章主要介绍了Java函数式编程(八):字符串及方法引用,本文是系列文章的第8篇,其它文章请参阅本文底部的相关文章,需要的朋友可以参考下
    2014-09-09
  • Java中的TreeSet源码解读

    Java中的TreeSet源码解读

    这篇文章主要介绍了Java中的TreeSet源码解读,TreeSet 是一个 有序集合,它扩展了 AbstractSet 类并实现了 NavigableSet 接口,对象根据其自然顺序以升序排序和存储,该 TreeSet 中使用 平衡树,更具体的一个 红黑树,需要的朋友可以参考下
    2023-09-09
  • Java设计模式中的策略(Strategy)模式解读

    Java设计模式中的策略(Strategy)模式解读

    这篇文章主要介绍了Java设计模式中的策略(Strategy)模式解读,对象的某个行为,在不同场景有不同实现方式,可以将这些行为的具体实现定义为一组策略,每个实现类实现一种策略,在不同场景使用不同的实现,并且可以自由切换策略,需要的朋友可以参考下
    2023-10-10
  • java文件对话框过滤特定文件类型示例

    java文件对话框过滤特定文件类型示例

    文件作为存储数据的单元,会根据数据类型产生很多分类,也就是所谓的文件类型。在对数据文件进行操作时,常常需要根据不同的文件类型来作不同的处理。本实例实现的是读取文件夹指定类型的文件并显示到表格控件中
    2014-02-02
  • java compareTo和compare方法比较详解

    java compareTo和compare方法比较详解

    这篇文章主要介绍了java compareTo和compare方法比较详解,本篇文章通过简要的案例,讲解了该项技术的了解与使用,以下就是详细内容,需要的朋友可以参考下
    2021-09-09
  • idea maven 经常主目录自动变回默认的解决方法

    idea maven 经常主目录自动变回默认的解决方法

    很多朋友反映idea maven 经常主目录自动变回默认,遇到这样的问题真的很头疼,该如何解决呢?下面小编给大家介绍下idea maven目录变回默认的解决方法,需要的朋友可以参考下
    2022-08-08
  • 解读Jvm的内存结构与GC及jvm参数调优

    解读Jvm的内存结构与GC及jvm参数调优

    这篇文章主要介绍了解读Jvm的内存结构与GC及jvm参数调优方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-05-05
  • java调用淘宝api联网查询ip归属地

    java调用淘宝api联网查询ip归属地

    java联网查询IP归属地,原理是根据淘宝提供的service查询IP的归属地并且解析http请求返回的json串
    2014-03-03
  • 使用SpringCloud Gateway解决跨域问题

    使用SpringCloud Gateway解决跨域问题

    本文给大家介绍了使用SpringCloud Gateway解决跨域问题,Spring Cloud Gateway是一个基于Spring Framework的微服务网关,使用Spring Cloud Gateway的跨域配置能够有效管理不同服务之间的通信,提高系统的可维护性和安全性,需要的朋友可以参考下
    2024-02-02

最新评论