域名 網(wǎng)站名稱手機(jī)系統(tǒng)流暢神器
LangChain自動化工作流實戰(zhàn)教程:從任務(wù)編排到智能決策
一、LangChain工作流引擎核心概念
1.1 工作流自動化設(shè)計模式
1.2 核心組件介紹
from langchain_core.runnables import (RunnableLambda,RunnableParallel,RunnableBranch
)
from langchain.agents import AgentExecutor, Tool
from langchain.memory import ConversationBufferWindowMemory
二、環(huán)境配置與基礎(chǔ)搭建
2.1 快速安裝
pip install langchain langchain-openai langchain-experimental
pip install duckduckgo-search # 用于網(wǎng)絡(luò)搜索工具
2.2 最小化工作流示例
from langchain_core.runnables import RunnablePassthrough# 定義處理節(jié)點
def step1(input):return {"output1": input["input"] + " processed"}def step2(input):return {"final": input["output1"].upper()}# 構(gòu)建工作流
workflow = (RunnablePassthrough.assign(output1=step1)| step2
)# 執(zhí)行工作流
workflow.invoke({"input": "test"})
三、復(fù)雜工作流編排實戰(zhàn)
3.1 條件分支工作流
from langchain_core.runnables import RunnableBranch# 定義分支條件
def route_condition(data):if data["type"] == "A":return "path_a"return "path_b"# 定義各分支處理
def path_a(data):return {"result": "Handled by A"}def path_b(data):return {"result": "Handled by B"}# 構(gòu)建分支工作流
branch = RunnableBranch((lambda x: route_condition(x) == "path_a", path_a),path_b
)# 執(zhí)行分支工作流
branch.invoke({"type": "B"})
3.2 并行執(zhí)行工作流
# 定義并行任務(wù)
def fetch_news(data):return {"news": "最新AI動態(tài)..."}def fetch_weather(data):return {"weather": "晴,25℃"}# 構(gòu)建并行流
parallel_workflow = RunnableParallel(news=fetch_news,weather=fetch_weather
)# 執(zhí)行并行流
parallel_workflow.invoke({})
四、集成外部工具與Agent
4.1 工具集成示例
from langchain.agents import Tool
from langchain.tools import DuckDuckGoSearchResults# 創(chuàng)建工具集
search = DuckDuckGoSearchResults()
tools = [Tool(name="web_search",func=search.run,description="用于查詢實時信息"),Tool(name="calculator",func=lambda x: str(eval(x)),description="用于數(shù)學(xué)計算")
]# 創(chuàng)建代理工作流
from langchain.agents import create_react_agentagent = create_react_agent(llm=ChatOpenAI(model="gpt-3.5-turbo"),tools=tools,prompt=prompt
)
4.2 自動化審批流程案例
def approval_workflow(doc):# 文檔分類classifier = RunnableLambda(classify_document)# 并行執(zhí)行檢查checks = RunnableParallel(legal_review=legal_check,finance_review=finance_check)# 最終決策decision = RunnableLambda(make_decision)return ({"document": doc}| classifier| checks| decision)# 執(zhí)行審批流
result = approval_workflow.invoke(contract_doc)
五、狀態(tài)管理與錯誤處理
5.1 工作流狀態(tài)跟蹤
from langchain_core.runnables import RunnableConfigdef log_step(data, config):print(f"Step {config.get('step')} executed")return dataworkflow_with_log = (RunnableLambda(log_step).with_config({"step": 1})| RunnableLambda(process_data).with_config({"step": 2})
)
5.2 異常處理機(jī)制
from langchain_core.runnables import RunnableLambdadef safe_operation(data):try:return risky_operation(data)except Exception as e:return {"error": str(e)}workflow = RunnableLambda(safe_operation)
六、生產(chǎn)級部署方案
6.1 工作流服務(wù)化部署
from fastapi import FastAPI
from concurrent.futures import ThreadPoolExecutorapp = FastAPI()
executor = ThreadPoolExecutor(max_workers=4)@app.post("/run-workflow")
async def run_workflow(input_data: dict):future = executor.submit(workflow.invoke, input_data)return {"status": "started", "task_id": future.task_id}
6.2 性能優(yōu)化技巧
# 異步執(zhí)行工作流
async def async_workflow(data):return await workflow.ainvoke(data)# 緩存中間結(jié)果
from langchain.cache import InMemoryCache
langchain.llm_cache = InMemoryCache()
七、典型應(yīng)用場景案例
7.1 智能客服工單系統(tǒng)
7.2 自動化數(shù)據(jù)分析流程
def analysis_workflow():return (load_data| clean_data| RunnableParallel(stats=calculate_statistics,trends=identify_trends)| generate_report)
最佳實踐與常見問題
8.1 調(diào)試建議
# 可視化工作流
print(workflow.get_graph().draw_mermaid())# 分步調(diào)試
for step in workflow.stream(input_data):print("Intermediate:", step)
8.2 性能優(yōu)化矩陣
優(yōu)化方向 | 實施方法 | 預(yù)期提升 |
---|---|---|
并行化 | 使用RunnableParallel | 30-50% |
緩存 | 實現(xiàn)LLM結(jié)果緩存 | 40-70% |
批處理 | 使用batch_invoke方法 | 3-5x |
異步執(zhí)行 | 使用ainvoke替代invoke | 2-3x |
完整項目示例:
git clone https://github.com/example/langchain-automation-demo
cd langchain-automation-demo
python finance_approval_workflow.py
提示:本教程基于LangChain 0.1.x版本,實際開發(fā)時請參考官方文檔獲取最新API變更。對于企業(yè)級應(yīng)用,建議結(jié)合Airflow或Prefect等調(diào)度系統(tǒng)實現(xiàn)復(fù)雜工作流管理。