python - sqlachemy另类用法思路详解

 更新时间:2024年12月06日 08:54:49   作者:疯狂的妞妞  
这篇文章主要介绍了python - sqlachemy另类用法,本文结合实例代码给大家介绍的非常详细,感兴趣的朋友跟随小编一起看看吧

这里只是给出一个思路,或许对于未来解决问题有一些参考意义。

仿 JAP 的写法

这种写法很像 java 环境中的 JPA,如果引入模版引擎,则可以大幅增强实用性。

但是,在 python 环境中,这不符合主流的 ORM 框架。

潜在风险:代码检测的时候,可能会被误判,因为我们定义了一大堆空的函数。

# 注解式事务 start ---------------------------------------------
@update(sql='UPDATE `t_temp` SET `desc`= :desc WHERE (`id`= :id) LIMIT 1')
def modify(params: dict = None) -> int:
    pass
@query(sql='SELECT * FROM `t_temp` WHERE (`id`= :id) LIMIT 1', result_type=dict)
def queryById(params: dict = None) -> list:
    pass
@query(sql='SELECT * FROM `t_temp` WHERE (`id`= :id) LIMIT 1', result_type=dict)
def queryById2(id: int) -> list:
    pass
@transactional()
def test_annotation():
    ret = modify({'id': 18, 'desc': 'OR 1=1'})
    print(ret)
    result = queryById2(18)
    print(result)

代码封装

import inspect
import logger_factory
import typing
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy.engine import Result, CursorResult
logger = logger_factory.get_logger()
# 定义数据库连接字符串
DATABASE_URI = 'mysql+pymysql://{username}:{password}@{host}:{port}/{dbname}?charset=utf8mb4'
# 替换为你的数据库用户名、密码、主机、端口和数据库名
USERNAME = 'root'
PASSWORD = 'root'
HOST = 'localhost'
PORT = '3306'
DBNAME = 'med'
# 创建数据库引擎,使用连接池
engine = create_engine(
    DATABASE_URI.format(
        username=USERNAME,
        password=PASSWORD,
        host=HOST,
        port=PORT,
        dbname=DBNAME
    ),
    echo=False,  # 如果设置为True,SQLAlchemy将打印所有执行的SQL语句,通常用于调试
    pool_size=10,  # 连接池大小
    max_overflow=20,  # 超过连接池大小外最多创建的连接数
    pool_timeout=30,  # 连接池中没有线程可用时,在抛出异常前等待的时间
    pool_recycle=3600  # 多少秒之后对连接进行一次回收(重置)
)
# do a test
with engine.connect() as con:
    rs = con.execute(text('SELECT 1'))
    rs.fetchone()
    logger.debug('create engine succeed!')
# session-maker
Session = sessionmaker(bind=engine)
# thread safe session-maker
DBSession = scoped_session(Session)
# with Session() as session:
#     # 获取数据库连接
#     connection = session.connection()
#     savepoint = connection.begin_nested()
#     print(savepoint)
def getEffectRows(result: Result) -> int:
    r"""
    获取受影响行数
    这里有点问题:源码部分 rowcount 是一个 callable,但实际应该是 int;
    这里绕一点,确保不会出问题,如果返回 -1,说明出现了意料之外的情况
    :param result: 结果集
    :return: 受影响行数
    """
    if isinstance(result, CursorResult):
        effect_row = result.rowcount
        if isinstance(effect_row, int):
            return effect_row
        if callable(effect_row):
            return effect_row()
    return -1
def resultAsDict(result: Result) -> list:
    r"""
    将查询结果转换为 dict-list
    :param result: 结果集
    :return: dict 列表
    """
    keys = result.keys()
    ret = list()
    for item in result.fetchall():
        ret.append(dict(zip(keys, item)))
    return ret
def execute(sql: str, params: dict = None) -> Result:
    r"""
    执行一条查询语句
    :param sql: 查询语句
    :param params: 参数
    :return: 结果集
    """
    if sql is None:
        raise ValueError('sql cannot be None')
    logger.debug('execute sql: ' + sql)
    logger.debug('parameter  : ' + str(params))
    return DBSession().execute(text(sql), params)
def executeQuery(sql: str, params: dict = None, result_type: type = tuple) -> typing.Sequence:
    r"""
    执行一个查询
    :param sql: sql
    :param params: dict
    :param result_type: 结果集类型,可选:tuple、dict
    :return: 序列
    """
    result = execute(sql, params)
    if result_type == dict:
        return resultAsDict(result)
        pass
    # default return_type tuple-list
    return result.fetchall()
def executeUpdate(sql: str, params: dict = None) -> int:
    r"""
    执行一个查询
    :param sql: sql 执行语句
    :param params: dict 查询参数
    :return: 受影响行数
    """
    result = execute(sql, params)
    return getEffectRows(result)
def transactional(rollback: type = Exception):
    r"""
    注解式事务
    用法类似于 spring 环境下的 @Transactional 注解
    注意: 事务控制在 session 级别,不能兼容事务嵌套的场景(理想状态下,应当通过 save-point 实现)
    推荐: 如果遇到很复杂的事务嵌套,显式调用 session,手动控制事务
    :param rollback: 指定触发回滚的异常类型
    :return: 装饰器函数
    """
    def decorator(func):
        def call(*args, **kwargs):
            session = None
            try:
                session = DBSession()
                ret = func(*args, **kwargs)
                session.commit()
                return ret
            except rollback as e:
                if session:
                    session.rollback()
                logger.exception(f'transaction exception, rollback: {str(e)}')
                raise
            finally:
                if session:
                    session.close()
        return call
    return decorator
    pass
def update(sql: str = None):
    r"""
    注解式查询,E.G.::
        @update(sql='UPDATE `t_temp` SET `desc`= :desc WHERE (`id`= :id) LIMIT 1')
        def modify(params: dict = None) -> int:
            pass
    :param sql: 要执行的 sql
    :return: decorator
    """
    def decorator(func):
        def call(*args, **kwargs):
            result = execute(sql, args[0])
            return getEffectRows(result)
        return call
    return decorator
    pass
def query(sql: str = None, result_type: type = tuple):
    r"""
    注解式查询,E.G.::
    E.G.::
        @query(sql='SELECT * FROM `t_temp` WHERE (`id`= :id) LIMIT 1', result_type=dict)
        def queryById2(id: int) -> list:
            pass
    :param sql: 要执行的 sql
    :param result_type: 结果集类型,可选:tuple、dict
    :return:  decorator
    """
    def decorator(func):
        def call(*args, **kwargs):
            if sql is None:
                raise ValueError('sql cannot be None')
            first = args[0]
            if isinstance(first, dict):
                result = DBSession().execute(text(sql), args)
            else:
                names = inspect.signature(func).parameters.values()
                params = dict()
                for idx, name in enumerate(names):
                    params[name.name] = args[idx]
                print(params)
                result = DBSession().execute(text(sql), params)
            if result_type == dict:
                keys = result.keys()
                ret = list()
                for item in result.fetchall():
                    ret.append(dict(zip(keys, item)))
                return ret
                # default return_type tuple
                pass
            return result.fetchall()
        return call
    return decorator
    pass
@transactional()
def test_transaction():
    r"""
    测试注解式事务
    :return: None
    """
    session = DBSession()
    session.execute(text("UPDATE `t_temp` SET `desc`= :desc WHERE (`id`= :id) LIMIT 1"), {'id': 18, 'desc': 'OR 1=3'})
    session.execute(text("UPDATE `t_temp` SET `desc`= :desc WHERE (`id`= :id) LIMIT 1"), {'id': 18, 'desc': 'OR 1=4'})
    # raise exception
    raise SyntaxError('Syntax error')
@transactional()
def test_api():
    r"""
    测试封装过的函数
    :return: None
    """
    execute("UPDATE `t_temp` SET `desc`= :desc WHERE (`id`= :id) LIMIT 1", {'id': 18, 'desc': 'OR 1=1'})
    execute("UPDATE `t_temp` SET `desc`= :desc WHERE (`id`= :id) LIMIT 1", {'id': 18, 'desc': 'OR 1=2'})
    # raise exception
    raise SyntaxError('Syntax error')

到此这篇关于python - sqlachemy另类用法的文章就介绍到这了,更多相关python sqlachemy另类用法内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • transform python环境快速配置方法

    transform python环境快速配置方法

    经常在数据开发中需要搞udf,最近发现transform更加方便易用,但是经常会涉及到集群python版本不一、包不全或者部分机器上没有安装python。这篇文章主要介绍了transform python环境快速配置方法,需要的朋友可以参考下
    2018-09-09
  • 深入解析Python中的__builtins__内建对象

    深入解析Python中的__builtins__内建对象

    __builtins__ 是内建模块__builtin__中的对象,使用Python中的内建函数时会通过__builtins__引导,这里我们就来深入解析Python中的__builtins__内建对象,需要的朋友可以参考下
    2016-06-06
  • pytorch中forwod函数在父类中的调用方式解读

    pytorch中forwod函数在父类中的调用方式解读

    这篇文章主要介绍了pytorch中forwod函数在父类中的调用方式解读,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-02-02
  • python基础之变量和数据类型

    python基础之变量和数据类型

    这篇文章主要介绍了python的变量和数据类型,实例分析了Python中返回一个返回值与多个返回值的方法,需要的朋友可以参考下
    2021-10-10
  • Python 字符串操作详情

    Python 字符串操作详情

    这篇文章主要介绍了Python 字符串操作,所谓字符串,就是由0个或者多个字符组成的有限序列,字符串的字符可以是特殊符号、英文字母、中文字符、日文的平假名或片假名、希腊字母、Emoji字符等等。下面我们大家一起来学习文章详细内容吧
    2021-11-11
  • selenium设置proxy、headers的方法(phantomjs、Chrome、Firefox)

    selenium设置proxy、headers的方法(phantomjs、Chrome、Firefox)

    这篇文章主要介绍了selenium设置proxy、headers的方法(phantomjs、Chrome、Firefox),小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-11-11
  • python如何把字符串类型list转换成list

    python如何把字符串类型list转换成list

    这篇文章主要介绍了python如何吧字符串类型list转换成list,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-02-02
  • Python构建一个简单的数据处理流水线

    Python构建一个简单的数据处理流水线

    数据处理流水线是数据分析和工程中非常常见的概念,通过流水线的设计,可以将数据的采集、处理、存储等步骤连接起来,实现自动化的数据流,使用Python构建一个简单的数据处理流水线(Data Pipeline),一步步构建流程,并附上流程图来帮助你更好地理解数据流的工作方式
    2024-12-12
  • Python实现批量CSV转Excel的高性能处理方案

    Python实现批量CSV转Excel的高性能处理方案

    在日常办公中,我们经常需要将CSV格式的数据转换为Excel文件,本文将介绍一个基于Python的高性能解决方案,感兴趣的小伙伴可以跟随小编一起学习一下
    2025-09-09
  • Pycharm 创建 Django admin 用户名和密码的实例

    Pycharm 创建 Django admin 用户名和密码的实例

    今天小编就为大家分享一篇Pycharm 创建 Django admin 用户名和密码的实例,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-05-05

最新评论