Python脚本实现Mysql数据迁移

 更新时间:2025年03月04日 08:15:23   作者:公孙鞅  
MySQL数据库迁移是指将MySQL数据库中的数据和结构迁移到另一个MySQL实例,下面小编就来为大家介绍一下如何通过Python脚本实现Mysql数据迁移吧

一、为什么要做数据迁移

MySQL数据库迁移是指将MySQL数据库中的数据和结构迁移到另一个MySQL实例,或者从一个MySQL实例迁移到另一个数据库系统(如从MySQL迁移到MariaDB,或者从本地MySQL迁移到云数据库)。其作用通常包括以下几个方面:

1. 提升性能与扩展性

  • 硬件升级:随着业务的发展,原来的MySQL数据库可能因为硬件或配置限制无法满足需求。迁移到更强大的服务器或云平台可以提高性能,支持更高的并发访问。
  • 分布式架构:对于大型应用,将数据迁移到分布式数据库架构中,能提高系统的扩展性和负载均衡能力。

2. 降低成本

  • 从本地迁移到云:将数据库从本地环境迁移到云平台(如AWS RDS、Google Cloud SQL、阿里云等),可以节省硬件和运维成本,且云平台提供自动备份、自动扩展等功能。
  • 选择更具成本效益的数据库实例:迁移到性价比更高的MySQL版本或实例,帮助企业节省开支。

3. 数据恢复和灾难恢复

数据迁移有时作为灾难恢复的一部分,帮助确保数据在主数据库不可用时能迅速恢复。例如,将数据从主数据中心迁移到备用数据中心,确保业务不中断。

4. 技术或版本更新

  • 随着MySQL数据库技术的演进,新的版本(例如MySQL 8.0)提供了更强大的功能、性能优化和安全性。迁移到新版本有助于利用这些改进,例如更高效的查询执行、更强的加密功能以及更灵活的配置。
  • 兼容性问题:数据库迁移有时是由于原有的MySQL版本或特性与新系统或需求不兼容,迁移到支持更好兼容性的新版本可以解决此问题。

5. 高可用性和负载均衡

  • 迁移到支持高可用性(如主从复制、Galera Cluster等)和负载均衡的架构,可以提高数据库的容错能力和并发处理能力,减少单点故障。
  • 可以通过迁移到支持集群的MySQL系统,配置多个主节点或从节点,分摊数据库的访问压力,保障服务的稳定性。

6. 数据备份和清理

  • 在进行数据库迁移的过程中,往往也会对数据进行清理和备份,去除冗余、无效或过期的数据,并进行数据库的优化(如表结构调整、索引优化等)。这不仅能提高迁移后的数据质量,也能提升新数据库的性能。

7. 业务整合与统一

  • 多数据库整合:一些公司可能存在多个MySQL实例,迁移数据到统一的数据库平台可以整合数据,简化管理和运维工作,提高数据的一致性和完整性。
  • 合并系统:例如,企业收购了其他公司,原有的系统中有不同的MySQL数据库,迁移合并到统一的数据库平台,方便统一管理和分析。

二. 数据迁移实战

1. 需求分析

只筛选,源表中S_SMSSendLogs的字段RobotTaskId = 源表T_RobotTasks字段Id且字段TenantId = 17409350669509才需要迁移

2. Python脚本

import mysql.connector
import json

# 数据库连接配置
source_db_config = {
    'host': 'localhost',
    'port': 3306,
    'user': 'CallSystemProd',
    'password': '******',
    'database': ''******','  # 源数据库名称
}

target_db_config = {
    'host': 'localhost',
    'port': 3306,
    'user': 'CallSystemTest',
    'password': ''******',',
    'database': ''******','  # 目标数据库名称
}

# 连接到源数据库
try:
    source_conn = mysql.connector.connect(**source_db_config)
    source_cursor = source_conn.cursor(dictionary=True)
    print("成功连接到源数据库")
except mysql.connector.Error as err:
    print(f"连接源数据库失败: {err}")
    exit()

# 连接到目标数据库
try:
    target_conn = mysql.connector.connect(**target_db_config)
    target_cursor = target_conn.cursor()
    print("成功连接到目标数据库")
except mysql.connector.Error as err:
    print(f"连接目标数据库失败: {err}")
    exit()

# 获取源表数据(S_SMSSendLogs)
source_cursor.execute("""
    SELECT Id, SMSId, Phone, Status, IsSucceed, Trade_id, RequestStr, 
           ResponseStr, RobotTaskId, ExtendCode, CreateTime, UpdateTime, 
           CreateUserId, UpdateUserId, IsDelete, TaskCallResultId, ProviderType
    FROM S_SMSSendLogs
""")
columns = source_cursor.fetchall()

# 获取源表数据(S_SMSTemplates),用于RequestStr中的替换
source_cursor.execute("""
    SELECT SMSId, Content, Id
    FROM S_SMSTemplates
""")
templates = {template['SMSId']: template for template in source_cursor.fetchall()}

# 获取与TaskCallResultId相关的数据(t_taskcallresults和t_repeattaskcallresults)
source_cursor.execute("""
    SELECT id, CreateUserId, UpdateUserId, ResidentsId, CustomerName
    FROM T_TaskCallResults
""")
taskcallresults = {result['id']: result for result in source_cursor.fetchall()}

source_cursor.execute("""
    SELECT id, CreateUserId, UpdateUserId, ResidentsId, CustomerName
    FROM T_RepeatTaskCallResults
""")
repeattaskcallresults = {result['id']: result for result in source_cursor.fetchall()}

# 获取RobotTaskId和OrgId相关的表数据(t_taskcallresults, t_robotrepeattasks, sysuser)
source_cursor.execute("""
    SELECT id, CreateUserId
    FROM T_TaskCallResults
""")
taskcallresults_user_map = {result['id']: result['CreateUserId'] for result in source_cursor.fetchall()}

source_cursor.execute("""
    SELECT id, CreateUserId
    FROM T_RobotRepeatTasks
""")
robotrepeattasks_user_map = {result['id']: result['CreateUserId'] for result in source_cursor.fetchall()}

source_cursor.execute("""
    SELECT Id, OrgId
    FROM SysUser
""")
sysuser_map = {user['Id']: user['OrgId'] for user in source_cursor.fetchall()}

# 构造批量插入语句
insert_query = """
    INSERT INTO S_SMSSendLogs (Id, SMSId, Phone, Status, IsSucceed, Trade_id, RequestStr, 
                               ResponseStr, RobotTaskId, ExtendCode, CreateTime, UpdateTime, 
                               CreateUserId, UpdateUserId, IsDelete, TaskCallResultId, 
                               ProviderType, OrgId, Content, CharCount, BillingCount, ResidentsId, 
                               CustomerName, TemplateId)
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""

# 构造插入的值(批量数据),应用转换规则
insert_values = []
for column in columns:
    # Status 字段直接迁移,无需转换
    status = column['Status']

    # 将 IsSucceed 转换为 tinyint类型的 0
    is_succeed = 0 if column['IsSucceed'] == 1 else column['IsSucceed']

    # RequestStr 替换操作
    request_str = column['RequestStr']
    template = templates.get(column['SMSId'])
    if template:
        template_content = template['Content']
        try:
            # 获取 RequestStr 中的变量并替换
            request_values = json.loads(request_str).get('arguments', {})
            for key, value in request_values.items():
                template_content = template_content.replace(f"#{key}#", str(value) if value is not None else "")
        except json.JSONDecodeError:
            pass  # 如果 JSON 格式不正确,跳过替换
        request_str = template_content

    # 计算字符数
    char_count = len(request_str)
    
    # 计算计费条数
    billing_count = (char_count + 66) // 67  # 每条超过67个字算第二条

    # 获取相关的 UserId 和 OrgId
    create_user_id = taskcallresults.get(column['TaskCallResultId'], {}).get('CreateUserId') or \
                     repeattaskcallresults.get(column['TaskCallResultId'], {}).get('CreateUserId')

    update_user_id = taskcallresults.get(column['TaskCallResultId'], {}).get('UpdateUserId') or \
                     repeattaskcallresults.get(column['TaskCallResultId'], {}).get('UpdateUserId')

    # ProviderType 转换
    provider_type = column['ProviderType']
    if provider_type == 1:
        provider_type = 0
    elif provider_type == 2:
        provider_type = 3

    # OrgId 通过 CreateUserId 获取
    create_user_id_for_org = taskcallresults_user_map.get(column['RobotTaskId']) or \
                             robotrepeattasks_user_map.get(column['RobotTaskId'])
    org_id = sysuser_map.get(create_user_id_for_org, None)

    # 构造数据插入
    insert_values.append((
        column['Id'], column['SMSId'], column['Phone'], status, is_succeed, column['Trade_id'],
        request_str, column['ResponseStr'], column['RobotTaskId'], column['ExtendCode'],
        column['CreateTime'], column['UpdateTime'], create_user_id, update_user_id,
        column['IsDelete'], column['TaskCallResultId'], provider_type, org_id, request_str, char_count,
        billing_count, taskcallresults.get(column['TaskCallResultId'], {}).get('ResidentsId'),
        taskcallresults.get(column['TaskCallResultId'], {}).get('CustomerName'),
        template['Id'] if template else None
    ))

# 执行批量插入
target_cursor.executemany(insert_query, insert_values)
target_conn.commit()
print(f"成功迁移了 {len(insert_values)} 条数据")

# 关闭数据库连接
source_cursor.close()
source_conn.close()
target_cursor.close()
target_conn.close()

以上就是Python脚本实现Mysql数据迁移的详细内容,更多关于Mysql数据迁移的资料请关注脚本之家其它相关文章!

相关文章

  • 深入解析Python中filter函数的使用

    深入解析Python中filter函数的使用

    在Python中,filter函数是一种内置的高阶函数,它能够接受一个函数和一个迭代器,然后返回一个新的迭代器,本文主要来介绍一下Python中filter函数的具体用法,需要的可以参考一下
    2023-07-07
  • python 读取更新中的log 或其它文本方式

    python 读取更新中的log 或其它文本方式

    今天就为大家分享一篇python 读取更新中的log 或其它文本方式,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2019-12-12
  • Python+OpenCV进行不规则多边形ROI区域提取

    Python+OpenCV进行不规则多边形ROI区域提取

    ROI即感兴趣区域。机器视觉、图像处理中,从被处理的图像以方框、圆、椭圆、不规则多边形等方式勾勒出需要处理的区域,称为感兴趣区域,ROI。本文将利用Python和OpenCV实现不规则多边形ROI区域提取,需要的可以参考一下
    2022-03-03
  • python中的zip模块

    python中的zip模块

    这篇文章主要介绍了zip文件格式是通用的文档压缩标准,在ziplib模块中,使用ZipFile类来操作zip文件,感兴趣的朋友参考如下
    2021-08-08
  • Python反射的用法实例分析

    Python反射的用法实例分析

    这篇文章主要介绍了Python反射的用法,结合实例形式分析了Python反射机制所涉及的几个常用方法与相关使用技巧,需要的朋友可以参考下
    2018-02-02
  • python正确读取文件路径的三种方式

    python正确读取文件路径的三种方式

    这篇文章主要介绍了python正确读取文件路径的三种方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-08-08
  • pytest中的fixture基本用法

    pytest中的fixture基本用法

    fixture是pytest特有的功能,用以在测试执行前和执行后进行必要的准备和清理工作,这篇文章主要介绍了pytest中的fixture基本用法,需要的朋友可以参考下
    2023-02-02
  • Python Ruby 等语言弃用自增运算符原因剖析

    Python Ruby 等语言弃用自增运算符原因剖析

    这篇文章主要为大家介绍了Python Ruby 等语言弃用自增运算符原因剖析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-08-08
  • python实现维吉尼亚算法

    python实现维吉尼亚算法

    这篇文章主要为大家详细介绍了python编程实现维吉尼亚算法,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2019-03-03
  • Python实现疫情通定时自动填写功能(附代码)

    Python实现疫情通定时自动填写功能(附代码)

    这篇文章主要介绍了Python实现疫情通定时自动填写功能,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-05-05

最新评论