基于FastAPI与LangChain开发Excel智能数据分析API详解

 更新时间:2025年10月14日 08:45:57   作者:东方佑  
本文将详细介绍如何使用FastAPI和LangChain构建一个支持流式响应的Excel智能数据分析API,实现对结构化数据的自然语言查询与对话式分析,需要的可以了解下

一、项目概述与核心功能

在现代数据驱动决策的环境中,让非技术用户能够通过自然语言与数据进行交互变得越来越重要。本项目开发了一个基于RESTful API的智能数据分析服务,具有以下核心功能:

  • 自然语言查询: 用户可以使用日常语言提问关于Excel数据的问题
  • 多会话管理: 支持多用户同时使用,各自维护独立的对话历史
  • 流式响应: 提供真正的流式输出体验,大幅提升用户体验
  • Excel数据支持: 支持读取和解析多工作表的Excel文件
  • 智能代理: 基于LangChain构建的Pandas数据分析代理

二、技术架构与工具选择

本项目采用了以下技术栈,每项技术都承担着特定角色:

技术组件版本要求职责说明
FastAPI>=0.68.0高性能Web框架,提供API服务
LangChain>=0.12.0智能代理和链式处理核心
Pandas>=1.3.0DataFrame数据处理和分析
Uvicorn>=0.15.0ASGI服务器,用于运行FastAPI
Tongyi Qwen-大语言模型,提供自然语言理解能力

技术选型理由

  • FastAPI:凭借其异步支持、自动文档生成和高性能特性,成为Python API开发的首选
  • LangChain:提供了与大语言模型集成的标准化方式,简化了代理创建和管理
  • Pandas:是Python数据分析的事实标准库,完美处理Excel文件
  • 流式响应:通过Server-Sent Events(SSE)实现真正流式输出,提升用户体验

三、核心实现细节

3.1 FastAPI应用初始化

首先创建FastAPI应用实例,并设置元数据信息:

from fastapi import FastAPI

app = FastAPI(
    title="Excel Data Analysis API",
    description="基于Pandas DataFrame的数据分析API",
    version="1.0.0"
)

FastAPI基于Python类型提示自动生成API文档,支持OpenAPI标准,开发者可以通过/docs/redoc端点访问交互式文档。

3.2 数据加载与代理初始化

项目启动时自动加载Excel数据并初始化LangChain代理:

@app.on_event("startup")
async def startup_event():
    """应用启动时初始化模型和代理"""
    try:
        initialize_agent()
        print("模型和代理初始化成功")
    except Exception as e:
        print(f"初始化失败: {e}")
        raise

数据加载过程支持多工作表Excel文件,自动合并所有工作表到一个DataFrame中:

def initialize_agent():
    global llm, pandas_agent
    
    # 读取Excel文件中的所有工作表
    excel_file = pd.ExcelFile(path)
    all_sheets = {}
    for sheet_name in excel_file.sheet_names:
        all_sheets[sheet_name] = pd.read_excel(excel_file, sheet_name=sheet_name)
    
    # 合并所有工作表
    df = pd.concat(all_sheets.values(), ignore_index=True)

这种方法确保了无论Excel文件的结构如何,都能完整地加载所有数据。

3.3 LangChain代理创建

使用LangChain的create_pandas_dataframe_agent创建专门用于Pandas数据处理的AI代理:

pandas_agent = create_pandas_dataframe_agent(
    llm,  # 通义千问模型实例
    df,   # 合并后的DataFrame
    verbose=True,  # 启用详细日志
    agent_type="zero-shot-react-description",  # 代理类型
    allow_dangerous_code=True,  # 允许执行代码
    max_iterations=5  # 最大迭代次数
)

这种代理结合了大语言模型的语言理解能力和Pandas的数据处理能力,能够理解自然语言查询并将其转换为DataFrame操作。

3.4 流式响应实现

为实现真正的流式输出,创建了自定义回调处理器:

class StreamingCallbackHandler(BaseCallbackHandler):
    """自定义回调处理器,用于实现真正的流式输出"""

    def __init__(self):
        self.tokens = []
        self.finished = False

    def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
        """当LLM生成新token时调用"""
        self.tokens.append(token)

    def on_llm_end(self, response: Any, **kwargs: Any) -> None:
        """当LLM生成结束时调用"""
        self.finished = True

流式响应接口使用Server-Sent Events(SSE)技术:

@app.post("/chat", response_model=ChatResponse)
async def chat_with_data(request: ChatRequest):
    # ... 省略其他代码 ...
    
    if request.stream:
        return StreamingResponse(
            true_stream_response(full_input, session_history, request.session_id),
            media_type="text/event-stream",
            headers={
                "Cache-Control": "no-cache",
                "Connection": "keep-alive",
            }
        )

这种方式相比传统响应,能够实时地将生成的token发送给客户端,大幅减少用户感知的延迟。

3.5 会话管理机制

为实现多用户支持,实现了基于session_id的会话管理:

conversation_history = {}

# 在聊天接口中维护会话历史
session_history = conversation_history.get(request.session_id, [])
session_history.append(request.message)
conversation_history[request.session_id] = session_history

还提供了清除历史记录的端点:

@app.delete("/clear_history/{session_id}")
async def clear_history(session_id: str):
    """清除指定会话的历史记录"""
    if session_id in conversation_history:
        del conversation_history[session_id]
    return {"message": f"会话 {session_id} 的历史记录已清除"}

这种设计允许不同用户或不同对话线程保持独立的上下文历史。

四、API端点设计

本项目实现了以下RESTful端点:

端点方法描述参数
/chatPOST主聊天接口,支持流式和非流式响应session_id, message, history_length, stream
/clear_history/{session_id}DELETE清除指定会话的历史记录session_id
/healthGET健康检查端点
/stream_testGET流式接口测试端点

主要请求和响应模型

class ChatRequest(BaseModel):
    session_id: str  # 会话ID,用于区分不同用户的对话历史
    message: str  # 用户消息
    history_length: Optional[int] = 5  # 历史消息长度,默认为5
    stream: Optional[bool] = False  # 是否使用流式响应

class ChatResponse(BaseModel):
    session_id: str
    response: str
    success: bool
    error: Optional[str] = None

五、部署与性能优化

5.1 生产环境部署

使用Uvicorn作为ASGI服务器部署应用:

uvicorn main:app --host 0.0.0.0 --port 9113 --workers 4 --timeout-keep-alive 300

参数说明

  • --workers 4:启动4个工作进程,充分利用多核CPU
  • --timeout-keep-alive 300:保持连接超时时间设置为300秒
  • --host 0.0.0.0:监听所有网络接口

5.2 性能优化建议

  • 缓存机制:引入Redis缓存频繁查询的结果
  • 数据库持久化:使用Redis或数据库持久化会话历史,替代内存存储
  • 负载均衡:在多服务器部署时使用Nginx进行负载均衡
  • 异步处理:确保所有I/O操作都使用异步方式,避免阻塞事件循环

六、应用场景与扩展方向

6.1 典型应用场景

  • 企业数据分析:让业务人员通过自然语言查询销售数据、业绩指标等
  • 学术研究:研究人员快速探索实验数据,发现潜在模式
  • 财务报表分析:自动解析财务数据,回答关于收入、支出和利润的问题
  • 客户支持:分析客户反馈数据,识别常见问题和趋势

6.2 扩展方向

  • 多数据源支持:扩展支持数据库、CSV、JSON等更多数据格式
  • 可视化集成:将分析结果以图表形式返回,增强数据表现力
  • 权限控制:增加用户认证和授权,控制数据访问权限
  • 预定义查询模板:提供常用查询模板,降低用户学习成本
  • 批量处理:支持批量查询和异步处理大量分析任务

七、总结与展望

本项目展示了如何将FastAPI、LangChain和大语言模型结合,构建一个功能强大的智能数据分析API。关键优势包括:

  • 自然语言交互:降低了数据查询的技术门槛
  • 流式响应:大幅提升了用户体验,减少等待时间
  • 多会话支持:适合多用户并发使用场景
  • 易于扩展:模块化设计方便后续功能扩展

未来发展方向

  • 支持更多数据源和格式
  • 增加数据可视化能力
  • 实现更复杂的对话状态管理
  • 提供查询建议和自动补全功能

通过这种技术组合,我们能够将先进的大语言模型能力转化为实用的企业级应用,真正实现"用自然语言与数据对话"的愿景。

完整代码

# main.py
import pandas as pd
from langchain_community.chat_models.tongyi import ChatTongyi
from langchain_experimental.agents import create_pandas_dataframe_agent
import os
from fastapi import FastAPI, HTTPException, Request
from pydantic import BaseModel
from typing import List, Optional
from fastapi.responses import StreamingResponse
import asyncio
import json
import time
from langchain.callbacks.base import BaseCallbackHandler
from typing import Any, Dict, List
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

# 初始化 FastAPI 应用
app = FastAPI(title="Excel Data Analysis API", description="基于 Pandas DataFrame 的数据分析 API")

# 全局变量存储模型和代理
llm = None
pandas_agent = None
conversation_history = {}


class StreamingCallbackHandler(BaseCallbackHandler):
    """自定义回调处理器,用于实现真正的流式输出"""

    def __init__(self):
        self.tokens = []
        self.finished = False

    def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
        """当LLM生成新token时调用"""
        self.tokens.append(token)

    def on_llm_end(self, response: Any, **kwargs: Any) -> None:
        """当LLM生成结束时调用"""
        self.finished = True

    def get_tokens(self):
        """获取已生成的tokens"""
        return self.tokens


class ChatRequest(BaseModel):
    session_id: str  # 会话ID,用于区分不同用户的对话历史
    message: str  # 用户消息
    history_length: Optional[int] = 5  # 历史消息长度,默认为5
    stream: Optional[bool] = False  # 是否使用流式响应


class ChatResponse(BaseModel):
    session_id: str
    response: str
    success: bool
    error: Optional[str] = None


def initialize_agent():
    """初始化模型和数据代理"""
    global llm, pandas_agent

    path = r'./data/1.xlsx'

    # 检查文件是否存在
    if not os.path.exists(path):
        raise FileNotFoundError(f"文件未找到:{path}")

    # 读取 Excel 中的所有工作表
    excel_file = pd.ExcelFile(path)
    all_sheets = {}
    for sheet_name in excel_file.sheet_names:
        all_sheets[sheet_name] = pd.read_excel(excel_file, sheet_name=sheet_name)

    # 合并所有工作表到一个DataFrame中
    df = pd.concat(all_sheets.values(), ignore_index=True)

    os.environ["DASHSCOPE_API_KEY"] = 'sk-50254f6d4df1ab3c9baf30093c4e'

    llm = ChatTongyi(
        model="qwen-max-latest",
        temperature=0.4,
        streaming=True  # 启用流式输出
    )

    # 创建excel_agent
    pandas_agent = create_pandas_dataframe_agent(
        llm,
        df,
        verbose=True,
        agent_type="zero-shot-react-description",
        allow_dangerous_code=True,
        max_iterations=5
    )


@app.on_event("startup")
async def startup_event():
    """应用启动时初始化模型和代理"""
    try:
        initialize_agent()
        print("模型和代理初始化成功")
    except Exception as e:
        print(f"初始化失败: {e}")
        raise


@app.post("/chat", response_model=ChatResponse)
async def chat_with_data(request: ChatRequest):
    """与数据进行对话(支持流式和非流式响应)"""
    global pandas_agent, conversation_history

    if not pandas_agent:
        raise HTTPException(status_code=500, detail="模型未初始化")

    try:
        # 获取或创建会话历史
        session_history = conversation_history.get(request.session_id, [])

        # 构建历史文本
        history_text = "\n".join(session_history[-request.history_length:])

        # 构造带上下文的输入
        if history_text:
            full_input = f"聊天历史:{history_text},当前问题:{request.message},请根据历史和当前问题,不要截断输出,展示全部内容,不要总结,严格按照查询内容输出,不要多余输出,如果无法确定指代对象,请询问用户澄清,并且不要编造。".strip()
        else:
            full_input = request.message

        # 如果请求流式响应
        if request.stream:
            return StreamingResponse(
                true_stream_response(full_input, session_history, request.session_id),
                media_type="text/event-stream",
                headers={
                    "Cache-Control": "no-cache",
                    "Connection": "keep-alive",
                }
            )

        # 非流式响应
        response = pandas_agent.invoke({"input": full_input})
        output = response['output'] if isinstance(response, dict) else str(response)

        # 更新会话历史
        session_history.append(request.message)
        conversation_history[request.session_id] = session_history

        return ChatResponse(
            session_id=request.session_id,
            response=output,
            success=True
        )

    except Exception as e:
        if request.stream:
            # 对于流式请求,返回流式错误响应
            async def error_stream():
                yield f"data: {json.dumps({'error': str(e)})}\n\n"

            return StreamingResponse(error_stream(), media_type="text/event-stream")
        else:
            return ChatResponse(
                session_id=request.session_id,
                response="",
                success=False,
                error=str(e)
            )


async def true_stream_response(input_text: str, session_history: list, session_id: str):
    """真正的流式响应生成器"""
    global pandas_agent, conversation_history

    try:
        # 使用回调处理器捕获流式输出
        callback_handler = StreamingCallbackHandler()

        # 在后台运行代理处理
        async def run_agent():
            try:
                # 调用代理,传入回调处理器
                response = await asyncio.get_event_loop().run_in_executor(
                    None,
                    lambda: pandas_agent.invoke(
                        {"input": input_text},
                        {"callbacks": [callback_handler]}
                    )
                )

                # 确保结束标记被设置
                callback_handler.finished = True

            except Exception as e:
                print(f"代理执行错误: {e}")
                callback_handler.finished = True

        # 启动代理任务
        agent_task = asyncio.create_task(run_agent())

        # 流式输出循环
        last_token_count = 0
        start_time = time.time()
        max_wait_time = 60  # 最大等待时间60秒

        while not callback_handler.finished and (time.time() - start_time) < max_wait_time:
            current_tokens = callback_handler.get_tokens()

            # 如果有新token,发送给客户端
            if len(current_tokens) > last_token_count:
                for i in range(last_token_count, len(current_tokens)):
                    token_data = {
                        "token": current_tokens[i],
                        "type": "token"
                    }
                    yield f"data: {json.dumps(token_data)}\n\n"

                last_token_count = len(current_tokens)

            # 短暂等待后继续检查
            await asyncio.sleep(0.1)

        # 如果超时,发送错误信息
        if (time.time() - start_time) >= max_wait_time:
            error_data = {"error": "请求超时", "type": "error"}
            yield f"data: {json.dumps(error_data)}\n\n"
        else:
            # 发送完成标记
            done_data = {"done": True, "type": "done"}
            yield f"data: {json.dumps(done_data)}\n\n"

            # 更新会话历史
            current_message = input_text.split("当前问题:")[1].split(",")[
                0] if "当前问题:" in input_text else input_text
            session_history.append(current_message)
            conversation_history[session_id] = session_history

        # 等待代理任务完成(如果还未完成)
        if not agent_task.done():
            agent_task.cancel()

    except Exception as e:
        error_data = {"error": f"流式处理错误: {str(e)}", "type": "error"}
        yield f"data: {json.dumps(error_data)}\n\n"


@app.delete("/clear_history/{session_id}")
async def clear_history(session_id: str):
    """清除指定会话的历史记录"""
    if session_id in conversation_history:
        del conversation_history[session_id]
    return {"message": f"会话 {session_id} 的历史记录已清除"}


@app.get("/health")
async def health_check():
    """健康检查接口"""
    return {
        "status": "healthy",
        "model_loaded": pandas_agent is not None,
        "active_sessions": len(conversation_history)
    }


@app.get("/stream_test")
async def stream_test():
    """测试流式接口"""

    async def generate_test_data():
        for i in range(10):
            yield f"data: 测试消息 {i}\n\n"
            await asyncio.sleep(1)

    return StreamingResponse(generate_test_data(), media_type="text/event-stream")


if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app, host="0.0.0.0", port=9113)  

以上就是基于FastAPI与LangChain开发Excel智能数据分析API详解的详细内容,更多关于FastAPI开发Excel数据分析API的资料请关注脚本之家其它相关文章!

相关文章

  • Python利用3D引擎做一个太阳系行星模拟器

    Python利用3D引擎做一个太阳系行星模拟器

    Python有一个不错的3D引擎——Ursina。本文就来利用Ursina这一3D引擎做一个太阳系行星模拟器,感兴趣的小伙伴可以跟随小编一起学习一下
    2023-01-01
  • 利用Python自动生成PPT的示例详解

    利用Python自动生成PPT的示例详解

    在日常工作中,PPT制作是常见的工作。这篇文章主要为大家详细介绍了如何利用Python自动生成PPT,文中的示例代码讲解详细,感兴趣的可以了解一下
    2022-07-07
  • Python数据持久化shelve模块用法分析

    Python数据持久化shelve模块用法分析

    这篇文章主要介绍了Python数据持久化shelve模块用法,结合实例形式较为详细的总结分析了shelve模块的功能、原理及简单使用方法,需要的朋友可以参考下
    2018-06-06
  • Python将8位的图片转为24位的图片实现方法

    Python将8位的图片转为24位的图片实现方法

    这篇文章主要介绍了Python将8位的图片转为24位的图片的实现代码,非常不错,具有一定的参考借鉴价值,需要的朋友可以参考下
    2018-10-10
  • 关于使用Python的time库制作进度条程序

    关于使用Python的time库制作进度条程序

    这篇文章主要介绍了关于使用Python的time库制作进度条程序,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-04-04
  • Python机器学习算法之决策树算法的实现与优缺点

    Python机器学习算法之决策树算法的实现与优缺点

    决策树(Decision Tree)是一种基本的分类与回归方法,这篇文章主要给大家介绍了关于Python机器学习算法之决策树算法实现与优缺点的相关资料,需要的朋友们下面随着小编来一起学习学习吧
    2021-05-05
  • Python爬虫实战项目掌握酷狗音乐的加密过程

    Python爬虫实战项目掌握酷狗音乐的加密过程

    在常见的几个音乐网站里,酷狗可以说是最好爬取的啦,什么弯都没有,所以最适合小白入门爬虫,本篇针对爬虫零基础的小白,所以每一步骤我都截图并详细解释了,其实我自己看着都啰嗦,归根到底就是两个步骤的请求,还请大佬绕路勿喷
    2021-09-09
  • Python wxPython库Core组件BoxSizer用法示例

    Python wxPython库Core组件BoxSizer用法示例

    这篇文章主要介绍了Python wxPython库Core组件BoxSizer用法,结合实例形式分析了wxPython BoxSizer布局管理相关使用方法及操作注意事项,需要的朋友可以参考下
    2018-09-09
  • python中利用xml.dom模块解析xml的方法教程

    python中利用xml.dom模块解析xml的方法教程

    这篇文章主要给大家介绍了关于python中利用xml.dom模块解析xml的方法教程,文中通过示例代码介绍的非常详细,相信对大家具有一定的参考学习价值,需要的朋友们下面来一起看看吧。
    2017-05-05
  • 使用Python进行Excel文件xls/xlsx/xsv格式互相转换

    使用Python进行Excel文件xls/xlsx/xsv格式互相转换

    本文介绍了如何使用Python进行Excel文件格式的互相转换,包括xls到xlsx、xlsx到xls、xls到csv、xlsx到csv、csv到xls以及csv到xlsx的转换方法,转换过程中需要注意文件路径的修改以及文件名冲突的处理,需要的朋友可以参考下
    2025-11-11

最新评论