elasticsearch节点间通信的基础transport启动过程

 更新时间:2022年04月21日 15:39:50   作者:zziawan  
这篇文章主要为大家介绍了elasticsearch节点间通信的基础transport启动过程,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

前言

在前一篇中我们分析了cluster的一些元素。接下来的章节会对cluster的运作机制做详细分析。本节先分析一些transport,它是cluster间通信的基础。它有两种实现,一种是基于netty实现nettytransport,主要用于节点间的通信。另一种是localtransport,主要是用于同一个jvm上的节点通信。因为是同一个jvm上的网络模拟,localtransport实现上非常简单,实际用处也非常有限,这里就不过多说明。这一篇的重点是nettytransport。

transport

transport顾名思义是集群通信的基本通道,无论是集群状态信息,还是搜索索引请求信息,都是通过transport传送。elasticsearch定义了tansport,tansportmessage,tansportchannel,tansportrequest,tansportresponse等所需的所有的基础接口。这里将以transport为主,分析过程中会附带介绍其它接口。首先看一下transport节点的定义,如下图所示:

NettyTransport实现了该接口。分析NettyTransport前简单说一下Netty的用法,Netty的使用需要三个模块ServerBootStrap,ClientBootStrap(v3.x)及MessageHandler。ServerBootStrap启动服务器,ClientBootStrap启动客户端并连接服务器,MessageHandler是message处理逻辑所在,也就是业务逻辑。其它详细使用请参考Netty官方文档。

启动serverBootStrap

NettyTransport每个在doStart()方法中启动serverBootStrap,和ClientBootStrap,并绑定ip,代码如下所示:

protected void doStart() throws ElasticsearchException {
       clientBootstrap = createClientBootstrap();//根据配置启动客户端
       ……//省略了无关分代码
    createServerBootstrap(name, mergedSettings);//启动server端
       bindServerBootstrap(name, mergedSettings);//绑定ip
        }

每一个节点都需要发送和接收,因此两者都需要启动,client和server的启动分别在相应的方法中,启动过程就是netty的启动过程,有兴趣可以去看相应方法。bindServerBootstrap(name, mergedSettings)将本地ip和断开绑定到netty同时设定好export host(export host的具体作业我也看明白也没有看到相关的绑定,需要进一步研究)。

启动client及server的过程中将messagehandler注入到channelpipeline中。至此启动过程完成,但是client并未连接任何server,连接过程是在节点启动后,才连接到其它节点的。

如何连接到node

方法代码如下所示:

public void connectToNode(DiscoveryNode node, boolean light) {
     //transport的模块必须要启动
        if (!lifecycle.started()) {
            throw new ElasticsearchIllegalStateException("can't add nodes to a stopped transport");
        }
     //获取读锁,每个节点可以和多个节点建立连接,因此这里用读锁
        globalLock.readLock().lock();
        try {
        //以node.id为基础获取一个锁,这保证对于每个node只能建立一次连接
            connectionLock.acquire(node.id());
            try {
                if (!lifecycle.started()) {
                    throw new ElasticsearchIllegalStateException("can't add nodes to a stopped transport");
                }
                NodeChannels nodeChannels = connectedNodes.get(node);
                if (nodeChannels != null) {
                    return;
                }
                try {
                    if (light) {//这里的light,就是对该节点只获取一个channel,所有类型(5种连接类型下面会说到)都使用者一个channel
                        nodeChannels = connectToChannelsLight(node);
                    } else {
                        nodeChannels = new NodeChannels(new Channel[connectionsPerNodeRecovery], new Channel[connectionsPerNodeBulk], new Channel[connectionsPerNodeReg], new Channel[connectionsPerNodeState], new Channel[connectionsPerNodePing]);
                        try {
                            connectToChannels(nodeChannels, node);
                        } catch (Throwable e) {
                            logger.trace("failed to connect to [{}], cleaning dangling connections", e, node);
                            nodeChannels.close();
                            throw e;
                        }
                    }
                    // we acquire a connection lock, so no way there is an existing connection
                    connectedNodes.put(node, nodeChannels);
                    if (logger.isDebugEnabled()) {
                        logger.debug("connected to node [{}]", node);
                    }
                    transportServiceAdapter.raiseNodeConnected(node);
                } catch (ConnectTransportException e) {
                    throw e;
                } catch (Exception e) {
                    throw new ConnectTransportException(node, "general node connection failure", e);
                }
            } finally {
                connectionLock.release(node.id());
            }
        } finally {
            globalLock.readLock().unlock();
        }
    }

如果不是轻连接,每个server和clien之间都有5中连接,着5中连接承担着不同的任务

连接方法的代码

protected void connectToChannels(NodeChannels nodeChannels, DiscoveryNode node) {
    //五种连接方式,不同的连接方式对应不同的集群操作
        ChannelFuture[] connectRecovery = new ChannelFuture[nodeChannels.recovery.length];
        ChannelFuture[] connectBulk = new ChannelFuture[nodeChannels.bulk.length];
        ChannelFuture[] connectReg = new ChannelFuture[nodeChannels.reg.length];
        ChannelFuture[] connectState = new ChannelFuture[nodeChannels.state.length];
        ChannelFuture[] connectPing = new ChannelFuture[nodeChannels.ping.length];
        InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address();
    //尝试建立连接
        for (int i = 0; i < connectRecovery.length; i++) {
            connectRecovery[i] = clientBootstrap.connect(address);
        }
        for (int i = 0; i < connectBulk.length; i++) {
            connectBulk[i] = clientBootstrap.connect(address);
        }
        for (int i = 0; i < connectReg.length; i++) {
            connectReg[i] = clientBootstrap.connect(address);
        }
        for (int i = 0; i < connectState.length; i++) {
            connectState[i] = clientBootstrap.connect(address);
        }
        for (int i = 0; i < connectPing.length; i++) {
            connectPing[i] = clientBootstrap.connect(address);
        }
    //获取每个连接的channel存入到相应的channels中便于后面使用。
        try {
            for (int i = 0; i < connectRecovery.length; i++) {
                connectRecovery[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
                if (!connectRecovery[i].isSuccess()) {
                    throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectRecovery[i].getCause());
                }
                nodeChannels.recovery[i] = connectRecovery[i].getChannel();
                nodeChannels.recovery[i].getCloseFuture().addListener(new ChannelCloseListener(node));
            }
            for (int i = 0; i < connectBulk.length; i++) {
                connectBulk[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
                if (!connectBulk[i].isSuccess()) {
                    throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectBulk[i].getCause());
                }
                nodeChannels.bulk[i] = connectBulk[i].getChannel();
                nodeChannels.bulk[i].getCloseFuture().addListener(new ChannelCloseListener(node));
            }
            for (int i = 0; i < connectReg.length; i++) {
                connectReg[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
                if (!connectReg[i].isSuccess()) {
                    throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectReg[i].getCause());
                }
                nodeChannels.reg[i] = connectReg[i].getChannel();
                nodeChannels.reg[i].getCloseFuture().addListener(new ChannelCloseListener(node));
            }
            for (int i = 0; i < connectState.length; i++) {
                connectState[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
                if (!connectState[i].isSuccess()) {
                    throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectState[i].getCause());
                }
                nodeChannels.state[i] = connectState[i].getChannel();
                nodeChannels.state[i].getCloseFuture().addListener(new ChannelCloseListener(node));
            }
            for (int i = 0; i < connectPing.length; i++) {
                connectPing[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
                if (!connectPing[i].isSuccess()) {
                    throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectPing[i].getCause());
                }
                nodeChannels.ping[i] = connectPing[i].getChannel();
                nodeChannels.ping[i].getCloseFuture().addListener(new ChannelCloseListener(node));
            }
            if (nodeChannels.recovery.length == 0) {
                if (nodeChannels.bulk.length > 0) {
                    nodeChannels.recovery = nodeChannels.bulk;
                } else {
                    nodeChannels.recovery = nodeChannels.reg;
                }
            }
            if (nodeChannels.bulk.length == 0) {
                nodeChannels.bulk = nodeChannels.reg;
            }
        } catch (RuntimeException e) {
            // clean the futures
            for (ChannelFuture future : ImmutableList.<ChannelFuture>builder().add(connectRecovery).add(connectBulk).add(connectReg).add(connectState).add(connectPing).build()) {
                future.cancel();
                if (future.getChannel() != null && future.getChannel().isOpen()) {
                    try {
                        future.getChannel().close();
                    } catch (Exception e1) {
                        // ignore
                    }
                }
            }
            throw e;
        }
    }

以上就是节点建立连接的过程,每一对client和server间都会建立一定数量的不同连接。之所以要区分连接,是因为不同的操作消耗的资源不同,请求的频率也不同。对于资源消耗少请求频率高的如ping,可以建立多一些连接,来确保并发。对于消耗资源多如bulk操作,则要少建立一些连接,保证机器不被拖垮。节点的断开,这是讲相应的channel释放的过程。这里就不再做详细说明,可以参考相关源码。

总结

nettytransport的连接过程,启动过程分别启动client和server,同时将对于的messagehandler注入,启动多次就是netty的启动过程。然后绑定server ip和断开。但是这里并没有连接,连接发送在节点启动时,节点启动会获取cluster信息,分别对集群中的节点建立上述的5种连接。

这就是NettyTransport的启动和连接过程。transport还有一个很重要的功能就是发送request,及如何处理request,这些功能会在下一篇中分析,希望大家以后多多支持脚本之家!

相关文章

  • MybatisX自定义模板方式

    MybatisX自定义模板方式

    本文介绍了如何使用MybatisX插件自定义VO对象模板,并提供了一个简单的示例,首先,文章展示了如何使用FreeMarker语法编写模板内容,接着,详细说明了如何配置模板,并通过实际测试验证了模板的正确性,最后,作者鼓励大家参考并支持脚本之家
    2025-01-01
  • Maven配置文件settings.xml的实现

    Maven配置文件settings.xml的实现

    Maven是一个用于构建和管理Java项目的强大工具,它依赖于设置文件来配置和管理其行为,其中最重要的之一便是settings.xml文件,本文主要介绍了Maven配置文件settings.xml的实现,具有一定的参考价值,感兴趣的可以了解一下
    2024-01-01
  • Spring学习教程之AOP模块的概述

    Spring学习教程之AOP模块的概述

    AOP 从功能的角度来讲,可能看作OOP编程方式的一种补充,提供了一种不同的代码或者系统组织方式,下面这篇文章主要给大家介绍了关于Spring学习教程之AOP模块的相关资料,文中通过图文介绍的非常详细,需要的朋友可以参考下
    2018-05-05
  • 如何使用新方式编写Spring MVC接口

    如何使用新方式编写Spring MVC接口

    这篇文章主要介绍了如何使用新方式编写Spring MVC接口,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-09-09
  • 详解Java正则表达式中Pattern类和Matcher类

    详解Java正则表达式中Pattern类和Matcher类

    java.util.regex是一个用正则表达式所订制的模式来对字符串进行匹配工作的类库包。包括两个类Pattern和Matcher Pattern,Pattern是一个正则表达式经编译后的表现模式。Matcher对象是一个状态机器,它依据Pattern对象做为匹配模式对字符串展开匹配检查。
    2016-12-12
  • Java捕获ThreadPoolExecutor内部线程异常的四种方法

    Java捕获ThreadPoolExecutor内部线程异常的四种方法

    这篇文章主要为大家详细介绍了Java捕获ThreadPoolExecutor内部线程异常的四种方法,文中的示例代码讲解详细,感兴趣的小伙伴可以参考一下
    2025-03-03
  • Java中Bigdecimal类的toString()方法和toPlainString()方法区别

    Java中Bigdecimal类的toString()方法和toPlainString()方法区别

    BigDecimal类有多个方法可以将其转换为字符串,其中包括toString()和toPlainString(),本文主要介绍了Java中Bigdecimal类的toString()方法和toPlainString()方法区别,具有一定的参考价值,感兴趣的可以了解一下
    2024-07-07
  • 深入理解JAVA核心:揭秘反射机制的奥秘

    深入理解JAVA核心:揭秘反射机制的奥秘

    欢迎来到JAVA反射机制指南!想要了解如何在JAVA中实现灵活的编程技巧吗?本指南将带你揭开JAVA反射机制的神秘面纱,让你的编程世界更加精彩!赶紧跟我一起来探索吧!
    2024-02-02
  • java.lang.ExceptionInInitializerError异常的解决方法

    java.lang.ExceptionInInitializerError异常的解决方法

    这篇文章主要为大家详细介绍了java.lang.ExceptionInInitializerError异常的解决方法,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-10-10
  • java.lang.IllegalStateException异常原因和解决办法

    java.lang.IllegalStateException异常原因和解决办法

    这篇文章主要给大家介绍了关于java.lang.IllegalStateException异常原因和解决办法,IllegalStateException是Java标准库中的一个异常类,通常表示在不合适或无效的情况下执行了某个方法或操作,需要的朋友可以参考下
    2023-07-07

最新评论