Python大批量写入数据(百万级别)的方法

 更新时间:2023年07月14日 10:15:15   作者:西红市杰出青年  
这篇文章主要给大家介绍了关于Python大批量写入数据(百万级别)的相关资料,在日常处理数据的过程中,我们都有批量写入数据的需求,文中给出了详细的示例代码,需要的朋友可以参考下

背景

现有一个百万行数据的csv格式文件,需要在两分钟之内存入数据库。

方案

方案一:多线程+协程+异步MySql方案二:多线程+MySql批量插入

代码

    1,先通过pandas读取所有csv数据存入列表。
    2,设置N个线程,将一百万数据均分为N份,以start,end传递给线程以切片的方法读取区间数据(建议为16个线程)
    3,方案二 线程内以  executemany 方法批量插入所有数据。
    4,方案一 线程内使用异步事件循环遍历所有数据异步插入。 
    5,方案一纯属没事找事型。

方案二

import threading

import pandas as pd
import asyncio
import time

import aiomysql
import pymysql

data=[]
error_data=[]

def run(start,end):
    global data
    global error_data
    print("start"+threading.current_thread().name)
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
    mysdb = getDb("*", *, "*", "*", "*")
    cursor = mysdb.cursor()
    sql = """insert into *_*_* values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
    cursor.executemany(sql,data[start:end])
    mysdb.commit()
    mysdb.close()
    print("end" + threading.current_thread().name)
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))

def csv_file_read_use_pd(csvFile):
    csv_result = pd.read_csv(csvFile,encoding="utf-16",sep='\t')
    csv_result = csv_result.fillna(value="None")
    result = csv_result.values.tolist()
    return result

class MyDataBase:
    def __init__(self,host=None,port=None,username=None,password=None,database=None):
        self.db = pymysql.connect(host=host,port=port,user=username,password=password,database=database)
    def close(self):
        self.db.close()

def getDb(host,port,username,password,database):
    MyDb = MyDataBase(host, port, username, password,database)
    return MyDb.db

def main(csvFile):
    global data  #获取全局对象  csv全量数据
    #读取所有的数据   将所有数据均分成   thread_lens   份 分发给  thread_lens  个线程去执行
    thread_lens=20
    csv_result=csv_file_read_use_pd(csvFile)
    day = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
    for item in csv_result:
        item.insert(0,day)

    data=csv_result
    thread_exe_count_list=[]   #线程需要执行的区间
    csv_lens=len(csv_result)
    avg = csv_lens // thread_lens
    remainder=csv_lens % thread_lens
    # 0,27517  27517,55,034
    nowIndex=0
    for i in range(thread_lens):
        temp=[nowIndex,nowIndex+avg]
        nowIndex=nowIndex+avg
        thread_exe_count_list.append(temp)
    thread_exe_count_list[-1:][0][1]+=remainder  #余数分给最后一个线程
    # print(thread_exe_count_list)

    #th(thread_exe_count_list[0][0],thread_exe_count_list[0][1])

    for i in range(thread_lens):
        sub_thread = threading.Thread(target=run,args=(thread_exe_count_list[i][0],thread_exe_count_list[i][1],))
        sub_thread.start()
        sub_thread.join()
        time.sleep(3)

if __name__=="__main__":
    #csv_file_read_use_pd("分公司箱型箱量.csv")
    main("分公司箱型箱量.csv")

方案一

import threading

import pandas as pd
import asyncio
import time

import aiomysql

data=[]
error_data=[]

async def async_basic(loop,start,end):
    global data
    global error_data
    print("start"+threading.current_thread().name)
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
    conn = await aiomysql.connect(
        host="*",
        port=*,
        user="*",
        password="*",
        db="*",
        loop=loop
    )
    day = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
    sql = """insert into **** values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
    async with conn.cursor() as cursor:
        for item in data[start:end]:
            params=[day]
            params.extend(item)
            try:
                x=await cursor.execute(sql,params)
                if x==0:
                    error_data.append(item)
                print(threading.current_thread().name+"   result "+str(x))
            except Exception as e:
                print(e)
                error_data.append(item)
                time.sleep(10)
                pass
    await conn.close()
    #await conn.commit()
    #关闭连接池
    # pool.close()
    # await pool.wait_closed()
    print("end" + threading.current_thread().name)
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))

def csv_file_read_use_pd(csvFile):
    csv_result = pd.read_csv(csvFile,encoding="utf-16",sep='\t')
    csv_result = csv_result.fillna(value="None")
    result = csv_result.values.tolist()
    return result

def th(start,end):
    loop = asyncio.new_event_loop()
    loop.run_until_complete(async_basic(loop,start,end))


def main(csvFile):
    global data  #获取全局对象  csv全量数据
    #读取所有的数据   将所有数据均分成   thread_lens   份 分发给  thread_lens  个线程去执行
    thread_lens=20
    csv_result=csv_file_read_use_pd(csvFile)
    data=csv_result
    thread_exe_count_list=[]   #线程需要执行的区间
    csv_lens=len(csv_result)
    avg = csv_lens // thread_lens
    remainder=csv_lens % thread_lens
    # 0,27517  27517,55,034
    nowIndex=0
    for i in range(thread_lens):
        temp=[nowIndex,nowIndex+avg]
        nowIndex=nowIndex+avg
        thread_exe_count_list.append(temp)
    thread_exe_count_list[-1:][0][1]+=remainder  #余数分给最后一个线程
    print(thread_exe_count_list)

    #th(thread_exe_count_list[0][0],thread_exe_count_list[0][1])

    for i in range(thread_lens):
        sub_thread = threading.Thread(target=th,args=(thread_exe_count_list[i][0],thread_exe_count_list[i][1],))
        sub_thread.start()
        time.sleep(3)

if __name__=="__main__":
    #csv_file_read_use_pd("分公司箱型箱量.csv")
    main("分公司箱型箱量.csv")

总结

到此这篇关于Python大批量写入数据的文章就介绍到这了,更多相关Python大批量写入数据内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Python配置文件解析模块ConfigParser使用实例

    Python配置文件解析模块ConfigParser使用实例

    这篇文章主要介绍了Python配置文件解析模块ConfigParser使用实例,本文讲解了figParser简介、ConfigParser 初始工作、ConfigParser 常用方法、ConfigParser使用实例等内容,需要的朋友可以参考下
    2015-04-04
  • 使用Python实现炫酷的数据动态图大全

    使用Python实现炫酷的数据动态图大全

    数据可视化是通过图形、图表、地图等可视元素将数据呈现出来,以便更容易理解、分析和解释,它是将抽象的数据转化为直观形象的过程,本文给大家介绍了使用Python实现炫酷的数据动态图大全,需要的朋友可以参考下
    2024-06-06
  • Python+Seaborn绘制分布图的示例详解

    Python+Seaborn绘制分布图的示例详解

    这篇文章我们将介绍10个示例,从而帮助大家掌握如何使用Python中的Seaborn库来创建图表。文中示例代码讲解详细,感兴趣的可以了解一下
    2022-05-05
  • Python 堆叠柱状图绘制方法

    Python 堆叠柱状图绘制方法

    这篇文章主要介绍了Python 堆叠柱状图绘制方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-07-07
  • Python操作PowerPoint实现添加与设置文本框完整教程

    Python操作PowerPoint实现添加与设置文本框完整教程

    在制作 PowerPoint 演示文稿时,文本框是最常用的元素之一,本文将介绍如何使用 Python 在 PowerPoint 中添加文本框,并设置文本内容、格式和边距等属性,希望对大家有所帮助
    2026-04-04
  • 解决Python图形界面中设置尺寸的问题

    解决Python图形界面中设置尺寸的问题

    这篇文章主要介绍了解决Python图形界面中设置尺寸的问题,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-03-03
  • Python操作列表常用方法实例小结【创建、遍历、统计、切片等】

    Python操作列表常用方法实例小结【创建、遍历、统计、切片等】

    这篇文章主要介绍了Python操作列表常用方法,结合实例形式总结分析了Python列表常见的创建、遍历、统计、切片等操作技巧与相关注意事项,需要的朋友可以参考下
    2019-10-10
  • python 穷举指定长度的密码例子

    python 穷举指定长度的密码例子

    这篇文章主要介绍了python 穷举指定长度的密码例子,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-04-04
  • 基于Python和C++实现删除链表的节点

    基于Python和C++实现删除链表的节点

    这篇文章主要介绍了基于Python和C++实现删除链表的节点,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-07-07
  • Flask创建并运行数据库迁移的实现过程

    Flask创建并运行数据库迁移的实现过程

    Flask创建并运行数据库迁移的过程是一个涉及多个步骤的操作,旨在帮助开发者在开发过程中管理数据库模式的变化,而不需要手动地删除和重建数据库表,从而避免数据丢失,以下是一个详细的步骤说明,需要的朋友可以参考下
    2024-09-09

最新评论