RocketMQ中的通信模块详解

 更新时间:2024年01月03日 10:58:31   作者:潜水路人甲  
这篇文章主要介绍了RocketMQ中的通信模块详解,RocketMQ消息队列集群主要包括NameServer、Broker(Master/Slave)、Producer、Consumer4个角色,本文我们简单来讲解一下,需要的朋友可以参考下

 通信机制

RocketMQ消息队列集群主要包括NameServer、Broker(Master/Slave)、Producer、Consumer4个角色,基本通讯流程如下:

(1) Broker启动后需要完成一次将自己注册至NameServer的操作;随后每隔30s时间定时向NameServer上报Topic路由信息。

(2) 消息生产者Producer作为客户端发送消息时候,需要根据消息的Topic从本地缓存的TopicPublishInfoTable获取路由信息。如果没有则更新路由信息会从NameServer上重新拉取,同时Producer会默认每隔30s向NameServer拉取一次路由信息。

(3) 消息生产者Producer根据2)中获取的路由信息选择一个队列(MessageQueue)进行消息发送;Broker作为消息的接收者接收消息并落盘存储。

(4) 消息消费者Consumer根据2)中获取的路由信息,并再完成客户端的负载均衡后,选择其中的某一个或者某几个消息队列来拉取消息并进行消费。

从上面1)~3)中可以看出在消息生产者, Broker和NameServer之间都会发生通信(这里只说了MQ的部分通信),因此如何设计一个良好的网络通信模块在MQ中至关重要,它将决定RocketMQ集群整体的消息传输能力与最终的性能。

rocketmq-remoting 模块是 RocketMQ消息队列中负责网络通信的模块,它几乎被其他所有需要网络通信的模块(诸如rocketmq-client、rocketmq-broker、rocketmq-namesrv)所依赖和引用。为了实现客户端与服务器之间高效的数据请求与接收,RocketMQ消息队列自定义了通信协议并在Netty的基础之上扩展了通信模块。

Remoting通信类

通信类结构:

NettyRemotingServer

NettyRemotingServer为服务端实现类,在NamesrvController中被构造。

NettyRemotingServer构造时主要工作是初始化下列属性,构造时判断useEpoll来决定EventLoopGroup的实现。

    private final ServerBootstrap serverBootstrap;
    private final EventLoopGroup eventLoopGroupSelector;
    private final EventLoopGroup eventLoopGroupBoss;
    private final NettyServerConfig nettyServerConfig;
 
    private final ExecutorService publicExecutor;
    private final ChannelEventListener channelEventListener;

NettyRemotingServer启动

    @Override
    public void start() {
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyServerConfig.getServerWorkerThreads(),
            new ThreadFactory() {
 
                private AtomicInteger threadIndex = new AtomicInteger(0);
 
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                }
            });
 
        prepareSharableHandlers();
 
        ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
					//添加handler,握手、编解码、idle检测、连接管理、消息处理
                        ch.pipeline()
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                            .addLast(defaultEventExecutorGroup,
                                encoder,
                                new NettyDecoder(),
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                connectionManageHandler,
                                serverHandler
                            );
                    }
                });
 
        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
            childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        }
 
        try {
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }
 
        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }
 
        this.timer.scheduleAtFixedRate(new TimerTask() {
 
            @Override
            public void run() {
                try {
                    NettyRemotingServer.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }

主要关注initChannel方法中添加的handler,NettyConnectManageHandler  

    //消息处理核心类   
    @ChannelHandler.Sharable
    class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
            processMessageReceived(ctx, msg);
        }
    }
    //连接管理处理类
    @ChannelHandler.Sharable
    class NettyConnectManageHandler extends ChannelDuplexHandler {
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
            log.info("NETTY SERVER PIPELINE: channelActive, the channel[{}]", remoteAddress);
            super.channelActive(ctx);
            if (NettyRemotingServer.this.channelEventListener != null) {
                NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel()));
            }
        }
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
            log.info("NETTY SERVER PIPELINE: channelInactive, the channel[{}]", remoteAddress);
            super.channelInactive(ctx);
            if (NettyRemotingServer.this.channelEventListener != null) {
                NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
            }
        }
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent event = (IdleStateEvent) evt;
                if (event.state().equals(IdleState.ALL_IDLE)) {
                    final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                    log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress);
                    RemotingUtil.closeChannel(ctx.channel());
                    if (NettyRemotingServer.this.channelEventListener != null) {
                        NettyRemotingServer.this
                            .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
                    }
                }
            }
            ctx.fireUserEventTriggered(evt);
        }
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
            log.warn("NETTY SERVER PIPELINE: exceptionCaught {}", remoteAddress);
            log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", cause);
            if (NettyRemotingServer.this.channelEventListener != null) {
                NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
            }
            RemotingUtil.closeChannel(ctx.channel());
        }
    }

到此这篇关于RocketMQ中的通信模块详解的文章就介绍到这了,更多相关RocketMQ通信模块内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java基于UDP协议的聊天室功能

    Java基于UDP协议的聊天室功能

    这篇文章主要为大家详细介绍了Java基于UDP协议的聊天室功能,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-09-09
  • idea日志乱码和tomcat日志乱码问题的解决方法

    idea日志乱码和tomcat日志乱码问题的解决方法

    这篇文章主要介绍了idea日志乱码和tomcat日志乱码问题的解决方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-08-08
  • ShardingProxy读写分离之原理、配置与实践过程

    ShardingProxy读写分离之原理、配置与实践过程

    ShardingProxy是Apache ShardingSphere的数据库中间件,通过三层架构实现读写分离,解决高并发场景下数据库性能瓶颈,其核心功能包括SQL路由、负载均衡、数据一致性保障和故障转移,支持主从架构下的透明分库分表及读写分流,广泛应用于微服务和高流量业务系统
    2025-08-08
  • SpringBoot的ConfigurationProperties或Value注解无效问题及解决

    SpringBoot的ConfigurationProperties或Value注解无效问题及解决

    在SpringBoot项目开发中,全局静态配置类读取application.yml或application.properties文件时,可能会遇到配置值始终为null的问题,这通常是因为在创建静态属性后,IDE自动生成的Get/Set方法包含了static关键字
    2024-11-11
  • 配置Spring4.0注解Cache+Redis缓存的用法

    配置Spring4.0注解Cache+Redis缓存的用法

    本篇文章主要介绍了详解配置Spring4.0注解Cache+Redis缓存的用法,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-04-04
  • SpringBoot实现文件上传下载的7种方法

    SpringBoot实现文件上传下载的7种方法

    文件上传下载功能是Web应用中的常见需求,从简单的用户头像上传到大型文件的传输与共享,都需要可靠的文件处理机制,下面我们来看看SpringBoot中处理文件上传下载的7种方法吧
    2025-05-05
  • Spring中AOP的切点、通知、切点表达式及知识要点整理

    Spring中AOP的切点、通知、切点表达式及知识要点整理

    这篇文章主要介绍了Spring中AOP的切点、通知、切点表达式及知识要点整理,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-03-03
  • Java 23种设计模型详解

    Java 23种设计模型详解

    本文主要介绍Java 23种设计模型,这里整理了详细的资料,及实现各种设计模型的示例代码,有需要的小伙伴可以参考下
    2016-09-09
  • 详解Spring Boot对 Apache Pulsar的支持

    详解Spring Boot对 Apache Pulsar的支持

    Spring Boot通过提供spring-pulsar和spring-pulsar-reactive自动配置支持Apache Pulsar,类路径中这些依赖存在时,Spring Boot自动配置命令式和反应式Pulsar组件,PulsarClient自动注册,默认连接本地Pulsar实例,感兴趣的朋友一起看看吧
    2024-11-11
  • 详解Java的四种引用方式及其区别

    详解Java的四种引用方式及其区别

    这篇文章主要介绍了Java的四种引用方式 ,主要主要包括强引用,软引用,弱引用,虚引用,稍微整理精简一下做下分享,具有一定的参考价值,需要的朋友可以参考下
    2018-12-12

最新评论