java连接池Druid获取连接getConnection示例详解

 更新时间:2023年09月15日 11:51:53   作者:福  
这篇文章主要为大家介绍了java连接池Druid获取连接getConnection示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

Druid连接池

Druid连接池只存储在connections数组中,所以获取连接的逻辑应该比HikariPool简单一些:直接从connectoins获取即可。

DruidDataSource.getConnection

直接上代码:

@Override
    public DruidPooledConnection getConnection() throws SQLException {
        return getConnection(maxWait);
    }

调用了getConnection(maxWait),maxWait是参数设定的获取连接的最长等待时间,超过该时长还没有获取到连接的话,抛异常。

看getConnection(maxWait)代码:

public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
        init();
        if (filters.size() > 0) {
            FilterChainImpl filterChain = new FilterChainImpl(this);
            return filterChain.dataSource_connect(this, maxWaitMillis);
        } else {
            return getConnectionDirect(maxWaitMillis);
        }
    }

先调用init,init方法会判断连接池是否已经完成了初始化,如果没有完成初始化则首先进行初始化,初始化的代码我们上一篇文章已经分析过了。

之后判断是否有filters,filters的内容我们先放放,暂时不管,直接看没有filters的情况下,调用getConnectionDirect方法。

getConnectionDirect

方法比较长,我们还是老办法,分段分析:

public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {
        int notFullTimeoutRetryCnt = 0;
        for (;;) {
            // handle notFullTimeoutRetry
            DruidPooledConnection poolableConnection;
            try {
                poolableConnection = getConnectionInternal(maxWaitMillis);
            } catch (GetConnectionTimeoutException ex) {
                if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && !isFull()) {
                    notFullTimeoutRetryCnt++;
                    if (LOG.isWarnEnabled()) {
                        LOG.warn("get connection timeout retry : " + notFullTimeoutRetryCnt);
                    }
                    continue;
                }
                throw ex;
            }

上来之后首先无限for循环,目的是从连接池获取到连接之后,根据参数设定可能会做必要的检查,如果检查不通过(比如连接不可用、连接已关闭等等)的话循环重新获取。

然后调用getConnectionInternal获取连接,getConnectionInternal方法应该是我们今天文章的主角,我们稍微放一放,为了文章的可读性,先分析完getConnectionDirect方法。

我们假设通过调用getConnectionInternal方法获取到一个连接(注意获取到的连接对象是DruidPooledConnection,不是Connection对象,这个也不难想象,连接池获取到的连接一定是数据库物理连接的代理对象(或者叫封装对象,封装了数据库物理连接Connection对象的对象,这个原理我们在分析HikariPool的时候已经说过了。这个DruidPooledConnection对象我们也暂时放一放,后面分析)。

调用getConnectionInternal方法如果返回超时异常,判断:如果当前连接池没满,而且获取连接超时重试次数小于参数notFullTimeoutRetryCount设定的次数的话,则continue,重新获取连接。否则,抛出超时异常。

接下来:

if (testOnBorrow) {
                boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
                if (!validate) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("skip not validate connection.");
                    }
                    discardConnection(poolableConnection.holder);
                    continue;
                }
            } else {
                if (poolableConnection.conn.isClosed()) {
                    discardConnection(poolableConnection.holder); // 传入null,避免重复关闭
                    continue;
                }

testOnBorrow参数的目的是:获取连接后是否要做连接可用性测试,如果设定为true的话,调用testConnectionInternal测试连接的可用性,testConnectionInternal方法上一篇文章分析连接回收的时候、处理keepAlive的过程中就碰到过,就是执行配置好的sql语句测试连接可用性,如果测试不通过的话则调用discardConnection关闭连接,continue重新获取连接。

否则,如果testOnBorrow参数没有打开的话,检查当前连接如果已经关闭,则调用discardConnection关闭连接(没太明白连接既然已经是关闭状态,为啥还需要调用?),continue重新获取连接。

不建议打开testOnBorrow参数,因为连接池都会有连接回收机制,比如上一篇文章讲过的Druid的DestroyConnectionThread & DestroyTask,回收参数配置正常的话,回收机制基本可以确保连接的可用性。打开testOnBorrow参数会导致每次获取连接之后都测试连接的可用性,严重影响系统性能。

接下来:

if (testWhileIdle) {
                    final DruidConnectionHolder holder = poolableConnection.holder;
                    long currentTimeMillis             = System.currentTimeMillis();
                    long lastActiveTimeMillis          = holder.lastActiveTimeMillis;
                    long lastExecTimeMillis            = holder.lastExecTimeMillis;
                    long lastKeepTimeMillis            = holder.lastKeepTimeMillis;
                    if (checkExecuteTime
                            && lastExecTimeMillis != lastActiveTimeMillis) {
                        lastActiveTimeMillis = lastExecTimeMillis;
                    }
                    if (lastKeepTimeMillis > lastActiveTimeMillis) {
                        lastActiveTimeMillis = lastKeepTimeMillis;
                    }
                    long idleMillis                    = currentTimeMillis - lastActiveTimeMillis;
                    long timeBetweenEvictionRunsMillis = this.timeBetweenEvictionRunsMillis;
                    if (timeBetweenEvictionRunsMillis <= 0) {
                        timeBetweenEvictionRunsMillis = DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS;
                    }
                    if (idleMillis >= timeBetweenEvictionRunsMillis
                            || idleMillis < 0 // unexcepted branch
                            ) {
                        boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
                        if (!validate) {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("skip not validate connection.");
                            }
                            discardConnection(poolableConnection.holder);
                             continue;
                        }
                    }
                }
            }

这段代码的逻辑是:参数testWhileIdle设置为true的话,检查当前链接的空闲时长如果大于timeBetweenEvictionRunsMillis(默认60秒)的话,则调用testConnectionInternal测试连接可用性,连接不可用则关闭连接,continue重新获取连接。

然后:

if (removeAbandoned) {
                StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
                poolableConnection.connectStackTrace = stackTrace;
                poolableConnection.setConnectedTimeNano();
                poolableConnection.traceEnable = true;
                activeConnectionLock.lock();
                try {
                    activeConnections.put(poolableConnection, PRESENT);
                } finally {
                    activeConnectionLock.unlock();
                }
            }

出现了一个removeAbandoned参数,这个参数的意思是移除被遗弃的连接对象,如果打开的话就把当前连接放到activeConnections中,篇幅有限,这部分内容就不展开了,后面我们会专门写一篇文章介绍removeAbandoned参数。

剩下的一小部分代码,很简单,根据参数设置连接的autoCommit,之后返回连接poolableConnection。

if (!this.defaultAutoCommit) {
                poolableConnection.setAutoCommit(false);
            }
            return poolableConnection;
        }
    }

getConnectionDirect方法源码分析完成了,下面我们要看一下getConnectionInternal方法,这是真正从连接池中获取连接的方法。

getConnectionInternal

直接看代码:

private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException {
        if (closed) {
            connectErrorCountUpdater.incrementAndGet(this);
            throw new DataSourceClosedException("dataSource already closed at " + new Date(closeTimeMillis));
        }
        if (!enable) {
            connectErrorCountUpdater.incrementAndGet(this);
            if (disableException != null) {
                throw disableException;
            }
            throw new DataSourceDisableException();
        }
        final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait);
        final int maxWaitThreadCount = this.maxWaitThreadCount;
        DruidConnectionHolder holder;

检查连接池状态如果已经disable或cloesed的话,抛异常。

接下来:

for (boolean createDirect = false;;) {
            if (createDirect) {
                createStartNanosUpdater.set(this, System.nanoTime());
                if (creatingCountUpdater.compareAndSet(this, 0, 1)) {
                    PhysicalConnectionInfo pyConnInfo = DruidDataSource.this.createPhysicalConnection();
                    holder = new DruidConnectionHolder(this, pyConnInfo);
                    holder.lastActiveTimeMillis = System.currentTimeMillis();
                    creatingCountUpdater.decrementAndGet(this);
                    directCreateCountUpdater.incrementAndGet(this);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("conn-direct_create ");
                    }
                    boolean discard = false;
                    lock.lock();
                    try {
                        if (activeCount < maxActive) {
                            activeCount++;
                            holder.active = true;
                            if (activeCount > activePeak) {
                                activePeak = activeCount;
                                activePeakTime = System.currentTimeMillis();
                            }
                            break;
                        } else {
                            discard = true;
                        }
                    } finally {
                        lock.unlock();
                    }
                    if (discard) {
                        JdbcUtils.close(pyConnInfo.getPhysicalConnection());
                    }
                }
            }

初始化createDirect变量为false之后启动无限循环,意思是不断循环直到获取到连接、或超时等其他异常情况发生。

紧接着的这段代码是createDirect=true的情况下执行的,createDirect是在下面循环体中检查如果:createScheduler不为空、连接池空、活动连接数小于设定的最大活动连接数maxActive、并且createScheduler的队列中排队等待创建连接的线程大于0的情况下,设置createDirect为true的,以上这些条件如果成立的话,大概率表明createScheduler中的创建线程出问题了、所以createScheduler大概率指望不上了,所以要直接创建连接了。

直接创建的代码也很容易理解,调用createPhysicalConnection创建物理连接,创建DruidConnectionHolder封装该物理连接,创建之后获取锁资源,检查activeCount < maxActive则表明创建连接成功、结束for循环,否则,activeCount >= maxActive则说明违反了原则(直接创建连接的过程中createScheduler可能复活了、又创建出来连接放入连接池中了),所以,关闭锁资源之后,将刚创建出来的连接关闭。

然后:

try {
                lock.lockInterruptibly();
            } catch (InterruptedException e) {
                connectErrorCountUpdater.incrementAndGet(this);
                throw new SQLException("interrupt", e);
            }
            try {
                if (maxWaitThreadCount > 0
                        && notEmptyWaitThreadCount >= maxWaitThreadCount) {
                    connectErrorCountUpdater.incrementAndGet(this);
                    throw new SQLException("maxWaitThreadCount " + maxWaitThreadCount + ", current wait Thread count "
                            + lock.getQueueLength());
                }
                if (onFatalError
                        && onFatalErrorMaxActive > 0
                        && activeCount >= onFatalErrorMaxActive) {
                    connectErrorCountUpdater.incrementAndGet(this);
                    StringBuilder errorMsg = new StringBuilder();
                    errorMsg.append("onFatalError, activeCount ")
                            .append(activeCount)
                            .append(", onFatalErrorMaxActive ")
                            .append(onFatalErrorMaxActive);
                    if (lastFatalErrorTimeMillis > 0) {
                        errorMsg.append(", time '")
                                .append(StringUtils.formatDateTime19(
                                        lastFatalErrorTimeMillis, TimeZone.getDefault()))
                                .append("'");
                    }
                    if (lastFatalErrorSql != null) {
                        errorMsg.append(", sql \n")
                                .append(lastFatalErrorSql);
                    }
                    throw new SQLException(
                            errorMsg.toString(), lastFatalError);
                }
                connectCount++;
                if (createScheduler != null
                        && poolingCount == 0
                        && activeCount < maxActive
                        && creatingCountUpdater.get(this) == 0
                        && createScheduler instanceof ScheduledThreadPoolExecutor) {
                    ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) createScheduler;
                    if (executor.getQueue().size() > 0) {
                        createDirect = true;
                        continue;
                    }
                }

获取锁资源,检查等待获取连接的线程数如果大于参数设置的最大等待线程数,抛异常。

检查并处理异常。

累加connectCount。

之后是上面提到过的对createDirect的处理。

接下来到了最为关键的部分,一般情况下createDirect为false,不会直接创建连接,逻辑会走到下面这部分代码中,从连接池中获取连接:

if (maxWait > 0) {
                    holder = pollLast(nanos);
                } else {
                    holder = takeLast();
                }
                if (holder != null) {
                    if (holder.discard) {
                        continue;
                    }
                    activeCount++;
                    holder.active = true;
                    if (activeCount > activePeak) {
                        activePeak = activeCount;
                        activePeakTime = System.currentTimeMillis();
                    }
                }
            } catch (InterruptedException e) {
                connectErrorCountUpdater.incrementAndGet(this);
                throw new SQLException(e.getMessage(), e);
            } catch (SQLException e) {
                connectErrorCountUpdater.incrementAndGet(this);
                throw e;
            } finally {
                lock.unlock();
            }

如果参数设置了maxWait,则调用pollLast限时获取,否则调用takeLast获取连接,这两个方法稍后分析。

之后检查获取到的连接已经被discard的话,continue重新获取连接。

释放锁资源。

从连接池中获取到了连接,结束for循环。

如果takeLast或poolLast返回的DruidConnectionHolder为null的话(调用poolLast超时),处理错误信息,抛GetConnectionTimeoutException超时异常(这部分代码没有贴出,省略了......感兴趣的童鞋自己打开源码看一下)。

否则,用DruidConnectionHolder封装创建DruidPooledConnection后返回。

takeLast & pollLast(nanos)

这两个方法的逻辑其实差不多,主要区别一个是限时,一个不限时,两个方法都是在锁状态下执行。

具体调用哪一个方法取决于参数maxWait,默认值为-1,默认情况下会调用takeLast,获取连接的时候不限时。

建议设置maxWait,否则在特殊情况下如果创建连接失败、会导致应用层线程挂起,获取不到任何返回的情况出现。如果设置了maxWait,getConnection方法会调用pollLast(nanos),获取不到连接后,应用层会得到连接超时的反馈。

先看takeLast方法:

takeLast() throws InterruptedException, SQLException {
        try {
            while (poolingCount == 0) {
                emptySignal(); // send signal to CreateThread create connection
                if (failFast && isFailContinuous()) {
                    throw new DataSourceNotAvailableException(createError);
                }
                notEmptyWaitThreadCount++;
                if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) {
                    notEmptyWaitThreadPeak = notEmptyWaitThreadCount;
                }
                try {
                    notEmpty.await(); // signal by recycle or creator
                } finally {
                    notEmptyWaitThreadCount--;
                }
                notEmptyWaitCount++;
                if (!enable) {
                    connectErrorCountUpdater.incrementAndGet(this);
                    if (disableException != null) {
                        throw disableException;
                    }
                    throw new DataSourceDisableException();
                }
            }
        } catch (InterruptedException ie) {
            notEmpty.signal(); // propagate to non-interrupted thread
            notEmptySignalCount++;
            throw ie;
        }
        decrementPoolingCount();
        DruidConnectionHolder last = connections[poolingCount];
        connections[poolingCount] = null;
        return last;
    }

如果连接池为空(poolingCount == 0)的话,无限循环。

调用emptySignal(),通知创建连接线程,有人在等待获取连接,抓紧时间创建连接。

然后调用notEmpty.await(),等待创建连接线程在完成创建、或者有连接归还到连接池中后唤醒通知。

如果发生异常,调用一下notEmpty.signal()通知其他获取连接的线程,没准自己没能获取成功、其他线程能获取成功。

下面的代码,线程池一定不空了。

线程池的线程数量减1(decrementPoolingCount),然后获取connections的最后一个元素返回。

pollLast方法的代码逻辑和takeLast的类似,只不过线程池空的话,当前线程会限时挂起等待,超时仍然不能获取到连接的话,直接返回null。

Druid连接池获取连接代码分析完毕!

小结

Druid连接池的连接获取过程的源码分析完毕,后面还有连接归还过程,更多关于java Druid获取连接getConnection的资料请关注脚本之家其它相关文章!

相关文章

  • spring boot集成jasypt 并实现自定义加解密的详细步骤

    spring boot集成jasypt 并实现自定义加解密的详细步骤

    由于项目中的配置文件 配置的地方过多,现将配置文件统一放到nacos上集中管理 且密码使用加密的方式放在配置文件中,配置文件的加密使用加密库jasypt,本文给大家介绍spring boot集成jasypt并实现自定义加解密,感兴趣的朋友一起看看吧
    2023-08-08
  • 详解MySQL的简易封装以及使用

    详解MySQL的简易封装以及使用

    本文主要介绍了MySQL的简易封装以及使用。具有一定的参考价值,下面跟着小编一起来看下吧
    2017-01-01
  • Java 位图法排序的使用方法

    Java 位图法排序的使用方法

    本篇文章,小编将为大家介绍关于Java 位图法排序的使用方法,有需要的朋友可以参考一下
    2013-04-04
  • Java微信公众平台开发(12) 微信用户信息的获取

    Java微信公众平台开发(12) 微信用户信息的获取

    这篇文章主要为大家详细介绍了Java微信公众平台开发第十二步,微信用户信息的获取,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-04-04
  • java为什么使用BlockingQueue解决竞态条件问题面试精讲

    java为什么使用BlockingQueue解决竞态条件问题面试精讲

    这篇文章主要为大家介绍了java为什么使用BlockingQueue解决竞态条件问题面试精讲,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-10-10
  • springboot打包实现项目JAR包和依赖JAR包分离

    springboot打包实现项目JAR包和依赖JAR包分离

    这篇文章主要介绍了springboot打包实现项目JAR包和依赖JAR包分离,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-02-02
  • SpringCloud-Hystrix-Dashboard客户端服务监控的实现方法

    SpringCloud-Hystrix-Dashboard客户端服务监控的实现方法

    这篇文章主要介绍了SpringCloud-Hystrix-Dashboard客户端服务监控的实现方法,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-03-03
  • springBoot使用JdbcTemplate代码实例

    springBoot使用JdbcTemplate代码实例

    这篇文章主要介绍了springBoot使用JdbcTemplate代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-09-09
  • java调用百度的接口获取起-止位置的距离

    java调用百度的接口获取起-止位置的距离

    本文主要介绍了java调用百度的接口获取起-止位置的距离,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-04-04
  • Java多线程实现的两种方式

    Java多线程实现的两种方式

    本文主要介绍了Java多线程实现的两种方式:继承Thread类、实现Runnable接口。具有一定的参考价值,下面跟着小编一起来看下吧
    2017-01-01

最新评论