图文并茂讲解RocketMQ消息类别

 更新时间:2022年12月27日 15:35:42   作者:一个双子座的Java攻城狮  
这篇文章主要介绍了图文并茂讲解RocketMQ消息类别,RocketMQ对于消息提供了很多用法,包括:同步消息、异步消息、单向发送、顺序消息、延时消息、批量消息、过滤消息、事务消息等

1、同步消息

即时性较强,重要的消息,且必须有回执的消息,例如短信,通知(转账成功)

生产者:

public class Producer {
    public static void main(String[] args) throws Exception{
        DefaultMQProducer producer=new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.23.127:9876");
        producer.start();
        for (int i = 1; i <= 5; i++) {
            Message msg = new Message("topic2",("同步消息:hello rocketmq "+i).getBytes("UTF-8"));
            //同步消息发送
            SendResult result = producer.send(msg);
            System.out.println("返回结果:"+result);
        }
        producer.shutdown();
    }
}

消费者:

public class Consumer {
    public static void main(String[] args) throws Exception{
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1");
        consumer.setNamesrvAddr("192.168.23.127:9876");
        consumer.subscribe("topic2","*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    //System.out.println("收到消息:"+msg);
                    System.out.println("消息:"+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功处理, mq 收到这个 标记后相同的消息讲不会再次发给消费者
            }
        });
        consumer.start();// 开启多线程 监控消息,持续运行
        System.out.println("接收消息服务已运行");
    }
}

测试:

2、异步消息

即时性较弱,但需要有回执的消息,例如订单中的某些信息

生产者:

public class Producer {
    public static void main(String[] args) throws Exception{
        DefaultMQProducer producer=new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.23.127:9876");
        producer.start();
        for (int i = 1; i <= 5; i++) {
            //异步消息发送
            Message msg = new Message("topic2",("异步消息:hello rocketmq "+i).getBytes("UTF-8"));
            producer.send(msg, new SendCallback() {
                //表示成功返回结果
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println(sendResult);
                }
                //表示发送消息失败
                @Override
                public void onException(Throwable throwable) {
                    System.out.println(throwable);
                }
            });
        }
        //添加一个休眠操作,确保异步消息返回后能够输出
        TimeUnit.SECONDS.sleep(10);
        producer.shutdown();
    }
}

消费者:

public class Consumer {
    public static void main(String[] args) throws Exception{
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1");
        consumer.setNamesrvAddr("192.168.23.127:9876");
        consumer.subscribe("topic2","*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    //System.out.println("收到消息:"+msg);
                    System.out.println("消息:"+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功处理, mq 收到这个 标记后相同的消息讲不会再次发给消费者
            }
        });
        consumer.start();// 开启多线程 监控消息,持续运行
        System.out.println("接收消息服务已运行");
    }
}

测试:

3、单向消息

不需要有回执的消息,例如日志类消息

生产者:

public class Producer {
    public static void main(String[] args) throws Exception{
        DefaultMQProducer producer=new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.23.127:9876");
        producer.start();
        for (int i = 1; i <= 5; i++) {
            //单向消息
            Message msg = new Message("topic2",("单向消息:hello rocketmq "+i).getBytes("UTF-8"));
            producer.sendOneway(msg);
        }
        //添加一个休眠操作,确保异步消息返回后能够输出
        TimeUnit.SECONDS.sleep(10);
        producer.shutdown();
    }
}

消费者代码同上

测试:

总结 同步消息

SendResult result = producer.send(msg);

异步消息(回调处理结果必须在生产者进程结束前执行,否则回调无法正确执行)

		producer.send(msg, new SendCallback() {
                //表示成功返回结果
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println(sendResult);
                }
                //表示发送消息失败
                @Override
                public void onException(Throwable throwable) {
                    System.out.println(throwable);
                }
            });

单向消息

producer.sendOneway(msg);

到此这篇关于图文并茂讲解RocketMQ消息类别的文章就介绍到这了,更多相关RocketMQ消息类别内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Spring Boot中@Validated注解不生效问题汇总大全

    Spring Boot中@Validated注解不生效问题汇总大全

    这篇文章主要给大家介绍了关于Spring Boot中@Validated注解不生效问题汇总的相关资料,@Validated注解是Spring框架中的一个注解,用于在方法参数上添加参数校验规则,需要的朋友可以参考下
    2023-07-07
  • 如何对 Excel 表格中提取的数据进行批量更新

    如何对 Excel 表格中提取的数据进行批量更新

    这篇文章主要介绍了如何对Excel表格中提取的数据进行批量更新操作,本文通过示例代码介绍的非常详细,感兴趣的朋友跟随小编一起看看吧
    2024-06-06
  • 详解springboot如何更新json串里面的内容

    详解springboot如何更新json串里面的内容

    这篇文章主要为大家介绍了springboot 如何更新json串里面的内容,文中有详细的解决方案供大家参考,对大家的学习或工作有一定的帮助,需要的朋友可以参考下
    2023-10-10
  • Java中@JSONField注解用法、场景与实践详解

    Java中@JSONField注解用法、场景与实践详解

    这篇文章主要给大家介绍了关于Java中@JSONField注解用法、场景与实践的相关资料,并结合实际应用场景,帮助开发者在项目中更高效地处理JSON数据,文中通过代码介绍的非常详细,需要的朋友可以参考下
    2024-12-12
  • SpringBoot整合Redis实现刷票过滤功能

    SpringBoot整合Redis实现刷票过滤功能

    随着互联网的不断发展,网站或APP的用户流量增加,也衍生出了一些恶意刷量等问题,给数据分析及运营带来极大的困难,所以本文使用SpringBoot和Redis实现一个刷票过滤功能,需要的可以参考一下
    2023-06-06
  • springboot2.0.0配置多数据源出现jdbcUrl is required with driverClassName的错误

    springboot2.0.0配置多数据源出现jdbcUrl is required with driverClassN

    这篇文章主要介绍了springboot2.0.0配置多数据源出现jdbcUrl is required with driverClassName的错误,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-11-11
  • Spring中为bean指定InitMethod和DestroyMethod的执行方法

    Spring中为bean指定InitMethod和DestroyMethod的执行方法

    在Spring中,那些组成应用程序的主体及由Spring IoC容器所管理的对象,被称之为bean,接下来通过本文给大家介绍Spring中为bean指定InitMethod和DestroyMethod的执行方法,感兴趣的朋友一起看看吧
    2021-11-11
  • java原生动态生成验证码

    java原生动态生成验证码

    这篇文章主要为大家详细介绍了java原生动态生成验证码,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2020-10-10
  • Java实现提取Word文档表格数据

    Java实现提取Word文档表格数据

    使用Java实现Word文档表格数据的提取,可以确保数据处理的一致性和准确性,同时大大减少所需的时间和成本,下面我们来看看具体实现方法吧
    2025-01-01
  • Java使用FileReader读取文件详解

    Java使用FileReader读取文件详解

    本文将为大家介绍FileReader类的基本用法,包括如何创建FileReader对象,如何读取文件,以及如何关闭流,感兴趣的小伙伴可以跟随小编一起了解一下
    2023-09-09

最新评论