详解pandas apply 并行处理的几种方法

 更新时间:2021年02月24日 10:19:22   作者:jingyi130705008  
这篇文章主要介绍了详解pandas apply 并行处理的几种方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

1. pandarallel (pip install )

对于一个带有Pandas DataFrame df的简单用例和一个应用func的函数,只需用parallel_apply替换经典的apply。

from pandarallel import pandarallel
 
# Initialization
pandarallel.initialize()
 
# Standard pandas apply
df.apply(func)
 
# Parallel apply
df.parallel_apply(func)

注意,如果不想并行化计算,仍然可以使用经典的apply方法。

另外可以通过在initialize函数中传递progress_bar=True来显示每个工作CPU的一个进度条。

2. joblib (pip install )

 https://pypi.python.org/pypi/joblib

# Embarrassingly parallel helper: to make it easy to write readable parallel code and debug it quickly
 
from math import sqrt
from joblib import Parallel, delayed
 
def test():
  start = time.time()
  result1 = Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10000))
  end = time.time()
  print(end-start)
  result2 = Parallel(n_jobs=8)(delayed(sqrt)(i**2) for i in range(10000))
  end2 = time.time()
  print(end2-end)

-------输出结果----------

0.4434356689453125
0.6346755027770996

3. multiprocessing

import multiprocessing as mp
 
with mp.Pool(mp.cpu_count()) as pool:
  df['newcol'] = pool.map(f, df['col'])
multiprocessing.cpu_count()

返回系统的CPU数量。

该数量不同于当前进程可以使用的CPU数量。可用的CPU数量可以由 len(os.sched_getaffinity(0)) 方法获得。

可能引发 NotImplementedError

参见os.cpu_count()

4. 几种方法性能比较

(1)代码

import sys
import time
import pandas as pd
import multiprocessing as mp
from joblib import Parallel, delayed
from pandarallel import pandarallel
from tqdm import tqdm, tqdm_notebook
 
 
def get_url_len(url):
  url_list = url.split(".")
  time.sleep(0.01) # 休眠0.01秒
  return len(url_list)
 
def test1(data):
  """
  不进行任何优化
  """
  start = time.time()
  data['len'] = data['url'].apply(get_url_len)
  end = time.time()
  cost_time = end - start
  res = sum(data['len'])
  print("res:{}, cost time:{}".format(res, cost_time))
 
def test_mp(data):
  """
  采用mp优化
  """
  start = time.time()
  with mp.Pool(mp.cpu_count()) as pool:
    data['len'] = pool.map(get_url_len, data['url'])
  end = time.time()
  cost_time = end - start
  res = sum(data['len'])
  print("test_mp \t res:{}, cost time:{}".format(res, cost_time))
 
def test_pandarallel(data):
  """
  采用pandarallel优化
  """
  start = time.time()
  pandarallel.initialize()
  data['len'] = data['url'].parallel_apply(get_url_len)
  end = time.time()
  cost_time = end - start
  res = sum(data['len'])
  print("test_pandarallel \t res:{}, cost time:{}".format(res, cost_time))
 
 
def test_delayed(data):
  """
  采用delayed优化
  """
  def key_func(subset):
    subset["len"] = subset["url"].apply(get_url_len)
    return subset
 
  start = time.time()
  data_grouped = data.groupby(data.index)
  # data_grouped 是一个可迭代的对象,那么就可以使用 tqdm 来可视化进度条
  results = Parallel(n_jobs=8)(delayed(key_func)(group) for name, group in tqdm(data_grouped))
  data = pd.concat(results)
  end = time.time()
  cost_time = end - start
  res = sum(data['len'])
  print("test_delayed \t res:{}, cost time:{}".format(res, cost_time))
 
 
if __name__ == '__main__':
  
  columns = ['title', 'url', 'pub_old', 'pub_new']
  temp = pd.read_csv("./input.csv", names=columns, nrows=10000)
  data = temp
  """
  for i in range(99):
    data = data.append(temp)
  """
  print(len(data))
  """
  test1(data)
  test_mp(data)
  test_pandarallel(data)
  """
  test_delayed(data)

(2) 结果输出

1k
res:4338, cost time:0.0018074512481689453
test_mp   res:4338, cost time:0.2626469135284424
test_pandarallel   res:4338, cost time:0.3467681407928467
 
1w
res:42936, cost time:0.008773326873779297
test_mp   res:42936, cost time:0.26111721992492676
test_pandarallel   res:42936, cost time:0.33237743377685547
 
10w
res:426742, cost time:0.07944369316101074
test_mp   res:426742, cost time:0.294996976852417
test_pandarallel   res:426742, cost time:0.39208269119262695
 
100w
res:4267420, cost time:0.8074917793273926
test_mp   res:4267420, cost time:0.9741342067718506
test_pandarallel   res:4267420, cost time:0.6779992580413818
 
1000w
res:42674200, cost time:8.027287006378174
test_mp   res:42674200, cost time:7.751036882400513
test_pandarallel   res:42674200, cost time:4.404983282089233

在get_url_len函数里加个sleep语句(模拟复杂逻辑),数据量为1k,运行结果如下:

1k
res:4338, cost time:10.054503679275513
test_mp   res:4338, cost time:0.35697126388549805
test_pandarallel   res:4338, cost time:0.43415403366088867
test_delayed   res:4338, cost time:2.294757843017578

5. 小结

(1)如果数据量比较少,并行处理比单次执行效率更慢;

(2)如果apply的函数逻辑简单,并行处理比单次执行效率更慢。

6. 问题及解决方法

(1)ImportError: This platform lacks a functioning sem_open implementation, therefore, the required synchronization primitives needed will not function, see issue 3770.

https://www.jianshu.com/p/0be1b4b27bde

(2)Linux查看物理CPU个数、核数、逻辑CPU个数

https://lover.blog.csdn.net/article/details/113951192

(3) 进度条的使用

https://www.jb51.net/article/206219.htm

到此这篇关于详解pandas apply 并行处理的几种方法的文章就介绍到这了,更多相关pandas apply 并行处理内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • pytorch模型的定义、修改、读取、断点续训深入解析

    pytorch模型的定义、修改、读取、断点续训深入解析

    模型定义是深度学习中重要的一环,PyTorch提供了强大而灵活的工具和函数,使我们能够轻松定义各种类型的深度学习模型,通过深入理解模型定义的原理和应用,我们能够更好地理解和设计自己的模型,从而提升深度学习任务的性能和效果
    2024-03-03
  • Python光学仿真wxpython透镜演示系统初始化与参数调节

    Python光学仿真wxpython透镜演示系统初始化与参数调节

    这篇文章主要为大家介绍了Python光学仿真wxpython透镜演示系统的初始化与参数调节,同样在学习wxpython透镜演示系统的入门同学可以借鉴参考下,希望能够有所帮助
    2021-10-10
  • Python实现自动计算Excel数据指定范围内的区间最大值

    Python实现自动计算Excel数据指定范围内的区间最大值

    这篇文章主要为大家详细介绍了如何基于Python自动计算Excel数据指定范围内的区间最大值,文中的示例代码简洁易懂,感兴趣的小伙伴可以了解下
    2023-07-07
  • Python中用max()方法求最大值的介绍

    Python中用max()方法求最大值的介绍

    这篇文章主要介绍了Python中用max()方法求最大值的介绍,是Python入门中的基础知识,需要的朋友可以参考下
    2015-05-05
  • python 去除二维数组/二维列表中的重复行方法

    python 去除二维数组/二维列表中的重复行方法

    今天小编就为大家分享一篇python 去除二维数组/二维列表中的重复行方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2019-01-01
  • python中not not x 与bool(x) 的区别

    python中not not x 与bool(x) 的区别

    这篇文章主要介绍了python中not not x 与 bool(x) 的区别,我们就来做一个选择,就是 not not x 和 bool(x) 用哪个比较好?下面一起进入文章看看吧

    2021-12-12
  • python修改包导入时搜索路径的方法

    python修改包导入时搜索路径的方法

    搜索路径是由一系列目录名组成的,Python解释器就依次从这些目录中去寻找所引入的模块,下面这篇文章主要给大家介绍了关于python修改包导入时搜索路径的相关资料,需要的朋友可以参考下
    2022-05-05
  • Python学习笔记之Django创建第一个数据库模型的方法

    Python学习笔记之Django创建第一个数据库模型的方法

    今天小编就为大家分享一篇Python学习笔记之Django创建第一个数据库模型的方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2019-08-08
  • 学懂Python字符编码避免乱码陷阱

    学懂Python字符编码避免乱码陷阱

    在Python编程中,处理字符编码和乱码问题是一个常见的挑战,特别是在处理文本数据、文件输入/输出和网络通信时,可能会遇到各种字符编码问题,本文章将深入探讨Python中的乱码问题,解释其原理,并提供解决办法,以确保你的应用程序能够正确处理各种编码情况
    2023-12-12
  • Python对列表排序的方法实例分析

    Python对列表排序的方法实例分析

    这篇文章主要介绍了Python对列表排序的方法,实例分析了Python列表排序函数的相关使用技巧,非常简单实用,需要的朋友可以参考下
    2015-05-05

最新评论