Python实现从网络摄像头拉流的方法分享

 更新时间:2023年01月28日 14:35:54   作者:AI浩  
这篇文章主要为大家详细介绍了Python实现从网络摄像头拉流的几种方法,文中的示例代码讲解详细,具有一定的学习价值,感兴趣的小伙伴可以了解一下

摘要

本文介绍几种从摄像头拉流的方法。

1、直接使用OpenCV

直接使用opencv的cv2.VideoCapture直接读取rtsp视频流,但是这样做的缺点是延迟严重、出现掉帧、花屏现象等,原因在于opencv自己有一个缓存,每次会顺序从自己的缓存中读取,而不是直接读取最新帧。

代码如下:

import cv2
import datetime
def time_str(fmt=None):
    if fmt is None:
        fmt = '%Y_%m_%d_%H_%M_%S'
    return datetime.datetime.today().strftime(fmt)

user_name, user_pwd = "admin", "1234"
ca_ip="192.168.1.100"
channel=2
cap = cv2.VideoCapture("rtsp://%s:%s@%s//Streaming/Channels/%d" \
                           % (user_name, user_pwd, ca_ip, channel))
if cap.isOpened():
    print("Opened")
while cap.isOpened():
        ret, frame = cap.read()
        cv2.imwrite("opencv_"+time_str() + ".jpg", frame)

2、使用ffmpeg

FFmpeg是一套强大的视频、音频处理程序,也是很多视频处理软件的基础 。但是FFmpeg的命令行使用起来有一定的学习成本。而ffmpeg-python就是解决FFmpeg学习成本的问题,让开发者使用python就可以调用FFmpeg的功能,既减少了学习成本,也增加了代码的可读性。

github地址:https://github.com/kkroening/ffmpeg-python

2.1、安装方法 

2.1.1、安装ffmpeg-python 

ffmpeg-python可以通过典型的 pip 安装获取最新版本(注意:是ffmpeg-python,不要写成了python-ffmpeg):

pip install ffmpeg-python

或者可以从本地克隆和安装源:

git clone git@github.com:kkroening/ffmpeg-python.git
pip install -e ./ffmpeg-python

2.1.2、安装FFmpeg 

使用该库,需要自行安装FFmpeg,如果电脑已经安装了,可以忽略本步骤。这里推荐直接使用conda进行安装,可以省下很多麻烦,其他的安装方式自行百度。

conda install ffmpeg

2.2、代码实现

使用ffmpeg读取rtsp流并转换成numpy array,并使用cv2.imwrite保存。

import ffmpeg
import numpy as np
import cv2
import datetime

def main(source):
    args = {
        "rtsp_transport": "tcp",
        "fflags": "nobuffer",
        "flags": "low_delay"
    }    # 添加参数
    probe = ffmpeg.probe(source)
    cap_info = next(x for x in probe['streams'] if x['codec_type'] == 'video')
    print("fps: {}".format(cap_info['r_frame_rate']))
    width = cap_info['width']           # 获取视频流的宽度
    height = cap_info['height']         # 获取视频流的高度
    up, down = str(cap_info['r_frame_rate']).split('/')
    fps = eval(up) / eval(down)
    print("fps: {}".format(fps))    # 读取可能会出错错误
    process1 = (
        ffmpeg
        .input(source, **args)
        .output('pipe:', format='rawvideo', pix_fmt='rgb24')
        .overwrite_output()
        .run_async(pipe_stdout=True)
    )
    while True:
        in_bytes = process1.stdout.read(width * height * 3)     # 读取图片
        if not in_bytes:
            break
        # 转成ndarray
        in_frame = (
            np
            .frombuffer(in_bytes, np.uint8)
            .reshape([height, width, 3])
        )
        frame = cv2.cvtColor(in_frame, cv2.COLOR_RGB2BGR)  # 转成BGR
        # cv2.imshow(time_str(), frame)
        cv2.imwrite(time_str()+".jpg", frame)
        # if cv2.waitKey(1) == ord('q'):
        #     break
    process1.kill()             # 关闭

def time_str(fmt=None):
    if fmt is None:
        fmt = '%Y_%m_%d_%H_%M_%S'
    return datetime.datetime.today().strftime(fmt)

if __name__ == "__main__":
    # rtsp流需要换成自己的
    user_name, user_pwd = "admin", "1234"
    ca_ip = "192.168.1.168"
    channel = 2
    alhua_rtsp="rtsp://%s:%s@%s//Streaming/Channels/%d" \
                           % (user_name, user_pwd, ca_ip, channel)

    main(alhua_rtsp)

3、多线程的方式读取图片

采用多线程的方式,新开一个线程,利用变量、队列等方式保存最新帧,使得每次都读取最新帧,而不是opencv自己缓存中的顺序帧,不会延迟,不会花屏了,代码如下:

import cv2
import threading
import sys
import  datetime
def time_str(fmt=None):
    if fmt is None:
        fmt = '%Y_%m_%d_%H_%M_%S'
    return datetime.datetime.today().strftime(fmt)

class RTSCapture(cv2.VideoCapture):
    _cur_frame = None
    _reading = False
    schemes = ["rtsp://","rtmp://"]
    @staticmethod
    def create(url, *schemes):
        rtscap = RTSCapture(url)
        rtscap.frame_receiver = threading.Thread(target=rtscap.recv_frame, daemon=True)
        rtscap.schemes.extend(schemes)
        if isinstance(url, str) and url.startswith(tuple(rtscap.schemes)):
            rtscap._reading = True
        elif isinstance(url, int):
            pass
        return rtscap

    def isStarted(self):
        ok = self.isOpened()
        if ok and self._reading:
            ok = self.frame_receiver.is_alive()
        return ok

    def recv_frame(self):
        while self._reading and self.isOpened():
            ok, frame = self.read()
            if not ok: break
            self._cur_frame = frame
        self._reading = False

    def read2(self):
        frame = self._cur_frame
        self._cur_frame = None
        return frame is not None, frame

    def start_read(self):
        self.frame_receiver.start()
        self.read_latest_frame = self.read2 if self._reading else self.read

    def stop_read(self):
        self._reading = False
        if self.frame_receiver.is_alive(): self.frame_receiver.join()


if __name__ == '__main__':
    user_name, user_pwd = "admin", "1234"
    ca_ip = "192.168.1.100"
    channel = 2
    alhua_rtsp="rtsp://%s:%s@%s//Streaming/Channels/%d" \
                           % (user_name, user_pwd, ca_ip, channel)

    rtscap = RTSCapture.create(alhua_rtsp)
    rtscap.start_read()

    while rtscap.isStarted():
        ok, frame = rtscap.read_latest_frame()
        # if cv2.waitKey(100) & 0xFF == ord('q'):
        #     break
        if not ok:
            continue


        # inhere
        # cv2.imshow(time_str(), frame)
        cv2.imwrite(time_str() + ".jpg", frame)


    rtscap.stop_read()
    rtscap.release()
    cv2.destroyAllWindows()

运行结果:

4、多进程的方式拉流

使用Python3自带的多进程模块,创建一个队列,进程A从通过rtsp协议从视频流中读取出每一帧,并放入队列中,进程B从队列中将图片取出,处理后进行显示。进程A如果发现队列里有两张图片(证明进程B的读取速度跟不上进程A),那么进程A主动将队列里面的旧图片删掉,换上新图片。通过多线程的方法:

代码如下:

import cv2
import multiprocessing as mp
import time
import datetime


def time_str(fmt=None):
    if fmt is None:
        fmt = '%Y_%m_%d_%H_%M_%S'
    return datetime.datetime.today().strftime(fmt)

def image_put(q, user, pwd, ip, channel=1):
    cap = cv2.VideoCapture("rtsp://%s:%s@%s//Streaming/Channels/%d" % (user, pwd, ip, channel))
    if cap.isOpened():
        print('HIKVISION')
    else:
        cap = cv2.VideoCapture("rtsp://%s:%s@%s/cam/realmonitor?channel=%d&subtype=0" % (user, pwd, ip, channel))
        print('DaHua')

    while True:
        q.put(cap.read()[1])
        q.get() if q.qsize() > 1 else time.sleep(0.01)


def image_get(q, window_name):
    # cv2.namedWindow(window_name, flags=cv2.WINDOW_FREERATIO)
    while True:
        frame = q.get()
        # cv2.imshow(window_name, frame)
        # cv2.waitKey(1)
        cv2.imwrite("opencv_"+time_str() + ".jpg", frame)
        cv2.waitKey(1)

def run_single_camera():
    user_name, user_pwd, camera_ip = "admin", "admin123456", "192.168.35.121"

    mp.set_start_method(method='spawn')  # init
    queue = mp.Queue(maxsize=2)
    processes = [mp.Process(target=image_put, args=(queue, user_name, user_pwd, camera_ip)),
                 mp.Process(target=image_get, args=(queue, camera_ip))]

    [process.start() for process in processes]
    [process.join() for process in processes]

def run_multi_camera():
    # user_name, user_pwd = "admin", "password"
    user_name, user_pwd = "admin", "1234"
    camera_ip_l = [
        "192.168.1.XX3",  # ipv4
        "192.168.1.XX2",
        "192.168.1.XX1",
    ]

    mp.set_start_method(method='spawn')  # init
    queues = [mp.Queue(maxsize=90) for _ in camera_ip_l]

    processes = []
    for queue, camera_ip in zip(queues, camera_ip_l):
        processes.append(mp.Process(target=image_put, args=(queue, user_name, user_pwd, camera_ip)))
        processes.append(mp.Process(target=image_get, args=(queue, camera_ip)))

    for process in processes:
        process.daemon = True
        process.start()
    for process in processes:
        process.join()


if __name__ == '__main__':
    # run_single_camera()
    run_multi_camera()
    pass

到此这篇关于Python实现从网络摄像头拉流的方法分享的文章就介绍到这了,更多相关Python网络摄像头拉流内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Tensorflow2.10使用BERT从文本中抽取答案实现详解

    Tensorflow2.10使用BERT从文本中抽取答案实现详解

    这篇文章主要为大家介绍了Tensorflow2.10使用BERT从文本中抽取答案实现详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-04-04
  • Python编程中NotImplementedError的使用方法

    Python编程中NotImplementedError的使用方法

    下面小编就为大家分享一篇Python编程中NotImplementedError的使用方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-04-04
  • 数据清洗--DataFrame中的空值处理方法

    数据清洗--DataFrame中的空值处理方法

    今天小编就为大家分享一篇数据清洗--DataFrame中的空值处理方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-07-07
  • 一文读懂python Scrapy爬虫框架

    一文读懂python Scrapy爬虫框架

    这篇文章主要介绍了一文读懂python Scrapy爬虫框架的相关知识,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-02-02
  • python uuid模块使用实例

    python uuid模块使用实例

    这篇文章主要介绍了python uuid模块使用实例,本文给出简单使用示例,讲解uuid1、uuid3、 uuid4、 uuid5这几个方法,需要的朋友可以参考下
    2015-04-04
  • 新手如何快速入门Python(菜鸟必看篇)

    新手如何快速入门Python(菜鸟必看篇)

    下面小编就为大家带来一篇新手如何快速入门Python(菜鸟必看篇)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-06-06
  • 详解python中absl包的使用

    详解python中absl包的使用

    "absl" 是 Google 开发的一个 Python 软件包,用于提供一些常见的 Python 编程功能和工具,以改善代码的可读性、可维护性和性能,下面我们就来看看absl包的具体使用吧
    2023-11-11
  • Python实现字典依据value排序

    Python实现字典依据value排序

    新华字典大家都使用过吧,那么使用python语言是如何实现字典排序的呢?下面跟着本教程一起学习Python实现字典依据value排序,需要的朋友参考下吧
    2016-02-02
  • 详解如何使用numpy提高Python数据分析效率

    详解如何使用numpy提高Python数据分析效率

    NumPy是Python语言的一个第三方库,其支持大量高维度数组与矩阵运算。本文主要为大家介绍了如何使用numpy提高python数据分析效率,需要的可以参考一下
    2023-04-04
  • Python中ImportError错误的详细解决方法

    Python中ImportError错误的详细解决方法

    最近辛辛苦苦安装完了python,最后再运行的时候会出现错误,所以这篇文章主要给大家介绍了关于Python中ImportError错误的详细解决方法,文中通过实例代码介绍的非常详细,需要的朋友可以参考下
    2022-07-07

最新评论