Python多线程如何同时处理多个文件

 更新时间:2023年08月14日 11:35:23   作者:DS..  
这篇文章主要介绍了Python多线程如何同时处理多个文件问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

Python多线程同时处理多个文件

在需要对大量文件进行相同的操作时,逐个遍历是非常耗费时间的。这时,我们可以借助于Python的多线程操作来大大提高处理效率,减少处理时间。

问题背景

比如说,我们现在需要从一个文件夹下面读取出所有的视频,然后对每个视频进行逐帧处理。

由于对视频逐帧处理本身就是比较耗时的任务,如果按照串行的方式顺序处理每个视频文件是效率非常低的,此时,一种比较容易的解决方案是使用Python的多线程进行处理。

定义通用处理函数

并发适合于处理相似的任务。例如我们需要对视频中的每一帧进行处理,处理函数接受的是一个视频名称的列表,对列表内的视频顺序处理,那么我们可以构建以下处理函数:

import cv2
def func(video_names):
    for video_name in video_names:
        cap = cv2.VideoCapture(video_name)
        while True:
            ret, frame = cap.read()
            if ret:
                # process temp frame
            else:
                break

这样,我们只需要将待处理的视频名称按照开启的线程数,划分成多个子列表,就可以进行并发批量处理了。

多线程 Thread

Python中的多线程是通过threading库实现的,除此之外,multiprocessing也可以实现并发处理,二者之间是存在一定差异的。这里我们使用threading来实现同时处理多个不同的文件。

import threading
# video_names_list = [part_names_1_list, part_names_2_list, ..., part_names_k_list]
for part_video in video_names_list:
    thread = threading.Thread(target=func, args=([part_video]))
    thread.start()

在这里,首先将要处理的文件名称列表划分成若干个子列表,然后对每一个子列表开启一个线程进行处理。

Python多线程文件操作

使用python 将在csv文件中的一百万条网址,写入mongo数据库中,这里使用多线程进行操作。

直接贴上代码,如下:

import os
import threading  #导入进程
import csv
import time
from Mongo_cache import MongoCache 
import win32com.client
import winsound
NUM_THREAD = 5
COUNT = 0
lock = threading.Lock()
cache = MongoCache()     #数据库连接初始化
def worker():
"""
func: 从csv文件中读取数据,并将数据返回
"""
    for path in os.listdir(os.getcwd()):
        #print("当前工作目录", path)
        file_name = path.split('.')
        #print(file_name)
        if file_name[-1] == 'csv':
            #print("地址是:",path)
            file = open(path)
            data = csv.reader(file)
            return data
        else:
            pass
def save_info(data,i, num_retries=2):
"""
func: 将数据保存
"""
    global COUNT
    global lock
    global cache
    for _, website in data:
        try:
            lock.acquire()
            #print("线程{}》》》{}正在运行".format(threading.current_thread().name, i))
            item = {'website':website}
            cache(item)
            COUNT += 1
        except:
            if num_retries > 0:
                save_info(data, i, num_retries-1)
        finally:
            lock.release()
def main():
"""
启动线程
"""
    print("start working")
    print("working...")
    data = worker()
    threads = []   #设置主线程
    for i in range(NUM_THREAD):
        t = threading.Thread(target=save_info, args=(data, i))
        threads.append(t)
    for i in range(NUM_THREAD):
        threads[i].start()
    for i in range(NUM_THREAD):
        threads[i].join()
    print("all was done!")
if __name__ == '__main__':
    s_time = time.time()
    main()
    e_time = time.time()
    print("总的信息条数:", COUNT)
    print("总耗时:", e_time-s_time)
    speak = win32com.client.Dispatch('SAPI.SPVOICE')
    speak.Speak("早上好,eric,程序结束!")

数据存储模块

import pickle
import zlib
from bson.binary import Binary 
from datetime import datetime, timedelta
from pymongo import MongoClient
import time
class MongoCache(object):
    def __init__(self, client=None, expires=timedelta(days=30)):
        self.client = MongoClient('localhost', 27017) if client is None else client
        self.db = self.client.cache
        #self.db.webpage.create_index('timestamp', expireAfterSeconds=expires.total_seconds())  #设置自动删除时间
    def __call__(self,url):
        self.db.webpage.insert(url)
        #print("保存成功")
    def __contains__(self,url):
        try:
            self[url]
        except KeyError:
            return False
        else:
            return True
    def __getitem__(self, url):
        record = self.db.webpage.find_one({'_id':url})
        if record:
            return pickle.loads(zlib.decompress(record['result']))
        else:
            raise KeyError(url + 'dose not exist')
    def __setitem__(self, url, result):
        record = {'result': Binary(zlib.compress(pickle.dumps(result))), 'timestamp':datetime.utcnow()}
        self.db.webpage.update({'_id':url},{'$set':record},upsert=True)
    def clear(self):
        self.db.webpage.drop()

将一百万条网址从csv文件保存到数据库所花费的时间为

start working
working...
all was done!
总的信息条数: 1000000
总耗时: 427.4034459590912

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • pycharm访问mysql数据库的方法步骤

    pycharm访问mysql数据库的方法步骤

    这篇文章主要介绍了pycharm访问mysql数据库的方法步骤。文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-06-06
  • Python数据类型中的元组Tuple

    Python数据类型中的元组Tuple

    这篇文章主要介绍了Python数据类型中的元组Tuple,元组可以理解为一个只读列表,用()来标识,下文围绕元组展开详细资料,需要的小伙伴可以参考一下
    2022-02-02
  • python中concurrent.futures的具体使用

    python中concurrent.futures的具体使用

    concurrent.futures是Python标准库的一部分,提供了ThreadPoolExecutor和ProcessPoolExecutor两种执行器,用于管理线程池和进程池,通过这些执行器,可以简化多线程和多进程任务的管理,提高程序执行效率
    2024-09-09
  • 分享7个 Python 实战项目练习

    分享7个 Python 实战项目练习

    这篇文章主要介绍了分享7个 Python 实战项目代码,经过Python3.6.4调试通过的代码,就具一点的参考价值,需要的小伙伴可以参考一下
    2022-03-03
  • 深入理解python虚拟机之多继承与 mro

    深入理解python虚拟机之多继承与 mro

    在本篇文章当中将主要给大家介绍 python 当中的多继承和mro,通过介绍在多继承当中存在的问题就能够理解在cpython当中引入c3算法的原因了,从而能够帮助大家更好的了理解mro,需要的朋友可以参考下
    2023-05-05
  • Python实现登录接口的示例代码

    Python实现登录接口的示例代码

    本篇文章主要介绍了Python实现登录接口的示例代码,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-07-07
  • Python 中的json常见用法实例详解

    Python 中的json常见用法实例详解

    这篇文章主要介绍了Python 中的json常见用法,本文结合实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2022-12-12
  • Django的get_absolute_url方法的使用

    Django的get_absolute_url方法的使用

    本文主要介绍了Django的get_absolute_url方法的使用,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-08-08
  • 探索Python中双下划线的特殊方法属性魔法世界

    探索Python中双下划线的特殊方法属性魔法世界

    这篇文章主要为大家介绍了Python中双下划线的特殊方法属性魔法世界探索,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-11-11
  • 简单介绍Python2.x版本中的cmp()方法的使用

    简单介绍Python2.x版本中的cmp()方法的使用

    这篇文章主要介绍了简单介绍Python2.x版本中的cmp()方法的使用,然而该方法在Python3.x版本中已并不再内置...需要的朋友可以参考下
    2015-05-05

最新评论