浅谈Springboot整合RocketMQ使用心得

 更新时间:2018年01月15日 16:44:49   作者:HenryZhou2  
本篇文章主要介绍了Springboot整合RocketMQ使用心得,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧

一、阿里云官网---帮助文档

https://help.aliyun.com/document_detail/29536.html?spm=5176.doc29535.6.555.WWTIUh

按照官网步骤,创建Topic、申请发布(生产者)、申请订阅(消费者)

二、代码

1、配置:

public class MqConfig {
  /**
   * 启动测试之前请替换如下 XXX 为您的配置
   */
  public static final String PUBLIC_TOPIC = "test";//公网测试
  public static final String PUBLIC_PRODUCER_ID = "PID_SCHEDULER";
  public static final String PUBLIC_CONSUMER_ID = "CID_SERVICE";

  public static final String ACCESS_KEY = "123";
  public static final String SECRET_KEY = "123";
  public static final String TAG = "";
  public static final String THREAD_NUM = "25";//消费端线程数
  /**
   * ONSADDR 请根据不同Region进行配置
   * 公网测试: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
   * 公有云生产: http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
   * 杭州金融云: http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
   * 深圳金融云: http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal
   */
  public static final String ONSADDR = "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal";
}

ONSADDR 阿里云用 公有云生产,测试用公网

不同的业务可以设置不同的tag,但是如果发送消息量大的话,建议新建TOPIC

2、生产者

方式1:

配置文件:producer.xml

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN" "http://www.springframework.org/dtd/spring-beans.dtd">
<beans>
  <bean id="producer" class="com.aliyun.openservices.ons.api.bean.ProducerBean"
     init-method="start" destroy-method="shutdown">
    <property name="properties">
      <map>
        <entry key="ProducerId" value="" /> <!-- PID,请替换 -->
        <entry key="AccessKey" value="" /> <!-- ACCESS_KEY,请替换 -->
        <entry key="SecretKey" value="" /> <!-- SECRET_KEY,请替换 -->
        <!--PropertyKeyConst.ONSAddr 请根据不同Region进行配置
         公网测试: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
         公有云生产: http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
         杭州金融云: http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
         深圳金融云: http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal -->
        <entry key="ONSAddr" value="http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal"/>
      </map>
    </property>
  </bean>
</beans>

启动方式1,在使用类的全局里设置:

//初始化生产者
  private ApplicationContext ctx;
  private ProducerBean producer;

  @Value("${producerConfig.enabled}")//开关,spring配置项,true为开启,false关闭
  private boolean producerConfigEnabled;

  @PostConstruct
  public void init(){
    if (true == producerConfigEnabled) {
      ctx = new ClassPathXmlApplicationContext("producer.xml");
      producer = (ProducerBean) ctx.getBean("producer");
    }
  }

PS:最近发现一个坑,如果producer用上面这种方式启动的话,一旦启动的多了,会造成fullGC,所以可以换成下面这种注解方式启动,在用到的地方手动start、shutdown

方式2:配置类(不需要xml)

@Configuration
public class ProducerBeanConfig {

  @Value("${openservices.ons.producerBean.producerId}")
  private String producerId;

  @Value("${openservices.ons.producerBean.accessKey}")
  private String accessKey;

  @Value("${openservices.ons.producerBean.secretKey}")
  private String secretKey;

  private ProducerBean producerBean;

  @Value("${openservices.ons.producerBean.ONSAddr}")
  private String ONSAddr;

  @Bean
  public ProducerBean oneProducer() {
    ProducerBean producerBean = new ProducerBean();
    Properties properties = new Properties();
    properties.setProperty(PropertyKeyConst.ProducerId, producerId);
    properties.setProperty(PropertyKeyConst.AccessKey, accessKey);
    properties.setProperty(PropertyKeyConst.SecretKey, secretKey);
    properties.setProperty(PropertyKeyConst.ONSAddr, ONSAddr);

    producerBean.setProperties(properties);
    return producerBean;
  }
}

PS:经过这次双11发现,以上2种方式在大数据量,多线程情况下都不太适用, 性能很差,所以推荐用3

方式3:(不需要xml)

@Component
public class ProducerBeanSingleTon {

  @Value("${openservices.ons.producerBean.producerId}")
  private String producerId;

  @Value("${openservices.ons.producerBean.accessKey}")
  private String accessKey;

  @Value("${openservices.ons.producerBean.secretKey}")
  private String secretKey;

  @Value("${openservices.ons.producerBean.ONSAddr}")
  private String ONSAddr;

  private static Producer producer;

  private static class SingletonHolder {
    private static final ProducerBeanSingleTon INSTANCE = new ProducerBeanSingleTon();
  }

  private ProducerBeanSingleTon (){}

  public static final ProducerBeanSingleTon getInstance() {
    return SingletonHolder.INSTANCE;
  }

  @PostConstruct
  public void init(){
    // producer 实例配置初始化
    Properties properties = new Properties();
    //您在控制台创建的Producer ID
    properties.setProperty(PropertyKeyConst.ProducerId, producerId);
    // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
    properties.setProperty(PropertyKeyConst.AccessKey, accessKey);
    // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
    properties.setProperty(PropertyKeyConst.SecretKey, secretKey);
    //设置发送超时时间,单位毫秒
    properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
    // 设置 TCP 接入域名(此处以公共云生产环境为例)
    properties.setProperty(PropertyKeyConst.ONSAddr, ONSAddr);
    producer = ONSFactory.createProducer(properties);
    // 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可
    producer.start();
  }

  public Producer getProducer(){
    return producer;
  }
}

spring配置

spring.jpa.properties.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect

consumerConfig.enabled = true

producerConfig.enabled = true #方式1:

scheduling.enabled = false

#方式2、3:rocketMQ \u516C\u7F51\u914D\u7F6E
openservices.ons.producerBean.producerId = pid
openservices.ons.producerBean.accessKey = 
openservices.ons.producerBean.secretKey = 

openservices.ons.producerBean.ONSAddr = 公网、杭州公有云生产

方式1投递消息代码:

 try {
   String jsonC = JsonUtils.toJson(elevenMessage);
   Message message = new Message(MqConfig.TOPIC, MqConfig.TAG, jsonC.getBytes());
   SendResult sendResult = producer.send(message);
   if (sendResult != null) {
     logger.info(".Send mq message success!”;

   } else {
     logger.warn(".sendResult is null.........");
   }
   } catch (Exception e) {
      logger.warn("DoubleElevenAllPreService");
      Thread.sleep(1000);//如果有异常,休眠1秒
   }

方式2投递消息代码:(可以每发1000个启动/关闭一次)

   producerBean.start();
try {
   String jsonC = JsonUtils.toJson(elevenMessage);
   Message message = new Message(MqConfig.TOPIC, MqConfig.TAG, jsonC.getBytes());
   SendResult sendResult = producer.send(message);
   if (sendResult != null) {
     logger.info(".Send mq message success!”;

   } else {
     logger.warn(".sendResult is null.........");
   }
   } catch (Exception e) {
      logger.warn("DoubleElevenAllPreService");
      Thread.sleep(1000);//如果有异常,休眠1秒
   }

   producerBean.shutdown();

方式3:投递消息

 try {
   String jsonC = JsonUtils.toJson(elevenMessage);
   Message message = new Message(MqConfig.TOPIC, MqConfig.TAG, jsonC.getBytes());
   Producer producer = ProducerBeanSingleTon.getInstance().getProducer();
   SendResult sendResult = producer.send(message);
   if (sendResult != null) {
     logger.info("DoubleElevenMidService.Send mq message success! Topic is:"”;

   } else {
     logger.warn("DoubleElevenMidService.sendResult is null.........");
   }
   } catch (Exception e) {
     logger.error("DoubleElevenMidService Thread.sleep 1 s___error is "+e.getMessage(), e);
     Thread.sleep(1000);//如果有异常,休眠1秒
   }

发送消息的代码一定要捕获异常,不然会重复发送。

这里的TOPIC用自己创建的,elevenMessage是要发送的内容,我这里是自己建的对象

3、消费者

配置启动类:

@Configuration
@ConditionalOnProperty(value = "consumerConfig.enabled", havingValue = "true", matchIfMissing = true)
public class ConsumerConfig {

  private Logger logger = LoggerFactory.getLogger(LoggerAppenderType.smsdist.name());

  @Bean
  public Consumer consumerFactory(){//不同消费者 这里不能重名
    Properties consumerProperties = new Properties();
    consumerProperties.setProperty(PropertyKeyConst.ConsumerId, MqConfig.CONSUMER_ID);
    consumerProperties.setProperty(PropertyKeyConst.AccessKey, MqConfig.ACCESS_KEY);
    consumerProperties.setProperty(PropertyKeyConst.SecretKey, MqConfig.SECRET_KEY);
    //consumerProperties.setProperty(PropertyKeyConst.ConsumeThreadNums,MqConfig.THREAD_NUM);
    consumerProperties.setProperty(PropertyKeyConst.ONSAddr, MqConfig.ONSADDR);
    Consumer consumer = ONSFactory.createConsumer(consumerProperties);
    consumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new DoubleElevenMessageListener());//new对应的监听器
    consumer.start();
    logger.info("ConsumerConfig start success.");
    

    return consumer;

  }
}

CID和ONSADDR一点要选对,用自己的,消费者线程数等可以在这里配置

创建消息监听器类,消费消息:

@Component
public class MessageListener implements MessageListener {
  private Logger logger = LoggerFactory.getLogger("remind");

  protected static ElevenReposity elevenReposity;
  @Resource
  public void setElevenReposity(ElevenReposity elevenReposity){
    MessageListener .elevenReposity=elevenReposity;
  }


  @Override
  public Action consume(Message message, ConsumeContext consumeContext) {

    if(message.getTopic().equals("自己的TOPIC")){//避免消费到其他消息 json转换报错
      try {

      byte[] body = message.getBody();
      String res = new String(body);
      
      //res 是生产者传过来的消息内容

        //业务代码

      }else{
        logger.warn("!");
      }

      } catch (Exception e) {
        logger.error("MessageListener.consume error:" + e.getMessage(), e);
      }

      logger.info("MessageListener.Receive message”);
      //如果想测试消息重投的功能,可以将Action.CommitMessage 替换成Action.ReconsumeLater
      return Action.CommitMessage;
    }else{
      logger.warn();
      return Action.ReconsumeLater;
    }

  }

注意,由于消费者是多线程的,所以对象要用static+set注入,把对象的级别提升到进程,这样多个线程就可以共用,但是无法调用父类的方法和变量

消费者状态可以查看消费者是否连接成功,消费是否延迟,消费速度等

重置消费位点可以清空所有消息

三、注意事项

1、发送的消息体 最大为256KB

2、消息最多存在3天

3、消费端默认线程数是20

4、如果运行过程中出现java挂掉或者cpu占用异常高,可以在发送消息的时候,每发送1000条让线程休息1s

5、本地测试或启动的时候,把ONSADDR换成公网,不然报错无法启动

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

相关文章

  • Java查找不重复无序数组中是否存在两个数字的和为某个值

    Java查找不重复无序数组中是否存在两个数字的和为某个值

    今天小编就为大家分享一篇关于Java查找不重复无序数组中是否存在两个数字的和为某个值,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2019-01-01
  • Spring Security实现自动登陆功能示例

    Spring Security实现自动登陆功能示例

    自动登录在很多网站和APP上都能用的到,解决了用户每次输入账号密码的麻烦。本文就使用Spring Security实现自动登陆功能,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-11-11
  • SpringBoot依赖管理的源码解析

    SpringBoot依赖管理的源码解析

    这篇文章主要介绍了SpringBoot依赖管理的源码解析,maven提供了一套依赖管理机制,通过在pom.xml定义坐标,通过坐标从互联网的中央仓库下载依赖的构件(jar包),规范去管理依赖所有构件,这就叫依赖管理,需要的朋友可以参考下
    2023-04-04
  • 一文讲解Java的String、StringBuffer和StringBuilder的使用与区别

    一文讲解Java的String、StringBuffer和StringBuilder的使用与区别

    String是不可变的字符序列,而StringBuffer和StringBuilder是可变的字符序列,本文就来详细的介绍一下Java的String、StringBuffer和StringBuilder的使用与区别,感兴趣的可以了解一下
    2024-03-03
  • Java实现贪吃蛇大作战小游戏(附源码)

    Java实现贪吃蛇大作战小游戏(附源码)

    今天给大家带来的是小项目是 基于Java+Swing+IO流实现 的贪吃蛇大作战小游戏。实现了界面可视化、基本的吃食物功能、死亡功能、移动功能、积分功能,并额外实现了主动加速和鼓励机制,需要的可以参考一下
    2022-07-07
  • SpringDataJpa多表操作的实现

    SpringDataJpa多表操作的实现

    开发过程中会有很多多表的操作,他们之间有着各种关系,本文主要介绍了SpringDataJpa多表操作的实现,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-11-11
  • Java中API的使用方法详情

    Java中API的使用方法详情

    这篇文章主要介绍了Java中API的使用方法详情,指的就是 JDK 中提供的各种功能的 Java类,这些类将底层的实现封装了起来,我们不需要关心这些类是如何实现的,只需要学习这些类如何使用即可,我们可以通过帮助文档来学习这些API如何使用,需要的朋友可以参考下
    2022-04-04
  • Java利用反射获取object的属性和值代码示例

    Java利用反射获取object的属性和值代码示例

    这篇文章主要介绍了Java利用反射获取object的属性和值代码示例,具有一定借鉴价值,需要的朋友可以参考下。
    2017-12-12
  • SpringBoot之自定义Filter获取请求参数与响应结果案例详解

    SpringBoot之自定义Filter获取请求参数与响应结果案例详解

    这篇文章主要介绍了SpringBoot之自定义Filter获取请求参数与响应结果案例详解,本篇文章通过简要的案例,讲解了该项技术的了解与使用,以下就是详细内容,需要的朋友可以参考下
    2021-09-09
  • Java无界阻塞队列DelayQueue详细解析

    Java无界阻塞队列DelayQueue详细解析

    这篇文章主要介绍了Java无界阻塞队列DelayQueue详细解析,DelayQueue是一个支持时延获取元素的无界阻塞队列,队列使用PriorityQueue来实现,队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素,需要的朋友可以参考下
    2023-12-12

最新评论