Python实现敏感词检测的多种方案
一、引言
在互联网内容审核、社交平台监管、评论系统过滤等场景中,敏感词检测是一项必不可少的功能。Python凭借其丰富的生态和简洁的语法,提供了多种实现敏感词检测的方案。本文将详细介绍几种主流的实现方式,并分析各自的优缺点及适用场景。
二、基础方案:关键词匹配
2.1 直接遍历匹配
最简单的实现方式是使用列表存储敏感词,然后遍历检测:
class SimpleSensitiveWordFilter:
def __init__(self):
self.sensitive_words = []
def add_words(self, words):
self.sensitive_words.extend(words)
def contains_sensitive_word(self, text):
for word in self.sensitive_words:
if word in text:
return True
return False
def replace_sensitive_words(self, text, replace_char='*'):
result = text
for word in self.sensitive_words:
result = result.replace(word, replace_char * len(word))
return result优点:实现简单,易于理解
缺点:效率低,时间复杂度O(n*m),无法处理重叠匹配
2.2 正则表达式匹配
使用正则表达式可以更灵活地匹配敏感词:
import re
class RegexSensitiveWordFilter:
def __init__(self):
self.pattern = None
def add_words(self, words):
# 使用正则的 | 连接所有敏感词
pattern_str = '|'.join(re.escape(word) for word in words)
self.pattern = re.compile(pattern_str, re.IGNORECASE)
def contains_sensitive_word(self, text):
return bool(self.pattern.search(text))
def find_all_sensitive_words(self, text):
return self.pattern.findall(text)
def replace_sensitive_words(self, text, replace_char='*'):
def replace_func(match):
return replace_char * len(match.group())
return self.pattern.sub(replace_func, text)优点:支持复杂匹配规则,代码简洁
缺点:敏感词数量大时性能下降,正则表达式编译开销较大
三、进阶方案:前缀树(Trie树)
3.1 原理介绍
Trie树(前缀树)是一种专门用于字符串快速匹配的树形数据结构。它的核心思想是利用字符串的公共前缀来减少不必要的比较,实现O(n)的时间复杂度。
3.2 实现代码
class TrieNode:
__slots__ = ('children', 'is_end')
def __init__(self):
self.children = {}
self.is_end = False
class TrieSensitiveWordFilter:
def __init__(self):
self.root = TrieNode()
def add_word(self, word):
"""添加单个敏感词"""
node = self.root
for char in word:
if char not in node.children:
node.children[char] = TrieNode()
node = node.children[char]
node.is_end = True
def add_words(self, words):
"""批量添加敏感词"""
for word in words:
self.add_word(word)
def contains_sensitive_word(self, text):
"""检测是否包含敏感词"""
for i in range(len(text)):
node = self.root
j = i
while j < len(text) and text[j] in node.children:
node = node.children[text[j]]
if node.is_end:
return True
j += 1
return False
def find_all_sensitive_words(self, text):
"""找出所有敏感词及位置"""
results = []
for i in range(len(text)):
node = self.root
j = i
while j < len(text) and text[j] in node.children:
node = node.children[text[j]]
if node.is_end:
results.append({
'word': text[i:j+1],
'start': i,
'end': j
})
j += 1
return results
def replace_sensitive_words(self, text, replace_char='*'):
"""替换敏感词"""
sensitive_positions = self.find_all_sensitive_words(text)
if not sensitive_positions:
return text
result = list(text)
for pos in sensitive_positions:
for k in range(pos['start'], pos['end'] + 1):
result[k] = replace_char
return ''.join(result)3.3 优化版:AC自动机
AC自动机(Aho-Corasick automaton)在Trie树的基础上增加了失败指针,实现多模式串的高效匹配:
from collections import deque
class AhoCorasickNode:
__slots__ = ('children', 'fail', 'output')
def __init__(self):
self.children = {}
self.fail = None
self.output = [] # 存储以此节点结尾的敏感词
class AhoCorasickFilter:
def __init__(self):
self.root = AhoCorasickNode()
self._built = False
def add_word(self, word):
"""添加敏感词"""
node = self.root
for char in word:
if char not in node.children:
node.children[char] = AhoCorasickNode()
node = node.children[char]
node.output.append(word)
self._built = False
def add_words(self, words):
for word in words:
self.add_word(word)
def _build_fail_pointers(self):
"""构建失败指针(BFS)"""
queue = deque()
# 初始化第一层节点的失败指针
for char, child in self.root.children.items():
child.fail = self.root
queue.append(child)
while queue:
current = queue.popleft()
for char, child in current.children.items():
queue.append(child)
# 寻找失败指针
fail_node = current.fail
while fail_node is not None and char not in fail_node.children:
fail_node = fail_node.fail
if fail_node is None:
child.fail = self.root
else:
child.fail = fail_node.children[char]
# 合并输出
child.output.extend(child.fail.output)
def search(self, text):
"""搜索文本中的所有敏感词"""
if not self._built:
self._build_fail_pointers()
self._built = True
result = []
node = self.root
for i, char in enumerate(text):
# 沿着失败指针查找匹配
while node is not self.root and char not in node.children:
node = node.fail
if char in node.children:
node = node.children[char]
else:
node = self.root
# 收集匹配结果
for word in node.output:
result.append({
'word': word,
'position': i - len(word) + 1
})
return result
def contains_sensitive_word(self, text):
return len(self.search(text)) > 0
def replace_sensitive_words(self, text, replace_char='*'):
matches = self.search(text)
if not matches:
return text
result = list(text)
for match in matches:
start = match['position']
end = start + len(match['word'])
for i in range(start, end):
result[i] = replace_char
return ''.join(result)四、第三方库方案
4.1 使用better_profanity
from better_profanity import profanity
# 初始化
profanity.load_censor_words()
# 检测
text = "You are a fool"
if profanity.contains_profanity(text):
print("包含敏感词")
# 替换
censored_text = profanity.censor(text)
print(censored_text) # You are a ****
# 自定义敏感词库
custom_badwords = ['badword1', 'badword2']
profanity.load_censor_words(custom_badwords)4.2 使用ahocorasick库
import ahocorasick
class PyAhocorasickFilter:
def __init__(self):
self.automaton = ahocorasick.Automaton()
self._built = False
def add_words(self, words):
for idx, word in enumerate(words):
self.automaton.add_word(word, (idx, word))
self._built = False
def build(self):
self.automaton.make_automaton()
self._built = True
def search(self, text):
if not self._built:
self.build()
result = []
for end_index, (idx, word) in self.automaton.iter(text):
start_index = end_index - len(word) + 1
result.append({
'word': word,
'start': start_index,
'end': end_index
})
return result4.3 其他相关库
| 库名 | 特点 | 适用场景 |
|---|---|---|
profanity-check | 基于机器学习 | 需要语义理解 |
alt-profanity-check | 轻量级 | 简单场景 |
ngram-profanity | 支持模糊匹配 | 变形词检测 |
五、高级特性实现
5.1 支持跳过干扰字符
class AdvancedFilter:
def __init__(self, skip_chars=None):
self.skip_chars = skip_chars or {' ', '-', '_', '.'}
self.trie_filter = TrieSensitiveWordFilter()
def _normalize(self, text):
"""标准化文本,去除干扰字符"""
return ''.join(c for c in text if c not in self.skip_chars)
def contains_sensitive_word(self, text):
normalized = self._normalize(text)
return self.trie_filter.contains_sensitive_word(normalized)5.2 支持拼音/谐音检测
# 使用 pypinyin 库处理拼音
from pypinyin import lazy_pinyin
class PinyinSensitiveFilter:
def __init__(self):
self.pinyin_map = {}
def add_word(self, word):
# 同时添加中文和拼音形式的敏感词
self.trie_filter.add_word(word)
pinyin = ''.join(lazy_pinyin(word))
self.trie_filter.add_word(pinyin)5.3 支持英文大小写/变体处理
class CaseInsensitiveFilter:
def __init__(self):
self.trie_filter = TrieSensitiveWordFilter()
def add_word(self, word):
# 添加小写版本
self.trie_filter.add_word(word.lower())
def contains_sensitive_word(self, text):
return self.trie_filter.contains_sensitive_word(text.lower())六、性能对比
| 方案 | 时间复杂度 | 空间复杂度 | 1000词响应时间 | 适用场景 |
|---|---|---|---|---|
| 直接遍历 | O(n*m) | O(m) | ~50ms | 小词库 |
| 正则表达式 | O(n*m) | O(m) | ~30ms | 中等词库 |
| Trie树 | O(n) | O(m*k) | ~5ms | 大词库 |
| AC自动机 | O(n) | O(m*k) | ~3ms | 超大词库 |
| better_profanity | O(n) | O(m) | ~2ms | 通用场景 |
注:n为文本长度,m为敏感词数量,k为平均词长
七、完整实战示例
class SensitiveWordDetectionSystem:
"""完整的敏感词检测系统"""
def __init__(self, word_file=None, skip_chars=None):
self.filter = AhoCorasickFilter()
self.skip_chars = skip_chars or {' ', '.', '-', '_', '*', '#'}
if word_file:
self.load_words_from_file(word_file)
def load_words_from_file(self, file_path):
"""从文件加载敏感词库"""
with open(file_path, 'r', encoding='utf-8') as f:
words = [line.strip() for line in f if line.strip()]
self.filter.add_words(words)
def preprocess_text(self, text):
"""预处理文本(移除干扰字符)"""
# 可选:移除干扰字符
# text = ''.join(c for c in text if c not in self.skip_chars)
# 可选:转小写
text = text.lower()
return text
def detect(self, text):
"""检测文本"""
processed = self.preprocess_text(text)
return self.filter.search(processed)
def audit(self, text, strict=True):
"""内容审核"""
matches = self.detect(text)
result = {
'is_safe': len(matches) == 0,
'sensitive_words': [m['word'] for m in matches],
'count': len(matches),
'suggested_action': 'block' if strict and len(matches) > 0 else 'review'
}
return result
def censor(self, text, replace_char='*'):
"""敏感词脱敏"""
processed = self.preprocess_text(text)
matches = self.filter.search(processed)
if not matches:
return text
# 需要映射回原始文本的位置
result = list(text)
# 简化版:直接替换(实际应用需要处理位置映射)
for match in matches:
# 这里简化处理
pass
return self.filter.replace_sensitive_words(processed, replace_char)
# 使用示例
if __name__ == "__main__":
# 初始化系统
system = SensitiveWordDetectionSystem()
system.filter.add_words(['敏感词', '不良信息', '违规内容'])
# 测试文本
test_texts = [
"这是一条正常的消息",
"这里包含敏感词,需要处理",
"用户发送了不良信息内容"
]
for text in test_texts:
result = system.audit(text)
print(f"文本: {text}")
print(f"审核结果: {result}")
print("-" * 50)八、敏感词库管理建议
8.1 词库分类
sensitive_words/ ├── political.txt # 政治敏感 ├── porn.txt # 色情低俗 ├── violence.txt # 暴力恐怖 ├── abuse.txt # 人身攻击 └── spam.txt # 垃圾广告
8.2 词库维护策略
- 定期更新:建立词库更新机制
- 分级管理:按敏感程度分级处理
- 白名单机制:支持误杀词的快速恢复
- 版本控制:记录词库变更历史
九、总结与建议
9.1 方案选择指南
- 小型项目(<100词):使用正则或直接遍历即可
- 中型项目(100-10000词):推荐Trie树方案
- 大型项目(>10000词):必须使用AC自动机
- 快速开发:使用better_profanity等成熟库
9.2 注意事项
- 性能优化:使用缓存、异步处理、批量检测
- 误杀控制:建立白名单和人工审核机制
- 编码问题:统一使用UTF-8编码
- 变形词:需要考虑拼音、谐音、数字替代等
- 多语言:中英文混合检测需要特殊处理
9.3 扩展方向
- 结合NLP语义理解,减少误杀
- 支持图片中的文字检测(OCR)
- 实时监控和告警系统
- 用户行为分析和分级
通过合理选择和优化敏感词检测方案,可以有效保障平台内容安全,同时保持良好的用户体验。
以上就是Python实现敏感词检测的多种方案的详细内容,更多关于Python敏感词检测方案的资料请关注脚本之家其它相关文章!


最新评论