PyMySQL数据库连接与优化方式

 更新时间:2025年10月09日 08:47:29   作者:AI手记叨叨礼拜天  
本文介绍PyMySQL连接MySQL数据库的方法,涵盖CRUD操作、事务处理及连接池优化,通过合理配置连接池和错误重试机制,提升性能并确保数据一致性

本文将介绍如何使用 PyMySQL 连接和操作 MySQL 数据库,包括基本连接CRUD 操作事务处理以及如何在高并发环境下使用连接池优化性能。

通过合理的连接池配置和错误处理机制,可以构建出稳定高效的数据库应用。

一、PyMySQL 简介

PyMySQL 是一个纯 Python 实现的 MySQL 客户端库,用于连接和操作 MySQL 数据库。它完全兼容 Python DB API 2.0 规范,提供了简单易用的接口来执行 SQL 查询和操作。

核心优势

  • 纯 Python 实现:无需外部依赖,跨平台兼容性好
  • Python 3 全面支持:兼容最新 Python 特性和语法
  • 线程安全:支持多线程并发操作
  • 完整功能支持:事务、存储过程、预处理语句等
  • 广泛兼容:支持 MySQL 5.5+ 和 MariaDB

安装方法

pip install pymysql

二、数据库连接配置

基础连接方式

import pymysql
from pymysql.cursors import DictCursor

# 推荐配置方式
def create_connection():
    return pymysql.connect(
        host='localhost',      # 数据库地址
        user='username',       # 用户名
        password='password',   # 密码
        database='test_db',    # 数据库名
        port=3306,            # 端口,默认3306
        charset='utf8mb4',     # 字符集,推荐utf8mb4
        autocommit=False,     # 是否自动提交
        cursorclass=DictCursor # 返回字典格式结果
    )
	

连接参数说明

参数说明
host数据库服务器地址‘localhost’
user用户名根据实际配置
password密码根据实际配置
database数据库名称项目数据库名
charset字符编码‘utf8mb4’(支持表情符号)
autocommit自动提交事务False(建议手动控制)
cursorclass游标类型DictCursor(结果以字典返回)

cursorclass参数说明

cursorclass说明返回结果格式适用场景
Cursor (默认)普通游标元组格式 (value1, value2, …)基础查询,需要最高性能时
DictCursor字典游标字典格式 {‘column’: value}需要按列名访问数据时
SSCursor无缓冲游标元组格式,流式读取处理大量数据,内存有限时
SSDictCursor无缓冲字典游标字典格式,流式读取大量数据且需要按列名访问
Cursor 子类自定义游标自定义格式特殊数据处理需求

完整连接示例

import pymysql
from pymysql.cursors import DictCursor

def get_db_connection():
    """获取数据库连接"""
    return pymysql.connect(
        host='localhost',
        user='myuser',
        password='mypassword',
        database='mydatabase',
        charset='utf8mb4',
        autocommit=False,
        cursorclass=DictCursor,
        connect_timeout=10  # 连接超时10秒
    )

# 使用示例
def test_connection():
    conn = get_db_connection()
    try:
        with conn.cursor() as cursor:
            cursor.execute("SELECT 1 as test")
            result = cursor.fetchone()
            print("连接测试成功:", result)
    finally:
        conn.close()

test_connection()

输出:

连接测试成功: {'test': 1}

三、数据库基础操作

创建示例数据表

CREATE TABLE mydb.users (
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    email VARCHAR(100) UNIQUE NOT NULL,
    age INT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

相关说明

关键字类型说明
INT数据类型整数类型,用于存储整数值
AUTO_INCREMENT约束/属性自动递增,每次插入新记录时自动生成唯一ID
PRIMARY KEY约束主键,唯一标识每条记录
VARCHAR(100)数据类型可变长度字符串,最大100字符
NOT NULL约束该字段不能为空,必须包含值
UNIQUE约束确保每个值唯一,不允许重复
TIMESTAMP数据类型时间戳类型,用于存储日期和时间
DEFAULT CURRENT_TIMESTAMP默认值默认值为当前系统时间

数据库操作封装类

import pymysql
from pymysql.cursors import DictCursor
from typing import List, Dict, Any, Optional, Tuple

class MySQLManager:
    """MySQL 数据库管理类"""

    def __init__(self, config: Dict[str, Any]):
        self.config = config

    def execute_query(self, sql: str, params: Tuple = None) -> List[Dict]:
        """执行查询语句(SELECT)"""
        conn = pymysql.connect(**self.config)
        try:
            with conn.cursor(DictCursor) as cursor:
                cursor.execute(sql, params or ())
                return cursor.fetchall()
        finally:
            conn.close()

    def execute_update(self, sql: str, params: Tuple = None) -> int:
        """执行更新语句(INSERT/UPDATE/DELETE)"""
        conn = pymysql.connect(**self.config)
        try:
            with conn.cursor() as cursor:
                affected_rows = cursor.execute(sql, params or ())
                conn.commit()
                return affected_rows
        except Exception as e:
            conn.rollback()
            raise e
        finally:
            conn.close()			

CRUD 操作示例

操作英文中文对应 SQL描述
CCreate创建INSERT创建新记录
RRead读取SELECT查询/读取数据
UUpdate更新UPDATE修改现有记录
DDelete删除DELETE删除记录
# 数据库配置
db_config = {
    'host': 'localhost',
    'user': 'root',
    'password': 'password',
    'database': 'test_db',
    'charset': 'utf8mb4',
    'cursorclass': DictCursor 
}

db = MySQLManager(db_config)

# 1. 插入数据
def add_user(name: str, email: str, age: int) -> int:
    sql = "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)"
    return db.execute_update(sql, (name, email, age))

# 2. 查询数据
def get_all_users() -> List[Dict]:
    return db.execute_query("SELECT * FROM users")

# 3. 更新数据
def update_user_email(user_id: int, new_email: str) -> int:
    sql = "UPDATE users SET email = %s WHERE id = %s"
    return db.execute_update(sql, (new_email, user_id))

# 4. 删除数据
def delete_user(user_id: int) -> int:
    return db.execute_update("DELETE FROM users WHERE id = %s", (user_id,))
	
if __name__ == '__main__':
    users = get_all_users()
    print(f"查询所有用户: {users}")
	
    user_id = add_user("张三", "zhangsan@example.com", 25)
    print(f"执行:插入新用户")
	
    users = get_all_users()
    print(f"查询所有用户: {users}")
	
    user_id = users[0]['id']
    affected_rows = update_user_email(user_id, "zhangsan2@example.com")
    print(f"执行:更新邮箱,影响行数: {affected_rows}")
	
    users = get_all_users()
    print(f"查询所有用户: {users}")
	
    affected_rows = delete_user(user_id)
    print(f"执行:删除用户,影响行数: {affected_rows}")
	
    users = get_all_users()
    print(f"查询所有用户: {users}")

输出:

查询所有用户: ()
执行:插入新用户
查询所有用户: [{'id': 3, 'name': '张三', 'email': 'zhangsan@example.com', 'age': 25, 'created_at': datetime.datetime(2025, 9, 23, 19, 25, 11)}]
执行:更新邮箱,影响行数: 1
查询所有用户: [{'id': 3, 'name': '张三', 'email': 'zhangsan2@example.com', 'age': 25, 'created_at': datetime.datetime(2025, 9, 23, 19, 25, 11)}]
执行:删除用户,影响行数: 1
查询所有用户: ()

事务处理示例

模拟简单的转账操作,从一个用户账户转移到另一个用户账户。

def transfer_points(sender_id: int, receiver_id: int, points: int) -> bool:
    """转账操作(事务示例)"""
    conn = pymysql.connect(**db_config)
    try:
        with conn.cursor(DictCursor) as cursor:
            # 检查发送者余额
            cursor.execute("SELECT points FROM accounts WHERE user_id = %s", (sender_id,))
            sender = cursor.fetchone()
            
            if not sender or sender['points'] < points:
                raise ValueError("余额不足")
            
            # 执行转账
            cursor.execute("UPDATE accounts SET points = points - %s WHERE user_id = %s", 
                         (points, sender_id))
            cursor.execute("UPDATE accounts SET points = points + %s WHERE user_id = %s", 
                         (points, receiver_id))
            
            conn.commit()
            return True
    except Exception as e:
        conn.rollback()
        raise e
    finally:
        conn.close()

批量操作

def batch_insert_users(users: List[tuple]) -> int:
    """批量插入用户数据"""
    sql = "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)"
    conn = pymysql.connect(**db_config)
    try:
        with conn.cursor() as cursor:
            affected_rows = cursor.executemany(sql, users)
            conn.commit()
            return affected_rows
    except Exception as e:
        conn.rollback()
        raise e
    finally:
        conn.close()

# 使用示例
users_data = [
    ('张三', 'zhangsan@example.com', 25),
    ('李四', 'lisi@example.com', 30)
]
batch_insert_users(users_data)

四、连接池优化

为什么需要连接池

频繁创建和关闭数据库连接会导致:

  • 资源浪费(TCP 连接建立开销)
  • 性能下降(连接初始化时间)
  • 连接数耗尽(超过数据库最大连接数)
    连接池通过复用连接解决这些问题。

使用 DBUtils 实现连接池

安装方法

pip install DBUtils

实现示例

from dbutils.pooled_db import PooledDB
import pymysql
import threading
from typing import List, Dict, Any, Tuple
from pymysql.cursors import DictCursor

class ConnectionPool:
    """数据库连接池"""
    
    _instance = None
    _lock = threading.Lock()
    
    def __new__(cls, config: Dict[str, Any]):
        with cls._lock:
            if cls._instance is None:
                cls._instance = super().__new__(cls)
                cls._instance.pool_config = config.copy()
                cls._instance._pool = PooledDB(
                    creator=pymysql,
                    maxconnections=20,  # 最大连接数
                    mincached=2,  # 初始空闲连接
                    maxcached=10,  # 最大空闲连接
                    blocking=True,  # 连接耗尽时等待
                    ping=1,  # 使用时检查连接
                    **config
                )
        return cls._instance
            
    def get_connection(self):
        """从连接池获取连接"""
        return self._pool.connection()

# 使用连接池的数据库管理器
class PooledDBManager:
    def __init__(self, pool_config: Dict[str, Any]):
        self.pool = ConnectionPool(pool_config)
    
    def execute_query(self, sql: str, params: Tuple = None) -> List[Dict]:
        """执行查询"""
        conn = self.pool.get_connection()
        try:
            with conn.cursor(DictCursor) as cursor:
                cursor.execute(sql, params or ())
                return cursor.fetchall()
        finally:
            conn.close()  # 实际是放回连接池
    
    def execute_update(self, sql: str, params: Tuple = None) -> int:
        """执行更新"""
        conn = self.pool.get_connection()
        try:
            with conn.cursor() as cursor:
                affected_rows = cursor.execute(sql, params or ())
                conn.commit()
                return affected_rows
        except Exception as e:
            conn.rollback()
            raise e
        finally:
            conn.close()

ping 参数说明

0 = 不检查
1 = 每次请求时检查(推荐)
2 = 每次游标创建时检查
4 = 每次执行时检查
7 = 1+2+4(所有检查)

五、应用示例

Flask 集成示例

from dbutils.pooled_db import PooledDB
from flask import Flask, request, jsonify
from pymysql.cursors import DictCursor

app = Flask(__name__)

db_config = {
    'host': 'localhost',
    'user': 'root',
    'password': 'password',
    'database': 'test_db',
    'charset': 'utf8mb4',
    'cursorclass': pymysql.cursors.DictCursor
}

# 初始化连接池
db_manager = PooledDBManager(db_config)

@app.route('/users', methods=['GET'])
def get_users():
    """获取所有用户"""
    try:
        users = db_manager.execute_query("SELECT * FROM users")
        return jsonify({'success': True, 'data': users})
    except Exception as e:
        return jsonify({'success': False, 'error': str(e)}), 500

@app.route('/users', methods=['POST'])
def create_user():
    """创建用户"""
    try:
        data = request.json
        sql = "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)"
        result = db_manager.execute_update(sql, (data['name'], data['email'], data['age']))
        return jsonify({'success': True, 'affected_rows': result})
    except Exception as e:
        return jsonify({'success': False, 'error': str(e)}), 500

if __name__ == '__main__':
    app.run(debug=True)

连接池实践配置

# 优化后的连接池配置
optimal_pool_config = {
    'maxconnections': 20,      # 根据并发量调整
    'mincached': 2,           # 减少初始资源占用
    'maxcached': 10,          # 控制最大空闲连接
    'blocking': True,         # 避免连接耗尽错误
    'ping': 1,                # 使用前检查连接健康
    **db_config              # 基础数据库配置
}

错误重试机制

数据库操作重试装饰器:当数据库连接出现临时故障时,会自动进行最多3次重试,并且每次重试间隔时间按指数增长(1秒、2秒、4秒),提高程序的容错能力。

import time
from functools import wraps
import pymysql

def retry_on_failure(max_retries=3, initial_delay=1):
    """数据库操作重试装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except (pymysql.OperationalError, pymysql.InterfaceError) as e:
                    if attempt == max_retries - 1:
                        raise e
                    time.sleep(initial_delay * (2 ** attempt))  # 指数退避
            return None
        return wrapper
    return decorator

# 使用示例
@retry_on_failure(max_retries=3)
def robust_query(sql, params=None):
    return db_manager.execute_query(sql, params)

指数退避:当操作失败时,不立即重试,而是等待一段时间,且每次重试的等待时间呈指数级增长。等待 1 秒, 2 秒, 4 秒,8 秒…

六、SQL事务操作对比

事务影响

操作类型语法示例主要用途返回值事务影响性能考虑使用场景
SELECT
(查询)
SELECT * FROM users WHERE age > 18;从数据库中检索数据结果集(0行或多行)只读操作,不影响数据索引优化很重要,避免全表扫描数据查询、报表生成、数据分析
UPDATE
(更新)
UPDATE users SET age = 20 WHERE id = 1;修改现有记录受影响的行数需要事务控制,会锁定行WHERE 条件要精确,避免锁表修改用户信息、更新状态、调整数值
INSERT
(插入)
INSERT INTO users (name, age) VALUES (‘张三’, 25);添加新记录插入的行数(通常是1)需要事务控制批量插入比单条插入高效新增用户、创建订单、记录日志
DELETE
(删除)
DELETE FROM users WHERE id = 1;删除记录受影响的行数需要事务控制,谨慎使用建议软删除,避免物理删除删除用户、清理数据、撤销操作

事务特性

操作是否自动提交锁级别回滚支持并发影响
SELECT是(可设置)共享锁可回滚到快照低(读写不阻塞)
UPDATE排他锁完全支持高(会阻塞其他写操作)
INSERT排他锁完全支持中(可能触发索引重建)
DELETE排他锁完全支持高(会阻塞其他操作)
  • 排他锁(X锁):写锁,一个事务独占资源,其他事务不能读写
  • 共享锁(S锁):读锁,多个事务可同时读取,但不能写入
  • 排他锁 = 独占,共享锁 = 共享读

普通 SELECT 是完全无锁的,不会阻塞其他事务的写操作,也不会被写操作阻塞。只有显式加锁的SELECT才会影响并发。

七、总结

连接管理

  • 使用连接池管理数据库连接
  • 合理配置连接池参数
  • 及时释放连接回池

事务控制

  • 明确控制事务边界
  • 及时提交或回滚事务
  • 处理并发场景下的数据一致性

错误处理

  • 实现适当的重试机制
  • 记录详细的错误日志
  • 区分业务错误和系统错误

性能优化

  • 使用预处理语句防止 SQL 注入
  • 合理使用批量操作
  • 监控连接池使用情况

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • maven冲突问题解决

    maven冲突问题解决

    这篇文章主要介绍了maven冲突问题解决,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-03-03
  • Python操作Elasticsearch处理timeout超时

    Python操作Elasticsearch处理timeout超时

    这篇文章主要介绍了Python操作Elasticsearch处理timeout超时,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-07-07
  • Python使用poplib模块和smtplib模块收发电子邮件的教程

    Python使用poplib模块和smtplib模块收发电子邮件的教程

    smtplib模块一般我们比较熟悉、这里我们会来讲解使用smtplib发送SSL/TLS安全邮件的方法,而poplib模块则负责处理接收pop3协议的邮件,下面我们就来看Python使用poplib模块和smtplib模块收发电子邮件的教程
    2016-07-07
  • python requests 测试代理ip是否生效

    python requests 测试代理ip是否生效

    这篇文章主要介绍了python requests 测试代理ip是否生效的相关资料,需要的朋友可以参考下
    2018-07-07
  • Python catplot函数自定义颜色的方法

    Python catplot函数自定义颜色的方法

    catplot() 函数是 Seaborn 中一个非常有用的函数,它可以绘制分类变量的图形,并可以根据另一个或多个变量进行分组,这篇文章主要介绍了Python catplot函数自定义颜色的方法,需要的朋友可以参考下
    2023-03-03
  • 手把手教你用322行Python代码编写贪吃蛇游戏

    手把手教你用322行Python代码编写贪吃蛇游戏

    最近在学Python,想做点什么来练练手,命令行的贪吃蛇一般是C的练手项目,但是一时之间找不到别的,就先做个贪吃蛇来练练简单的语法,下面这篇文章主要给大家介绍了关于如何用322行Python代码编写贪吃蛇游戏的相关资料,需要的朋友可以参考下
    2023-02-02
  • python kornia计算机视觉库实现图像变化

    python kornia计算机视觉库实现图像变化

    这篇文章主要为大家介绍了python kornia计算机视觉库实现图像变化算法示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2024-01-01
  • Python实现WGS84火星百度及web墨卡托四种坐标系相互转换

    Python实现WGS84火星百度及web墨卡托四种坐标系相互转换

    主流被使用的地理坐标系并不统一,常用的有WGS84、GCJ02(火星坐标系)、BD09(百度坐标系)以及百度地图中保存矢量信息的web墨卡托,本文利用Python编写相关类以实现4种坐标系统之间的互相转换
    2023-08-08
  • 和孩子一起学习python之变量命名规则

    和孩子一起学习python之变量命名规则

    这篇文章我们给大家总结了关于儿童学习python中的变量命名规则相关知识点内容,有兴趣的朋友跟着参考学习下。
    2018-05-05
  • python 利用 PrettyTable 美化表格

    python 利用 PrettyTable 美化表格

    这篇文章主要介绍了python 利用 PrettyTable 美化表格,首先按行设置数据展开相关内容,需要的小伙伴可以参考一下
    2022-04-04

最新评论