網(wǎng)站改版的宣傳詞湖南網(wǎng)絡(luò)優(yōu)化服務(wù)
系列文章索引
LangChain教程 - 系列文章
在現(xiàn)代自然語(yǔ)言處理(NLP)中,基于文檔內(nèi)容的問(wèn)答系統(tǒng)變得愈發(fā)重要,尤其是當(dāng)我們需要從大量文檔中提取信息時(shí)。通過(guò)結(jié)合文檔檢索和生成模型(如RAG,Retrieval-Augmented Generation),我們可以構(gòu)建強(qiáng)大的問(wèn)答系統(tǒng)。本博客將詳細(xì)介紹如何使用FastAPI和LangChain框架,創(chuàng)建一個(gè)基于PDF文檔的RAG問(wèn)答API。
一、背景
在許多實(shí)際應(yīng)用中,用戶可能需要基于大量的PDF文件進(jìn)行快速的問(wèn)答查詢。LangChain作為一個(gè)強(qiáng)大的框架,支持將各種數(shù)據(jù)源與生成模型集成,而FastAPI則是一個(gè)輕量級(jí)的Web框架,適用于構(gòu)建高性能的API。在本案例中,我們將使用FastAPI作為API服務(wù)端,LangChain來(lái)處理文檔加載、文本切分、向量存儲(chǔ)和問(wèn)答生成任務(wù)。
二、技術(shù)棧
- FastAPI:用于構(gòu)建Web服務(wù)。
- LangChain:提供構(gòu)建問(wèn)答系統(tǒng)的工具,涉及文檔加載、文本切分、向量存儲(chǔ)、RAG鏈構(gòu)建等功能。
- Ollama Embeddings:用于將文本轉(zhuǎn)換為向量。
- Chroma:用于存儲(chǔ)和檢索文本向量的數(shù)據(jù)庫(kù)。
- Starlette:FastAPI的底層庫(kù),用于支持流式響應(yīng)。
三、實(shí)現(xiàn)步驟
1. 環(huán)境配置
首先,我們需要安裝必需的庫(kù)。你可以通過(guò)以下命令來(lái)安裝:
pip install fastapi langchain langchain-chroma langchain-ollama langchain-community starlette uvicorn nest_asyncio
安裝完畢后,我們可以開(kāi)始構(gòu)建我們的API。
2. 加載PDF并處理文本
我們從一個(gè)PDF文件加載文檔,并將其切分成適合處理的小塊。這樣可以更高效地將文本轉(zhuǎn)化為向量,并存儲(chǔ)到數(shù)據(jù)庫(kù)中。
from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter# 設(shè)置PDF文件路徑
pdf_path = "../../files/pdf/en/Transformer.pdf"# 加載PDF文檔并分割文本
loader = PyPDFLoader(pdf_path)
docs = loader.load()# 使用遞歸文本切分器來(lái)切分文檔
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
splits = text_splitter.split_documents(docs)
在這段代碼中,PyPDFLoader
被用來(lái)加載PDF文件,而RecursiveCharacterTextSplitter
則將文檔切分為多個(gè)小塊,確保每個(gè)塊之間有200個(gè)字符的重疊,以便保持上下文的連貫性。
3. 存儲(chǔ)向量到數(shù)據(jù)庫(kù)
接下來(lái),我們使用Chroma
來(lái)存儲(chǔ)文檔的向量表示。我們利用OllamaEmbeddings
模型將文本塊轉(zhuǎn)化為向量,并將它們存儲(chǔ)在Chroma數(shù)據(jù)庫(kù)中。
from langchain_chroma import Chroma
from langchain_ollama import OllamaEmbeddings# 存儲(chǔ)分割后的文檔到向量數(shù)據(jù)庫(kù)
vectorstore = Chroma.from_documents(documents=splits, embedding=OllamaEmbeddings(model="nomic-embed-text"))
這里,我們通過(guò)OllamaEmbeddings
將文檔切分塊轉(zhuǎn)換為嵌入向量,并使用Chroma
將這些向量存儲(chǔ)到數(shù)據(jù)庫(kù)中。這樣,我們就可以通過(guò)相似度檢索來(lái)快速找到與用戶查詢相關(guān)的文檔。
4. 構(gòu)建檢索器
為了支持從數(shù)據(jù)庫(kù)中檢索相關(guān)文檔,我們將構(gòu)建一個(gè)基于相似度搜索的檢索器。
# 構(gòu)建檢索器
retriever = vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": 3})
這段代碼通過(guò)as_retriever
方法創(chuàng)建了一個(gè)檢索器,能夠基于向量的相似度從數(shù)據(jù)庫(kù)中返回與查詢最相關(guān)的文檔。
5. 定義RAG鏈
我們使用RAG(檢索增強(qiáng)生成)技術(shù),將檢索到的文檔與生成模型(如ChatOllama
)結(jié)合,生成最終的答案。hub.pull("rlm/rag-prompt")
方法提供了一個(gè)預(yù)定義的RAG提示模板。
from langchain import hub
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_ollama import ChatOllama# 定義RAG提示模板
prompt = hub.pull("rlm/rag-prompt")# 格式化檢索到的文檔
def format_docs(docs):return "\n\n".join(doc.page_content for doc in docs)# 定義RAG鏈
rag_chain = ({"context": retriever | format_docs, "question": RunnablePassthrough()}| prompt| ChatOllama(model="deepseek-r1:7b")| StrOutputParser()
)
在這里,我們將檢索到的文檔內(nèi)容格式化為字符串,并將其與用戶的查詢一起傳遞到生成模型中,以生成最終的答案。
6. 生成答案和流式響應(yīng)
我們定義了兩個(gè)方法:一個(gè)是生成完整答案,另一個(gè)是生成流式響應(yīng)。
import json# 生成答案函數(shù)
async def generate_answer(question: str):response = await rag_chain.ainvoke(question)return response# 生成流式響應(yīng)
async def generate_streaming_response(question: str):async for chunk in rag_chain.astream(question): # 使用astream逐塊獲取響應(yīng)yield json.dumps({"answer chunk": chunk}) + "\n" # 按流式返回每一塊內(nèi)容
在這部分代碼中,generate_answer
方法會(huì)返回完整的答案,而generate_streaming_response
方法則返回流式響應(yīng),每次返回一個(gè)內(nèi)容塊。
7. 創(chuàng)建FastAPI應(yīng)用
最后,我們使用FastAPI創(chuàng)建一個(gè)Web應(yīng)用,提供一個(gè)POST接口來(lái)接收用戶查詢,并返回答案。
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from starlette.responses import StreamingResponse# 創(chuàng)建FastAPI應(yīng)用
app = FastAPI()# 定義輸入模型
class QueryModel(BaseModel):question: strstream: bool = False # 默認(rèn)不流式返回# 創(chuàng)建POST路由處理查詢
@app.post("/query/")
async def query_question(query: QueryModel):try:if query.stream:# 如果stream為True,使用流式響應(yīng)return StreamingResponse(generate_streaming_response(query.question), media_type="text/json")else:# 否則直接返回完整答案answer = await generate_answer(query.question) # 使用await獲取完整的答案return {"answer": answer}except Exception as e:raise HTTPException(status_code=500, detail=str(e))
此API提供了一個(gè)接口,用戶可以通過(guò)發(fā)送帶有問(wèn)題的POST請(qǐng)求來(lái)獲取答案。如果請(qǐng)求中指定stream
為True
,系統(tǒng)將返回流式的答案。
8. 啟動(dòng)(jupyter)
notebooks下載地址: https://github.com/flower-trees/langchain-example/blob/master/pdf/jupyter/chat_pdf_api.ipynb
import nest_asyncio
import uvicorn
nest_asyncio.apply()
uvicorn.run(app, host="127.0.0.1", port=8000)
9. 提問(wèn)
curl -X POST http://127.0.0.1:8000/query/ \-H "Content-Type: application/json" \-d '{"question": "Why is masking necessary in the decoder’s self-attention mechanism?","stream": true}'
10. 清理向量數(shù)據(jù)庫(kù)
vectorstore.delete_collection()
四、完整代碼實(shí)例
代碼下載地址: https://github.com/flower-trees/langchain-example/blob/master/pdf/chat_pdf_api.py
import json
from contextlib import asynccontextmanagerfrom fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from langchain import hub
from langchain_chroma import Chroma
from langchain_community.document_loaders import PyPDFLoader
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_ollama import ChatOllama, OllamaEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
from starlette.responses import StreamingResponse# 設(shè)置 PDF 文件路徑
pdf_path = "../files/pdf/en/Transformer.pdf"# 加載 PDF 文檔并分割文本
loader = PyPDFLoader(pdf_path)
docs = loader.load()
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
splits = text_splitter.split_documents(docs)# 存儲(chǔ)分割后的文檔到向量數(shù)據(jù)庫(kù)
vectorstore = Chroma.from_documents(documents=splits, embedding=OllamaEmbeddings(model="nomic-embed-text"))# 構(gòu)建檢索器
retriever = vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": 3})# 定義 RAG 提示模板
prompt = hub.pull("rlm/rag-prompt")# 格式化檢索到的文檔
def format_docs(docs):return "\n\n".join(doc.page_content for doc in docs)# 定義 RAG 鏈
rag_chain = ({"context": retriever | format_docs, "question": RunnablePassthrough()}| prompt| ChatOllama(model="deepseek-r1:7b")| StrOutputParser()
)print("RAG ready")# 生成答案函數(shù)
async def generate_answer(question: str):response = await rag_chain.ainvoke(question)return response# 生成流式響應(yīng)
async def generate_streaming_response(question: str):async for chunk in rag_chain.astream(question): # 使用 astream 逐塊獲取響應(yīng)yield json.dumps({"answer chunk": chunk}) + "\n" # 按流式返回每一塊內(nèi)容# 8. 清理向量數(shù)據(jù)庫(kù)
def clear_vectorstore():vectorstore.delete_collection()@asynccontextmanager
async def lifespan(app: FastAPI):# 在應(yīng)用啟動(dòng)時(shí)執(zhí)行的代碼yield# 在應(yīng)用關(guān)閉時(shí)執(zhí)行的代碼clear_vectorstore()print("Vectorstore cleaned up successfully!")# 創(chuàng)建 FastAPI 應(yīng)用
app = FastAPI(lifespan=lifespan)# 定義輸入模型
class QueryModel(BaseModel):question: strstream: bool = False # 默認(rèn)不流式返回# 創(chuàng)建 POST 路由處理查詢
@app.post("/query/")
async def query_question(query: QueryModel):try:if query.stream:# 如果 `stream` 為 True,使用流式響應(yīng)return StreamingResponse(generate_streaming_response(query.question), media_type="text/json")else:# 否則直接返回完整答案answer = await generate_answer(query.question) # 使用 await 獲取完整的答案return {"answer": answer}except Exception as e:raise HTTPException(status_code=500, detail=str(e))# 啟動(dòng) FastAPI 應(yīng)用(適用于開(kāi)發(fā)環(huán)境)
# uvicorn chat_pdf_api:app --reload
五、總結(jié)
通過(guò)本教程,我們展示了如何使用FastAPI和LangChain框架,結(jié)合檢索增強(qiáng)生成(RAG)技術(shù),構(gòu)建一個(gè)基于PDF文檔的問(wèn)答系統(tǒng)。系統(tǒng)支持兩種查詢方式:普通的完整答案返回和流式答案返回。借助LangChain提供的強(qiáng)大工具集,我們能夠輕松地實(shí)現(xiàn)文檔加載、文本切分、向量存儲(chǔ)與檢索等功能。FastAPI則讓我們能夠高效地將這些功能封裝為一個(gè)Web API,供用戶使用。
在實(shí)際應(yīng)用中,這種基于文檔的問(wèn)答系統(tǒng)可以廣泛應(yīng)用于客戶支持、知識(shí)庫(kù)管理、教育培訓(xùn)等領(lǐng)域,為用戶提供智能化的答案生成服務(wù)。