使用FastAPI和Redis实现实时通知方式(SSE)

 更新时间:2026年01月21日 16:45:42   作者:写bug如流水  
本文介绍了如何使用FastAPI和Redis实现Server-SentEvents(SSE)来推送实时通知,Server-SentEvents是一种单向数据流技术,适用于实时数据推送场景,FastAPI和Redis的结合可以高效地实现这一功能,并通过心跳机制管理连接

在当今快速发展的Web应用程序中,实时通知已成为用户体验的重要组成部分。

无论是社交媒体更新、消息通知,还是系统状态提醒,实时数据推送可以极大地提升用户互动性。

本文将详细介绍如何使用FastAPI和Redis实现Server-Sent Events (SSE) 来推送实时通知。

什么是Server-Sent Events (SSE)?

Server-Sent Events(SSE)是一种通过HTTP连接从服务器向客户端发送实时更新的技术。

与WebSocket相比,SSE的实现更加简单,适用于单向数据流场景。

服务器可以持续向客户端推送数据,而无需客户端不断发起请求。

为什么选择FastAPI和Redis?

  • FastAPI:作为一个现代的Web框架,FastAPI以其高性能和易用性而闻名。它支持异步编程,使得构建高并发应用变得更加简单。
  • Redis:作为一个高性能的键值存储数据库,Redis适合用作缓存和消息代理。在我们的场景中,Redis可以存储心跳信息,以决定何时停止推送通知。

环境准备

在开始之前,确保你的开发环境中安装了fastapiuvicornredis库。

可以通过以下命令进行安装:

pip install fastapi uvicorn redis

实现步骤

1. 创建FastAPI应用

首先,我们需要设置一个基本的FastAPI应用。下面是实现SSE的基本代码:

from fastapi import FastAPI
from starlette.responses import EventSourceResponse
import asyncio
import redis

app = FastAPI()
redis_client = redis.Redis()

async def event_stream(key: str):
    while True:
        if redis_client.ttl(key) == -1:
            yield f"data: 心跳已过期\n\n"
            break
        await asyncio.sleep(1)  # 模拟等待
        yield f"data: 这里是实时通知\n\n"

@app.get("/notifications/{key}")
async def notifications(key: str):
    return EventSourceResponse(event_stream(key))

@app.post("/heartbeat/{key}/{seconds}")
async def set_heartbeat(key: str, seconds: int):
    redis_client.setex(key, seconds, "active")
    return {"message": "Heartbeat set"}

2. 代码解析

  • event_stream 函数:这是一个异步生成器。它将不断检查Redis中指定键的TTL(生存时间)。如果TTL过期,生成器将停止发送消息。
  • /notifications/{key}:这个端点用于建立SSE连接,允许客户端接收实时通知。
  • /heartbeat/{key}/{seconds}:这个端点用于设置Redis中的心跳时间,确保在指定时间内保持连接。

3. 前端实现

在前端,我们可以使用JavaScript来发送心跳请求并接收来自服务器的通知。以下是一个简单的示例:

const key = 'your_key';
const heartbeatDuration = 10; // 设置心跳时长为10秒

// 发送心跳请求
fetch(`/heartbeat/${key}/${heartbeatDuration}`, { method: 'POST' });

// 接收SSE通知
const eventSource = new EventSource(`/notifications/${key}`);
eventSource.onmessage = function(event) {
    console.log(event.data); // 在控制台显示通知
};

4. 测试与调试

在开发过程中,你可以使用uvicorn来运行你的FastAPI应用:

uvicorn your_file_name:app --reload

打开浏览器,确保前端代码能成功连接到后端,并能正确接收通知。

如果遇到问题,检查浏览器的开发者工具,查看网络请求和控制台输出,以便进行调试。

应用场景

这种SSE实现可以广泛应用于以下场景:

  • 社交网络:实时推送新消息或评论。
  • 在线游戏:通知玩家状态变化。
  • 监控系统:实时显示系统性能或状态更新。

总结

通过以上步骤,我们实现了一个使用FastAPI和Redis的实时通知系统。该系统能够根据心跳状态,持续推送通知,直到心跳到期为止。

你可以根据具体需求进一步扩展这个示例,例如添加用户身份验证、处理不同类型的通知等。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • SpringMVC找不到Controller路径的解决方案

    SpringMVC找不到Controller路径的解决方案

    这篇文章主要介绍了SpringMVC找不到Controller路径的解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-06-06
  • Java中对AtomicInteger和int值在多线程下递增操作的测试

    Java中对AtomicInteger和int值在多线程下递增操作的测试

    这篇文章主要介绍了Java中对AtomicInteger和int值在多线程下递增操作的测试,本文得出AtomicInteger操作 与 int操作的效率大致相差在50-80倍上下的结论,需要的朋友可以参考下
    2014-09-09
  • Java泛型定义与用法入门示例

    Java泛型定义与用法入门示例

    这篇文章主要介绍了Java泛型定义与用法,结合实例形式分析了java泛型的功能、定义、应用场景及相关使用注意事项,需要的朋友可以参考下
    2019-08-08
  • 探讨Java验证码制作(上篇)

    探讨Java验证码制作(上篇)

    很多朋友对验证码并不陌生,无论是申请账号还是某些情况下登录时都会要求输入验证码。接下来通过本文给大家介绍java验证码制作的方法,感兴趣的朋友一起学习吧
    2016-05-05
  • SpringBoot集成WebSocket实现前后端消息互传的方法

    SpringBoot集成WebSocket实现前后端消息互传的方法

    这篇文章主要介绍了SpringBoot集成WebSocket实现前后端消息互传的方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-10-10
  • 一篇文章带你了解初始Spring

    一篇文章带你了解初始Spring

    这篇文章主要给大家介绍了一个简单的Spring容器初始化流程的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-08-08
  • BufferedReader中read()方法和readLine()方法的使用

    BufferedReader中read()方法和readLine()方法的使用

    这篇文章主要介绍了BufferedReader中read()方法和readLine()方法的使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-04-04
  • Java List 用法实例详解

    Java List 用法实例详解

    这篇文章主要介绍了Java List 用法实例详解的相关资料,需要的朋友可以参考下
    2017-09-09
  • java持久层框架mybatis防止sql注入的方法

    java持久层框架mybatis防止sql注入的方法

    下面小编就为大家带来一篇java持久层框架mybatis防止sql注入的方法。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2016-10-10
  • Java中左移和右移问题图文详解

    Java中左移和右移问题图文详解

    左移和右移并不常用,在一些特殊情况下才会使用,比如加解密时,会大量用到,这篇文章主要给大家介绍了关于Java中左移和右移问题的相关资料,需要的朋友可以参考下
    2021-11-11

最新评论