可信網(wǎng)站標(biāo)志上海網(wǎng)站快速排名提升
上下文(Context)狀態(tài)
到目前為止,在我們的示例中,我們一直通過(guò)自定義事件的屬性在各個(gè)步驟之間傳遞數(shù)據(jù)。這是一種強(qiáng)大的數(shù)據(jù)傳遞方式,但它也存在一些局限性。例如,如果你想在那些并非直接相連的步驟之間傳遞數(shù)據(jù),你就需要通過(guò)中間的所有步驟來(lái)傳遞這些數(shù)據(jù)。這會(huì)使你的代碼變得更難閱讀和維護(hù)。
為避免這一問(wèn)題,我們?cè)诠ぷ髁髦械拿總€(gè)步驟都可以使用一個(gè) Context(上下文) 對(duì)象。要使用它,只需在你的步驟函數(shù)中聲明一個(gè)類型為 Context 的參數(shù)即可。以下是具體的做法:
我們需要一個(gè)新的導(dǎo)入內(nèi)容,即 Context 類型:
from llama_index.core.workflow import (StartEvent,StopEvent,Workflow,step,Event,Context,
)
現(xiàn)在,我們定義一個(gè) start 事件,它會(huì)檢查是否有數(shù)據(jù)已經(jīng)被加載到上下文中。如果沒(méi)有,它將返回一個(gè) QueryAccountEvent,觸發(fā)用于加載數(shù)據(jù)的 step_query_account步驟,并循環(huán)回到 start。
然后在 step_query_account中,我們可以直接從上下文中訪問(wèn)數(shù)據(jù),而無(wú)需顯式傳遞。在生成式 AI 應(yīng)用中,這種方法對(duì)于加載索引和其他大型數(shù)據(jù)操作非常有用。
import asynciofrom llama_index.core.workflow import (StartEvent,StopEvent,Workflow,step,Event,Context,
)
from llama_index.utils.workflow import draw_all_possible_flowsclass QueryAccountEvent(Event):payload: strclass SaveAccountEvent(Event):payload: strclass AccountWorkflow(Workflow):@stepasync def start(self, ctx: Context, ev: StartEvent)-> QueryAccountEvent | SaveAccountEvent:# 從上下文中獲取數(shù)據(jù),獲取賬戶信息db_account = await ctx.get("account", default=None)# 如果賬戶不存在,則從數(shù)據(jù)庫(kù)中查詢賬戶信息if db_account is None:return QueryAccountEvent(payload=f"{ev.payload}--->未查詢到賬戶信息,執(zhí)行查詢節(jié)點(diǎn): step_query_account")return SaveAccountEvent(payload=f"{ev.payload}--->從ctx中獲取到了張三的賬戶,跳轉(zhuǎn)到節(jié)點(diǎn): step_save_account")@stepasync def step_query_account(self, ctx: Context, ev: QueryAccountEvent)-> StartEvent:"""查詢張三賬戶信息"""await ctx.set("account", {"name": "張三", "balance": 0.0})return StartEvent(payload=f"{ev.payload}--->查詢張三賬戶信息,并保證到ctx中,然后跳轉(zhuǎn)到開(kāi)始節(jié)點(diǎn): start")@stepasync def step_save_account(self, ctx: Context, ev: SaveAccountEvent)-> StopEvent:account = await ctx.get("account")account['balance'] += 1000000000await ctx.set("account", account)return StopEvent(result=f"{ev.payload}--->工作流完成,賬戶信息: {account}")# 運(yùn)行工作流
async def run_workflow():w = AccountWorkflow(timeout=10, verbose=False)result = await w.run(payload="給張三的賬戶打一個(gè)小目標(biāo)")print(result)if __name__ == '__main__':asyncio.run(run_workflow())# 工作流可視化工具
draw_all_possible_flows(AccountWorkflow, filename="multi_step_workflow.html") # type: ignore
運(yùn)行結(jié)果:
給張三的賬戶打一個(gè)小目標(biāo)--->未查詢到賬戶信息,執(zhí)行查詢節(jié)點(diǎn): step_query_account--->查詢張三賬戶信息,并保證到ctx中,然后跳轉(zhuǎn)到開(kāi)始節(jié)點(diǎn): start--->從ctx中獲取到了張三的賬戶,跳轉(zhuǎn)到節(jié)點(diǎn): step_save_account--->工作流完成,賬戶信息: {'name': '張三', 'balance': 1000000000.0}
接下來(lái),我們將學(xué)習(xí)如何從正在進(jìn)行的工作流中流式傳輸事件。
流式傳輸事件
工作流可能會(huì)很復(fù)雜——它們被設(shè)計(jì)用來(lái)處理復(fù)雜的、有分支的、并發(fā)的邏輯——這意味著它們可能需要一定時(shí)間才能完全執(zhí)行。為了給用戶提供良好的體驗(yàn),你可能希望在事件發(fā)生時(shí)通過(guò)流式傳輸事件來(lái)提供進(jìn)度反饋。工作流在上下文(Context)對(duì)象上對(duì)此提供了內(nèi)置支持。
為了完成這個(gè)任務(wù),我們先引入所有需要的依賴項(xiàng):
from llama_index.core.workflow import (StartEvent,StopEvent,Workflow,step,Event,Context,
)
import asyncio
from llama_index.llms.openai import OpenAI
from llama_index.utils.workflow import draw_all_possible_flows
讓我們?yōu)橐粋€(gè)簡(jiǎn)單的三步工作流設(shè)置一些事件,并額外添加一個(gè)事件用于在執(zhí)行過(guò)程中流式傳輸我們的進(jìn)度:
class FirstEvent(Event):first_output: strclass SecondEvent(Event):second_output: strresponse: strclass ProgressEvent(Event):msg: str
然后定義一個(gè)會(huì)發(fā)送事件的工作流類:
class MyWorkflow(Workflow):@stepasync def step_one(self, ctx: Context, ev: StartEvent)-> FirstEvent:ctx.write_event_to_stream(ProgressEvent(msg='第一步正在進(jìn)行中'))return FirstEvent(first_output="第一步完成")@stepasync def step_two(self, ctx: Context, ev: FirstEvent)-> SecondEvent:llms = MyLLMsClients.deepseek_client()generator = await llms.astream_complete("寫兩句關(guān)于春天的詩(shī)句")response = ""async for message in generator:response += message.delta# 允許工作流流式傳輸此段響應(yīng)。ctx.write_event_to_stream(ProgressEvent(msg=str(response)))return SecondEvent(second_output="第二步完成,附上完整回復(fù)",response=str(response),)@stepasync def step_three(self, ctx: Context, ev: SecondEvent)-> StopEvent:ctx.write_event_to_stream(ProgressEvent(msg="第三步正在進(jìn)行中"))return StopEvent(result="工作流完成")
在 step_one 和 step_three 中,我們將單個(gè)事件寫入事件流。在 step_two 中,我們使用 stream_complete 生成一個(gè)可迭代的生成器來(lái)獲取大語(yǔ)言模型(LLM)的響應(yīng),然后在 LLM 返回給我們的每一塊數(shù)據(jù)(大致每個(gè)詞一塊)時(shí)生成一個(gè)對(duì)應(yīng)的事件,最后再將完整的結(jié)果返回給 step_three。
要實(shí)際獲取這些輸出,我們需要異步運(yùn)行工作流并監(jiān)聽(tīng)事件,如下所示:
async def run_workflow():w = MyWorkflow(timeout=60, verbose=True)handler = w.run(first_input="開(kāi)始工作流")async for ev in handler.stream_events():if isinstance(ev, ProgressEvent):print(ev.msg)final_result = await handlerprint("最終結(jié)果:", final_result)if __name__ == '__main__':asyncio.run(run_workflow())
run 會(huì)在后臺(tái)運(yùn)行工作流,而 stream_events 將提供寫入事件流中的每一個(gè)事件。當(dāng)流返回一個(gè) StopEvent 時(shí),事件流結(jié)束,之后你可以像往常一樣獲取工作流的最終結(jié)果。
完整代碼
from llama_index.core.workflow import (StartEvent,StopEvent,Workflow,step,Event,Context,
)
import asyncio
from llama_index.llms.openai import OpenAI
from llama_index.utils.workflow import draw_all_possible_flowsfrom my_llms.MyLLMsClients import MyLLMsClientsclass FirstEvent(Event):first_output: strclass SecondEvent(Event):second_output: strresponse: strclass ProgressEvent(Event):msg: strclass MyWorkflow(Workflow):@stepasync def step_one(self, ctx: Context, ev: StartEvent)-> FirstEvent:ctx.write_event_to_stream(ProgressEvent(msg='第一步正在進(jìn)行中'))return FirstEvent(first_output="第一步完成")@stepasync def step_two(self, ctx: Context, ev: FirstEvent)-> SecondEvent:llms = MyLLMsClients.deepseek_client()generator = await llms.astream_complete("寫兩句關(guān)于春天的詩(shī)句")response = ""async for message in generator:response += message.delta# 允許工作流流式傳輸此段響應(yīng)。ctx.write_event_to_stream(ProgressEvent(msg=str(response)))return SecondEvent(second_output="第二步完成,附上完整回復(fù)",response=str(response),)@stepasync def step_three(self, ctx: Context, ev: SecondEvent)-> StopEvent:ctx.write_event_to_stream(ProgressEvent(msg="第三步正在進(jìn)行中"))return StopEvent(result="工作流完成")async def run_workflow():w = MyWorkflow(timeout=60, verbose=True)handler = w.run(first_input="開(kāi)始工作流")async for ev in handler.stream_events():if isinstance(ev, ProgressEvent):print(ev.msg)final_result = await handlerprint("最終結(jié)果:", final_result)if __name__ == '__main__':asyncio.run(run_workflow())# 工作流可視化工具
draw_all_possible_flows(MyWorkflow, filename="multi_step_workflow.html")
運(yùn)行結(jié)果:
Running step step_one
Step step_one produced event FirstEvent
Running step step_two
第一步正在進(jìn)行中
Step step_two produced event SecondEvent
1. **"小桃灼灼柳鬖鬖,春色滿江南。"**(黃庭堅(jiān)《訴衷情》)
2. **"等閑識(shí)得東風(fēng)面,萬(wàn)紫千紅總是春。"**(朱熹《春日》) 兩聯(lián)皆以簡(jiǎn)練筆墨勾勒春景,一繪桃柳江南,一寫東風(fēng)爛漫,各具生機(jī)。
Running step step_three
Step step_three produced event StopEvent
第三步正在進(jìn)行中
最終結(jié)果: 工作流完成
流程圖