Python基于WebSocket实现直播弹幕数据采集

 更新时间:2026年04月30日 09:23:12   作者:分析全都队  
本文分享了基于WebSocket的实时弹幕采集程序设计,使用WebSocket接收实时弹幕,结合asyncio实现异步处理,将弹幕缓存并保存为日志和JSON文件,程序设计注重实时性、稳定性和可扩展性,适用于直播弹幕采集和其他实时数据应用,需要的朋友可以参考下

前言

在进行直播数据分析、舆情研究或用户互动行为研究时,弹幕数据是一类非常重要的实时文本数据来源。相比评论数据,弹幕具有两个明显特点:

一是实时性强,几乎与直播内容同步出现;

二是互动密度高,能够反映观众情绪变化与热点话题。因此,如果能够稳定地采集直播弹幕数据,就可以进一步开展诸如情感分析、关键词统计、热点时刻识别等研究。

在实际开发过程中,我尝试设计了一套 基于 WebSocket 的实时弹幕采集程序。整体思路并不复杂:通过 WebSocket 服务接收弹幕消息,对消息进行解析与格式化处理,然后通过队列进行缓存,并最终保存到日志文件和 JSON 文件中。这样不仅可以实现实时监控,还能够方便后续进行数据分析与建模。

本文结合具体代码,分享这一套弹幕采集程序的实现思路与关键技术。

一、环境准备与依赖安装

在开始之前,需要安装 WebSocket 相关依赖库。

pip install websockets

代码中还使用了以下 Python 内置库:

  • asyncio(异步编程)
  • json(数据解析)
  • logging(日志系统)
  • os(文件操作)
  • datetime(时间处理)

导入模块如下:

import asyncio
import json
import websockets
from collections import deque
import logging
import os
from datetime import datetime

这里特别需要注意的是 asyncio 与 websockets 的组合,它能够实现 异步消息处理,非常适合实时弹幕这种高频数据流场景。

二、日志系统与数据缓存设计

在实际采集弹幕时,如果只是在终端输出数据,很容易丢失重要信息。因此我在代码中设计了 日志系统 + 数据缓存队列

首先配置日志系统:

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%H:%M:%S'
)

这样可以让每一条弹幕都带有时间信息,方便后续查看与调试。

接下来创建一个 弹幕队列

danmu_queue = deque(maxlen=100)

这里使用 deque 的原因是:插入速度快、支持固定长度缓存、适合实时数据流。

三、弹幕数据存储结构设计

为了便于后续分析,我将弹幕数据保存为两种格式:

1️⃣ 文本日志
2️⃣ JSON数据

配置代码如下:

DATA_DIR = 'danmu_data'
LOG_FILE = os.path.join(DATA_DIR, 'danmu.log')
JSON_FILE = os.path.join(DATA_DIR, 'danmu.json')

os.makedirs(DATA_DIR, exist_ok=True)

这样程序运行时会自动创建一个 danmu_data 文件夹,用来存储采集的数据.

四、弹幕消息解析函数

核心逻辑是 process_message() 函数,它负责解析 WebSocket 接收到的消息。

async def process_message(message_data):

首先解析 JSON 数据:

data = json.loads(message_data)

接着判断消息类型:

if data.get('type') != 'danmu':
    return None

这里的设计非常重要,因为 WebSocket 可能会发送多种类型的消息,而我们只需要 弹幕类型的数据

然后提取关键字段:

content = message.get('content', '')
sender = message.get('nickname', 'unknown')
time = message.get('time', '')
user_token = message.get('userToken', '')
live_id = message.get('liveId', '')

最终构造一条格式化弹幕:

formatted_message = f"[{time}] {sender} [{user_token}]\n{content}"

例如:

[20:35:12] 用户12345 [token]
这主播太搞笑了

随后将弹幕保存到字典中:

danmu = {
    'user': sender,
    'content': content,
    'time': time,
    'live_id': live_id,
    'user_token': user_token,
    'formatted': formatted_message
}

并放入队列:

danmu_queue.append(danmu)

最后调用函数保存到文件。

五、WebSocket服务器实现

为了接收弹幕数据,需要搭建一个 WebSocket 服务。

核心函数:

async def websocket_handler(websocket, path):

当有客户端连接时:

client_id = id(websocket)
logging.info(f"新的客户端连接 (ID: {client_id})")

然后持续接收消息:

async for message in websocket:
    success = await process_message(message)

如果消息处理失败:

logging.warning(f"消息处理失败: {message}")

当客户端断开连接时:

except websockets.exceptions.ConnectionClosed:
    logging.info(f"客户端断开连接")

这一部分逻辑实现了 实时弹幕监听机制

六、服务器启动逻辑

服务器入口函数:

async def main():

启动 WebSocket 服务:

server = await websockets.serve(
    websocket_handler,
    "127.0.0.1",
    8765,
    ping_interval=None
)

参数解释:

关闭 ping 的原因是某些客户端在频繁 ping 时可能会出现连接异常。

七、弹幕数据保存机制

为了方便数据分析,代码设计了一个保存函数:

def save_danmu_to_file(danmu):

首先写入文本日志:

with open(LOG_FILE, 'a', encoding='utf-8') as f:
    f.write(danmu['formatted'] + '\n\n')

再写入 JSON 数据:

json.dump(danmu_with_timestamp, f, ensure_ascii=False)

并增加保存时间:

danmu_with_timestamp['save_time'] = datetime.now().isoformat()

最终每条弹幕都会以 JSON 形式存储。

例如:

{
 "user":"张三",
 "content":"主播太厉害了",
 "time":"20:35:12",
 "live_id":"123456",
 "user_token":"abcde",
 "save_time":"2026-03-09T20:35:12"
}

这样既可以用于日志查看,也可以直接用于数据分析。

八、程序启动与关闭

主程序入口:

if __name__ == "__main__":

程序启动时会记录日志:

==================================================
程序启动于: 2026-03-09 20:30:01
==================================================

程序结束时也会记录退出时间。

这一步设计的好处是:可以知道采集时长,也可以追踪异常退出。

九、弹幕采集程序的设计心得

在设计这个弹幕采集程序时,我有几个比较深的体会:

第一,WebSocket非常适合实时数据采集。相比传统HTTP轮询方式,WebSocket能够持续保持连接,从而实时接收弹幕消息,大幅降低延迟。

第二,异步编程是处理高频消息流的关键。使用 asyncio 可以避免阻塞,提高程序整体效率。

第三,数据存储需要兼顾实时与分析需求。因此同时保存为日志文件和 JSON 文件,这样既能实时查看,也方便后续做数据分析。

第四,缓存队列设计可以提高系统稳定性。通过 deque 保存最近弹幕,可以避免高频消息导致系统压力过大。

十、总结

通过这套程序,我们实现了一套完整的 直播弹幕实时采集系统,具备以下特点:

(1)WebSocket实时采集

(2)asyncio异步处理

(3)弹幕队列缓存

(4)自动日志记录

(5)JSON结构化数据保存

(6)程序运行状态记录

这套程序不仅可以用于直播弹幕采集,还可以扩展到实时舆情监测、弹幕情感分析、用户互动研究、直播热点识别。

后续如果需要,还可以进一步加入数据库存储(MySQL / MongoDB)、弹幕情感分析、实时数据可视化、弹幕关键词统计。

从整体实现来看,这套弹幕采集程序虽然代码量不算很大,但在设计上我更关注的是稳定性和可扩展性。通过 WebSocket 实现实时数据接收,再结合 asyncio 异步处理,可以比较轻松地应对高频弹幕消息;同时利用 deque 做缓存队列、日志系统做运行记录、JSON 做结构化数据存储,使得整个数据流从“接收 → 解析 → 缓存 → 落盘”形成一个完整闭环。这样的设计不仅方便后续做弹幕文本分析、情感分析或关键词统计,也为进一步接入数据库、实时可视化或数据流处理打下了基础。

以上就是Python基于WebSocket实现直播弹幕数据采集的详细内容,更多关于Python WebSocket弹幕数据采集的资料请关注脚本之家其它相关文章!

相关文章

  • Selenium 配置启动项参数的方法

    Selenium 配置启动项参数的方法

    这篇文章主要介绍了Selenium 配置启动项参数的方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-12-12
  • python 对任意数据和曲线进行拟合并求出函数表达式的三种解决方案

    python 对任意数据和曲线进行拟合并求出函数表达式的三种解决方案

    这篇文章主要介绍了python 对任意数据和曲线进行拟合并求出函数表达式的三种解决方案,本文通过实例代码给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-02-02
  • pytorch配置双显卡方式,使用双显卡跑代码

    pytorch配置双显卡方式,使用双显卡跑代码

    这篇文章主要介绍了pytorch配置双显卡方式,使用双显卡跑代码,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-06-06
  • python orm 框架中sqlalchemy用法实例详解

    python orm 框架中sqlalchemy用法实例详解

    这篇文章主要介绍了python orm 框架中sqlalchemy用法,结合实例形式详细分析了Python orm 框架基本概念、原理及sqlalchemy相关使用技巧,需要的朋友可以参考下
    2020-02-02
  • Python中实现 xls 文件转 xlsx的4种方法(示例详解)

    Python中实现 xls 文件转 xlsx的4种方法(示例详解)

    在 Python 中,可以采用 pandas、pyexcel、win32com 和 xls2xlsx 这四个模块,实现 xls 转 xlsx 格式,本文以 Excel 示例文件test_Excel.xls 为例结合示例代码给大家介绍的非常详细,感兴趣的朋友跟随小编一起看看吧
    2024-06-06
  • Python连接mysql方法及常用参数

    Python连接mysql方法及常用参数

    这篇文章主要介绍了Python连接mysql方法及常用参数,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-09-09
  • Python使用Selenium实现自动化网页批量录入

    Python使用Selenium实现自动化网页批量录入

    本文介绍一个面向上海区批量入账的网页自动化小工具的核心脚本,在本地页面上自动点击新增一行就会批量填充五列数据,下面小编就来讲讲具体的实现方法吧
    2025-11-11
  • python安装第三方包的三种方法图文详解

    python安装第三方包的三种方法图文详解

    安装Python第三方包有多种方法,下面这篇文章主要给大家介绍了关于python安装第三方包的三种方法,文中通过图文的非常详细,需要的朋友可以参考下
    2024-03-03
  • Python设计模式优雅构建代码全面教程示例

    Python设计模式优雅构建代码全面教程示例

    Python作为一门多范式的编程语言,提供了丰富的设计模式应用场景,在本文中,我们将详细介绍 Python 中的各种设计模式,包括创建型、结构型和行为型模式
    2023-11-11
  • Python使用ftplib实现简易FTP客户端的方法

    Python使用ftplib实现简易FTP客户端的方法

    这篇文章主要介绍了Python使用ftplib实现简易FTP客户端的方法,实例分析了ftplib模块相关设置与使用技巧,需要的朋友可以参考下
    2015-06-06

最新评论