Python异步编程入门之实现文件批处理的并发处理方式

 更新时间:2024年10月28日 10:56:00   作者:engchina  
本文以Python初级程序员为对象,介绍了如何使用asyncio和logging模块实现一个异步批处理文件的并发处理系统,以提高处理大量文件或数据时的效率,其中,通过配置日志系统记录处理文件的日志信息,定义AsyncBatchProcessor类控制并发任务的数量

引言

在现代软件开发中,处理大量文件或数据时,提高处理效率和并发性是非常重要的。

Python 的 asyncio 库提供了一种强大的方式来实现异步编程,从而提高程序的并发处理能力。

本文将面向 Python 初级程序员,介绍如何使用 asynciologging 模块来实现一个异步批处理文件的并发处理系统。

代码实现

1. 日志配置

首先,我们需要配置日志系统,以便在处理文件时记录日志信息。

日志配置包括设置日志格式和输出位置。

import logging
import os

# 获取当前文件的绝对路径
current_file = os.path.abspath(__file__)

# 配置日志格式
log_format = '%(asctime)s - %(levelname)s - %(pathname)s:%(lineno)d - %(message)s'
logging.basicConfig(format=log_format, level=logging.INFO)

# 创建一个文件处理器,并将日志输出到文件
file_handler = logging.FileHandler('app.log')
file_handler.setFormatter(logging.Formatter(log_format))
logging.getLogger().addHandler(file_handler)

2. 异步批处理类

接下来,我们定义一个 AsyncBatchProcessor 类,用于处理批量文件。

该类使用 asyncio.Semaphore 来控制并发任务的数量。

import asyncio
import random

DEFAULT_MAX_CONCURRENT_TASKS = 2  # 最大并发任务数
MAX_RETRIES = 3  # 最大重试次数

class AsyncBatchProcessor:
    def __init__(self, max_concurrent: int = DEFAULT_MAX_CONCURRENT_TASKS):
        self.max_concurrent = max_concurrent
        self.semaphore = asyncio.Semaphore(max_concurrent)

    async def process_single_file(
            self,
            input_file: str,
            retry_count: int = 0
    ) -> None:
        """处理单个文件的异步方法"""
        async with self.semaphore:  # 使用信号量控制并发
            try:
                logging.info(f"Processing file: {input_file}")

                # 模拟文件处理过程
                await asyncio.sleep(random.uniform(0.5, 2.0))

                logging.info(f"Successfully processed {input_file}")

            except Exception as e:
                logging.error(f"Error processing {input_file} of Attempt {retry_count}: {str(e)}")
                if retry_count < MAX_RETRIES:
                    logging.info(f"Retrying {input_file} (Attempt {retry_count + 1})")
                    await asyncio.sleep(1)
                    await self.process_single_file(input_file, retry_count + 1)
                else:
                    logging.error(f"Failed to process {input_file} after {MAX_RETRIES} attempts")

    async def process_batch(
            self,
            file_list: list
    ) -> None:
        total_files = len(file_list)
        logging.info(f"Found {total_files} files to process")

        # 创建工作队列
        queue = asyncio.Queue()

        # 将所有文件放入队列
        for file_path in file_list:
            await queue.put(file_path)

        # 创建工作协程
        async def worker(worker_id: int):
            while True:
                try:
                    # 非阻塞方式获取任务
                    input_file_path = await queue.get()
                    logging.info(f"Worker {worker_id} processing: {input_file_path}")

                    try:
                        await self.process_single_file(input_file_path)
                    except Exception as e:
                        logging.error(f"Error processing {input_file_path}: {str(e)}")
                    finally:
                        queue.task_done()

                except asyncio.QueueEmpty:
                    # 队列为空,工作结束
                    break
                except Exception as e:
                    logging.error(f"Worker {worker_id} encountered error: {str(e)}")
                    break

        # 创建工作任务
        workers = []
        for i in range(self.max_concurrent):
            worker_task = asyncio.create_task(worker(i))
            workers.append(worker_task)

        # 等待队列处理完成
        await queue.join()

        # 取消所有仍在运行的工作任务
        for w in workers:
            w.cancel()

        # 等待所有工作任务完成
        await asyncio.gather(*workers, return_exceptions=True)

3. 异步批处理入口函数

最后,我们定义一个异步批处理入口函数 batch_detect,用于启动批处理任务。

async def batch_detect(
        file_list: list,
        max_concurrent: int = DEFAULT_MAX_CONCURRENT_TASKS
):
    """异步批处理入口函数"""
    processor = AsyncBatchProcessor(max_concurrent)
    await processor.process_batch(file_list)

# 示例调用
file_list = ["file1.pdf", "file2.pdf", "file3.pdf", "file4.pdf"]
asyncio.run(batch_detect(file_list))

代码解释

1.日志配置

  • 使用 logging 模块记录日志信息,包括时间、日志级别、文件路径和行号、以及日志消息。
  • 日志输出到文件 app.log 中,便于后续查看和分析。

2.异步批处理类 AsyncBatchProcessor

  • __init__ 方法初始化最大并发任务数和信号量。
  • process_single_file 方法处理单个文件,使用信号量控制并发,模拟文件处理过程,并在失败时重试。
  • process_batch 方法处理批量文件,创建工作队列和协程,控制并发任务的执行。

3.异步批处理入口函数 batch_detect

  • 创建 AsyncBatchProcessor 实例,并调用 process_batch 方法启动批处理任务。

总结

通过使用 asynciologging 模块,我们实现了一个高效的异步批处理文件系统。

该系统能够并发处理大量文件,并在处理失败时自动重试,直到达到最大重试次数。

日志系统帮助我们记录每个文件的处理过程,便于后续的调试和分析。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • 使用Python第三方库pygame写个贪吃蛇小游戏

    使用Python第三方库pygame写个贪吃蛇小游戏

    这篇文章主要介绍了使用Python第三方库pygame写个贪吃蛇小游戏,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-03-03
  • python实现跨进程(跨py文件)通信示例

    python实现跨进程(跨py文件)通信示例

    本文主要介绍了python实现跨进程(跨py文件)通信示例,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-03-03
  • jupyter notebook更换皮肤主题的实现

    jupyter notebook更换皮肤主题的实现

    这篇文章主要介绍了jupyter notebook更换皮肤主题的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-01-01
  • Pandas DataFrame进行数据拼接方法详解

    Pandas DataFrame进行数据拼接方法详解

    这篇文章主要为大家详细介绍了Pandas DataFrame进行数据拼接多种方法,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下
    2025-11-11
  • keras的load_model实现加载含有参数的自定义模型

    keras的load_model实现加载含有参数的自定义模型

    这篇文章主要介绍了keras的load_model实现加载含有参数的自定义模型,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-06-06
  • Python yield 的使用浅析

    Python yield 的使用浅析

    这篇文章主要为大家详细介绍了Python yield的使用,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下,希望能够给你带来帮助
    2022-02-02
  • 利用Python实现添加或读取Excel公式

    利用Python实现添加或读取Excel公式

    Excel公式是数据处理的核心工具,从简单的加减运算到复杂的逻辑判断,掌握基础语法是高效工作的起点,下面我们就来看看如何使用Python进行Excel公式的添加与读取吧
    2025-03-03
  • python自动化测试工具Helium使用示例

    python自动化测试工具Helium使用示例

    大家好,本篇文章主要讲的是python自动化测试工具Helium使用示例,感兴趣的同学赶快来看一看吧,对你有帮助的话记得收藏一下哦
    2021-12-12
  • 深入浅出Python contextlib如何优雅管理上下文资源

    深入浅出Python contextlib如何优雅管理上下文资源

    这篇文章主要为大家详细介绍了Python contextlib优雅管理上下文资源的相关知识,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下
    2026-04-04
  • Python绘制分形图案探索无限细节和奇妙之美

    Python绘制分形图案探索无限细节和奇妙之美

    本文将介绍如何使用Python绘制各种分形图案,包括分形树、科赫曲线、曼德博集合等。通过本文读者可以了解分形图案的基本概念和构造方法,并学会使用Python绘制出各种精美的分形图案。本文还提供了具体的代码示例和实践案例,帮助读者更好地理解分形图案的奇妙之美
    2023-04-04

最新评论