解决python3 pika之连接断开的问题

 更新时间:2018年12月18日 09:51:07   作者:moxiaomomo  
今天小编就为大家分享一篇解决python3 pika之连接断开的问题,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧

问题描述

在消费rabbitMQ队列时, 每次进入回调函数内需要进行一些比较耗时的操作;操作完成后给rabbitMQ server发送ack信号以dequeue本条消息。

问题就发生在发送ack操作时, 程序提示链接已被断开或socket error。

源码示例

#!/usr/bin
#coding: utf-8

import pika
import time


USER = 'guest'
PWD = 'guest'
TEST_QUEUE = 'just4test'

def callback(ch, method, properties, body):
 print(body)
 time.sleep(600)
 ch.basic_publish('', routing_key=TEST_QUEUE, body="fortest")
 ch.basic_ack(delivery_tag = method.delivery_tag)

def test_main():
 s_conn = pika.BlockingConnection(
  pika.ConnectionParameters('127.0.0.1', 
   credentials=pika.PlainCredentials(USER, PWD)))
 chan = s_conn.channel()
 chan.queue_declare(queue=TEST_QUEUE)

 chan.basic_publish('', routing_key=TEST_QUEUE, body="fortest")
 chan.basic_consume(callback, queue=TEST_QUEUE)
 chan.start_consuming()

if __name__ == "__main__":
 test_main()

运行一段时间后, 就会报错:

[ERROR][pika.adapters.base_connection][2017-08-18 12:33:49]Error event 25, None
[CRITICAL][pika.adapters.base_connection][2017-08-18 12:33:49]Tried to handle an error where no error existed
[ERROR][pika.adapters.base_connection][2017-08-18 12:33:49]Fatal Socket Error: BrokenPipeError(32, 'Broken pipe')

问题排查

猜测:pika客户端没有及时发送心跳,连接被server断开

一开始修改了heartbeat_interval参数值, 示例如下:

def test_main():
 s_conn = pika.BlockingConnection(
  pika.ConnectionParameters('127.0.0.1', 
   heartbeat_interval=10,
   socket_timeout=5,
   credentials=pika.PlainCredentials(USER, PWD)))
 # ....

修改后运行依然报错,后来想想应该单线程被一直占用,pika无法发送心跳;

于是又加了个心跳线程, 示例如下:

#!/usr/bin
#coding: utf-8

import pika
import time
import logging
import threading

USER = 'guest'
PWD = 'guest'
TEST_QUEUE = 'just4test'

class Heartbeat(threading.Thread):
 def __init__(self, connection):
  super(Heartbeat, self).__init__()
  self.lock = threading.Lock()
  self.connection = connection
  self.quitflag = False
  self.stopflag = True
  self.setDaemon(True)

 def run(self):
  while not self.quitflag:
   time.sleep(10)
   self.lock.acquire()
   if self.stopflag :
    self.lock.release()
    continue
   try:
    self.connection.process_data_events()
   except Exception as ex:
    logging.warn("Error format: %s"%(str(ex)))
    self.lock.release()
    return
   self.lock.release()

 def startHeartbeat(self):
  self.lock.acquire()
  if self.quitflag==True:
   self.lock.release()
   return
  self.stopflag=False
  self.lock.release()

def callback(ch, method, properties, body):
 logging.info("recv_body:%s" % body)
 time.sleep(600)
 ch.basic_ack(delivery_tag = method.delivery_tag)

def test_main():
 s_conn = pika.BlockingConnection(
  pika.ConnectionParameters('127.0.0.1', 
   heartbeat_interval=10,
   socket_timeout=5,
   credentials=pika.PlainCredentials(USER, PWD)))
 chan = s_conn.channel()
 chan.queue_declare(queue=TEST_QUEUE)
 chan.basic_consume(callback,
      queue=TEST_QUEUE)

 heartbeat = Heartbeat(s_conn)
 heartbeat.start()   #开启心跳线程
 heartbeat.startHeartbeat()
 chan.start_consuming()

if __name__ == "__main__":
 test_main()

尝试运行,结果还是不行,不得不安静下来思考自己是不是想错了。

去看它的api,看到heartbeat_interval的解析:

:param int heartbeat_interval: How often to send heartbeats.
         Min between this value and server's proposal
         will be used. Use 0 to deactivate heartbeats
         and None to accept server's proposal.

按这样说法,应该还是没有把心跳值给设置好。上面的程序期望是10秒发一次心跳,但是理论上发送心跳的间隔会比10秒多一点。所以艾玛,我应该是把heartbeat_interval的作用搞错了, 它是指超过这个时间间隔不发心跳或不给server任何信息,server就会断开连接, 而不是说pika会按这个间隔来发心跳。 结果我把heartbeat_interval值设置高一点(比实际发送心跳/信息的间隔更长),比如上面设置成60秒,就正常运行了。

如果不指定heartbeat_interval, 它默认为None, 意味着按rabbitMQ server的配置来检测心跳是否正常。

如果设置heartbeat_interval=0, 意味着不检测心跳,server端将不会主动断开连接。

以上这篇解决python3 pika之连接断开的问题就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • Python列表之间的数字与字符转化实例

    Python列表之间的数字与字符转化实例

    在python列表操作中,面对需要把列表中的字符串转为礼拜的操作,无需强转,通过简单的几步就可以实现,下面这篇文章主要给大家介绍了关于Python列表之间的数字与字符转化的相关资料,需要的朋友可以参考下
    2023-02-02
  • python 使用openpyxl读取excel数据

    python 使用openpyxl读取excel数据

    这篇文章主要介绍了python 使用openpyxl读取excel数据的方法,帮助大家更好的理解和学习使用python,感兴趣的朋友可以了解下
    2021-02-02
  • python+matplotlib绘制旋转椭圆实例代码

    python+matplotlib绘制旋转椭圆实例代码

    这篇文章主要介绍了python+matplotlib绘制旋转椭圆实例代码,具有一定借鉴价值,需要的朋友可以参考下
    2018-01-01
  • python正则表达式re.sub各个参数的超详细讲解

    python正则表达式re.sub各个参数的超详细讲解

    Python 的 re 模块提供了re.sub用于替换字符串中的匹配项,下面这篇文章主要给大家介绍了关于python正则表达式re.sub各个参数的相关资料,文中通过实例代码介绍的非常详细,需要的朋友可以参考下
    2022-07-07
  • Python matplotlib画图实例之绘制拥有彩条的图表

    Python matplotlib画图实例之绘制拥有彩条的图表

    这篇文章主要介绍了Python matplotlib画图实例之绘制拥有彩条的图表,具有一定借鉴价值,需要的朋友可以参考下
    2017-12-12
  • python中的多进程的创建与启动方式

    python中的多进程的创建与启动方式

    这篇文章主要介绍了python中的多进程的创建与启动,python中的并发有三种形式,多进程、多线程、协程,执⾏并发任务的⽬的是为了提⾼程序运⾏的效率,本文通过实例代码详细讲解需要的朋友可以参考下
    2022-12-12
  • Python json解析库jsonpath原理及使用示例

    Python json解析库jsonpath原理及使用示例

    这篇文章主要介绍了Python json解析库jsonpath原理及使用示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-11-11
  • django框架防止XSS注入的方法分析

    django框架防止XSS注入的方法分析

    这篇文章主要介绍了django框架防止XSS注入的方法,结合实例形式分析了XSS攻击的原理及Django框架防止XSS攻击的相关操作技巧,需要的朋友可以参考下
    2019-06-06
  • Python SMTP发送电子邮件的示例

    Python SMTP发送电子邮件的示例

    这篇文章主要介绍了Python SMTP发送电子邮件的示例,帮助大家更好的理解和使用python,感兴趣的朋友可以了解下
    2020-09-09
  • numpy多级排序lexsort函数的使用

    numpy多级排序lexsort函数的使用

    本文主要介绍了numpy多级排序lexsort函数的使用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-03-03

最新评论