Python使用进程池并发执行SQL语句的操作代码

 更新时间:2024年10月31日 09:48:23   作者:U盘失踪了  
Python的进程池是一种并发工具,它允许我们将任务分发给一组工作进程,这些进程可以同时运行并共享一个进程池,本文给大家介绍了Python使用进程池并发执行SQL语句的操作代码,需要的朋友可以参考下

这段代码使用了 Python 的 multiprocessing 模块来实现真正的并行处理,绕过 Python 的全局解释器锁(GIL)限制,从而在多核 CPU 上并发执行多个 SQL 语句。

from pyhive import hive
import multiprocessing
 
# 建立连接
conn = hive.Connection(host="localhost", port=10000, username="your_username", password="your_password")
 
# SQL 语句列表
sql_statements = [
    "INSERT INTO table1 VALUES (1, 'value1')",
    "INSERT INTO table1 VALUES (2, 'value2')",
    "INSERT INTO table1 VALUES (3, 'value3')"
]
 
# 定义执行函数
def execute_sql(sql):
    with conn.cursor() as cursor:
        cursor.execute(sql)
 
# 确保多进程代码只在主进程中执行
if __name__ == '__main__':
 
    # 使用进程池并发执行
    with multiprocessing.Pool() as pool:
        pool.map(execute_sql, sql_statements)
 
    # 关闭连接
    conn.close()

1. 导入模块

from pyhive import hive
import multiprocessing
  • pyhive: 这是用于连接和操作 Hive 数据库的 Python 库。hive.Connection 用于建立与 Hive 数据库的连接。
  • multiprocessing: 这是 Python 的标准库,用于创建和管理进程。通过 multiprocessing,我们可以绕过 Python 的 GIL(全局解释器锁)限制,实现真正的并行处理。

2. 建立数据库连接

conn = hive.Connection(host="localhost", port=10000, username="your_username", password="your_password")
  • 这里我们使用 hive.Connection 建立一个到 Hive 数据库的连接。
  • 参数
    • host: HiveServer2 的主机地址,通常是 localhost 或 HiveServer2 运行的服务器 IP。
    • port: HiveServer2 的端口号,默认是 10000
    • username: 连接 Hive 使用的用户名。
    • password: 连接 Hive 使用的密码。

这个连接对象 conn 将在后续的代码中用于创建游标(cursor),并通过游标执行 SQL 语句。

3. 定义 SQL 语句列表

sql_statements = [
    "INSERT INTO table1 VALUES (1, 'value1')",
    "INSERT INTO table1 VALUES (2, 'value2')",
    "INSERT INTO table1 VALUES (3, 'value3')"
]
  • 这里定义了一个包含多个 SQL 语句的列表 sql_statements。每个语句都是一个插入操作,将数据插入到 Hive 表 table1 中。
  • 你可以根据实际需求修改这些 SQL 语句。

4. 定义执行函数

def execute_sql(sql):
    with conn.cursor() as cursor:
        cursor.execute(sql)
  • execute_sql 函数是用于执行单个 SQL 语句的函数。
  • with conn.cursor() as cursor:为当前数据库连接创建一个游标对象 cursor,这个游标用于执行 SQL 语句。
    • cursor.execute(sql):执行传入的 SQL 语句。
  • 这个函数会被进程池中的每个进程调用,每个进程都会独立执行一个 SQL 语句。

5. 使用进程池并发执行

with multiprocessing.Pool() as pool:
    pool.map(execute_sql, sql_statements)
  • multiprocessing.Pool():创建一个进程池。进程池可以管理一组工作进程,并将任务分配给这些进程。
    • 默认情况下,Pool() 会根据系统的 CPU 核心数创建相应数量的工作进程。
    • 你可以通过参数指定池中的进程数量,例如 Pool(4) 表示创建 4 个工作进程。
  • pool.map(execute_sql, sql_statements)
    • pool.map 方法会将 execute_sql 函数应用到 sql_statements 列表中的每个元素上。
    • pool.map 方法会自动将 SQL 语句列表分配给进程池中的工作进程,每个进程独立执行一个 SQL 语句。
    • 这个过程是并行的,多个进程可以同时执行不同的 SQL 语句,从而提高执行效率。

6. 关闭数据库连接

conn.close()
  • 在所有 SQL 语句执行完毕后,我们关闭数据库连接,释放资源。

进程池的工作原理

multiprocessing.Pool 提供了一种方便的方式来并行化执行函数。其工作原理如下:

  1. 创建进程池:当你创建一个 Pool 对象时,会启动多个工作进程(数量可以指定,或默认根据 CPU 核心数决定)。
  2. 任务分配:当你调用 pool.map 时,进程池会将任务(在这里是 execute_sql 函数)分配给空闲的工作进程。
  3. 并行执行:每个工作进程独立执行分配给它的任务,互不干扰。
  4. 结果收集pool.map 会收集所有工作进程的执行结果,并按照原始任务列表的顺序返回结果。

为什么使用进程池而不是线程池?

  1. GIL 限制:Python 的全局解释器锁(GIL)限制了多线程的并行执行能力,尤其是在 CPU 密集型任务中,多线程并不能充分利用多核 CPU。
  2. 进程并行multiprocessing 模块通过创建多个进程来绕过 GIL 限制,每个进程都有自己的 Python 解释器和内存空间,因此可以实现真正的并行执行。
  3. 适用场景
    • 线程池:适合 I/O 密集型任务(例如,等待数据库查询结果)。
    • 进程池:适合 CPU 密集型任务(例如,并行计算、数据处理等),或者你需要绕过 GIL 限制时。

注意事项

  1. 数据库连接:在多进程环境中,每个进程都有自己的内存空间,因此每个进程需要独立的数据库连接。在上述代码中,每个进程都通过 conn.cursor() 创建了自己的游标。
  2. 进程开销:创建和销毁进程有一定的开销,因此对于非常短小的任务,进程池可能不会显著提高性能。在这种情况下,可以考虑调整进程池的大小或使用其他优化手段。
  3. 连接池:如果你的程序需要频繁访问数据库,可以考虑使用数据库连接池来复用数据库连接,减少连接建立和关闭的开销。

总结

  • 进程池:通过 multiprocessing.Pool 实现,可以绕过 Python 的 GIL 限制,实现真正的并行处理。
  • 适用场景:适合 CPU 密集型任务或需要并行执行多个独立任务的场景。
  • 代码结构
    • 建立数据库连接。
    • 定义 SQL 语句列表。
    • 定义执行函数 execute_sql
    • 使用进程池并发执行 SQL 语句。
    • 关闭数据库连接。

通过这种方式,你可以充分利用多核 CPU 的优势,并发执行多个 SQL 语句,从而提高程序的执行效率。

解决多进程报错

你遇到的错误是 RuntimeError,这是因为你在使用 multiprocessing 时没有正确地保护代码的入口点。具体来说,在 Windows 系统上(以及其他非 fork 的启动方式),你必须将多进程相关的代码放在 if __name__ == '__main__': 语句块中,以避免子进程在启动时重新导入主模块并执行不必要的代码。

错误原因:

在 Windows 系统中,Python 的 multiprocessing 模块使用 spawn 启动子进程,这意味着子进程会重新导入当前脚本。如果不加以保护,子进程会再次执行主模块中的代码,导致递归创建进程并抛出错误。

解决方案:

你需要将多进程相关的代码放在 if __name__ == '__main__': 语句块中,确保只有主进程会执行这些代码,而子进程不会。

修改后的代码:

import multiprocessing
 
data = [
    "1",
    "2",
    "3"
]
 
# 定义执行函数
def print_str(data):
    print(data)
 
# 确保多进程代码只在主进程中执行
if __name__ == '__main__':
    # 使用进程池并发执行
    with multiprocessing.Pool() as pool:
        pool.map(print_str, data)

解释:

  • if __name__ == '__main__': 确保了只有在直接运行当前脚本时,才会执行其中的多进程代码。子进程不会执行这个代码块,从而避免了递归创建进程的问题。
  • 在 Windows 系统上,这是使用 multiprocessing 时必须遵循的惯用写法。

其他注意事项:

  • 如果你打算将脚本打包成可执行文件(例如使用 pyinstaller),你还需要调用 multiprocessing.freeze_support(),不过在大多数脚本运行的情况下,这个调用不是必须的。

例如:

if __name__ == '__main__':
    multiprocessing.freeze_support()  # 如果需要打包成可执行文件,可以加上这行
    with multiprocessing.Pool() as pool:
        pool.map(print_str, data)

执行sql 简单示例

import multiprocessing
 
data = [  ]
 
# 定义执行函数
def print_str(data):
    print(data)
 
# 确保多进程代码只在主进程中执行
if __name__ == '__main__':
 
    data2 = [
        "1",
        "2",
        "3"
    ]
 
    for i in data2:
        data_str = f"""
        inset into {i}
        """
        data.append(data_str)
 
 
    # 使用进程池并发执行
    with multiprocessing.Pool() as pool:
        pool.map(print_str, data)

到此这篇关于Python使用进程池并发执行SQL语句的操作代码的文章就介绍到这了,更多相关Python进程池执行SQL语句内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • python使用Plotly创建交互式数据可视化的操作步骤

    python使用Plotly创建交互式数据可视化的操作步骤

    Python 的 Plotly 库是创建这种交互式可视化的强大工具,它提供了丰富的图表类型和易于使用的接口,本文将探讨如何使用 Plotly 创建交互式数据可视化,包括代码实例和深入的解释,需要的朋友可以参考下
    2024-08-08
  • 解决python 出现unknown encoding: idna 的问题

    解决python 出现unknown encoding: idna 的问题

    这篇文章主要介绍了解决python出现 unknown encoding: idna 的问题,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-03-03
  • python如何解决指定代码段超时程序卡死

    python如何解决指定代码段超时程序卡死

    这篇文章主要介绍了python如何解决指定代码段超时程序卡死,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-11-11
  • Django后端接收嵌套Json数据及解析详解

    Django后端接收嵌套Json数据及解析详解

    这篇文章主要介绍了Django后端接收嵌套Json数据及解析详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-07-07
  • 基于Python写一个番茄钟小工具

    基于Python写一个番茄钟小工具

    最近听到朋友说在用番茄钟,有点兴趣也想下载一个来用用,后面仔细一想这玩意做起来也不难,索性自己顺手写一个算了,在这里也分享给大家了
    2022-12-12
  • Python实现上课点名器系统

    Python实现上课点名器系统

    今天给大家分享一个读者粉丝投稿的,关于上课点名的实战案例,对Python上课点名器实现过程感兴趣的朋友,一起来看看是如何实现的吧
    2021-10-10
  • python判断变量是否是None的三种写法总结

    python判断变量是否是None的三种写法总结

    代码中经常会有变量是否为None的判断,这篇文章给大家总结了三种判断变量是否是none的写法,文中通过代码示例介绍的非常详细,需要的朋友可以参考下
    2023-12-12
  • 想学画画?python满足你!

    想学画画?python满足你!

    这篇文章主要介绍了如何利用python画画,帮助大家更好的理解和使用python的turtle库,感兴趣的朋友可以了解下
    2020-12-12
  • 举例讲解Python中metaclass元类的创建与使用

    举例讲解Python中metaclass元类的创建与使用

    在Python中我们用type函数可以动态地创建一个元类,同样也可以用__metaclass__属性来指定一个元类,接下来我们就来具体举例讲解Python中metaclass元类的创建与使用
    2016-06-06
  • Python Numpy运行报错IndexError与形状不匹配的问题解决办法

    Python Numpy运行报错IndexError与形状不匹配的问题解决办法

    在使用Numpy进行数据处理和科学计算时,IndexError和形状不匹配(Shape Mismatch)是常见的错误类型,这些错误通常发生在数组索引操作、数组运算或数组重塑时,本文将通过一个具体的例子来详细分析这些错误的原因和解决办法,需要的朋友可以参考下
    2024-07-07

最新评论