RocketMQ中的通信模块详解

 更新时间:2024年01月03日 10:58:31   作者:潜水路人甲  
这篇文章主要介绍了RocketMQ中的通信模块详解,RocketMQ消息队列集群主要包括NameServer、Broker(Master/Slave)、Producer、Consumer4个角色,本文我们简单来讲解一下,需要的朋友可以参考下

 通信机制

RocketMQ消息队列集群主要包括NameServer、Broker(Master/Slave)、Producer、Consumer4个角色,基本通讯流程如下:

(1) Broker启动后需要完成一次将自己注册至NameServer的操作;随后每隔30s时间定时向NameServer上报Topic路由信息。

(2) 消息生产者Producer作为客户端发送消息时候,需要根据消息的Topic从本地缓存的TopicPublishInfoTable获取路由信息。如果没有则更新路由信息会从NameServer上重新拉取,同时Producer会默认每隔30s向NameServer拉取一次路由信息。

(3) 消息生产者Producer根据2)中获取的路由信息选择一个队列(MessageQueue)进行消息发送;Broker作为消息的接收者接收消息并落盘存储。

(4) 消息消费者Consumer根据2)中获取的路由信息,并再完成客户端的负载均衡后,选择其中的某一个或者某几个消息队列来拉取消息并进行消费。

从上面1)~3)中可以看出在消息生产者, Broker和NameServer之间都会发生通信(这里只说了MQ的部分通信),因此如何设计一个良好的网络通信模块在MQ中至关重要,它将决定RocketMQ集群整体的消息传输能力与最终的性能。

rocketmq-remoting 模块是 RocketMQ消息队列中负责网络通信的模块,它几乎被其他所有需要网络通信的模块(诸如rocketmq-client、rocketmq-broker、rocketmq-namesrv)所依赖和引用。为了实现客户端与服务器之间高效的数据请求与接收,RocketMQ消息队列自定义了通信协议并在Netty的基础之上扩展了通信模块。

Remoting通信类

通信类结构:

NettyRemotingServer

NettyRemotingServer为服务端实现类,在NamesrvController中被构造。

NettyRemotingServer构造时主要工作是初始化下列属性,构造时判断useEpoll来决定EventLoopGroup的实现。

    private final ServerBootstrap serverBootstrap;
    private final EventLoopGroup eventLoopGroupSelector;
    private final EventLoopGroup eventLoopGroupBoss;
    private final NettyServerConfig nettyServerConfig;
 
    private final ExecutorService publicExecutor;
    private final ChannelEventListener channelEventListener;

NettyRemotingServer启动

    @Override
    public void start() {
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyServerConfig.getServerWorkerThreads(),
            new ThreadFactory() {
 
                private AtomicInteger threadIndex = new AtomicInteger(0);
 
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                }
            });
 
        prepareSharableHandlers();
 
        ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
					//添加handler,握手、编解码、idle检测、连接管理、消息处理
                        ch.pipeline()
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                            .addLast(defaultEventExecutorGroup,
                                encoder,
                                new NettyDecoder(),
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                connectionManageHandler,
                                serverHandler
                            );
                    }
                });
 
        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
            childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        }
 
        try {
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }
 
        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }
 
        this.timer.scheduleAtFixedRate(new TimerTask() {
 
            @Override
            public void run() {
                try {
                    NettyRemotingServer.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }

主要关注initChannel方法中添加的handler,NettyConnectManageHandler  

    //消息处理核心类   
    @ChannelHandler.Sharable
    class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
            processMessageReceived(ctx, msg);
        }
    }
    //连接管理处理类
    @ChannelHandler.Sharable
    class NettyConnectManageHandler extends ChannelDuplexHandler {
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
            log.info("NETTY SERVER PIPELINE: channelActive, the channel[{}]", remoteAddress);
            super.channelActive(ctx);
            if (NettyRemotingServer.this.channelEventListener != null) {
                NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel()));
            }
        }
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
            log.info("NETTY SERVER PIPELINE: channelInactive, the channel[{}]", remoteAddress);
            super.channelInactive(ctx);
            if (NettyRemotingServer.this.channelEventListener != null) {
                NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
            }
        }
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent event = (IdleStateEvent) evt;
                if (event.state().equals(IdleState.ALL_IDLE)) {
                    final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                    log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress);
                    RemotingUtil.closeChannel(ctx.channel());
                    if (NettyRemotingServer.this.channelEventListener != null) {
                        NettyRemotingServer.this
                            .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
                    }
                }
            }
            ctx.fireUserEventTriggered(evt);
        }
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
            log.warn("NETTY SERVER PIPELINE: exceptionCaught {}", remoteAddress);
            log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", cause);
            if (NettyRemotingServer.this.channelEventListener != null) {
                NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
            }
            RemotingUtil.closeChannel(ctx.channel());
        }
    }

到此这篇关于RocketMQ中的通信模块详解的文章就介绍到这了,更多相关RocketMQ通信模块内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java 中Map 的用法详解

    Java 中Map 的用法详解

    本文主要介绍java 中的Map 接口, 这里对Map 接口下的几个类做了详细介绍,希望对学习java 编程的小伙伴有所帮助
    2016-07-07
  • SpringBoot结果封装和异常拦截的实现示例

    SpringBoot结果封装和异常拦截的实现示例

    SpringBoot 项目中,我们通常需要将结果数据封装成特定的格式,以方便客户端进行处理,本文主要介绍了SpringBoot 优雅的结果封装和异常拦截,感兴趣的可以了解一下
    2023-08-08
  • JavaWeb项目Servlet无法访问问题解决

    JavaWeb项目Servlet无法访问问题解决

    这篇文章主要介绍了JavaWeb项目Servlet无法访问问题解决,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-11-11
  • 踩坑之spring事务,非事务方法与事务方法执行相互调用方式

    踩坑之spring事务,非事务方法与事务方法执行相互调用方式

    这篇文章主要介绍了踩坑之spring事务,非事务方法与事务方法执行相互调用方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-07-07
  • Spring中ApplicationContextAware的使用方法详解

    Spring中ApplicationContextAware的使用方法详解

    ApplicationContextAware 通过它Spring容器会自动把上下文环境对象调用ApplicationContextAware接口中的setApplicationContext方法,这篇文章主要介绍了Spring中ApplicationContextAware的作用,需要的朋友可以参考下
    2023-03-03
  • Springboot四种事件监听的实现方式详解

    Springboot四种事件监听的实现方式详解

    这篇文章主要介绍了Springboot四种事件监听的实现方式,事件监听是一种机制,可以定义和触发自定义的事件,以及在应用程序中注册监听器来响应这些事件,需要的朋友可以参考下
    2022-06-06
  • Java如何将时间戳格式化为日期字符串

    Java如何将时间戳格式化为日期字符串

    这篇文章主要介绍了Java如何将时间戳格式化为日期字符串问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-04-04
  • Spring中集成Groovy的四种方式(小结)

    Spring中集成Groovy的四种方式(小结)

    这篇文章主要介绍了Spring中集成Groovy的四种方式,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-09-09
  • Java创建线程池的几种方式代码示例

    Java创建线程池的几种方式代码示例

    这篇文章主要介绍了Java中创建线程池的四种方式,包括使用Executors类、ThreadPoolExecutor类、Future和Callable接口以及Spring的ThreadPoolTaskExecutor,文中通过代码介绍的非常详细,需要的朋友可以参考下
    2025-01-01
  • Java实现入参数据批量数据校验详解

    Java实现入参数据批量数据校验详解

    在业务处理中一般入参是单条数据,这样数据校验比较容易,但是这种方法对于集合数据的校验不适用,下面我们就来看看如何对入参数据进行批量数据校验吧
    2024-02-02

最新评论