Python中实现JWT认证的完整指南(从生成到验证)

 更新时间:2025年06月24日 09:14:31   作者:盛夏绽放  
JSON Web Tokens (JWT) 是现代 Web 开发中广泛使用的身份验证机制,本文将用生动的方式带你全面了解 JWT 在 Python 中的实现,包括生成、验证和各种相关方法,需要的朋友可以参考下

引言

JSON Web Tokens (JWT) 是现代 Web 开发中广泛使用的身份验证机制。本文将用生动的方式带你全面了解 JWT 在 Python 中的实现,包括生成、验证和各种相关方法,通过丰富的比喻、表格和流程图帮助你彻底掌握 JWT。

一、JWT 是什么?为什么需要它?

想象你去游乐园,入园时会得到一个手环。这个手环:

  • 包含信息:显示你的门票类型(VIP/普通)
  • 防伪设计:有特殊图案难以伪造
  • 有效期:只在当天有效

JWT 就是这样的数字手环:

游乐园手环JWT 令牌
门票类型用户角色
入园时间签发时间(iat)
闭园时间过期时间(exp)
防伪标记数字签名
手环材质加密算法

传统 session 与 JWT 对比

特性SessionJWT
存储位置服务器客户端
扩展性需要共享session天然无状态
跨域支持需要配置原生支持
移动端友好度需处理cookie直接使用header
安全性依赖cookie安全依赖token存储方式
典型场景传统Web应用API/移动应用/微服务

二、JWT 的结构解析

一个 JWT 看起来像这样:xxxxx.yyyyy.zzzzz

就像三明治分三层:

Header (面包上层)

{
  "alg": "HS256",  // 签名算法(HMAC SHA256)
  "typ": "JWT"     // 类型标识
}

Payload (馅料)

{
  "sub": "user123",  // 主题(用户ID)
  "name": "张三",
  "admin": true,
  "iat": 1516239022  // 签发时间
}

Signature (面包下层+防伪标记)

HMACSHA256(
  base64UrlEncode(header) + "." +
  base64UrlEncode(payload),
  密钥
)

生成流程

[Header] → base64编码 → xxxxx
[Payload] → base64编码 → yyyyy
[xxxxx.yyyyy + 密钥] → 签名算法 → zzzzz
最终令牌:xxxxx.yyyyy.zzzzz

三、Python 中实现 JWT

1. 安装 PyJWT 包

pip install pyjwt

2. 生成 JWT

import jwt
import datetime
from datetime import timezone

# 建议从环境变量读取
SECRET_KEY = "your_super_secret_key"

def generate_jwt(user_id: str, username: str, role: str) -> str:
    """生成JWT令牌"""
    payload = {
        "sub": user_id,  # 标准字段:主题
        "name": username,
        "role": role,
        "iat": datetime.datetime.now(tz=timezone.utc),  # 签发时间
        "exp": datetime.datetime.now(tz=timezone.utc) + datetime.timedelta(hours=1)  # 过期时间
    }
    return jwt.encode(payload, SECRET_KEY, algorithm="HS256")

参数详解表

参数类型必填说明示例
payloaddict负载数据{“sub”: “user123”}
keystr签名密钥“secret”
algorithmstr签名算法“HS256”
headersdict额外头部{“kid”: “key1”}

标准声明字段(建议)

字段全称说明示例
subSubject主题(用户ID)“user123”
expExpiration过期时间1735689600
iatIssued At签发时间1735686000
audAudience接收方“mobile-app”
issIssuer签发者“auth-server”

3. 验证 JWT

def verify_jwt(token: str) -> dict:
    """验证JWT令牌"""
    try:
        payload = jwt.decode(
            token, 
            SECRET_KEY, 
            algorithms=["HS256"],
            audience="your-app",  # 验证接收方
            issuer="auth-server"  # 验证签发方
        )
        return payload
    except jwt.PyJWTError as e:
        print(f"Token验证失败: {type(e).__name__}: {e}")
        return None

验证流程示意图

4. 错误处理大全

from fastapi import HTTPException, status

def validate_token(token: str):
    try:
        return jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
    except jwt.ExpiredSignatureError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Token已过期",
            headers={"WWW-Authenticate": "Bearer"}
        )
    except jwt.InvalidTokenError as e:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=f"无效Token: {str(e)}",
            headers={"WWW-Authenticate": "Bearer"}
        )

异常类型表

异常类触发条件HTTP状态码处理建议
ExpiredSignatureErrorToken过期401提示重新登录
InvalidSignatureError签名无效401拒绝访问
InvalidTokenError通用错误401记录日志
MissingRequiredClaimError缺少必要声明400返回错误详情
InvalidIssuerError签发者不匹配403审计日志

四、高级应用场景

1. 双令牌系统(Access + Refresh)

以下是基于你的 JWT 认证流程整理的表格表示:

步骤流程节点条件/判断动作/响应备注
1用户登录提交账号密码系统验证凭证
2验证结果验证成功生成: - access_token(15分钟) - refresh_token(7天)
验证失败返回错误信息HTTP 401
3返回响应返回双token给客户端
4API访问携带access_token验证token有效性
5验证结果access_token有效正常返回请求数据HTTP 200
access_token无效检查是否存在refresh_token
6刷新检查存在有效refresh_token发放新access_tokenHTTP 200 + 新token
无有效refresh_token要求重新登录HTTP 401

详细说明表格:

阶段条件分支系统行为客户端响应HTTP状态码
登录阶段
1.1凭证正确生成双token接收: - access_token - refresh_token200
1.2凭证错误终止流程收到错误提示401
API访问阶段
2.1access_token有效处理请求获取正常数据200
2.2access_token过期检查refresh_token
Token刷新阶段
3.1refresh_token有效发放新access_token获取新token200
3.2refresh_token无效终止流程要求重新登录401

异常处理补充表:

异常情况系统处理客户端表现
access_token格式错误直接拒绝请求收到"无效token"错误
refresh_token过期清除客户端存储跳转登录页面
连续使用过期refresh_token标记为安全事件强制登出所有设备

实现代码:

def generate_token_pair(user_id: str):
    """生成令牌对"""
    access_payload = {
        "sub": user_id,
        "type": "access",
        "exp": datetime.datetime.now(tz=timezone.utc) + datetime.timedelta(minutes=15)
    }
    refresh_payload = {
        "sub": user_id,
        "type": "refresh",
        "exp": datetime.datetime.now(tz=timezone.utc) + datetime.timedelta(days=7)
    }
    
    access_token = jwt.encode(access_payload, SECRET_KEY, algorithm="HS256")
    refresh_token = jwt.encode(refresh_payload, SECRET_KEY, algorithm="HS256")
    
    return {
        "access_token": access_token,
        "refresh_token": refresh_token
    }

def refresh_access_token(refresh_token: str):
    """使用refresh_token获取新access_token"""
    try:
        payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=["HS256"])
        if payload.get("type") != "refresh":
            raise HTTPException(status_code=400, detail="无效的refresh token类型")
        
        new_payload = {
            "sub": payload["sub"],
            "type": "access",
            "exp": datetime.datetime.now(tz=timezone.utc) + datetime.timedelta(minutes=15)
        }
        return jwt.encode(new_payload, SECRET_KEY, algorithm="HS256")
    
    except jwt.PyJWTError as e:
        raise HTTPException(status_code=401, detail=f"refresh token无效: {str(e)}")

2. 与 FastAPI/Django 集成

FastAPI 示例

from fastapi import Depends, FastAPI, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

security = HTTPBearer()

app = FastAPI()

async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
    token = credentials.credentials
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
        return payload
    except jwt.PyJWTError:
        raise HTTPException(status_code=401, detail="无效或过期的token")

@app.get("/protected")
async def protected_route(user: dict = Depends(get_current_user)):
    return {"message": f"你好, {user['sub']}!", "user_data": user}

Django 示例

from django.http import JsonResponse
from functools import wraps

def jwt_required(view_func):
    @wraps(view_func)
    def wrapper(request, *args, **kwargs):
        auth_header = request.headers.get('Authorization')
        if not auth_header or not auth_header.startswith('Bearer '):
            return JsonResponse({"error": "未提供token"}, status=401)
        
        token = auth_header.split(' ')[1]
        try:
            request.user = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
            return view_func(request, *args, **kwargs)
        except jwt.ExpiredSignatureError:
            return JsonResponse({"error": "token已过期"}, status=401)
        except jwt.InvalidTokenError:
            return JsonResponse({"error": "无效token"}, status=401)
    return wrapper

@jwt_required
def protected_view(request):
    return JsonResponse({"data": "受保护的内容", "user": request.user})

五、安全最佳实践

密钥管理

  • 使用环境变量存储密钥
import os
SECRET_KEY = os.getenv("JWT_SECRET_KEY", "fallback-secret")
  • 定期轮换密钥(密钥版本控制)
keys = {
    "2023": "old-secret",
    "2024": "current-secret"
}

增强安全措施

def generate_secure_token(user, ip):
    """生成带IP绑定的token"""
    payload = {
        "sub": user.id,
        "ip": ip,  # 绑定客户端IP
        "jti": str(uuid.uuid4())  # 唯一标识防止重放
    }
    return jwt.encode(payload, SECRET_KEY, algorithm="HS256")

def verify_secure_token(token, ip):
    payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
    if payload.get("ip") != ip:
        raise ValueError("IP地址不匹配")
    return payload

黑名单实现

from redis import Redis

redis = Redis(host='localhost', port=6379)

def revoke_token(jti: str, expire_in: int):
    """将token加入黑名单"""
    redis.setex(f"blacklist:{jti}", expire_in, "revoked")

def is_revoked(jti: str) -> bool:
    """检查是否在黑名单"""
    return bool(redis.exists(f"blacklist:{jti}"))

六、性能优化技巧

选择更快的算法

算法性能比较表(执行时间,越小越好)

算法类型性能指标(单位:ms)备注
HS2561.2对称加密算法
RS2563.8RSA 非对称加密
ES2564.1ECDSA 非对称加密

详细对比表格:

特性对比HS256RS256ES256
算法类型对称加密非对称加密非对称加密
密钥管理共享密钥公钥/私钥公钥/私钥
签名速度⚡️ 1.2ms🐢 3.8ms🐢 4.1ms
验证速度⚡️ 最快🐢 慢🐢 最慢
安全性中等最高
适用场景内部服务公开API金融级应用

减少payload大小

# 不推荐 - payload过大
payload = {**user.__dict__, "iat": ..., "exp": ...}

# 推荐 - 只存储必要信息
payload = {
    "sub": user.id,
    "role": user.role,
    "iat": ...,
    "exp": ...
}

异步签名验证

import asyncio
from jwt import PyJWT

async def async_verify(token: str):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(
        None, 
        lambda: jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
    )

七、完整示例:FastAPI 实现

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
import jwt
import datetime
from datetime import timezone
from typing import Optional

app = FastAPI()
security = HTTPBearer()

# 配置
SECRET_KEY = "your-secret-key-here"  # 生产环境应从环境变量获取
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE = datetime.timedelta(minutes=15)
REFRESH_TOKEN_EXPIRE = datetime.timedelta(days=7)

# 模拟数据库
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "password": "secret123",
        "role": "user"
    },
    "admin": {
        "username": "admin",
        "password": "admin123",
        "role": "admin"
    }
}

class Token(BaseModel):
    access_token: str
    refresh_token: str
    token_type: str

class User(BaseModel):
    username: str
    role: Optional[str] = None

def create_tokens(username: str):
    """创建access和refresh令牌对"""
    access_payload = {
        "sub": username,
        "role": fake_users_db[username]["role"],
        "type": "access",
        "exp": datetime.datetime.now(tz=timezone.utc) + ACCESS_TOKEN_EXPIRE
    }
    refresh_payload = {
        "sub": username,
        "type": "refresh",
        "exp": datetime.datetime.now(tz=timezone.utc) + REFRESH_TOKEN_EXPIRE
    }
    
    access_token = jwt.encode(access_payload, SECRET_KEY, algorithm=ALGORITHM)
    refresh_token = jwt.encode(refresh_payload, SECRET_KEY, algorithm=ALGORITHM)
    
    return {
        "access_token": access_token,
        "refresh_token": refresh_token,
        "token_type": "bearer"
    }

async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
    """依赖项:验证JWT并返回当前用户"""
    token = credentials.credentials
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        if payload.get("type") != "access":
            raise HTTPException(status_code=400, detail="需要access token")
        username = payload.get("sub")
        if username not in fake_users_db:
            raise HTTPException(status_code=404, detail="用户不存在")
        return User(username=username, role=payload.get("role"))
    except jwt.ExpiredSignatureError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Token已过期",
            headers={"WWW-Authenticate": "Bearer"}
        )
    except jwt.InvalidTokenError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="无效Token",
            headers={"WWW-Authenticate": "Bearer"}
        )

@app.post("/login")
async def login(username: str, password: str):
    """登录接口"""
    if username not in fake_users_db or fake_users_db[username]["password"] != password:
        raise HTTPException(status_code=400, detail="用户名或密码错误")
    return create_tokens(username)

@app.post("/refresh")
async def refresh_token(refresh_token: str):
    """刷新access token"""
    try:
        payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=[ALGORITHM])
        if payload.get("type") != "refresh":
            raise HTTPException(status_code=400, detail="需要refresh token")
        return create_tokens(payload.get("sub"))
    except jwt.PyJWTError as e:
        raise HTTPException(status_code=401, detail=str(e))

@app.get("/me")
async def read_users_me(current_user: User = Depends(get_current_user)):
    """获取当前用户信息"""
    return current_user

@app.get("/admin")
async def admin_route(current_user: User = Depends(get_current_user)):
    """管理员专属路由"""
    if current_user.role != "admin":
        raise HTTPException(status_code=403, detail="无权访问")
    return {"message": "欢迎管理员"}

结语

通过本文,你已经掌握了:

  • JWT 的核心原理和结构
  • Python 中生成和验证 JWT 的完整方法
  • 各种高级应用场景和安全实践
  • 与流行框架的集成方式

记住这些黄金法则:

  • 最小化payload:只存储必要信息
  • 保护密钥:像保护银行密码一样保护你的密钥
  • 合理设置有效期:平衡安全性和用户体验
  • 考虑刷新机制:提升用户体验同时保持安全

现在,你已经准备好为你的 Python 应用实现强大的 JWT 认证系统了!

以上就是Python中JWT认证的完整指南(从生成到验证)的详细内容,更多关于Python JWT认证指南的资料请关注脚本之家其它相关文章!

相关文章

  • Python通过内置函数和自写算法DFS实现排列组合

    Python通过内置函数和自写算法DFS实现排列组合

    这篇文章主要介绍了Python通过内置函数和自写算法DFS实现排列组合,排列组合是数学中的一种常见的计算方法,用于求出从给定的元素中选取若干个元素的所有可能的排列或组合。在Python中,有多种方式可以实现排列组合的计算,需要的朋友可以参考下
    2023-05-05
  • 如何利用Python打开txt格式的文件

    如何利用Python打开txt格式的文件

    在机器学习中,常常需要读取txt文本中的数据,这篇文章主要给大家介绍了关于如何利用Pythont打开txt格式的文件的相关资料,文中通过示例代码介绍的非常详解,需要的朋友可以参考下
    2021-10-10
  • python 基于opencv实现高斯平滑

    python 基于opencv实现高斯平滑

    这篇文章主要介绍了python 基于opencv实现高斯平滑,帮助大家更好的理解和使用python处理图片,感兴趣的朋友可以了解下
    2020-12-12
  • Python之py2exe打包工具详解

    Python之py2exe打包工具详解

    下面小编就为大家带来一篇Python之py2exe打包工具详解。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-06-06
  • python文件编译为pyc后运行的实现步骤

    python文件编译为pyc后运行的实现步骤

    本文主要介绍了python文件编译为pyc后运行的实现步骤,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-02-02
  • python读取xml文件方法解析

    python读取xml文件方法解析

    这篇文章主要介绍了python读取xml文件方法解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-08-08
  • 解决pycharm导入本地py文件时,模块下方出现红色波浪线的问题

    解决pycharm导入本地py文件时,模块下方出现红色波浪线的问题

    这篇文章主要介绍了解决pycharm导入本地py文件时,模块下方出现红色波浪线的问题,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-06-06
  • Python使用ElementTree美化XML格式的操作

    Python使用ElementTree美化XML格式的操作

    这篇文章主要介绍了Python使用ElementTree美化XML格式的操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-03-03
  • python利用跳板机ssh远程连接redis的方法

    python利用跳板机ssh远程连接redis的方法

    今天小编就为大家分享一篇python利用跳板机ssh远程连接redis的方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2019-02-02
  • Python基于opencv的简单图像轮廓形状识别(全网最简单最少代码)

    Python基于opencv的简单图像轮廓形状识别(全网最简单最少代码)

    这篇文章主要介绍了基于opencv的简单图像轮廓形状识别(全网最简单最少代码),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-01-01

最新评论