Flask框架里面sse的使用示例

 更新时间:2025年05月20日 09:24:22   作者:zwjapple  
本文主要介绍了Flask框架里面sse的使用示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

可以在一个 Flask 应用中注册多个 Blueprint,每个 Blueprint 可以对应一个 SSE 接口。例如:

from flask import Flask, Response, Blueprint
import time

app = Flask(__name__)

bp1 = Blueprint('sse1', __name__)
bp2 = Blueprint('sse2', __name__)

@bp1.route('/sse1')
def sse1():
    def generate():
        while True:
            yield 'data: {}\n\n'.format(time.time())
            time.sleep(1)
    return Response(generate(), mimetype='text/event-stream')

@bp2.route('/sse2')
def sse2():
    def generate():
        while True:
            yield 'data: {}\n\n'.format(time.strftime('%Y-%m-%d %H:%M:%S'))
            time.sleep(1)
    return Response(generate(), mimetype='text/event-stream')

app.register_blueprint(bp1)
app.register_blueprint(bp2)

if __name__ == '__main__':
    app.run(debug=True)

在这个例子中,我们创建了两个 Blueprint,分别对应 `/sse1` 和 `/sse2` 接口。每个 Blueprint 中的函数都返回一个 SSE 数据流,使用 `Response` 类型的响应对象封装。最后在 Flask 应用中注册这两个 Blueprint,即可同时开启两个 SSE 接口。

from flask import Flask, Response, request

app = Flask(__name__)

# 存储所有的订阅者
subscribers = {}

# 订阅接口,每个浏览器对应一个topic
@app.route('/subscribe/<topic>')
def subscribe(topic):
    def stream():
        # 将当前请求的客户端添加到订阅者列表中
        subscribers.setdefault(topic, []).append(stream)
        # 无限循环,等待新的消息
        while True:
            # 等待新的消息
            message = request.args.get('message')
            if message:
                # 向所有订阅者推送新的消息
                for subscriber in subscribers.get(topic, []):
                    subscriber.put(message)
            # 休眠一段时间,减少服务器压力
            time.sleep(1)

    # 设置响应头,告诉客户端这是一个SSE流
    response = Response(stream(), mimetype='text/event-stream')
    response.headers['Cache-Control'] = 'no-cache'
    response.headers['Connection'] = 'keep-alive'
    return response

# 推送接口,向指定topic的所有订阅者推送消息
@app.route('/publish/<topic>')
def publish(topic):
    # 获取要推送的消息
    message = request.args.get('message')
    if message:
        # 向所有订阅者推送新的消息
        for subscriber in subscribers.get(topic, []):
            subscriber.put(message)
    return 'OK'

在上面的代码中,我们定义了两个接口,`/subscribe/<topic>`用于订阅指定的topic,`/publish/<topic>`用于向指定topic的所有订阅者推送消息。
在订阅接口中,我们将当前请求的客户端添加到订阅者列表中,并通过一个无限循环等待新的消息。当有新的消息到达时,我们会向所有订阅者推送这条消息。
在推送接口中,我们获取要推送的消息,并向指定topic的所有订阅者推送这条消息。
使用这个代码,你可以轻松地实现一个简单的SSE推送服务,每个浏览器对应一个topic。

import uuid
from flask import Flask, jsonify, request, Response, g
from flask_cors import CORS
from flask_sse import sse
import time
import json

app = Flask(__name__)
cros = CORS(app)
app.config['REDIS_URL'] = 'redis://localhost'
app.register_blueprint(sse, url_prefix='/stream')
# SSE 推送函数


# SSE 推送路由


@app.route('/register', methods=["GET"])
def register():
    # 获取客户端标识符
    client_id = str(uuid.uuid4())

    # 返回 SSE 响应
    return jsonify({"client_id": client_id})

# SSE 推送路由


@app.route('/sse', methods=['POST'])
def stream():
    # 获取客户端标识符
    data = request.get_json()
    client_id = data['clientId']
    print("client_id", client_id)

    def aa():
        # 循环发送 SSE 数据
        for i in range(10):
            data = 'Hello, %s!' % client_id+str(i)
            sse.publish(data, channel=client_id, type='message')
            time.sleep(1)
        sse.publish("end", channel=client_id, type='message')
     # 返回 SSE 响应
    response = Response(aa(), mimetype='text/event-stream')
    response.headers.add('Cache-Control', 'no-cache')
    response.headers.add('Connection', 'keep-alive')
    response.headers.add('X-Accel-Buffering', 'no')
    return response


if __name__ == '__main__':
    app.run(debug=True, port=5000)

在上面的代码中,我们使用了 Flask-SSE 扩展来管理 SSE 通道。我们在 `/register` 路由中为每个客户端创建了唯一的标识符,并将其存储在请求环境中。在 `/sse` 路由中,我们使用 `sse.publish` 方法发送 SSE 数据。

到此这篇关于Flask框架里面sse的使用示例的文章就介绍到这了,更多相关Flask使用sse内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Python环境的安装以及PyCharm编辑器配置教程详解

    Python环境的安装以及PyCharm编辑器配置教程详解

    优质的教程可以让我们少走很多弯路,这一点毋庸置疑。这篇文章主要为大家介绍了纯净Python环境的安装以及PyCharm编辑器的配置,需要的可以参考一下
    2023-04-04
  • Python图像特效之模糊玻璃效果

    Python图像特效之模糊玻璃效果

    这篇文章主要为大家详细介绍了Python图像特效之模糊玻璃效果,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-09-09
  • Python自动化短视频生成脚本实现热门视频流水线生产

    Python自动化短视频生成脚本实现热门视频流水线生产

    有粉丝和说,最近在网上看到一些视频营销号一天能发布几百条短视频, 感觉是批量生成的,能不能用Python做个自动化短视频生成脚本呢?今天就带大家一起实现热门视频批量流水线生产
    2021-09-09
  • 使用python实现数组、链表、队列、栈的方法

    使用python实现数组、链表、队列、栈的方法

    数据结构是指相互之间存在着一种或多种关系的数据元素的集合和该集合中数据元素之间的关系组成。这篇文章主要介绍了使用python实现数组、链表、队列、栈的相关知识,需要的朋友可以参考下
    2019-12-12
  • python实现向微信用户发送每日一句 python实现微信聊天机器人

    python实现向微信用户发送每日一句 python实现微信聊天机器人

    这篇文章主要为大家详细介绍了python实现向微信用户发送每日一句,python调实现微信聊天机器人,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2019-03-03
  • Python之Matlibplot画图功能演示过程

    Python之Matlibplot画图功能演示过程

    这篇文章主要介绍了Python之Matlibplot画图功能演示过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-09-09
  • 从基础到高级详解Python迭代器手动访问完全指南

    从基础到高级详解Python迭代器手动访问完全指南

    在Python高级编程中,手动控制迭代器是处理复杂数据流的关键技术,本文将深入解析Python手动迭代技术体系,并拓展大数据处理、流式计算、自定义数据结构等工程级应用场景
    2025-09-09
  • Python Rich增加终端显示视觉效果

    Python Rich增加终端显示视觉效果

    Python开发中,命令行界面(CLI)经常被用于交互和数据展示,虽然命令行界面通常被视为简单、枯燥的文本显示区域,通过Python的Rich库,为命令行界面带来更多生机和视觉吸引力,本文带大家探索Rich功能强大的Python库,增强终端文本渲染,使输出更具有吸引力和可读性
    2024-01-01
  • Django 自动生成api接口文档教程

    Django 自动生成api接口文档教程

    今天小编就为大家分享一篇Django 自动生成api接口文档教程,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2019-11-11
  • selenium 多窗口切换的实现(windows)

    selenium 多窗口切换的实现(windows)

    这篇文章主要介绍了selenium 多窗口切换的实现(windows),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-01-01

最新评论