Scala数据库连接池的简单实现

 更新时间:2023年02月09日 09:28:39   作者:ncqingchuan1976  
本文主要介绍了Scala数据库连接池的简单实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

在使用JDBC的时候,数据库据连接是非常宝贵的资源。为了复用这些资源,可以将连接保存在一个队列中。当需要的时候可以从队列中取出未使用的连接。如果没有可用连接,则可以在一定时间内等待,直到队列中有可用的连接,否则将抛出异常。

以下是DataSoucre的代码,DataSoucre负责对连接的管理以及分发,同时设置队列的大小,等待时间,连接的账号、密码等。

核心方法为getConenction()方法。且实现AutoCloseable接口,以便后面可以使用using方法自动关闭资源。队列中的连接为封装了conenction的DbConnection类。

package pool
import scala.util.control.Breaks._
import scala.collection.mutable.ArrayBuffer
import java.{util => ju}
import scala.collection.mutable.Buffer
import scala.util.control.Breaks
 
class DataSource(
    val driverName: String,
    val url: String,
    val user: String,
    val password: String,
    val minSize: Integer = 1,
    val maxSize: Integer = 10,
    val keepAliveTimeout: Long = 1000
) extends AutoCloseable {
 
  if (minSize < 0 || minSize > maxSize || keepAliveTimeout < 0) {
    throw new IllegalArgumentException("These arguments are Illegal")
  }
 
  Class.forName(driverName)
  private val pool: Buffer[DbConnection] = ArrayBuffer[DbConnection]()
  private val lock: ju.concurrent.locks.Lock = new ju.concurrent.locks.ReentrantLock(true)
 
  for (i <- 0 until minSize) {
    pool += new DbConnection(url, user, password)
  }
 
  def getConenction(): DbConnection = {
    val starEntry = System.currentTimeMillis()
    Breaks.breakable {
      while (true) {
        lock.lock()
        try {
          for (con <- pool) {
            if (!con.used) {
              con.used = true
              return con;
            }
          }
          if (pool.size < maxSize) {
            var con = new DbConnection(url, user, password) { used = true }
            pool.append(con)
            return con
          }
        } finally {
          lock.unlock()
        }
        if (System.currentTimeMillis() - starEntry > keepAliveTimeout) {
          break()
        }
      }
    }
    throw new IllegalArgumentException("Connection Pool is empty")
  }
  def close(): Unit = {
    lock.lock()
    try {
      if (pool != null) {
        pool.foreach(t => t.innerConnection.close())
        pool.clear()
      }
    } finally {
      lock.unlock()
    }
  }
}

以下是Dbconnection类,该类提供了三个方法且实现了AutoCloseable接口

BeginTransaction:开启事务,并返回封装了的DbTransaction类

close:将连接释放

CreateCommand:创建DbCommand类,该类是负责操作连接的类,比如提交sql,读取数据等

package pool
 
import java.sql.Connection
import java.sql.DriverAction
import java.sql.DriverManager
 
class DbConnection(
    val url: String,
    val user: String,
    val password: String
) extends AutoCloseable {
 
  private[pool] var used: Boolean = false
  private[pool] val innerConnection: Connection = DriverManager.getConnection(url, user, password)
 
  def close(): Unit = {
    if (used) {
      used = false
    }
  }
 
  def BeginTransaction(isolationLevel: Int = IsolationLevel.TRANSACTION_READ_COMMITTED): DbTransaction = {
    if (innerConnection.getAutoCommit()) {
      innerConnection.setAutoCommit(false)
    }
    innerConnection.setTransactionIsolation(isolationLevel)
    new DbTransaction(this)
  }
 
  def CreateCommand(): DbCommand = {
    new DbCommand(this)
  }
}

以下是DbCommand类的代码,该类负责操作数据库。如ExecuteResultSet,ExecuteScalar等。

ExecuteScalar:查询数据库并返回第一行第一个值的方法。

ExecuteResultSet:该方法有两个重载方法。

参数为callBack: ResultSet => Unit的方法,提供了一个回调函数,解析数据的操作可以在回调中实现。

无参的版本则通过反射直接将ResultSet通过字段位置映射,转换成你需要的类型。

package pool
 
import java.sql.CallableStatement
import java.sql.ResultSet
import java.sql.SQLType
import java.sql.Statement
import java.sql.Types
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.Buffer
import scala.language.implicitConversions
import scala.reflect.ClassTag
import scala.reflect.runtime.{universe => ru}
 
import Dispose.using
import java.{util => ju}
 
class DbCommand(val connection: DbConnection, var commandText: String = null, val queryTimeout: Integer = 30) extends AutoCloseable {
  if (queryTimeout < 0) {
    throw new IllegalArgumentException(s"timeout (${queryTimeout}) value must be greater than 0.")
  }
  val Parameters: Buffer[DbParameter] = ArrayBuffer[DbParameter]()
  private val mirror = ru.runtimeMirror(getClass().getClassLoader())
  private var statement: CallableStatement = null
 
  /** @author:qingchuan
    *
    * @return
    */
  def ExecuteScalar(): Any = {
    var obj: Any = None
    ExecuteResultSet(t => {
      if (t.next()) {
        if (t.getMetaData().getColumnCount() > 0)
          obj = t.getObject(1)
      }
    })
    obj
  }
 
  /** @author
    *   qingchuan
    * @version 1.0
    *
    * @param callBack
    */
  def ExecuteResultSet(callBack: ResultSet => Unit): Unit = {
    if (callBack == null) throw new IllegalArgumentException("The value of parameter callback is null.")
    statement = connection.innerConnection.prepareCall(commandText)
    statement.setQueryTimeout(queryTimeout)
    addParatemetrs()
    using(statement.executeQuery()) { t =>
      callBack(t)
      if (!t.isClosed())
        getOutParameterValue()
    }
  }
 
  def ExecuteResultSet[T: ru.TypeTag](): ArrayBuffer[T] = {
    val classSymbol = mirror.symbolOf[T].asClass
    val classMirror = mirror.reflectClass(classSymbol)
    val consMethodMirror = classMirror.reflectConstructor(classSymbol.primaryConstructor.asMethod)
    val fields = ru.typeOf[T].decls.filter(t => t.asTerm.isGetter && t.isPublic).map(t => t.asTerm)
    val result = new ArrayBuffer[T]()
    ExecuteResultSet(t => {
      while (t.next()) {
        var i = 1
        val values: Buffer[Any] = ArrayBuffer()
        for (f <- fields) {
          values += t.getObject(i)
          i += 1
        }
        result += consMethodMirror.apply(values: _*).asInstanceOf[T]
      }
    })
    result
  }
 
  def ExecuteBatch[T: ru.TypeTag: ClassTag](values: List[T]): Int = {
    statement = connection.innerConnection.prepareCall(commandText)
    var trans: DbTransaction = null
    val fields = ru.typeOf[T].decls.filter(t => t.asTerm.isGetter && t.isPublic).map(t => t.asTerm)
    for (t <- values) {
      var i = 1
      val filedMirror = mirror.reflect(t)
      for (f <- fields) {
        val instance = filedMirror.reflectField(f)
        statement.setObject(i, instance.get)
        i += 1
      }
      statement.addBatch()
    }
 
    try {
      trans = connection.BeginTransaction()
      val obj = statement.executeBatch()
      trans.Commit()
      statement.clearBatch()
      obj.sum
    } catch {
      case e: Exception => {
        if (trans != null)
          trans.RollBack()
        throw e
      }
    }
  }
 
  def ExecuteNoneQuery(): Integer = {
    statement = connection.innerConnection.prepareCall(commandText)
    statement.setQueryTimeout(queryTimeout)
    addParatemetrs()
    val obj = statement.executeUpdate()
    getOutParameterValue()
    obj
  }
 
  def CreateParameter(): DbParameter = {
    new DbParameter();
  }
 
  private def getOutParameterValue(): Unit = {
    for (i <- 1 to Parameters.size) {
      val parameter: DbParameter = Parameters(i - 1);
      if (parameter.parameterDirection == ParameterDirection.Output || parameter.parameterDirection == ParameterDirection.InputOutput) {
        parameter.value = statement.getObject(i);
      }
    }
  }
 
  private def addParatemetrs(): Unit = {
    statement.clearParameters()
    for (i <- 1 to Parameters.size) {
      val p = Parameters(i - 1);
      if (p.parameterDirection == ParameterDirection.Input || p.parameterDirection == ParameterDirection.InputOutput) {
        statement.setObject(i, p.value)
      }
      if (p.parameterDirection == ParameterDirection.Output || p.parameterDirection == ParameterDirection.InputOutput) {
        statement.registerOutParameter(p.parameterName, p.sqlType, p.scale)
      }
    }
  }
  def close() {
    if (statement != null) {
      statement.close()
    }
  }
}
 
case class DbParameter(
    var parameterName: String = null,
    var value: Any = null,
    var parameterDirection: Integer = ParameterDirection.Input,
    var scale: Integer = 0,
    var sqlType: Integer = null
) {}
 
object ParameterDirection {
  val Input = 1
  val InputOutput = 2
  val Output = 3
}

以下代码是DbTransaction,该类提供了事务的操作如提交、回滚。

package pool
 
class DbTransaction(private val connection: DbConnection) {
 
  def Commit(): Unit = {
    connection.innerConnection.commit()
    if (!connection.innerConnection.getAutoCommit()) {
      connection.innerConnection.setAutoCommit(true);
    }
  }
  def RollBack(): Unit = {
    connection.innerConnection.rollback()
    if (!connection.innerConnection.getAutoCommit()) {
      connection.innerConnection.setAutoCommit(true)
    }
  }
 
  def getConnection(): DbConnection = {
    connection
  }
 
  def getTransactionIsolation(): Int = {
    connection.innerConnection.getTransactionIsolation()
  }
 
}
 
object IsolationLevel {
 
  val TRANSACTION_NONE = 0
 
  val TRANSACTION_READ_UNCOMMITTED = 1;
 
  val TRANSACTION_READ_COMMITTED = 2;
 
  val TRANSACTION_REPEATABLE_READ = 4;
 
  val TRANSACTION_SERIALIZABLE = 8;
 
}

最后是using的方法。通过柯里化以及Try-catch-finally的方式 自动关闭实现了AutoCloseable接口的资源。

package pool
 
object Dispose {
  def using[T <: AutoCloseable](cls: T)(op: T => Unit): Unit = {
    try {
      op(cls)
    } catch {
      case e: Exception => throw e
    } finally {
      cls.close()
    }
  }
}

以下是客户端调用,代码模拟了15个线程并发访问数据库,连接池最多3个资源,从而说明连接池是可以复用这些连接的。

import pool.DataSource
import pool.DbCommand
import pool.DbParameter
import pool.DbTransaction
import pool.Dispose.using
import pool.IsolationLevel
import pool.ParameterDirection
 
import java.sql.Date
import java.sql.ResultSet
import java.sql.Types
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.LocalTime
import javax.xml.crypto.Data
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.Buffer
import scala.language.experimental.macros
import scala.reflect.ClassTag
import scala.reflect.runtime.{universe => ru}
import com.nimbusds.oauth2.sdk.util.date.SimpleDate
import java.text.SimpleDateFormat
object App {
  def main(args: Array[String]): Unit = {
    val pool = new DataSource(
      "com.microsoft.sqlserver.jdbc.SQLServerDriver",
      "jdbc:sqlserver://localhost:1433;databaseName=HighwaveDW;trustServerCertificate=true",
      "账号",
      "密码",
      minSize = 1,
      maxSize = 3,
      keepAliveTimeout = 3000
    )
 
    val formatter: SimpleDateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
    for (i <- 1 to 15) {
      val thread: Thread = new Thread(() => {
        val date = new Date(System.currentTimeMillis())
        using(pool.getConenction()) { con =>
          {
            using(new DbCommand(con)) { cmd =>
              {
                cmd.commandText = "{call p_get_out(?,?)}"
                val p1 = new DbParameter("@id", i)
                val p2 = new DbParameter(parameterName = "@msg", parameterDirection = ParameterDirection.Output, sqlType = Types.VARCHAR, scale = 20)
                cmd.Parameters.append(p1)
                cmd.Parameters.append(p2)
                val result = cmd.ExecuteScalar()
                println(s"result=${result},output=${p2.value},parameter=${i}")
              }
            }
          }
        }
      })
      thread.start()
    }
  }
}

开发环境VsCode,SQL Server数据库。以下是引用的第三方库。

version := "1.0"
libraryDependencies += "com.microsoft.sqlserver" % "mssql-jdbc" % "11.2.0.jre8"
libraryDependencies += "org.scala-lang" % "scala-reflect" % "2.13.8"

以下是执行结果。

到此这篇关于Scala数据库连接池的简单实现的文章就介绍到这了,更多相关Scala数据库连接池内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java去掉小数点后面无效0的方案与建议

    Java去掉小数点后面无效0的方案与建议

    当前小数点后面的位数过多的时候,多余的0没有实际意义,下面这篇文章主要给大家介绍了关于Java去掉小数点后面无效0的方案与建议,文中通过示例代码介绍的非常详细,需要的朋友可以参考下
    2022-07-07
  • java RocketMQ快速入门基础知识

    java RocketMQ快速入门基础知识

    这篇文章主要介绍了java RocketMQ快速入门基础知识,所以RocketMQ是站在巨人的肩膀上(kafka),又对其进行了优化让其更满足互联网公司的特点。它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。,需要的朋友可以参考下
    2019-06-06
  • Spring注解解析之@ImportResource

    Spring注解解析之@ImportResource

    之前我们使用spring,最多的还是通过xml配置文件的方式来配置spring bean等内容,随着注解的广泛应用和spring4中Java config的引入,xml配置文件方式逐步被替换,但是如果是想要使用xml配置文件方式的话,也可以通过@ImportResource注解来实现,下面我们来一起看下如何使用.
    2021-05-05
  • idea中一键自动生成序列化serialVersionUID方式

    idea中一键自动生成序列化serialVersionUID方式

    这篇文章主要介绍了idea中一键自动生成序列化serialVersionUID方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-09-09
  • 手把手教你在eclipse创建第一个java web项目并运行

    手把手教你在eclipse创建第一个java web项目并运行

    Eclipse是用来做开发的自由集成开发环境,这也是很多java程序员会使用的开发环境,所以可以使用eclipse创建javaweb项目,下面这篇文章主要给大家介绍了关于如何在eclipse创建第一个java web项目并运行的相关资料,需要的朋友可以参考下
    2023-02-02
  • 利用logback 设置不同包下的日志级别

    利用logback 设置不同包下的日志级别

    这篇文章主要介绍了利用logback 设置不同包下的日志级别,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-12-12
  • 修改Android应用的样式的一些关键点解析

    修改Android应用的样式的一些关键点解析

    这篇文章主要介绍了修改Android应用的样式的一些关键点,即对影响外观的theme跟style的相关修改,需要的朋友可以参考下
    2015-12-12
  • Spring Boot详解配置文件的用途与用法

    Spring Boot详解配置文件的用途与用法

    SpringBoot项目是一个标准的Maven项目,它的配置文件需要放在src/main/resources/下,其文件名必须为application,其存在两种文件形式,分别是properties和yaml(或者yml)文件
    2022-06-06
  • 解决IDEA报错java无效的目标发行版:22

    解决IDEA报错java无效的目标发行版:22

    在使用IDEA编译项目时,可能会遇到JDK版本不一致的错误,这篇文章主要介绍了解决IDEA报错java无效的目标发行版:22的相关资料,文中通过代码介绍的非常详细,需要的朋友可以参考下
    2024-10-10
  • IDEA搭建SpringBoot多模块聚合工程过程详解(多模块聚合工程)

    IDEA搭建SpringBoot多模块聚合工程过程详解(多模块聚合工程)

    这篇文章主要是介绍一下,如何在IDEA开发工具下,搭建一个基于SpringBoot的多模块聚合工程项目,本篇文章,将项目模块细分为几个子工程模块,在文中给大家详细介绍过,对IDEA搭建SpringBoot多模块聚合工程感兴趣的朋友一起看看吧
    2022-04-04

最新评论