Python进程间通讯与进程池超详细讲解

 更新时间:2022年12月23日 09:28:46   作者:alwaysrun  
进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块主要通过队列方式,队列:队列类似于一条管道,元素先进先出,需要注意的一点是:队列都是在内存中操作,进程退出,队列清空,另外,队列也是一个阻塞的形态

在《多进程并发与同步》中介绍了进程创建与信息共享,除此之外python还提供了更方便的进程间通讯方式。

进程间通讯

multiprocessing中提供了Pipe(一对一)和Queue(多对多)用于进程间通讯。

队列Queue

队列是一个可用于进程间共享的Queue(内部使用pipe与锁),其接口与普通队列类似:

put(obj[, block[, timeout]]):插入数据到队列(默认阻塞,且没有超时时间);

  • 若设定了超时且队列已满,会抛出queue.Full异常;
  • 队列已关闭时,抛出ValueError异常

get([block[, timeout]]):读取并删除一个元素;

  • 若设定了超时且队列为空,会抛出queue.Empty异常;
  • 队列已关闭时,抛出ValueError异常;若已阻塞后,再关闭则会一直阻塞;

qsize():返回一个近似队列长度(因多进程原因,长度会有误差);

empty()/full():队列空或慢(因多进程原因,会有误差);

close():关闭队列;

当主进程(创建Queue的)关闭队列时,子进程中的队列并没有关闭,所以getElement进程会一直阻塞等待(为保证能正常退出,需要设为后台进程):

def putElement(name, qu: multiprocessing.Queue):
    try:
        for i in range(10):
            qu.put(f"{name}-{i + 1}")
            time.sleep(.1)
    except ValueError:
        print("queue closed")
    print(f"{name}: put complete")
def getElement(name, qu: multiprocessing.Queue):
    try:
        while True:
            r = qu.get()
            print(f"{name} recv: {r}")
    except ValueError:
        print("queue closed")
    print(f"{name}: get complete")
if __name__ == '__main__':
    qu = multiprocessing.Queue(100)
    puts = [multiprocessing.Process(target=putElement, args=(f"send{i}", qu)) for i in range(10)]
    gets = [multiprocessing.Process(target=getElement, args=(f"recv{i}", qu), daemon=True) for i in range(2)]
    list(map(lambda f: f.start(), puts))
    list(map(lambda f: f.start(), gets))
    for f in puts:
        f.join()
    print("To close")
    qu.close() # 只是main中的close了,其他进程中的并没有

管道Pipe

multiprocessing.Pipe([duplex])返回一个连接对象对(conn1, conn2)。若duplex为True(默认),创建的是双向管道;否则conn1只能用于接收消息,conn2只能用于发送消息:

  • send():发送消息;
  • recv():接收消息;

进程间的Pipe基于fork机制建立:

  • 主进程创建Pipe:Pipe的两个Connections连接的的都是主进程;
  • 创建子进程后,Pipe也被拷贝了一份:此时有了4个Connections;
  • 主进程关闭一个Out Connection,子进程关闭一个In Connection:就建立好了一个输入在主进程,输出在子进程的管道。
def pipeProc(pipe):
    outPipe, inPipe = pipe
    inPipe.close() # 必须关闭,否则结束时不会收到EOFError异常
    try:
        while True:
            r = outPipe.recv()
            print("Recv:", r)
    except EOFError:
        print("RECV end")
if __name__ == '__main__':
    outPipe, inPipe = multiprocessing.Pipe()
    sub = multiprocessing.Process(target=pipeProc, args=((outPipe, inPipe),))
    sub.start()
    outPipe.close() # 必须在进程成功运行后,才可关闭
    with inPipe:
        for x in range(10):
            inPipe.send(x)
            time.sleep(.1)
    print("send complete")
    sub.join()

进程池Pool

虽然使用多进程能提高效率,但进程的创建与销毁会消耗较长时间;同时,过多进程会引起频繁的调度,也增加了开销。

进程池中有固定数量的进程:

  • 请求到来时,从池中取出一个进程来处理任务;理完毕后,进程并不立即关闭,而是再放回进程池中;
  • 当池中进程数量不够,请求就要等待,直到拿到空闲进程后才能继续执行;
  • 池中进程的数量是固定的,隐藏同一时间最多有固定数量的进程在运行。

multiprocessing.Pool([processes[, initializer[, initargs]]])

  • processes:要创建进程数量(默认os.cpu_count()个),在需要时才会创建;
  • initializer(*initargs):每个工作进程启动时执行的方法(一般processes为几就执行几次);

Pool类中主要方法:

  • apply(func[, args[, kwds]]):以阻塞方式,从池中获取进程并执行func(*args,**kwargs)
  • apply_async(func[, args[, kwds[, callback[, error_callback]]]]):异步方式(从池中获取一个进程)执行func(*args,**kwargs),返回AsyncResult;
  • map(func, iterable[, chunksize])/map_async:map的并行版本(可同时处理多个任务),异步时返回MapResult;
  • starmap(func, iterable[, chunksize])/starmap_async:与map的区别是允许传入多个参数;
  • imap(func, iterable[, chunksize]):map的惰性版本(返回结果是可迭代对象),内存消耗会低些,返回迭代器IMapIterator;
  • imap_unordered(func, iterable[, chunksize]):imap返回的结果顺序与map顺序是相同的,而此方法返回的顺序是乱序的(不依次等待每个任务完成,先完成的先返回),返回迭代器IMapIterator;
  • close():关闭,禁止继续提交任务(已提交任务会继续执行完成);
  • terminate():立即终止所有任务;
  • join():等待工作进程完成(必须已close或terminate了);
def poolWorker():
    print(f"worker in process {os.getpid()}")
    time.sleep(1)
def poolWorkerOne(name):
    print(f"worker one {name} in process {os.getpid()}")
    time.sleep(random.random())
    return name
def poolWorkerTwo(first, second):
    res = first + second
    print(f"worker two {res} in process {os.getpid()}")
    time.sleep(1./(first+1))
    return res
def poolInit():
    print("pool init")
if __name__ == '__main__':
    workers = multiprocessing.Pool(5, poolInit) # poolInit会被调用5次(线程启动时)
    with workers:
        for i in range(5):
            workers.apply_async(poolWorker)
        arg = [(i, i) for i in range(10)]
        workers.map_async(poolWorkerOne, arg)
        results = workers.starmap_async(poolWorkerTwo, arg) # 每个元素(元组)会被拆分为独立的参数
        print("Starmap:", results.get())
        results = workers.imap_unordered(poolWorkerOne, arg)
        for r in results: # r是乱序的(若使用imap,则与输入arg的顺序相同)
            print("Unordered:", r)
    # 必须保证workers已close了
    workers.join()

到此这篇关于Python进程间通讯与进程池超详细讲解的文章就介绍到这了,更多相关Python进程间通讯内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Python中导入模块的几种方式总结

    Python中导入模块的几种方式总结

    模块就是用一堆的代码实现了一些功能的代码的集合,通常一个或者多个函数写在一个.py文件里,下面这篇文章主要给大家介绍了关于Python中导入模块的几种方式,需要的朋友可以参考下
    2022-12-12
  • Python 做曲线拟合和求积分的方法

    Python 做曲线拟合和求积分的方法

    今天小编就为大家分享一篇Python 做曲线拟合和求积分的方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-12-12
  • Pycharm基本操作及调试代码

    Pycharm基本操作及调试代码

    最近天气狂热暴躁,很难静下心来学习啦,于是给大家整理一些python开发工具pycharm基本操作及调试代码吧,感兴趣的朋友跟随小编一起看看吧
    2021-06-06
  • 如何将yolov5中的PANet层改为BiFPN详析

    如何将yolov5中的PANet层改为BiFPN详析

    现在yolov5的neck用的是PANet,在efficient论文中提出了BiFPN结构,还有更加不错的性能,下面这篇文章主要给大家介绍了关于如何将yolov5中的PANet层改为BiFPN的相关资料,需要的朋友可以参考下
    2022-06-06
  • Numpy中np.max的用法及np.maximum区别

    Numpy中np.max的用法及np.maximum区别

    这篇文章主要介绍了Numpy中np.max的用法及np.maximum区别,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-11-11
  • Tornado 多进程实现分析详解

    Tornado 多进程实现分析详解

    这篇文章主要介绍了Tornado 多进程实现分析详解,具有一定借鉴价值,需要的朋友可以参考下
    2018-01-01
  • Python实现用户注册登录程序

    Python实现用户注册登录程序

    这篇文章主要为大家详细介绍了Python实现用户注册登录程序,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-02-02
  • 超详细注释之OpenCV实现视频实时人脸模糊和人脸马赛克

    超详细注释之OpenCV实现视频实时人脸模糊和人脸马赛克

    这篇文章主要介绍了OpenCV实现视频实时人脸模糊和人脸马赛克,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-09-09
  • 基于fastapi框架的异步解读

    基于fastapi框架的异步解读

    这篇文章主要介绍了基于fastapi框架的异步解读,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-03-03
  • python 使用matplotlib 实现从文件中读取x,y坐标的可视化方法

    python 使用matplotlib 实现从文件中读取x,y坐标的可视化方法

    今天小编就为大家分享一篇python 使用matplotlib 实现从文件中读取x,y坐标的可视化方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2019-07-07

最新评论