Python高效处理大文件的方法详解

 更新时间:2022年07月25日 14:14:22   作者:Awan  
在这篇文章中,我们将学习如何使用multiprocessing、joblib和tqdm Python包减少大文件的处理时间。这是一个简单的教程,可以适用于任何文件、数据库、图像、视频和音频,感兴趣的可以了解一下

为了进行并行处理,我们将任务划分为子单元。它增加了程序处理的作业数量,减少了整体处理时间。

例如,如果你正在处理一个大的CSV文件,你想修改一个单列。我们将把数据以数组的形式输入函数,它将根据可用的进程数量,一次并行处理多个值。这些进程是基于你的处理器内核的数量。

在这篇文章中,我们将学习如何使用multiprocessing、joblib和tqdm Python包减少大文件的处理时间。这是一个简单的教程,可以适用于任何文件、数据库、图像、视频和音频。

开始

我们将使用来自 Kaggle 的 US Accidents (2016 - 2021) 数据集,它包括280万条记录和47个列。

我们将导入multiprocessing、joblib和tqdm用于并行处理,pandas用于数据导入,re、nltk和string用于文本处理。

# Parallel Computing
import multiprocessing as mp
from joblib import Parallel, delayed
from tqdm.notebook import tqdm
# Data Ingestion  
import pandas as pd
# Text Processing  
import re  
from nltk.corpus import stopwords
import string

在我们开始之前,让我们通过加倍cpu_count()来设置n_workers。正如你所看到的,我们有8个workers。

n_workers = 2 * mp.cpu_count()
print(f"{n_workers} workers are available")
>>> 8 workers are available

下一步,我们将使用pandas read_csv函数读取大型CSV文件。然后打印出dataframe的形状、列的名称和处理时间。

%%time
file_name="../input/us-accidents/US_Accidents_Dec21_updated.csv"
df = pd.read_csv(file_name)
print(f"Shape:{df.shape}\n\nColumn Names:\n{df.columns}\n")

输出:

Shape:(2845342, 47)
Column Names:
Index(['ID', 'Severity', 'Start_Time', 'End_Time', 'Start_Lat', 'Start_Lng',
'End_Lat', 'End_Lng', 'Distance(mi)', 'Description', 'Number', 'Street',
'Side', 'City', 'County', 'State', 'Zipcode', 'Country', 'Timezone',
'Airport_Code', 'Weather_Timestamp', 'Temperature(F)', 'Wind_Chill(F)',
'Humidity(%)', 'Pressure(in)', 'Visibility(mi)', 'Wind_Direction',
'Wind_Speed(mph)', 'Precipitation(in)', 'Weather_Condition', 'Amenity',
'Bump', 'Crossing', 'Give_Way', 'Junction', 'No_Exit', 'Railway',
'Roundabout', 'Station', 'Stop', 'Traffic_Calming', 'Traffic_Signal',
'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight',
'Astronomical_Twilight'],
dtype='object')
CPU times: user 33.9 s, sys: 3.93 s, total: 37.9 s
Wall time: 46.9 s

处理文本

clean_text是一个用于处理文本的简单函数。我们将使用nltk.copus获得英语停止词,并使用它来过滤掉文本行中的停止词。之后,我们将删除句子中的特殊字符和多余的空格。它将成为确定串行、并行和批处理的处理时间的基准函数。

def clean_text(text):  
 # Remove stop words
 stops = stopwords.words("english")
  text = " ".join([word for word in text.split() if word  
not in stops])
 # Remove Special Characters
 text = text.translate(str.maketrans('', '', string.punctuation))
 # removing the extra spaces
 text = re.sub(' +',' ', text)
 return text

串行处理

对于串行处理,我们可以使用pandas的.apply()函数,但是如果你想看到进度条,你需要为pandas激活tqdm,然后使用.progress_apply()函数。

我们将处理280万条记录,并将结果保存回 “Description” 列中。

%%time
tqdm.pandas()
df['Description'] = df['Description'].progress_apply(clean_text)

输出

高端处理器串行处理280万行花了9分5秒。

100%          2845342/2845342 [09:05<00:00, 5724.25it/s]
CPU times: user 8min 14s, sys: 53.6 s, total: 9min 7s
Wall time: 9min 5s

多进程处理

有多种方法可以对文件进行并行处理,我们将了解所有这些方法。multiprocessing是一个内置的python包,通常用于并行处理大型文件。

我们将创建一个有8个workers的多处理池,并使用map函数来启动进程。为了显示进度条,我们将使用tqdm。

map函数由两部分组成。第一个部分需要函数,第二个部分需要一个参数或参数列表。

%%time
p = mp.Pool(n_workers)  
df['Description'] = p.map(clean_text,tqdm(df['Description']))

输出

我们的处理时间几乎提高了3倍。处理时间从9分5秒下降到3分51秒。

100%          2845342/2845342 [02:58<00:00, 135646.12it/s]
CPU times: user 5.68 s, sys: 1.56 s, total: 7.23 s
Wall time: 3min 51s

并行处理

我们现在将学习另一个Python包来执行并行处理。在本节中,我们将使用joblib的Parallel和delayed来复制map函数。

  • Parallel需要两个参数:n_job = 8和backend = multiprocessing。
  • 然后,我们将在delayed函数中加入clean_text。
  • 创建一个循环,每次输入一个值。

下面的过程是相当通用的,你可以根据你的需要修改你的函数和数组。我曾用它来处理成千上万的音频和视频文件,没有任何问题。

建议:使用 "try: "和 "except: "添加异常处理。

def text_parallel_clean(array):
 result = Parallel(n_jobs=n_workers,backend="multiprocessing")(
 delayed(clean_text)
  (text)  
 for text in tqdm(array)
 )
 return result

在text_parallel_clean()中添加“Description”列。

%%time
df['Description'] = text_parallel_clean(df['Description'])

输出

我们的函数比多进程处理Pool多花了13秒。即使如此,并行处理也比串行处理快4分59秒。

100%          2845342/2845342 [04:03<00:00, 10514.98it/s]
CPU times: user 44.2 s, sys: 2.92 s, total: 47.1 s
Wall time: 4min 4s

并行批量处理

有一个更好的方法来处理大文件,就是把它们分成若干批,然后并行处理。让我们从创建一个批处理函数开始,该函数将在单一批次的值上运行clean_function。

批量处理函数

def proc_batch(batch):
 return [
 clean_text(text)
 for text in batch
 ]

将文件分割成批

下面的函数将根据workers的数量把文件分成多个批次。在我们的例子中,我们得到8个批次。

def batch_file(array,n_workers):
 file_len = len(array)
 batch_size = round(file_len / n_workers)
 batches = [
 array[ix:ix+batch_size]
 for ix in tqdm(range(0, file_len, batch_size))
 ]
 return batches
batches = batch_file(df['Description'],n_workers)
>>> 100% 8/8 [00:00<00:00, 280.01it/s]

运行并行批处理

最后,我们将使用Parallel和delayed来处理批次。

%%time
batch_output = Parallel(n_jobs=n_workers,backend="multiprocessing")(
 delayed(proc_batch)
  (batch)  
 for batch in tqdm(batches)
 )
df['Description'] = [j for i in batch_output for j in i]

输出

我们已经改善了处理时间。这种技术在处理复杂数据和训练深度学习模型方面非常有名。

100%          8/8 [00:00<00:00, 2.19it/s]
CPU times: user 3.39 s, sys: 1.42 s, total: 4.81 s
Wall time: 3min 56s

tqdm 并发

tqdm将多处理带到了一个新的水平。它简单而强大。

process_map需要:

  • 函数名称
  • Dataframe 列名
  • max_workers
  • chucksize与批次大小类似。我们将用workers的数量来计算批处理的大小,或者你可以根据你的喜好来添加这个数字。
%%time
from tqdm.contrib.concurrent import process_map
batch = round(len(df)/n_workers)
df['Description'] = process_map(clean_text,df['Description'], max_workers=n_workers, chunksize=batch)

输出

通过一行代码,我们得到了最好的结果:

100%          2845342/2845342 [03:48<00:00, 1426320.93it/s]
CPU times: user 7.32 s, sys: 1.97 s, total: 9.29 s
Wall time: 3min 51s

结论

我们需要找到一个平衡点,它可以是串行处理,并行处理,或批处理。如果你正在处理一个较小的、不太复杂的数据集,并行处理可能会适得其反。

在这个教程中,我们已经了解了各种处理大文件的Python包,它们允许我们对数据函数进行并行处理。

如果你只处理一个表格数据集,并且想提高处理性能,那么建议你尝试Dask、datatable和RAPIDS。

到此这篇关于Python高效处理大文件的方法详解的文章就介绍到这了,更多相关Python处理大文件内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • python中让自定义的类使用加号"+"

    python中让自定义的类使用加号"+"

    这篇文章主要介绍了python中让自定义的类使用加号"+",如果对两个对象直接“+”肯定是不行的,因为还没有对CartoonImage类重载加法运算符__add__(),下文小编举例形式讲解该内容,需要的下伙伴可以参考一下
    2022-03-03
  • python在屏幕上点击特定按钮或图像效果实例

    python在屏幕上点击特定按钮或图像效果实例

    这篇文章主要给大家介绍了关于python在屏幕上点击特定按钮或图像效果的相关资料,文中通过实例代码介绍的非常详细,对大家学习或者使用python具有一定的参考学习价值,需要的朋友可以参考下
    2022-09-09
  • 快速解决pyqt5窗体关闭后子线程不同时退出的问题

    快速解决pyqt5窗体关闭后子线程不同时退出的问题

    今天小编就为大家分享一篇快速解决pyqt5窗体关闭后子线程不同时退出的问题,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2019-06-06
  • 一文详解Python中的super 函数

    一文详解Python中的super 函数

    这篇文章主要介绍了一文了解Python中的super 函数,文章围绕主题展开详细的内容介绍,具有一定的参考价值,需要的朋友可以参考一下
    2022-09-09
  • Python绘制移动均线方法 含源代码

    Python绘制移动均线方法 含源代码

    上一篇文章我们介绍了Python绘制专业的K线图,讲解了数据获取、K线图绘制及成交量绘制等内容。本篇将在上一篇的基础上,继续讲解移动均线的绘制,需要的朋友可以参考下
    2021-10-10
  • 套娃式文件夹如何通过Python批量处理

    套娃式文件夹如何通过Python批量处理

    这篇文章主要介绍了套娃式文件夹如何通过Python批量处理,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-08-08
  • Python 将字符串转换为列表的7种方法汇总

    Python 将字符串转换为列表的7种方法汇总

    这篇文章主要介绍了Python 将字符串转换为列表的7种方法汇总,在本文中,我们将尝试将给定的字符串转换为列表,其中根据用户的选择,遇到空格或任何其他特殊字符,为此,我们在string中使用split()方法,需要的朋友可以参考下
    2023-11-11
  • 详解Python字符串对象的实现

    详解Python字符串对象的实现

    本文介绍了 python 内部是如何管理字符串对象,以及字符串查找操作是如何实现的,感兴趣的小伙伴们可以参考一下
    2015-12-12
  • Numpy(Pandas)删除全为零的列的方法

    Numpy(Pandas)删除全为零的列的方法

    这篇文章主要介绍了Numpy(Pandas)删除全为零的列的方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-09-09
  • Django的ListView超详细用法(含分页paginate)

    Django的ListView超详细用法(含分页paginate)

    这篇文章主要介绍了Django的ListView超详细用法(含分页paginate),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-05-05

最新评论