Scrapy的Pipeline之处理CPU密集型或阻塞型操作详解

 更新时间:2023年10月23日 09:24:22   作者:bluespacezero  
这篇文章主要介绍了Scrapy的Pipeline之处理CPU密集型或阻塞型操作详解,Twisted框架的reactor适合于处理短的、非阻塞的操作,Twisted提供了线程池来在其他的线程而不是主线程(Twisted的reactor线程)中执行慢的操作,需要的朋友可以参考下

Pipeline处理CPU密集型或阻塞型操作

Twisted框架的reactor适合于处理短的、非阻塞的操作。但是如果要处理一些复杂的、或者包含阻塞的操作又该怎么办呢?Twisted提供了线程池来在其他的线程而不是主线程(Twisted的reactor线程)中执行慢的操作——使用reactor.callInThread() API。这就意味着reactor在执行计算时还能保持运行并对事件做出反应。一定要记住线程池中的处理不是线程安全的。这就意味着当你使用了全局的状态之后,还要面临所有那些传统的多线程编程的同步问题。下面是一个简单的例子:

class UsingBlocking(object):
    @defer.inlineCallbacks
    def process_item(self, item, spider):
        price = item["price"][0]

        out = defer.Deferred()
        reactor.callInThread(self._do_calculation, price, out)

        item["price"][0] = yield out

        defer.returnValue(item)

    def _do_calculation(self, price, out):
        new_price = price + 1
        time.sleep(0.10)
        reactor.callFromThread(out.callback, new_price)

在上面的Pipeline中,对于每个Item,我们提取出它的price字段,想要在_do_caculation()方法中对它进行处理。这个方法使用了time.sleep(),一个阻塞的操作。我们调用reactor.callInThread()方法使它运行在另一个线程中,该方法的第一个参数是想要调用的函数,后面的参数则会全部传递给被调用的函数作为参数。在这里我们给被调用的函数传递了price,还有一个创建的Deferred对象out。当_do_caculation()函数完成计算后,我们会使用out的回调函数来返回这个值。接下来,yield这个 Deferred对象并为price设置一个新的值,最后返回Item。

在_do_caculation()函数中我们把price加一,然后休眠了100ms。其实这个时间是很长的,如果在reactor的线程中调用这个函数,那就意味着我们每秒只能处理不超过10个页面。不过如果把它放在另一个线程中来调用就不会出现这种问题了。这些计算任务会在线程池中排队,等待某个线程处于可用状态,然后这个线程就会执行这个任务,休眠100ms。最后一步是激活out的回调函数。通常情况下,我们可以这样来激活:out.callback(new_price),但是既然现在我们处于另外一个线程中,这样做就不安全了。如果我们执意这样做了,这个Deferred对象的代码,也就是Scrapy的功能就会在别的线程中执行,这样会导致数据被损坏。所以我们调用了reactor.callFromThread()函数,同样的,它也是以一个函数作为参数,并把额外的参数直接传递给被调用的函数。这个函数会在主线程中排队并等待被调用,它反过来解锁了process_item()方法中的yield语句,并恢复Scrapy对这个Item的操作。

如果我们的pipeline中含有全局状态会怎么样呢?比如,计数器或者平均值等,我们需要在_do_caculation()函数中使用的。例如有以下两个变量,beta和delta:

class UsingBlocking(object):
    def __init__(self):
        self.beta, self.delta = 0, 0
        ...

    def _do_calculation(self, price, out):
        self.beta += 1
        time.sleep(0.001)
        self.delta += 1
        new_price = price + self.beta - self.delta + 1
        assert abs(new_price-price-1) < 0.01

        time.sleep(0.10)...

上面的代码有一些问题,并且在运行的时候会给出assertion错误。这是因为,如果一个线程在self.beta += 1和self.delta += 1语句之间切换的话,另一个线程就会恢复执行并使用beta和delta的值来计算price,这里线程会发现这两个值处于不一致的状态(beta比delta大),这样,错误的产生了。中间短的sleep会让线程切换更可能发生,不过即使没有它,同样也会出现竞态条件。为了阻止竞态条件的发生,我们必须使用锁,例如Python的threading.RLock()锁。使用了这个递归锁,就能确保两个线程不会同时执行锁保护的临界区的代码:

class UsingBlocking(object):
    def __init__(self):
        ...
        self.lock = threading.RLock()
        ...

    def _do_calculation(self, price, out):
        with self.lock:
            self.beta += 1
            ...
            new_price = price + self.beta - self.delta + 1
        assert abs(new_price-price-1) < 0.01 ...

现在的代码就没问题了,要注意的是,我们不需要保护整个代码,只需要能够覆盖全局状态的使用就行了。

在ITEM_PIPELINES中加上:

ITEM_PIPELINES = { ...
    'properties.pipelines.computation.UsingBlocking': 500,
}

运行一下会发现,时延由于100ms的休眠的缘故变调了,不过吞吐量还是保持不变,大约每秒25个。

到此这篇关于Scrapy的Pipeline之处理CPU密集型或阻塞型操作详解的文章就介绍到这了,更多相关Pipeline处理CPU密集型或阻塞型操作内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Python使用colorlog实现控制台管理日志多种颜色显示

    Python使用colorlog实现控制台管理日志多种颜色显示

    colorlog 是一个 Python 日志库,它可以让你在控制台中以彩色的方式显示日志消息,使得日志更易于阅读和理解,下面就跟随小编一起来看看它的具体应用吧
    2024-03-03
  • PyInstaller将Python脚本转为独立可执行文件

    PyInstaller将Python脚本转为独立可执行文件

    在开发 Python 应用程序时,通常会遇到需要将 Python 脚本分发给没有 Python 环境的用户的场景,为了解决这个问题,我们可以使用 PyInstaller,一个强大的工具,它可以将 Python 脚本打包为独立的可执行文件,方便用户无需安装 Python 即可运行你的程序
    2024-11-11
  • Django框架中序列化和反序列化的例子

    Django框架中序列化和反序列化的例子

    今天小编就为大家分享一篇Django框架中序列化和反序列化的例子,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2019-08-08
  • Python web实战教程之Django文件上传和处理详解

    Python web实战教程之Django文件上传和处理详解

    Django和Flask都是Python的Web框架,用于开发Web应用程序,这篇文章主要给大家介绍了关于Python web实战教程之Django文件上传和处理的相关资料,文中通过代码介绍的非常详细,需要的朋友可以参考下
    2023-12-12
  • python实现递归查找某个路径下所有文件中的中文字符

    python实现递归查找某个路径下所有文件中的中文字符

    这篇文章主要为大家详细介绍了python实现递归查找某个路径下所有文件中的中文字符,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2019-08-08
  • 关于yolov8训练的一些改动及注意事项

    关于yolov8训练的一些改动及注意事项

    Yolo是一种目标检测算法,目标检测的任务是从图片中找出物体并给出其类别和位置,这篇文章主要给大家介绍了关于yolov8训练的一些改动及注意事项,文中通过实例代码介绍的非常详细,需要的朋友可以参考下
    2023-02-02
  • python爬虫 2019中国好声音评论爬取过程解析

    python爬虫 2019中国好声音评论爬取过程解析

    这篇文章主要介绍了python爬虫 2019中国好声音评论爬取过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-08-08
  • Python 制作自动化翻译工具

    Python 制作自动化翻译工具

    这篇文章主要介绍了Python 实现自动化翻译和替换的脚本,帮助大家更好的理解和学习使用python,提高办公效率感兴趣的朋友可以了解下
    2021-04-04
  • Python字符串的转义字符

    Python字符串的转义字符

    这篇文章主要介绍了Python字符串的转义字符,转义字符是指,用一些普通字符的组合来代替一些特殊字符,由于其组合改变了原来字符表示的含义,下文相关资料需要的小伙伴可以参考一下
    2022-04-04
  • python中解析命令行参数的示例详解

    python中解析命令行参数的示例详解

    这篇文章主要为大家详细介绍了python中命令行参数的解析,并做一个命令行程序,文中的示例代码讲解详细,感兴趣的小伙伴可以了解一下
    2024-11-11

最新评论