RocketMQ producer发送者浅析

 更新时间:2023年04月24日 11:33:09   作者:Acqierement  
RocketMQ生产者是一种高性能、可靠的消息发送者,能够将消息快速、可靠地发送到RocketMQ消息队列中。它具有多种消息发送模式和消息发送方式,可以根据不同的业务需求进行灵活配置

发送者其实比较简单,需要做的就是首先确定往哪里发送,其次怎么让消息发送顺畅。我们就看一下具体的代码吧。

首先调用start方法。完成各个类的初始化,启动多个定时任务,其中一个定时任务是updateTopicRouteInfoFromNameServer,这个方法里面和nameService建立长连接,同时维护了topicRouteTable和brokerAddrTable等缓存。topicRouteTable里面维护了这个topic包括有哪些queue和broker。这样producer才可以知道要发往哪里。

启动的流程主要在这个方法中:

MQClientInstance#start

public void start() throws MQClientException {
    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // If not specified,looking address from name server
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                // Start request-response channel
                this.mQClientAPIImpl.start();
                // Start various schedule tasks
                this.startScheduledTask();
                // Start pull service
                this.pullMessageService.start();
                // Start rebalance service
                this.rebalanceService.start();
                // Start push service
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}

其中启动了一系列定时任务,包括org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer这个方法

    public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {
        try {
            if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                try {
                    TopicRouteData topicRouteData;
                    if (isDefault && defaultMQProducer != null) {
                        // 从nameServer获取topciRouteData
                        topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                            clientConfig.getMqClientApiTimeout());
                        if (topicRouteData != null) {
                            for (QueueData data : topicRouteData.getQueueDatas()) {
                                int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                                data.setReadQueueNums(queueNums);
                                data.setWriteQueueNums(queueNums);
                            }
                        }
                    } else {
                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());
                    }
                    if (topicRouteData != null) {
                        TopicRouteData old = this.topicRouteTable.get(topic);
                        boolean changed = topicRouteData.topicRouteDataChanged(old);
                        if (!changed) {
                            changed = this.isNeedUpdateTopicRouteInfo(topic);
                        } else {
                            log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
                        }
                        if (changed) {
                            for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                                this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                            }
                            // Update endpoint map
                            {
                                ConcurrentMap<MessageQueue, String> mqEndPoints = topicRouteData2EndpointsForStaticTopic(topic, topicRouteData);
                                if (!mqEndPoints.isEmpty()) {
                                    topicEndPointsTable.put(topic, mqEndPoints);
                                }
                            }
                            // Update Pub info
                            {
                                // 生成topicPublishInfo
                                TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                                publishInfo.setHaveTopicRouterInfo(true);
                                for (Entry<String, MQProducerInner> entry : this.producerTable.entrySet()) {
                                    MQProducerInner impl = entry.getValue();
                                    if (impl != null) {
                                        // 更新 topicPublishInfo
                                        impl.updateTopicPublishInfo(topic, publishInfo);
                                    }
                                }
                            }
                            // Update sub info
                            if (!consumerTable.isEmpty()) {
                                Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                                for (Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
                                    MQConsumerInner impl = entry.getValue();
                                    if (impl != null) {
                                        impl.updateTopicSubscribeInfo(topic, subscribeInfo);
                                    }
                                }
                            }
                            TopicRouteData cloneTopicRouteData = new TopicRouteData(topicRouteData);
                            log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
                            this.topicRouteTable.put(topic, cloneTopicRouteData);
                            return true;
                        }
                    } else {
                        log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}. [{}]", topic, this.clientId);
                    }
                } catch (MQClientException e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
                        log.warn("updateTopicRouteInfoFromNameServer Exception", e);
                    }
                } catch (RemotingException e) {
                    log.error("updateTopicRouteInfoFromNameServer Exception", e);
                    throw new IllegalStateException(e);
                } finally {
                    this.lockNamesrv.unlock();
                }
            } else {
                log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms. [{}]", LOCK_TIMEOUT_MILLIS, this.clientId);
            }
        } catch (InterruptedException e) {
            log.warn("updateTopicRouteInfoFromNameServer Exception", e);
        }
        return false;
    }

通过方法名也知道是从nameServer获取这个topic相关的broke数据,拿到TopicRouteData数据。先更新brokerAddrTable,存储borker具体的地址。然后在org.apache.rocketmq.client.impl.factory.MQClientInstance#topicRouteData2TopicPublishInfo里面再进一步生成TopicPublishInfo数据。TopicPublishInfo是对TopicRouteData的一个封装,除了TopicRouteData,还有messageQueue数据,messageQueue是Queue和Borker的交集,会根据配置的queue数量,生成具体的messageQueue,queueId就是0,1,2,3,4他们自己的顺序。

所以有了TopicPublishInfo数据,就知道往哪里发了。

发送消息的过程。

  • 先找到TopicPublishInfo。TopicPublishInfo里面有一个MessageQueue的list。
  • 从MessageQueueList里面拿到一个messageQueue。 如果没有开启sendLatencyFaultEnable,默认就是采用轮询方法。具体的轮询方式就是,TopicPublishInfo里面维护了一个序号index,每次index自增1,然后通过index去MessageQueueList里面拿一个。
  • 拿到了MessageQueue之后,里面有broker的name,根据name去找broker的ip地址,发送数据。这个ip地址就是前面提到的brokerAddrTable变量,在updateTopicRouteInfoFromNameServer方法里面维护的。

到此这篇关于RocketMQ producer发送者浅析的文章就介绍到这了,更多相关RocketMQ producer内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • RestTemplate响应中如何获取输入流InputStream

    RestTemplate响应中如何获取输入流InputStream

    这篇文章主要介绍了RestTemplate响应中如何获取输入流InputStream问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-01-01
  • Java定时器Timer与TimerTask的使用详解

    Java定时器Timer与TimerTask的使用详解

    这篇文章主要介绍了Java定时器Timer与TimerTask的使用详解,在JDK类库中Timer主要负责计划任务的功能,也就是在指定时间执行某一任务,执行时候会在主线程之外起一个单独的线程执行指定的任务,该类主要是设置任务计划,但封装的类是TimerTask类,需要的朋友可以参考下
    2023-10-10
  • 详解Servlet之过滤器(Filter)

    详解Servlet之过滤器(Filter)

    本篇文章主要介绍了Servlet——过滤器(Filter),小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-06-06
  • java操作Redis缓存设置过期时间的方法

    java操作Redis缓存设置过期时间的方法

    这篇文章主要介绍了java操作Redis缓存设置过期时间的方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-06-06
  • Java设计模式七大原则之开闭原则详解

    Java设计模式七大原则之开闭原则详解

    开闭原则,又称为OCP原则,即一个软件实体如类,模块和函数应该对扩展开放,对修改关闭。本文将详细介绍Java设计模式七大原则之一的开闭原则,需要的可以参考一下
    2022-02-02
  • 实战分布式医疗挂号系统之设置微服务搭建医院模块

    实战分布式医疗挂号系统之设置微服务搭建医院模块

    这篇文章主要为大家介绍了实战分布式医疗挂号系统之搭建医院设置微服务模块,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-04-04
  • 浅析Java关键词synchronized的使用

    浅析Java关键词synchronized的使用

    Synchronized是java虚拟机为线程安全而引入的。这篇文章主要为大家介绍一下Java关键词synchronized的使用与原理,需要的可以参考一下
    2022-12-12
  • Java 使用geotools读取tiff数据的示例代码

    Java 使用geotools读取tiff数据的示例代码

    这篇文章主要介绍了Java 通过geotools读取tiff,一般对于tiff数据的读取,都会借助于gdal,本文结合示例代码给大家介绍的非常详细,需要的朋友可以参考下
    2022-04-04
  • Java使用easyExcel导出数据及单元格多张图片

    Java使用easyExcel导出数据及单元格多张图片

    除了平时简单的数据导出需求外,我们也经常会遇到一些有固定格式或者模板要求的数据导出,下面这篇文章主要给大家介绍了关于Java使用easyExcel导出数据及单元格多张图片的相关资料,需要的朋友可以参考下
    2023-05-05
  • java Class文件结构解析常量池字节码

    java Class文件结构解析常量池字节码

    这篇文章主要为大家介绍了java Class文件的整体结构解析常量池字节码详细讲解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-07-07

最新评论