springboot内置tomcat之NIO处理流程一览

 更新时间:2021年12月29日 09:41:27   作者:CRUD的W  
这篇文章主要介绍了springboot内置tomcat之NIO处理流程,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

前言

springboot内置的tomcat目前默认是基于NIO来实现的,本文介绍下tomcat接受请求的一些组件及组件之间的关联

tomcat组件

本文只介绍NIO中tomcat的组件

我们直接看NIO的核心类NioEndpoint的startInternal方法

Acceptor组件

public void startInternal() throws Exception {
        if (!running) {
            running = true;
            paused = false;
            processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                    socketProperties.getProcessorCache());
            eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                            socketProperties.getEventCache());
            nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                    socketProperties.getBufferPool());
            // Create worker collection
            if ( getExecutor() == null ) {
                createExecutor();
            }
            initializeConnectionLatch();
            // Start poller threads
            // 核心代码1
            pollers = new Poller[getPollerThreadCount()];
            for (int i=0; i<pollers.length; i++) {
                pollers[i] = new Poller();
                Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
                pollerThread.setPriority(threadPriority);
                pollerThread.setDaemon(true);
                pollerThread.start();
            }
			// 核心代码2
            startAcceptorThreads();
        }
    }

看核心代码1的位置构造了一个Poller数组,Poller是一个实现了Runnable的类,并且启动了该线程类,

getPollerThreadCount()方法返回了2和当前物理机CPU内核数的最小值,即创建的数组最大值为2

接下来看核心代码2startAcceptorThreads()

protected final void startAcceptorThreads() {
    int count = getAcceptorThreadCount();
    acceptors = new ArrayList<>(count);
    for (int i = 0; i < count; i++) {
        Acceptor<U> acceptor = new Acceptor<>(this);
        String threadName = getName() + "-Acceptor-" + i;
        acceptor.setThreadName(threadName);
        acceptors.add(acceptor);
        Thread t = new Thread(acceptor, threadName);
        t.setPriority(getAcceptorThreadPriority());
        t.setDaemon(getDaemon());
        t.start();
    }
}

创建了多个Acceptor类,Acceptor也是实现了Runnable的线程类,创建个数默认是1

然后我们看下Acceptor启动后做了什么,我们直接看run方法

  public void run() {
        ......
                U socket = null;
                try {
                    // Accept the next incoming connection from the server
                    // socket
                    // 核心代码1
                    socket = endpoint.serverSocketAccept();
                } catch (Exception ioe) {
                    ......
                }
                // Successful accept, reset the error delay
                errorDelay = 0;
                // Configure the socket
                if (endpoint.isRunning() && !endpoint.isPaused()) {
                    // setSocketOptions() will hand the socket off to
                    // an appropriate processor if successful
                    // 核心代码2
                    if (!endpoint.setSocketOptions(socket)) {
                        endpoint.closeSocket(socket);
                    }
               ......
    }

核心代码1很明显是一个阻塞模型,即接受客户端连接的,当没有客户端连接时会处于阻塞,这里可以看到默认情况下tomcat在nio模式下只有一个Acceptor线程类来接受连接

然后看核心代码2

 protected boolean setSocketOptions(SocketChannel socket) {
        // Process the connection
        try {
            //disable blocking, APR style, we are gonna be polling it
            socket.configureBlocking(false);
            Socket sock = socket.socket();
            socketProperties.setProperties(sock);
            NioChannel channel = nioChannels.pop();
            if (channel == null) {
                SocketBufferHandler bufhandler = new SocketBufferHandler(
                        socketProperties.getAppReadBufSize(),
                        socketProperties.getAppWriteBufSize(),
                        socketProperties.getDirectBuffer());
                if (isSSLEnabled()) {
                    channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
                } else {
                    channel = new NioChannel(socket, bufhandler);
                }
            } else {
                channel.setIOChannel(socket);
                channel.reset();
            }
            // 核心代码
            getPoller0().register(channel);
        } catch (Throwable t) {
           ......
        }
        return true;
    }

我们看核心代码getPoller0().register(channel)

public void register(final NioChannel socket) {
        socket.setPoller(this);
        NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
        socket.setSocketWrapper(ka);
        ka.setPoller(this);
        ka.setReadTimeout(getConnectionTimeout());
        ka.setWriteTimeout(getConnectionTimeout());
        ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
        ka.setSecure(isSSLEnabled());
        PollerEvent r = eventCache.pop();
        ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
        if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
        else r.reset(socket,ka,OP_REGISTER);
        // 核心代码
        addEvent(r);
    }

看addEvent

  private void addEvent(PollerEvent event) {
        events.offer(event);
        if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();
    }

events的定义

private final SynchronizedQueue<PollerEvent> events =
                new SynchronizedQueue<>();

这里可以看到封装了一个PollerEvent 并且扔到了一个队列里面,然后当前类就结束了

由此可得Acceptor的作用就是接受客户端连接,并且把连接封装起来扔到了一个队列中

Poller

我们前面已经创建并且启动了多个Poller线程类,默认的数量是小于等于2的。

然后我们看下Poller类做了什么,同样我们看run方法

  @Override
    public void run() {
        // Loop until destroy() is called
        while (true) {
            boolean hasEvents = false;
            try {
                if (!close) {
                	// 核心代码1
                    hasEvents = events();
                    
            .......
            
            Iterator<SelectionKey> iterator =
                keyCount > 0 ? selector.selectedKeys().iterator() : null;
            // Walk through the collection of ready keys and dispatch
            // any active event.
            while (iterator != null && iterator.hasNext()) {
                SelectionKey sk = iterator.next();
                NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
                // Attachment may be null if another thread has called
                // cancelledKey()
                if (attachment == null) {
                    iterator.remove();
                } else {
                    iterator.remove();
                    // 核心代码2
                    processKey(sk, attachment);
                }
            }//while
            //process timeouts
            timeout(keyCount,hasEvents);
        }//while
        getStopLatch().countDown();
    }

先看核心代码1 hasEvents = events()

 public boolean events() {
        boolean result = false;
        PollerEvent pe = null;
        for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) {
            result = true;
            try {
            	// 核心代码
                pe.run();
                pe.reset();
                if (running && !paused) {
                    eventCache.push(pe);
                }
            } catch ( Throwable x ) {
                log.error("",x);
            }
        }
        return result;
    }

核心代码run

@Override
    public void run() {
        if (interestOps == OP_REGISTER) {
            try {
            	// 核心代码,注册到selector轮训器
                socket.getIOChannel().register(
                        socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);
            } catch (Exception x) {
                log.error(sm.getString("endpoint.nio.registerFail"), x);
            }
        } else {
            ......
        }
    }

可以看出大概的意思就是从刚才我们放进去的队列events里面取数据放到了eventCache里面,eventCache的定义SynchronizedStack eventCache,当取到数据后返回true,这个时候就会进入核心代码2处的processKey(sk, attachment),也就是开始处理请求了

protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
        try {
            if ( close ) {
                cancelledKey(sk);
            } else if ( sk.isValid() && attachment != null ) {
                if (sk.isReadable() || sk.isWritable() ) {
                    if ( attachment.getSendfileData() != null ) {
                        processSendfile(sk,attachment, false);
                    } else {
                        unreg(sk, attachment, sk.readyOps());
                        boolean closeSocket = false;
                        // Read goes before write
                        if (sk.isReadable()) {
                        	// 核心代码
                            if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
                                closeSocket = true;
                            }
                        }
                        if (!closeSocket && sk.isWritable()) {
                            if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
                                closeSocket = true;
                            }
                        }
                      ......
    }

这里就可以看到我们的NIO的模型了,也就是多路复用的模型,轮询来判断key的状态,当key是可读或者可写时执行processSocket

  public boolean processSocket(SocketWrapperBase<S> socketWrapper,
            SocketEvent event, boolean dispatch) {
        try {
            if (socketWrapper == null) {
                return false;
            }
            SocketProcessorBase<S> sc = processorCache.pop();
            if (sc == null) {
                sc = createSocketProcessor(socketWrapper, event);
            } else {
                sc.reset(socketWrapper, event);
            }
            // 核心代码
            Executor executor = getExecutor();
            if (dispatch && executor != null) {
                executor.execute(sc);
            } else {
                sc.run();
            }
        } catch (RejectedExecutionException ree) {
            getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
            return false;
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            // This means we got an OOM or similar creating a thread, or that
            // the pool and its queue are full
            getLog().error(sm.getString("endpoint.process.fail"), t);
            return false;
        }
        return true;
    }

这里就是核心代码了,可以看到getExecutor()方法,获取线程池,这个线程池是在初始化tomcat时提前初始化好的,默认情况下核心线程是10,最大线程是200。线程池的配置可以根据我们自己配置来设置大小。

这里拿到线程池然后包装了一个SocketProcessorBase线程类扔到线程池里面取执行

从这里可以看到Poller的功能就是从前面的队列里面获取连接然后包装成SocketProcessorBase之后扔到线程池里面去执行,SocketProcessorBase才是最终真正处理请求的

总结

根据上面的分析我们已经可以看到tomcat的执行流程了,这里盗用网上的一张比较好的图

在这里插入图片描述

大致流程为

1、创建一个Acceptor线程来接收用户连接,接收到之后扔到events queue队列里面,默认情况下只有一个线程来接收

2、创建Poller线程,数量小于等于2,Poller对象是NIO的核心,在Poller中,维护了一个Selector对象;当Poller从队列中取出socket后,注册到该Selector中;然后通过遍历Selector,找出其中可读的socket,然后扔到线程池中处理相应请求,这就是典型的NIO多路复用模型

3、扔到线程池中的SocketProcessorBase处理请求

相较于BIO模型的tomcat,NIO的优势分析

1、BIO中的流程应该是接收到请求之后直接把请求扔给线程池去做处理,在这个情况下一个连接即需要一个线程来处理,线程既需要读取数据还需要处理请求,线程占用时间长,很容易达到最大线程

2、NIO的流程的不同点在于Poller类采用了多路复用模型,即Poller类只有检查到可读或者可写的连接时才把当前连接扔给线程池来处理,这样的好处是大大节省了连接还不能读写时的处理时间(如读取请求数据),也就是说NIO“读取socket并交给Worker中的线程”这个过程是非阻塞的,当socket在等待下一个请求或等待释放时,并不会占用工作线程,因此Tomcat可以同时处理的socket数目远大于最大线程数,并发性能大大提高。

以上就是我对于tomcat中nio处理模型的一些理解。希望能给大家一个参考,也希望大家多多支持脚本之家

相关文章

  • java springmvc乱码解决归纳整理详解

    java springmvc乱码解决归纳整理详解

    本篇文章介绍了java 中spring mvc 解决乱码的问题方法实例,需要的朋友可以参考下
    2017-04-04
  • MyBatis持久层框架的用法知识小结

    MyBatis持久层框架的用法知识小结

    MyBatis 本是apache的一个开源项目iBatis,接下来通过本文给大家介绍MyBatis持久层框架的用法知识小结,非常不错,具有参考借鉴价值,感兴趣的朋友一起学习吧
    2016-07-07
  • Springboot项目如何获取所有的接口

    Springboot项目如何获取所有的接口

    这篇文章主要介绍了Springboot项目如何获取所有的接口,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-09-09
  • 简单了解Java方法的定义和使用实现详解

    简单了解Java方法的定义和使用实现详解

    这篇文章主要介绍了简单了解Java方法的定义和使用实现详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-12-12
  • 16 个有用的的Java工具类(小结)

    16 个有用的的Java工具类(小结)

    这篇文章主要介绍了16 个有用的的Java工具类,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-08-08
  • Java JTable 实现日历的示例

    Java JTable 实现日历的示例

    这篇文章主要介绍了Java JTable 实现日历的示例,帮助大家更好的理解和学习Java jtable的使用方法,感兴趣的朋友可以了解下
    2020-10-10
  • 微信公众帐号开发教程之图文消息全攻略

    微信公众帐号开发教程之图文消息全攻略

    本篇主要介绍微信公众帐号开发中图文消息的使用,以及图文消息的几种表现形式。标题取名为"图文消息全攻略",这绝对不是标题党,是想借此机会把大家对图文消息相关的问题、疑虑、障碍全部清除掉。
    2016-12-12
  • JNDI在JavaEE中的角色_动力节点Java学院整理

    JNDI在JavaEE中的角色_动力节点Java学院整理

    这篇文章主要介绍了JNDI在JavaEE中的角色,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-08-08
  • JAVA格式化时间日期的简单实例

    JAVA格式化时间日期的简单实例

    这篇文章主要介绍了JAVA格式化时间日期的简单实例,有需要的朋友可以参考一下
    2013-11-11
  • Springboot整合freemarker和相应的语法详解

    Springboot整合freemarker和相应的语法详解

    FreeMarker是一款Spring官方推荐使用的模板引擎。接下来通过本文给大家介绍Springboot整合freemarker和相应的语法,感兴趣的朋友一起看看吧
    2021-09-09

最新评论