python 多进程队列数据处理详解

 更新时间:2019年12月23日 10:46:42   作者:gmHappy  
今天小编就为大家分享一篇python 多进程队列数据处理详解,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧

我就废话不多说了,直接上代码吧!

# -*- coding:utf8 -*-
import paho.mqtt.client as mqtt
from multiprocessing import Process, Queue
import time, random, os
import camera_person_num
 
MQTTHOST = "172.19.4.4"
MQTTPORT = 1883
mqttClient = mqtt.Client()
q = Queue() 
 
 
# 连接MQTT服务器
def on_mqtt_connect():
  mqttClient.connect(MQTTHOST, MQTTPORT, 60)
  mqttClient.loop_start()
 
 
# 消息处理函数
def on_message_come(lient, userdata, msg):
  # print(msg.topic + ":" + str(msg.payload.decode("utf-8")))
  
  q.put(msg.payload.decode("utf-8")) # 放入队列
  print("产生消息", msg.payload.decode("utf-8"))
  # 消息处理开启多进程
  # p = Process(target=talk, args=("/camera/person/num/result", msg.payload.decode("utf-8")))
  # p.start()
 
 
def consumer(q, pid):
  print("开启消费序列进程", pid)
  while True:
    msg = q.get()
    # p = Process(target=talk, args=("/camera/person/num/result", msg, pid))
    # p.start()
    talk("/camera/person/num/result", msg, pid) 
 
 
# subscribe 消息订阅
def on_subscribe():
  mqttClient.subscribe("test123", 1) # 主题为"test"
  mqttClient.on_message = on_message_come # 消息到来处理函数
 
 
# publish 消息发布
def on_publish(topic, msg, qos):
  mqttClient.publish(topic, msg, qos);
 
 
# 多进程中发布消息需要重新初始化mqttClient
def talk(topic, msg, pid):
  cameraPsersonNum = camera_person_num.CameraPsersonNum(msg)
  t_max, t_mean, t_min = cameraPsersonNum.personNum()
  # time.sleep(20)
  print("消费消息", pid, msg) 
  mqttClient2 = mqtt.Client()
  mqttClient2.connect(MQTTHOST, MQTTPORT, 60)
  mqttClient2.loop_start()
  mqttClient2.publish(topic, '{"max":' + str(t_max) + ',"mean":' + str(t_mean) + ',"min:"' + t_min + '}', 1)
  mqttClient2.disconnect()
 
 
def main():
  
  on_mqtt_connect()
  on_subscribe()
  for i in range(1, 3):
    c1 = Process(target=consumer, args=(q, i))
    c1.start()
  while True:
    pass
 
 
if __name__ == '__main__':
  main()

以上这篇python 多进程队列数据处理详解就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • Python打印详细报错日志logging问题

    Python打印详细报错日志logging问题

    这篇文章主要介绍了Python打印详细报错日志logging问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-09-09
  • Python中base64编码与解码详解

    Python中base64编码与解码详解

    本文主要介绍了Python2和Python3中使用base64加密方式的区别,Python3中字符为unicode编码,而b64encode函数的参数为byte类型,所以需要先进行转码
    2024-11-11
  • 简单示例解析python爬虫IP的使用(小白篇)

    简单示例解析python爬虫IP的使用(小白篇)

    这篇文章主要为大家通过简单示例解析python爬虫IP的使用介绍,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-06-06
  • Python中Json使用示例详解

    Python中Json使用示例详解

    这篇文章主要介绍了Python中Json使用,主要介绍一下python 中 json的使用 如何把dict转成json 、object 转成json 、以及json转成对象,需要的朋友可以参考下
    2022-07-07
  • 浅谈python下tiff图像的读取和保存方法

    浅谈python下tiff图像的读取和保存方法

    今天小编就为大家分享一篇浅谈python下tiff图像的读取和保存方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-12-12
  • PyCharm+Qt Designer+PyUIC安装配置教程详解

    PyCharm+Qt Designer+PyUIC安装配置教程详解

    这篇文章主要介绍了PyCharm+Qt Designer+PyUIC安装配置教程详解,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2019-06-06
  • Python绘制交通流折线图详情

    Python绘制交通流折线图详情

    这篇文章主要介绍了Python绘制交通流折线图详情,文章基于python的相关资料展开折线图绘制的实现流程,感兴趣的小伙伴可以参考一下
    2022-06-06
  • python 镜像环境搭建总结

    python 镜像环境搭建总结

    这篇文章主要介绍了python 镜像环境搭建总结,文章围绕主题展开详细的内容介绍,具有一定的参考价值,需要的小伙伴可以参考一下
    2022-09-09
  • Python 如何实现变量交换

    Python 如何实现变量交换

    这篇文章主要介绍了Python 如何实现变量交换,Python 程序员肯定知道 a,b = b,a,这句话用来交换两个变量。相较于其它语言需要引入一个 temp 来临时存储变量的做法,Python 的这种写法无疑非常优雅,下面我们来看看具体的实现过程吧
    2022-01-01
  • python中lxml.etree 和 ElementTree 的区别解析

    python中lxml.etree 和 ElementTree 的区别解析

    lxml.etree 提供了更多的功能,例如 XPath、XSLT、Relax NG、 和 XML 模式支持,etree 对 Python unicode 字符串的想法与 ElementTree 不同,本文给大家介绍python中lxml.etree 和 ElementTree 的区别,感兴趣的朋友一起看看吧
    2024-01-01

最新评论