Python ORM数据库框架Sqlalchemy的使用教程详解

 更新时间:2022年10月19日 14:31:41   作者:胡安民-独行者  
对象关系映射(Object Relational Mapping,简称ORM)模式是一种为了解决面向对象与关系数据库存在的互不匹配的现象的技术。本文主要介绍了其使用的相关资料,感兴趣的小伙伴可以学习一下

对象关系映射(Object Relational Mapping,简称ORM)模式是一种为了解决面向对象与关系数据库存在的互不匹配的现象的技术。面向对象的开发方法是当今企业级应用开发环境中的主流开发方法,关系数据库是企业级应用环境中永久存放数据的主流数据存储系统。对象和关系数据是业务实体的两种表现形式,业务实体在内存中表现为对象,在数据库中表现为关系数据。内存中的对象之间存在关联和继承关系,而在数据库中,关系数据无法直接表达多对多关联和继承关系。因此,对象-关系映射(ORM)系统一般以中间件的形式存在,主要实现程序对象到关系数据库数据的映射。

学过java的hibernate框架的那么这个很好上手,非常简单 ,他有两种模式一种纯orm另一种模式是支持原生sql这两种可以混合使用

优点:

  • 简洁易读:将数据表抽象为对象(数据模型),更直观易读
  • 可移植:封装了多种数据库引擎,面对多个数据库,操作基本一致,代码易维护
  • 更安全:有效避免SQL注入

缺点: 虽然性能稍稍不及原生SQL,但是操作数据库真的很方便!

官网: https://www.sqlalchemy.org/

概念和数据类型

概念

常见数据类型

安装

pip install SQLAlchemy

​​​​​​​pip install mysqlclient   # 安装自己的数据库客户端(可以是mysql 可以是oracle)

连接

from sqlalchemy import create_engine
engine = create_engine("mysql://user:password@hostname/dbname?charset=uft8",
            echo=True,
            pool_size=8,
            pool_recycle=60*30
            )

创建好了Engine的同时,Pool和Dialect也已经创建好了,但是此时并没有真正与数据库连接,等到执行具体的语句.connect()等时才会连接到数据库。

  • echo: 当设置为True时会将orm语句转化为sql语句打印,一般debug的时候可用
  • pool_size: 连接池的大小,默认为5个,设置为0时表示连接无限制
  • pool_recycle: 设置时间以限制数据库多久没连接自动断开

创建数据库表类(模型)

ORM的重要特点就是操作类来操作数据库,现在我们来创建一个类,以常见的用户表举例:

from sqlalchemy import Column, Integer, String
from src.database.SQLalchemyFast import SQLalchemyFast

class UserDB(SQLalchemyFast.Base):
    __tablename__ = "User"   # __tablename__ 声明表名
    # primary_key为主键,autoincrement为自增, doc 为注释但是不会在数据库中生成
    id = Column(Integer, primary_key=True,autoincrement=True,doc="主键")
    name = Column(String(64), unique=True, doc="用户名") # unique=True 为唯一约束会在数据库中生成索引
    email = Column(String(64), doc="邮箱")
    def __init__(self, name=None, email=None):
        self.name = name
        self.email = email

    def __str__(self):
           return "UserDB(id=%s,name=%s,email=%s)" % (self.id, self.name, self.email)

上面的SQLalchemyFast.Base是我自己封装的Base ,用于统一管理所有模型类,可以将Python类和数据库表关联映射起来。数据库表模型类通过__tablename__和表关联起来,Column表示数据表的列

生成数据库表

Base = declarative_base()
Base.metadata.create_all(engine)

会自动创建表,如果存在则忽略,执行以上代码,就会发现在db中创建了users表。 前提必须有模型类继承了Base

会话

会话就和打电话一样,打一次电话就是一个会话,就相当于和数据库交互一次就是一个会话,一个会话可以运行多个或单个语句,会话结束必须关闭

sqlalchemy中使用session用于创建程序和数据库之间的会话,所有对象的载入和保存都需要通过session对象 。

通过sessionmaker调用创建一个工厂,并关联Engine以确保每个session都可以使用该Engine连接资源:

from sqlalchemy.orm import sessionmaker

# 创建session
DbSession = sessionmaker(bind=engine)
session = DbSession()

session的常见操作方法包括:

  • flush:预提交,提交到数据库文件,还未写入数据库文件中 (没事用)
  • commit:提交了一个事务
  • rollback:回滚
  • close:关闭

增删改查

add_user = UserDB("test", "test123@qq.com")
session.add(add_user)
session.commit()

session.add()将会把Model加入当前session维护的持久空间(可以从session.dirty看到)中,直到commit时提交到数据库。

users = session.query(UserDB).filter(UserDB.id=1).all()
for item in users:
  print(item.name)

session.query(Users).filter(UserDB.id=1).update({'name': "Jack"})

session.query(UserDB).filter(UserDB.name == "test").delete()
session.commit()

执行裸sql

session.execute(text(sql), params)
session.commit()

sql: select * from User where id = :id and name = :name

params: {"id":1,"name":"张三"}`

参数名必须和sql语句中的参数名一致

with关闭会话

        DbSession = sessionmaker(bind=engine)
        with DbSession() as conn:
            # 代码
            conn.commit()

sql建造者模式

需要导入的包

from sqlalchemy import  delete, update, text, select, join, desc, asc

        sql = select(UserDB.id,UserDB.name).select_from(UserDB).\
            where(text("id = :id and name = :name")).\
            group_by(UserDB.id,UserDB.name).\
            having(text("id = :id and name = :name")).\
            order_by(desc("id"),asc("name")).\
            offset(1).limit(10).\
            params(id=1, name="张三")
        print(sql)

以上sql放入到execute里直接就能跑了

多表联查(只支持内查询和左查询和全查询)

        sql = select(UserDB.id,UserDB.name).select_from(UserDB).\
            join(BookDB,UserDB.id == BookDB.id).\
            join(alias(BookDB,"b"),text("b.id == b.id"),isouter=True).\
            join(alias(BookDB,"e"),text("e.id == e.id"),full=True). \
            where(text("id = :id and name = :name"))
        print(sql)

封装的工具

数据库配置文件database.properties

url=mysql://root:root@106.12.174.220:3306/demo?charset=utf8
echo=True # 是否打印sql语句
pool_size=10 # 连接池大小
pool_recycle=1800 # 连接池回收时间
pool_timeout=30 # 连接池超时时间
isolation_level=READ_COMMITTED # 事务隔离级别

工具

from sqlalchemy import create_engine, delete, update, text, alias
from sqlalchemy.future import select
from sqlalchemy.orm import declarative_base, sessionmaker

from src.file.FileReadAndWrite import FileReadAndWrite
from src.log.Log import Log


class SQLalchemyFast(object):
    Base = declarative_base()
    """
    功能: SQLalchemy工具
    """

    def __init__(self, dbFile):
        file = FileReadAndWrite.readPropertiesFile(dbFile)
        self.engine = create_engine(
            url=file['url'],
            echo=bool(file['echo']),
            pool_size=int(file['pool_size']),
            pool_recycle=int(file['pool_recycle']),
            pool_timeout=int(file['pool_timeout']),
            isolation_level=file['isolation_level'],
        )

        SQLalchemyFast.Base.metadata.create_all(self.engine)  # 创建表,如果表存在则不创建(必须对象继承Base)

    # 创建会话
    def createSession(self):
        Session = sessionmaker(bind=self.engine)
        return Session()

    # 添加一条数据
    def addData(self, object):
        with self.createSession() as conn:
            conn.add(object)
            conn.commit()

    # 添加多条数据
    def addDataList(self, objectList):
        with self.createSession() as conn:
            conn.add_all(objectList)
            conn.commit()

    # 删除主键id的数据
    def deleteDataById(self, cla, id):
        with self.createSession() as conn:
            conn.query(cla).filter(cla.id == id).delete()
            conn.commit()

    # 删除指定数据(where是并且的关系,不支持or和其他复杂查询)
    def deleteDataWhere(self, cla, *where):
        with self.createSession() as conn:
            stmt = delete(cla).where(*where)
            conn.execute(stmt)
            conn.commit()

    # 清空表
    def truncateTable(self, cla):
        with self.createSession() as conn:
            conn.query(cla).delete()
            conn.commit()

    # 更新指定主键id的数据
    def updateDataById(self, cla, id, data):
        """
        :param cla:  类(表)
        :param id:  主键id
        :param data:  {'key': "value",...}  key为表中的字段名,value为要修改的值
        :return:
        """
        with self.createSession() as conn:
            stmt = update(cla).where(cla.id == id).values(data)
            result = conn.execute(stmt)
            conn.commit()
            return result

    # 更新指定条件的数据 (where是并且的关系,不支持or和其他复杂查询)
    def updateDataWhere(self, cla, data, *where):
        """
        :param cla:  类(表)
        :param data:  {'key': "value",...}  key为表中的字段名,value为要修改的值
        :param where:  过滤条件
        :return:
        """
        with self.createSession() as conn:
            stmt = update(cla).where(*where).values(data)
            conn.execute(stmt)
            conn.commit()

    # 查询全部数据
    def queryDataAll(self, cla):
        with self.createSession() as conn:
            result = conn.query(cla).all()
        return result

    # 查询主键id的数据
    def queryDataById(self, cla, id):
        with self.createSession() as conn:
            result = conn.query(cla).filter(cla.id == id).first()
        return result

    # 查询指定数据,不支持分组查询(因为聚合后的数据无法转换成对象)
    def queryDataWhere(self, cla,aliasName=None, column=None, where=None,
                       join=None, on=None, left=None, full=None,
                       order="", limit="", offset="", distinct=None, params=None):
        with self.createSession() as conn:
            stmt = select(cla)
            if aliasName:
                stmt = select(alias(cla,aliasName))
            if column:
                stmt = stmt.with_only_columns(text(column)) .select_from(cla)
            if join is not None and on is not None:
                if left:
                    stmt = stmt.join(join, text(on), isouter=True)
                elif full:
                    stmt = stmt.join(join, text(on), full=True)
                else:
                    stmt = stmt.join(join, text(on))
            if where:
                stmt = stmt.where(text(where))
            if order:
                stmt = stmt.order_by(text(order))
            if limit:
                stmt = stmt.limit(limit)
            if offset:
                stmt = stmt.offset(offset)
            if distinct:
                stmt = stmt.distinct()
            result = conn.execute(stmt,params).all()
            result= [row[0] for row in result]
        return result

    # 创建事物(运行多条sql语句 ,function(conn)是一个函数,里面包含多条sql语句,需要使用原生的sqlalchemy)
    def createTransaction(self, function):
        with  self.createSession() as conn:
            conn.begin()
            try:
                function(conn)
                conn.commit()
            except Exception as e:
                Log.logError(e)
                conn.rollback()

    # 执行sql语句(包括增删改查,和存储过程...只要是sql语句都可以执行)
    def executeSql(self, sql, params=None):
        """
        :param sql:  sql语句  如: "select * from User where id = :id and name = :name "
        :param params:  参数  例如: {"id":1,"name":"张三"} 注意:参数名必须和sql语句中的参数名一致
        发送多个参数时,参数名必须以列表的形式传入,例如: {"id":["1","2"],"name":"张三"}
        "INSERT INTO some_table (x, y) VALUES (:x, :y)" 参数可以是 [{"x": 11, "y": 12}, {"x": 13, "y": 14}]
        :return:
        """
        with self.createSession() as conn:
            result = conn.execute(text(sql), params)
            conn.commit()
        return result

    # 执行构建sql语句
    def executeSqlBuild(self, sql):
        with self.createSession() as conn:
            result = conn.execute(sql)
            conn.commit()
        return result

测试实体

from sqlalchemy import Column, Integer, String

from src.database.SQLalchemyFast import SQLalchemyFast


class UserDB(SQLalchemyFast.Base):
    __tablename__ = "User"   # __tablename__ 声明表名
    # primary_key为主键,autoincrement为自增, doc 为注释但是不会在数据库中生成
    id = Column(Integer, primary_key=True,autoincrement=True,doc="主键")
    name = Column(String(64), unique=True, doc="用户名") # unique=True 为唯一约束会在数据库中生成索引
    email = Column(String(64), doc="邮箱")
    def __init__(self, name=None, email=None):
        self.name = name
        self.email = email

    def __str__(self):
           return "UserDB(id=%s,name=%s,email=%s)" % (self.id, self.name, self.email)
from sqlalchemy import Column, Integer, String

from src.database.SQLalchemyFast import SQLalchemyFast


class BookDB(SQLalchemyFast.Base):
    __tablename__ = "Book"   # __tablename__ 声明表名
    # primary_key为主键,autoincrement为自增, doc 为注释但是不会在数据库中生成
    id = Column(Integer, primary_key=True,autoincrement=True,doc="主键")
    name = Column(String(64), unique=True, doc="用户名") # unique=True 为唯一约束会在数据库中生成索引
    email = Column(String(64), doc="邮箱")
    def __init__(self, name=None, email=None):
        self.name = name
        self.email = email

    def __str__(self):
           return "UserDB(id=%s,name=%s,email=%s)" % (self.id, self.name, self.email)

验证代码

import unittest
from sqlalchemy import delete, update, text, select, join, desc, asc, alias

from src.database.BookDB import BookDB
from src.database.SQLalchemyFast import SQLalchemyFast
from src.database.UserDB import UserDB
from src.file.FileTool import FileTool


class SQLalchemyFastTest(unittest.TestCase):
    # 测试添加数据
    def test_add(self):
        path = FileTool.getProjectPath(subpath="src/database/database.properties")
        db = SQLalchemyFast(path)
        db.addData(UserDB("name1", "123456789"))
        db.addData(UserDB("name2", "123456789"))
        db.addData(UserDB("name3", "123456789"))
        db.addData(UserDB("name4", "123456789"))

    # 测试添加多条数据
    def test_addAll(self):
        path = FileTool.getProjectPath(subpath="src/database/database.properties")
        db = SQLalchemyFast(path)
        db.addDataList([UserDB("name111", "123456789"), UserDB("name211", "123456789")])
    # 测试删除数据
    def test_deleteDataById(self):
        path = FileTool.getProjectPath(subpath="src/database/database.properties")
        db = SQLalchemyFast(path)
        db.deleteDataById(UserDB, 1)

    # 测试条件删除数据
    def test_deleteWhere(self):
        path = FileTool.getProjectPath(subpath="src/database/database.properties")
        db = SQLalchemyFast(path)
        db.deleteDataWhere(UserDB, UserDB.name == "name1", UserDB.email == "123456789")

    # 测试更新数据
    def test_update(self):
        path = FileTool.getProjectPath(subpath="src/database/database.properties")
        db = SQLalchemyFast(path)
        db.updateDataById(UserDB, 10, {"name": "name31", "email": "123456789"})

    # 测试条件更新数据
    def test_updateFilter(self):
        path = FileTool.getProjectPath(subpath="src/database/database.properties")
        db = SQLalchemyFast(path)
        db.updateDataWhere(UserDB, {"name": "name33", "email": "123456789"}, UserDB.name == "name2",
                           UserDB.email == "1231")

    # 测试查询数据
    def test_queryDataAll(self):
        path = FileTool.getProjectPath(subpath="src/database/database.properties")
        db = SQLalchemyFast(path)
        data_all = db.queryDataAll(UserDB)
        for data in data_all:
            print(data)

    # 测试查询指定id数据
    def test_queryDataById(self):
        path = FileTool.getProjectPath(subpath="src/database/database.properties")
        db = SQLalchemyFast(path)
        data = db.queryDataById(UserDB, 10)
        print(data)

    # 测试条件查询数据(不支持分组查询和链表查询)
    def test_queryDataWhere(self):
        path = FileTool.getProjectPath(subpath="src/database/database.properties")
        db = SQLalchemyFast(path)
        data_all = db.queryDataWhere(UserDB,
                                     where="name like CONCAT(:name,'%')",
                                     order="id desc",
                                     offset=1,
                                     limit=3,
                                     params={"name": "name"})

        # db.queryDataWhere(UserDB,
        #                              where="name like CONCAT(:name,'%')",
        #                              order="id desc",
        #                              offset=1,
        #                              limit=3,
        #                              params={"name": "name"})

        # db.queryDataWhere(UserDB,aliasName="a",
        #                   join=alias(BookDB,"b"),on="a.id == b.id",
        #                   where="a.name like CONCAT(:name,'%')",
        #                   params={"name": "name"})
        for data in data_all:
            print(data)

    # 测试创建事物
    def test_createTransaction(self):
        path = FileTool.getProjectPath(subpath="src/database/database.properties")
        db = SQLalchemyFast(path)

        def test1(conn):
            conn.add(UserDB("name111", "123456789"))
            conn.add(UserDB("name211", "123456789"))
            # raise Exception("test122")
            # conn.add(UserDB("name333", "123456789"))
            # conn.add(UserDB("name444", "123456789"))

        db.createTransaction(test1)

    # 测试执行sql(执行失败会回滚的(存储过程,函数))
    def test_executeSql(self):
        path = FileTool.getProjectPath(subpath="src/database/database.properties")
        db = SQLalchemyFast(path)
        # data_all = db.executeSql("select * from User")
        # data_all = db.executeSql("select * from User where name like CONCAT(:name,'%')", params={"name":"name"})
        # for data in data_all:
        #     print(data)

        # 创建存储过程
        # db.executeSql("CREATE PROCEDURE `test_procedure` \
        # (IN `in_name` VARCHAR(255), IN `in_email` VARCHAR(255)) \
        # BEGIN \
        # INSERT INTO `User` (`name`, email) VALUES (in_name, in_email); \
        # END")

        # 调用存储过程
        # db.executeSql("call test_procedure(:name, :email)", params={"name": "name555", "email": "email12131"})

        # 创建函数
        # db.executeSql("CREATE FUNCTION `test_function`( `in_name` VARCHAR(255),  `in_email` VARCHAR(255)) \
        #  RETURNS INT(11) \
        #  BEGIN \
        #  DELETE FROM `User`; \
        #  INSERT INTO `User` (`name`, email) VALUES (in_name, in_email); \
        #  INSERT INTO `User` (`name`, email) VALUES (in_name, in_email); \
        #  RETURN 1; \
        #  END")
        # 调用函数
        # data_all = db.executeSql("select test_function(:name, :email)", params={"name": "name5551", "email": "email12131"})



    # 测试sql构造
    def test_executeSqlBuild(self):
        path = FileTool.getProjectPath(subpath="src/database/database.properties")
        db = SQLalchemyFast(path)
        # sql = select(UserDB.id,UserDB.name).select_from(UserDB).\
        #     join(BookDB,UserDB.id == BookDB.id)
        # print(sql)
        # db.executeSqlBuild(sql)

以上就是Python ORM数据库框架Sqlalchemy的使用教程详解的详细内容,更多关于Python Sqlalchemy的资料请关注脚本之家其它相关文章!

相关文章

  • yolov5训练时参数workers与batch-size的深入理解

    yolov5训练时参数workers与batch-size的深入理解

    最近再学习YOLOv3与YOLOv5训练数据集的具体步骤,几经波折终于实现了很好的效果,这篇文章主要给大家介绍了关于yolov5训练时参数workers与batch-size的相关资料,文中通过实例代码介绍的非常详细,需要的朋友可以参考下
    2022-03-03
  • python snap7读写PLC的操作方法

    python snap7读写PLC的操作方法

    这篇文章主要介绍了python snap7读写PLC的操作方法,本文结合实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-02-02
  • Pytorch实现tensor序列化和并行化的示例详解

    Pytorch实现tensor序列化和并行化的示例详解

    这篇文章主要介绍了Pytorch实现tensor序列化和并行化,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,感兴趣的同学们下面随着小编来一起学习学习吧
    2023-12-12
  • 测试、预发布后用python检测网页是否有日常链接

    测试、预发布后用python检测网页是否有日常链接

    难免会碰到秀逗了把测试的链接发布到线上的情况,一般这种都是通过一些测试的检查工具来检查链接来规避风险的,下面为大家简述下大概的实现思路
    2014-06-06
  • 强悍的Python读取大文件的解决方案

    强悍的Python读取大文件的解决方案

    今天小编就为大家分享一篇关于强悍的Python读取大文件的解决方案,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2019-02-02
  • 基于 Python 实践感知器分类算法

    基于 Python 实践感知器分类算法

    这篇文章主要介绍了基于 Python 实践感知器分类算法的教程,帮助大家更好的利用python进行机器学习,感兴趣的朋友可以了解下
    2021-01-01
  • Python判断变量名是否合法的方法示例

    Python判断变量名是否合法的方法示例

    今天小编就为大家分享一篇关于Python判断变量名是否合法的方法示例,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2019-01-01
  • Python Numpy 数组的初始化和基本操作

    Python Numpy 数组的初始化和基本操作

    Python 是一种高级的,动态的,多泛型的编程语言。接下来通过本文给大家介绍Python Numpy 数组的初始化和基本操作,感兴趣的朋友一起看看吧
    2018-03-03
  • Matplotlib自定义坐标刻度的使用示例

    Matplotlib自定义坐标刻度的使用示例

    虽然matplotlib默认的坐标轴定位器与格式生成器可以满足大部分需求,但是并非对每一幅图都合适,本文主要介绍了Matplotlib自定义坐标刻度的使用示例,感兴趣的可以了解一下
    2023-11-11
  • 简单了解Python matplotlib线的属性

    简单了解Python matplotlib线的属性

    这篇文章主要介绍了简单了解Python matplotlib线的属性,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-06-06

最新评论