Scrapy的Pipeline之处理CPU密集型或阻塞型操作详解

 更新时间:2023年10月23日 09:24:22   作者:bluespacezero  
这篇文章主要介绍了Scrapy的Pipeline之处理CPU密集型或阻塞型操作详解,Twisted框架的reactor适合于处理短的、非阻塞的操作,Twisted提供了线程池来在其他的线程而不是主线程(Twisted的reactor线程)中执行慢的操作,需要的朋友可以参考下

Pipeline处理CPU密集型或阻塞型操作

Twisted框架的reactor适合于处理短的、非阻塞的操作。但是如果要处理一些复杂的、或者包含阻塞的操作又该怎么办呢?Twisted提供了线程池来在其他的线程而不是主线程(Twisted的reactor线程)中执行慢的操作——使用reactor.callInThread() API。这就意味着reactor在执行计算时还能保持运行并对事件做出反应。一定要记住线程池中的处理不是线程安全的。这就意味着当你使用了全局的状态之后,还要面临所有那些传统的多线程编程的同步问题。下面是一个简单的例子:

class UsingBlocking(object):
    @defer.inlineCallbacks
    def process_item(self, item, spider):
        price = item["price"][0]

        out = defer.Deferred()
        reactor.callInThread(self._do_calculation, price, out)

        item["price"][0] = yield out

        defer.returnValue(item)

    def _do_calculation(self, price, out):
        new_price = price + 1
        time.sleep(0.10)
        reactor.callFromThread(out.callback, new_price)

在上面的Pipeline中,对于每个Item,我们提取出它的price字段,想要在_do_caculation()方法中对它进行处理。这个方法使用了time.sleep(),一个阻塞的操作。我们调用reactor.callInThread()方法使它运行在另一个线程中,该方法的第一个参数是想要调用的函数,后面的参数则会全部传递给被调用的函数作为参数。在这里我们给被调用的函数传递了price,还有一个创建的Deferred对象out。当_do_caculation()函数完成计算后,我们会使用out的回调函数来返回这个值。接下来,yield这个 Deferred对象并为price设置一个新的值,最后返回Item。

在_do_caculation()函数中我们把price加一,然后休眠了100ms。其实这个时间是很长的,如果在reactor的线程中调用这个函数,那就意味着我们每秒只能处理不超过10个页面。不过如果把它放在另一个线程中来调用就不会出现这种问题了。这些计算任务会在线程池中排队,等待某个线程处于可用状态,然后这个线程就会执行这个任务,休眠100ms。最后一步是激活out的回调函数。通常情况下,我们可以这样来激活:out.callback(new_price),但是既然现在我们处于另外一个线程中,这样做就不安全了。如果我们执意这样做了,这个Deferred对象的代码,也就是Scrapy的功能就会在别的线程中执行,这样会导致数据被损坏。所以我们调用了reactor.callFromThread()函数,同样的,它也是以一个函数作为参数,并把额外的参数直接传递给被调用的函数。这个函数会在主线程中排队并等待被调用,它反过来解锁了process_item()方法中的yield语句,并恢复Scrapy对这个Item的操作。

如果我们的pipeline中含有全局状态会怎么样呢?比如,计数器或者平均值等,我们需要在_do_caculation()函数中使用的。例如有以下两个变量,beta和delta:

class UsingBlocking(object):
    def __init__(self):
        self.beta, self.delta = 0, 0
        ...

    def _do_calculation(self, price, out):
        self.beta += 1
        time.sleep(0.001)
        self.delta += 1
        new_price = price + self.beta - self.delta + 1
        assert abs(new_price-price-1) < 0.01

        time.sleep(0.10)...

上面的代码有一些问题,并且在运行的时候会给出assertion错误。这是因为,如果一个线程在self.beta += 1和self.delta += 1语句之间切换的话,另一个线程就会恢复执行并使用beta和delta的值来计算price,这里线程会发现这两个值处于不一致的状态(beta比delta大),这样,错误的产生了。中间短的sleep会让线程切换更可能发生,不过即使没有它,同样也会出现竞态条件。为了阻止竞态条件的发生,我们必须使用锁,例如Python的threading.RLock()锁。使用了这个递归锁,就能确保两个线程不会同时执行锁保护的临界区的代码:

class UsingBlocking(object):
    def __init__(self):
        ...
        self.lock = threading.RLock()
        ...

    def _do_calculation(self, price, out):
        with self.lock:
            self.beta += 1
            ...
            new_price = price + self.beta - self.delta + 1
        assert abs(new_price-price-1) < 0.01 ...

现在的代码就没问题了,要注意的是,我们不需要保护整个代码,只需要能够覆盖全局状态的使用就行了。

在ITEM_PIPELINES中加上:

ITEM_PIPELINES = { ...
    'properties.pipelines.computation.UsingBlocking': 500,
}

运行一下会发现,时延由于100ms的休眠的缘故变调了,不过吞吐量还是保持不变,大约每秒25个。

到此这篇关于Scrapy的Pipeline之处理CPU密集型或阻塞型操作详解的文章就介绍到这了,更多相关Pipeline处理CPU密集型或阻塞型操作内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • windows系统上通过whl文件安装triton模块的简单步骤

    windows系统上通过whl文件安装triton模块的简单步骤

    这篇文章主要介绍了在Windows系统中通过.whl文件安装Triton的步骤,包括确认系统环境、下载合适的.whl文件、使用pip安装、验证安装、使用Triton以及解决潜在问题,需要的朋友可以参考下
    2025-01-01
  • python pyautogui手动活动(模拟鼠标键盘)自动化库使用

    python pyautogui手动活动(模拟鼠标键盘)自动化库使用

    这篇文章主要为大家介绍了python pyautogui手动活动(模拟鼠标键盘)自动化库使用示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2024-01-01
  • Python 包管理器pip入门教程

    Python 包管理器pip入门教程

    这篇文章主要为大家介绍了Python pip包管理器入门教程详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-11-11
  • OpenCV霍夫圆变换cv2.HoughCircles()

    OpenCV霍夫圆变换cv2.HoughCircles()

    这篇博客将学习如何使用霍夫圆变换在图像中找到圆圈,OpenCV使用cv2.HoughCircles()实现霍夫圆变换,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-07-07
  • python中wx模块的具体使用方法

    python中wx模块的具体使用方法

    这篇文章主要介绍了python中wx模块的具体使用方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-05-05
  • Python 将字符串转换为列表的7种方法汇总

    Python 将字符串转换为列表的7种方法汇总

    这篇文章主要介绍了Python 将字符串转换为列表的7种方法汇总,在本文中,我们将尝试将给定的字符串转换为列表,其中根据用户的选择,遇到空格或任何其他特殊字符,为此,我们在string中使用split()方法,需要的朋友可以参考下
    2023-11-11
  • Djanog admin 显示图片及触发器讲解

    Djanog admin 显示图片及触发器讲解

    这篇文章主要为大家介绍了Djanog admin 显示图片及触发器讲解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-06-06
  • 这可能是最好玩的python GUI入门实例(推荐)

    这可能是最好玩的python GUI入门实例(推荐)

    这篇文章主要介绍了这可能是最好玩的python GUI入门实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-07-07
  • 浅谈Python 字符串格式化输出(format/printf)

    浅谈Python 字符串格式化输出(format/printf)

    下面小编就为大家带来一篇浅谈Python 字符串格式化输出(format/printf)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2016-07-07
  • python中内置库csv的使用及说明

    python中内置库csv的使用及说明

    这篇文章主要介绍了python中内置库csv的使用及说明,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-11-11

最新评论