Java Dubbo协议下的服务端线程使用详解

 更新时间:2023年03月01日 10:47:55   作者:Redick01  
Dubbo是阿里开源项目,国内很多互联网公司都在用,已经经过很多线上考验。Dubbo内部使用了Netty、Zookeeper,保证了高性能高可用性,使用Dubbo可以将核心业务抽取出来,作为独立的服务,逐渐形成稳定的服务中心

Provider端线程模型

在了解服务端线程模型之前,先了解一下Dubbo对Channel上的操作抽象,Dubbo将Channel上的操作成了5中行为,分别是:建立连接、断开连接、发送消息、接收消息、异常捕获,Channel上的操作的接口为org.apache.dubbo.remoting.ChannelHandler,该接口是SPI的,用户可以自己扩展,接口代码如下:

该接口抽象的五种Channel上的行为解释如下:

  • 建立连接:connected,主要是的职责是在channel记录read、write的时间,以及处理建立连接后的回调逻辑,比如dubbo支持在断开后自定义回调的hook(onconnect),即在该操作中执行。
  • 断开连接:disconnected,主要是的职责是在channel移除read、write的时间,以及处理端开连接后的回调逻辑,比如dubbo支持在断开后自定义回调的hook(ondisconnect),即在该操作中执行。
  • 发送消息:sent,包括发送请求和发送响应。记录write的时间。
  • 接收消息:received,包括接收请求和接收响应。记录read的时间。
  • 异常捕获:caught,用于处理在channel上发生的各类异常。

Dubbo框架的线程模型与以上这五种行为息息相关,Dubbo协议Provider端线程模型提供了五种实现,虽说都是五种但是别把二者混淆,线程模型的顶级接口是org.apache.dubbo.remoting.Dispatcher,该接口也是SPI的,提供的五种实现分别是AllDispatcherDirectDispatcherMessageOnlyDispatcherExecutionDispatcherConnectionOrderedDispatcher,默认的使用的是AllDispatcher

org.apache.dubbo.remoting.ChannelHandler作为Channel上的行为的顶级接口对应Dubbo协议Provider端的5种线程模型同样也提供了对应的5种实现,分别是AllChannelHandlerDirectChannelHandlerMessageOnlyChannelHandlerExecutionChannelHandlerConnectionOrderedChannelHandler,这里Channel上行为的具体实现不展开讨论。

Channel上行为和线程模型之间使用策略可以参考org.apache.dubbo.remoting.transport.dispatcher.ChannelHandlers的源代码,这里不做详细的介绍,下面的各个章节只针对5种线程模型做简单的介绍。

AllDispatcher

IO线程上的操作:

  • 接口响应序列化
  • sent操作

Dubbo线程池上的操作:

  • received、connected、disconnected、caught都是在Dubbo线程池上执行
  • 服务端反序列化操作的Dubbo线程池上执行

AllDispatcher代码如下,AllDispatcherdispatch方法实例化了AllChannelHandlerAllChannelHandler实现了received、connected、disconnected、caught操作在dubbo线程池中,代码如下:

public class AllDispatcher implements Dispatcher {
    public static final String NAME = "all";
    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new AllChannelHandler(handler, url);
    }
}
public class AllChannelHandler extends WrappedChannelHandler {
    public AllChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }
    @Override
    public void connected(Channel channel) throws RemotingException {
        ExecutorService executor = getSharedExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }
    @Override
    public void disconnected(Channel channel) throws RemotingException {
        ExecutorService executor = getSharedExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
        }
    }
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getPreferredExecutorService(message);
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            if(message instanceof Request && t instanceof RejectedExecutionException){
                sendFeedback(channel, (Request) message, t);
                return;
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }
    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService executor = getSharedExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }
}

DirectDispatcher

该线程模型Channel上的所有行为均在IO线程中执行,并没有在Dubbo线程池中执行

DirectDispatcherAllDispatcher相似,实例化了DirectChannelHandlerDirectChannelHandler只实现了received行为,但是received中获取的线程池如果是ThreadlessExecutor才会提交task,否则也是在ChannelHandler中执行received行为,ThreadlessExecutor和普通线程池最大的区别是不会管理任何线程,这里不展开讨论。

public class DirectDispatcher implements Dispatcher {
    public static final String NAME = "direct";
    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new DirectChannelHandler(handler, url);
    }
}
public class DirectChannelHandler extends WrappedChannelHandler {
    public DirectChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getPreferredExecutorService(message);
        if (executor instanceof ThreadlessExecutor) {
            try {
                executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
            } catch (Throwable t) {
                throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
            }
        } else {
            handler.received(channel, message);
        }
    }
}

ExecutionDispatcher

在IO线程中执行的操作有:

  • sent、connected、disconnected、caught操作在IO线程上执行。
  • 序列化响应在IO线程上执行。

在Dubbo线程中执行的操作有:

  • received都是在Dubbo线程上执行的。
  • 反序列化请求的行为在Dubbo中做的。

同样的,我们可以直接看ExecutionChannelHandler源码,逻辑是当message的类型是Request时received行为在Dubbo线程池执行。感兴趣的可以自己看源码,这里不做介绍。

MessageOnlyDispatcher

Message Only Dispatcher所有的received行为和反序列化都是在dubbo线程池中执行的

public class MessageOnlyChannelHandler extends WrappedChannelHandler {
    public MessageOnlyChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getPreferredExecutorService(message);
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            if(message instanceof Request && t instanceof RejectedExecutionException){
                sendFeedback(channel, (Request) message, t);
                return;
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }
}

ConnectionOrderedDispatcher

该线程模型与AllDispatcher类似,sent操作和相应的序列化是在IO线程上执行;connected、disconnected、received、caught操作在dubbo线程池上执行,他们的区别是在connected、disconnected行为上ConnectionOrderedDispatcher做了线程池隔离,并且在Dubbo connected thread pool中提供了链接限制、告警灯能力,我们直接看ConnectionOrderedChannelHandler源码,代码如下:

public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {
    protected final ThreadPoolExecutor connectionExecutor;
    private final int queueWarningLimit;
    public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
        String threadName = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        connectionExecutor = new ThreadPoolExecutor(1, 1,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),
                new NamedThreadFactory(threadName, true),
                new AbortPolicyWithReport(threadName, url)
        );  // FIXME There's no place to release connectionExecutor!
        queueWarningLimit = url.getParameter(CONNECT_QUEUE_WARNING_SIZE, DEFAULT_CONNECT_QUEUE_WARNING_SIZE);
    }
    @Override
    public void connected(Channel channel) throws RemotingException {
        try {
            checkQueueLength();
            connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }
    @Override
    public void disconnected(Channel channel) throws RemotingException {
        try {
            checkQueueLength();
            connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnected event", channel, getClass() + " error when process disconnected event .", t);
        }
    }
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getPreferredExecutorService(message);
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            if (message instanceof Request && t instanceof RejectedExecutionException) {
                sendFeedback(channel, (Request) message, t);
                return;
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }
    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService executor = getSharedExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }
    private void checkQueueLength() {
        if (connectionExecutor.getQueue().size() > queueWarningLimit) {
            logger.warn(new IllegalThreadStateException("connectionordered channel handler queue size: " + connectionExecutor.getQueue().size() + " exceed the warning limit number :" + queueWarningLimit));
        }
    }
}

到此这篇关于Java Dubbo协议下的服务端线程使用详解的文章就介绍到这了,更多相关Java Dubbo服务端线程内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java栈的运用之中缀表达式求值详解

    Java栈的运用之中缀表达式求值详解

    本文来介绍一题中缀表达式求值的问题,就是给定一个中缀计算式,编写程序将这个式子运算结果给计算出来,其实和后缀表达式的思路差不多,都是栈的运用问题,感兴趣的可以了解一下
    2022-11-11
  • IDEA中调用方法时,如何同步显示方法的注释信息

    IDEA中调用方法时,如何同步显示方法的注释信息

    这篇文章主要介绍了IDEA中调用方法时,如何同步显示方法的注释信息问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2025-03-03
  • JAVA动态代理模式(从现实生活角度理解代码原理)

    JAVA动态代理模式(从现实生活角度理解代码原理)

    本文主要介绍了JAVA动态代理模式(从现实生活角度理解代码原理)的相关知识。具有很好的参考价值。下面跟着小编一起来看下吧
    2017-03-03
  • Spring Boot整合Bootstrap的超详细步骤

    Spring Boot整合Bootstrap的超详细步骤

    之前做前端开发,在使用bootstrap的时候都是去官网下载,然后放到项目中,在页面引用,下面这篇文章主要给大家介绍了关于Spring Boot整合Bootstrap的超详细步骤,需要的朋友可以参考下
    2023-05-05
  • Java对文本文件MD5加密并ftp传送到远程主机目录的实现方法

    Java对文本文件MD5加密并ftp传送到远程主机目录的实现方法

    这篇文章主要给大家介绍了关于Java对文本文件MD5加密并ftp传送到远程主机目录的实现方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2018-08-08
  • java jdbc连接和使用详细介绍

    java jdbc连接和使用详细介绍

    这篇文章主要介绍了 java jdbc连接和使用详细介绍的相关资料,需要的朋友可以参考下
    2016-12-12
  • Springcloud实现服务多版本控制的示例代码

    Springcloud实现服务多版本控制的示例代码

    这篇文章主要介绍了Springcloud实现服务多版本控制的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-05-05
  • Lombok的@Accessors使用说明

    Lombok的@Accessors使用说明

    这篇文章主要介绍了Lombok的@Accessors使用说明,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2025-03-03
  • SpringBoot中的@Value注解用法

    SpringBoot中的@Value注解用法

    这篇文章主要介绍了SpringBoot中的@Value注解用法,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-02-02
  • Java Validation方法入参校验实现过程解析

    Java Validation方法入参校验实现过程解析

    这篇文章主要介绍了Java Validation方法入参校验实现过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-11-11

最新评论