RocketMQ中的通信模块详解

 更新时间:2024年01月03日 10:58:31   作者:潜水路人甲  
这篇文章主要介绍了RocketMQ中的通信模块详解,RocketMQ消息队列集群主要包括NameServer、Broker(Master/Slave)、Producer、Consumer4个角色,本文我们简单来讲解一下,需要的朋友可以参考下

 通信机制

RocketMQ消息队列集群主要包括NameServer、Broker(Master/Slave)、Producer、Consumer4个角色,基本通讯流程如下:

(1) Broker启动后需要完成一次将自己注册至NameServer的操作;随后每隔30s时间定时向NameServer上报Topic路由信息。

(2) 消息生产者Producer作为客户端发送消息时候,需要根据消息的Topic从本地缓存的TopicPublishInfoTable获取路由信息。如果没有则更新路由信息会从NameServer上重新拉取,同时Producer会默认每隔30s向NameServer拉取一次路由信息。

(3) 消息生产者Producer根据2)中获取的路由信息选择一个队列(MessageQueue)进行消息发送;Broker作为消息的接收者接收消息并落盘存储。

(4) 消息消费者Consumer根据2)中获取的路由信息,并再完成客户端的负载均衡后,选择其中的某一个或者某几个消息队列来拉取消息并进行消费。

从上面1)~3)中可以看出在消息生产者, Broker和NameServer之间都会发生通信(这里只说了MQ的部分通信),因此如何设计一个良好的网络通信模块在MQ中至关重要,它将决定RocketMQ集群整体的消息传输能力与最终的性能。

rocketmq-remoting 模块是 RocketMQ消息队列中负责网络通信的模块,它几乎被其他所有需要网络通信的模块(诸如rocketmq-client、rocketmq-broker、rocketmq-namesrv)所依赖和引用。为了实现客户端与服务器之间高效的数据请求与接收,RocketMQ消息队列自定义了通信协议并在Netty的基础之上扩展了通信模块。

Remoting通信类

通信类结构:

NettyRemotingServer

NettyRemotingServer为服务端实现类,在NamesrvController中被构造。

NettyRemotingServer构造时主要工作是初始化下列属性,构造时判断useEpoll来决定EventLoopGroup的实现。

    private final ServerBootstrap serverBootstrap;
    private final EventLoopGroup eventLoopGroupSelector;
    private final EventLoopGroup eventLoopGroupBoss;
    private final NettyServerConfig nettyServerConfig;
 
    private final ExecutorService publicExecutor;
    private final ChannelEventListener channelEventListener;

NettyRemotingServer启动

    @Override
    public void start() {
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyServerConfig.getServerWorkerThreads(),
            new ThreadFactory() {
 
                private AtomicInteger threadIndex = new AtomicInteger(0);
 
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                }
            });
 
        prepareSharableHandlers();
 
        ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
					//添加handler,握手、编解码、idle检测、连接管理、消息处理
                        ch.pipeline()
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                            .addLast(defaultEventExecutorGroup,
                                encoder,
                                new NettyDecoder(),
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                connectionManageHandler,
                                serverHandler
                            );
                    }
                });
 
        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
            childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        }
 
        try {
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }
 
        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }
 
        this.timer.scheduleAtFixedRate(new TimerTask() {
 
            @Override
            public void run() {
                try {
                    NettyRemotingServer.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }

主要关注initChannel方法中添加的handler,NettyConnectManageHandler  

    //消息处理核心类   
    @ChannelHandler.Sharable
    class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
            processMessageReceived(ctx, msg);
        }
    }
    //连接管理处理类
    @ChannelHandler.Sharable
    class NettyConnectManageHandler extends ChannelDuplexHandler {
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
            log.info("NETTY SERVER PIPELINE: channelActive, the channel[{}]", remoteAddress);
            super.channelActive(ctx);
            if (NettyRemotingServer.this.channelEventListener != null) {
                NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel()));
            }
        }
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
            log.info("NETTY SERVER PIPELINE: channelInactive, the channel[{}]", remoteAddress);
            super.channelInactive(ctx);
            if (NettyRemotingServer.this.channelEventListener != null) {
                NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
            }
        }
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent event = (IdleStateEvent) evt;
                if (event.state().equals(IdleState.ALL_IDLE)) {
                    final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                    log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress);
                    RemotingUtil.closeChannel(ctx.channel());
                    if (NettyRemotingServer.this.channelEventListener != null) {
                        NettyRemotingServer.this
                            .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
                    }
                }
            }
            ctx.fireUserEventTriggered(evt);
        }
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
            log.warn("NETTY SERVER PIPELINE: exceptionCaught {}", remoteAddress);
            log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", cause);
            if (NettyRemotingServer.this.channelEventListener != null) {
                NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
            }
            RemotingUtil.closeChannel(ctx.channel());
        }
    }

到此这篇关于RocketMQ中的通信模块详解的文章就介绍到这了,更多相关RocketMQ通信模块内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 基于SpringBoot实现用户身份验证工具

    基于SpringBoot实现用户身份验证工具

    这篇文章主要介绍了基于SpringBoot实现的用户身份验证工具,非常不错,具有参考借鉴价值 ,需要的朋友可以参考下
    2018-04-04
  • java中List对象列表实现去重或取出及排序的方法

    java中List对象列表实现去重或取出及排序的方法

    这篇文章主要介绍了关于java中List对象列表实现去重或取出以及排序的方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面跟着小编来一起学习学习吧。
    2017-08-08
  • 基于RecyclerChart的KLine的绘制Scale详解

    基于RecyclerChart的KLine的绘制Scale详解

    这篇文章主要为大家详细介绍了基于RecyclerChart实现KLine绘制Scale的相关资料,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下
    2023-03-03
  • Java中关键字synchronized的使用方法详解

    Java中关键字synchronized的使用方法详解

    synchronized关键字可以作为函数的修饰符,也可作为函数内的语句,也就是平时说的同步方法和同步语句块,下面这篇文章主要给大家介绍了关于Java中synchronized使用的相关资料,需要的朋友可以参考下
    2021-08-08
  • SpringMVC配置拦截器实现登录控制的方法

    SpringMVC配置拦截器实现登录控制的方法

    这篇文章主要介绍了SpringMVC配置拦截器实现登录控制的方法,SpringMVC读取Cookie判断用户是否登录,对每一个action都要进行判断,有兴趣的可以了解一下。
    2017-03-03
  • 新版SpringSecurity安全配置说明

    新版SpringSecurity安全配置说明

    这篇文章主要介绍了新版SpringSecurity安全配置说明,在 Spring Security 5.7.0-M2 中,我们弃用了WebSecurityConfigurerAdapter,因为我们鼓励用户转向基于组件的安全配置,需要的朋友可以参考下
    2023-07-07
  • Java数组常见应用详解【创建、遍历、排序、查找】

    Java数组常见应用详解【创建、遍历、排序、查找】

    这篇文章主要介绍了Java数组常见应用,结合实例形式详细分析了java数组的基本定义、创建、遍历、排序、查找等相关操作技巧与使用注意事项,需要的朋友可以参考下
    2020-02-02
  • shiro与spring集成基础Hello案例详解

    shiro与spring集成基础Hello案例详解

    这篇文章主要介绍了shiro与spring集成基础Hello案例详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-11-11
  • Spring Data Envers支持有条件变动纪录的保存和查询的方法

    Spring Data Envers支持有条件变动纪录的保存和查询的方法

    通过spring-data-envers可以很容易的实现数据变动纪录的保存和查询,本文介绍支持有条件变动纪录的保存和查询的方法,通过spring-data-envers很容易的实现变动纪录的保存和查询,只需要增加几个注解就可以,感兴趣的朋友跟随小编一起看看吧
    2023-10-10
  • mybatis分割字符串并循环,实现in多个参数的操作

    mybatis分割字符串并循环,实现in多个参数的操作

    这篇文章主要介绍了mybatis分割字符串并循环,实现in多个参数的操作,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-06-06

最新评论