杭州移動(dòng)公司網(wǎng)站各種手藝培訓(xùn)班
協(xié)作相關(guān)
src/magentic_ui/
├── teams/
│ ├── orchestrator/
│ │ ├── _group_chat.py # GroupChat 類定義,管理參與者的狀態(tài)
│ │ ├── _orchestrator.py # Orchestrator 類定義,維護(hù)對(duì)話的狀態(tài)
│ │ └── orchestrator_config.py # 相關(guān)的配置
│ └── roundrobin_orchestrator.py # 輪詢狀態(tài)的管理
├── agents/
│ ├── file_surfer/
│ │ └── _file_surfer.py
│ ├── web_surfer/
│ │ └── _web_surfer.py
│ └── users/
│ ├── _dummy_user_proxy.py
│ └── _metadata_user_proxy.py
├── backend/
│ ├── teammanager/
│ └── _teammanager.py
└── task_team.py
核心文件的核心函數(shù)
-
_group_chat.py:
-
__init__:智能體團(tuán)隊(duì)的創(chuàng)建
-
-
_orchestrator.py:
-
__init__:協(xié)調(diào)器的初始化
-
_orchestrate_step_planning:進(jìn)行規(guī)劃
-
_orchestrate_step_execution:分配合適的智能體進(jìn)行執(zhí)行
-
handle_agent_response:處理智能體的響應(yīng)(例如:某個(gè)Agent:我已完成任務(wù),xxx)
-
handle_start:將所有的消息轉(zhuǎn)發(fā)給智能體,消息流轉(zhuǎn)的入口點(diǎn)
-
-
_file_surfer.py:
-
on_messages:消息的轉(zhuǎn)化
-
on_messages_stream:自己的智能體消息的響應(yīng)
-
-
_web_surfer.py:
-
on_messages:消息的轉(zhuǎn)化
-
on_messages_stream:自己的智能體消息的響應(yīng)
-
_handle_action:處理具體的網(wǎng)頁操作,返回操作結(jié)果消息
-
_handle_tool_call:處理工具調(diào)用,返回工具調(diào)用結(jié)果消息
-
-
_dummy_user_proxy.py:
-
on_messages:消息的轉(zhuǎn)化
-
on_messages_stream:自己的智能體消息的響應(yīng)
-
-
_metadata_user_proxy.py:
-
on_messages:消息的轉(zhuǎn)化
-
on_messages_stream:自己的智能體消息的響應(yīng)
-
-
_teammanager.py:
-
run_stream:團(tuán)隊(duì)的運(yùn)行相關(guān),頂級(jí)
-
-
task_team.py:
-
get_task_team:創(chuàng)建智能體的實(shí)例
-
關(guān)系的一些區(qū)分處理:
handle_start 和 _orchestrate_step_execution之間的關(guān)系:
-
消息的流轉(zhuǎn)過程
-
handle_start 是入口點(diǎn),負(fù)責(zé)接收原始消息并轉(zhuǎn)發(fā)給所有智能體.
-
消息添加到message_histroy中保存
-
調(diào)用_orchestrate_step 開始協(xié)調(diào)過程,是進(jìn)行計(jì)劃還是分發(fā)給智能體執(zhí)行。
-
_orchestrate_step_execution 處理的是經(jīng)過的具體消息,也就是planning后的消息
-
-
執(zhí)行流程
-
handle_start 啟動(dòng)整個(gè)對(duì)話流程
-
_orchestrate_step 根據(jù)當(dāng)前狀態(tài)決定是進(jìn)入規(guī)劃模式還是執(zhí)行模式
-
orchestrate_step_execution 負(fù)責(zé)具體的執(zhí)行階段
-
-
通過self._state進(jìn)行信息的共享
智能體之間的協(xié)作:
初始化階段
-
智能體的創(chuàng)建(src\magentic_ui\task_team.py)下的get_task_team
async def get_task_team(...):# 1. 創(chuàng)建各個(gè)智能體實(shí)例file_surfer = FileSurfer(name="file_surfer",model_client=model_client_file_surfer,work_dir=paths.internal_run_dir,bind_dir=paths.external_run_dir,model_context_token_limit=magentic_ui_config.model_context_token_limit,approval_guard=approval_guard,)# 2.創(chuàng)建并注冊(cè)智能體team = GroupChat(participants=[web_surfer, user_proxy, coder_agent, file_surfer],orchestrator_config=orchestrator_config,model_client=model_client_orch,memory_provider=memory_provider,)# 3. 初始化團(tuán)隊(duì)await team.lazy_init()return team
-
團(tuán)隊(duì)的創(chuàng)建(src\magentic_ui\teams\orchestrator\_group_chat.py)下的GroupChat,需要有參與的智能體,模型的客戶端,協(xié)調(diào)器
class GroupChat(BaseGroupChat, Component[GroupChatConfig]):def __init__(self,participants: List[ChatAgent],model_client: ChatCompletionClient,orchestrator_config: OrchestratorConfig,...):# 1. 調(diào)用父類初始化super().__init__(participants,group_chat_manager_name="Orchestrator",group_chat_manager_class=Orchestrator,...)# 2. 初始化內(nèi)部變量self._orchestrator_config = orchestrator_configself._model_client = model_clientself._message_factory = MessageFactory()self._memory_provider = memory_provider
- ?協(xié)調(diào)器的初始化(src\magentic_ui\teams\orchestrator\_orchestrator.py)下的Orchestrato
class Orchestrator(BaseGroupChatManager):def __init__(self,name: str,group_topic_type: str,output_topic_type: str,participant_topic_types: List[str],participant_names: List[str],participant_descriptions: List[str],...):# 初始化基礎(chǔ)屬性self._model_client = model_clientself._config = configself._user_agent_topic = "user_proxy"self._web_agent_topic = "web_surfer"
消息相關(guān)
團(tuán)隊(duì)相關(guān)
teammanager.py VS task_team.py VS _orchestrator.py
-
teammanager.py是管理團(tuán)隊(duì)的生命周期
-
task_team.py只是團(tuán)隊(duì)的創(chuàng)建,不管生命周期
-
_orchestrator.py僅是團(tuán)隊(duì)的內(nèi)部協(xié)調(diào),進(jìn)行通信
用戶相關(guān)
用戶代理是如何加入到團(tuán)隊(duì)中?
# src\magentic_ui\task_team.py
# 用戶代理的創(chuàng)建
user_proxy: DummyUserProxy | MetadataUserProxy | UserProxyAgentif magentic_ui_config.user_proxy_type == "dummy":user_proxy = DummyUserProxy(name="user_proxy")elif magentic_ui_config.user_proxy_type == "metadata":assert (magentic_ui_config.task is not None), "Task must be provided for metadata user proxy"assert (magentic_ui_config.hints is not None), "Hints must be provided for metadata user proxy"assert (magentic_ui_config.answer is not None), "Answer must be provided for metadata user proxy"user_proxy = MetadataUserProxy(name="user_proxy",description="Metadata User Proxy Agent",task=magentic_ui_config.task,helpful_task_hints=magentic_ui_config.hints,task_answer=magentic_ui_config.answer,model_client=model_client_orch,)else:user_proxy_input_func = make_agentchat_input_func(input_func)user_proxy = UserProxyAgent(description=USER_PROXY_DESCRIPTION,name="user_proxy",input_func=user_proxy_input_func,)# 輪詢代理if websurfer_loop_team:# simplified team of only the web surferteam = RoundRobinGroupChat(participants=[web_surfer, user_proxy],max_turns=10000,)await team.lazy_init()return team# 由LLM智能選擇team = GroupChat(participants=[web_surfer, user_proxy, coder_agent, file_surfer],orchestrator_config=orchestrator_config,model_client=model_client_orch,memory_provider=memory_provider,)await team.lazy_init()return team
_metadata_user_proxy.py為例,它是創(chuàng)建復(fù)雜的用戶代理,另一個(gè)是創(chuàng)建簡(jiǎn)單的用戶代理
async def on_messages_stream(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken
) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | Response, None]:"""處理輸入消息并生成響應(yīng)流這是核心方法,處理所有消息邏輯"""# 1. 嚴(yán)格模式下的提示重寫if (self.how_helpful == "strict"and self.helpful_task_hintsand self.helpful_task_hints != "Helpful hints are not available for this task."and self.rewritten_helpful_task_hints is None):self.rewritten_helpful_task_hints = await self._rewrite_helpful_hints(cancellation_token)# 2. 消息處理chat_messages = thread_to_context(list(messages),agent_name=self.name,is_multimodal=self._model_client.model_info["vision"],)self._chat_history.extend(chat_messages)# 3. 階段判斷if ("type" in messages[-1].metadataand messages[-1].metadata["type"] == "plan_message"):self.have_encountered_plan_message = Trueself.in_planning_phase = Trueelse:if self.have_encountered_plan_message:self.in_planning_phase = Falseelse:self.in_planning_phase = True# 4. 模型上下文準(zhǔn)備await self._model_context.clear()system_message = SystemMessage(content=self._get_system_message())await self._model_context.add_message(system_message)# 5. 添加聊天歷史for msg in self._chat_history:await self._model_context.add_message(msg)# 6. 獲取token限制的歷史記錄token_limited_history = await self._model_context.get_messages()# 7. 規(guī)劃階段處理if self.in_planning_phase:if self.simulated_user_type in ["co-planning", "co-planning-and-execution"]:if (self.max_co_planning_rounds is Noneor self.current_co_planning_round < self.max_co_planning_rounds):# 生成規(guī)劃響應(yīng)result = await self._model_client.create(messages=token_limited_history,cancellation_token=cancellation_token,)yield Response(chat_message=TextMessage(content=result.content,source=self.name,metadata={"co_planning_round": str(self.current_co_planning_round),"user_plan_reply": "llm","helpful_task_hints": self.rewritten_helpful_task_hintsif self.rewritten_helpful_task_hintselse self.helpful_task_hints,},),inner_messages=[],)self.current_co_planning_round += 1else:# 達(dá)到最大規(guī)劃輪次yield Response(chat_message=TextMessage(content="accept",source=self.name,metadata={"co_planning_round": str(self.current_co_planning_round),"user_plan_reply": "accept",},),inner_messages=[],)else:# 非協(xié)作規(guī)劃模式yield Response(chat_message=TextMessage(content="accept",source=self.name,metadata={"co_planning_round": str(self.current_co_planning_round),"user_plan_reply": "accept",},),inner_messages=[],)# 8. 執(zhí)行階段處理else:if self.simulated_user_type in ["co-execution", "co-planning-and-execution"]:if (self.max_co_execution_rounds is Noneor self.current_co_execution_round < self.max_co_execution_rounds):# 生成執(zhí)行響應(yīng)result = await self._model_client.create(messages=token_limited_history,cancellation_token=cancellation_token,)yield Response(chat_message=TextMessage(content=result.content,source=self.name,metadata={"user_execution_reply": "llm"},),inner_messages=[],)self.current_co_execution_round += 1else:# 達(dá)到最大執(zhí)行輪次yield Response(chat_message=TextMessage(content="I don't know, you figure it out, don't ask me again.",source=self.name,),)else:# 非協(xié)作執(zhí)行模式yield Response(chat_message=TextMessage(content="I don't know, you figure it out, don't ask me again.",source=self.name,metadata={"user_execution_reply": "idk"},),)
感覺像用戶代理這塊,不需要進(jìn)行LLM的調(diào)用,我認(rèn)為的消息流程是:用戶輸入-->用戶代理-->Orchestrator-->智能體-->LLM-->(智能體-->LLM)可能多個(gè)-->Orchesatrator-->用戶代理-->用戶界面的顯示。由于在智能體那塊以及調(diào)用了LLM,有了確切的解決,我認(rèn)為正確是:
-
用戶代理只負(fù)責(zé)消息的傳遞和顯示
-
Orchestrator負(fù)責(zé)任務(wù)的規(guī)劃和分配(交給智能體)
-
智能體負(fù)責(zé)調(diào)用LLM和處理任務(wù)
-
響應(yīng)通過用戶代理直接顯示給用戶
原始消息從哪里來的,怎么傳到handle_start?
@rpcasync def handle_start(self, message: GroupChatStart, ctx: MessageContext) -> None:
# @rpc 是遠(yuǎn)程過程調(diào)用,運(yùn)行函數(shù)被遠(yuǎn)程調(diào)用,就像調(diào)用本地函數(shù)一樣
# message 來自于外部的調(diào)用.venv\Lib\site-packages\autogen_agentchat\teams\_group_chat\_base_group_chat.py
文件中的 run_stream 方法,就是負(fù)責(zé)將初始值(初始消息)通過 GroupChatStart 消息傳遞
給群聊管理器(GroupChatManager)的地方。具體流程如下:'''
對(duì)第一點(diǎn)的self.team的解釋,實(shí)際上self.team就是代指GroupChat
路徑:src\magentic_ui\teams\orchestrator\_group_chat.py下的run_stream誰去調(diào)用?
不會(huì)直接 new 一個(gè) GroupChat 然后直接 run_stream,而是通過 TeamManager 統(tǒng)一管理
GroupChat.run_stream 的實(shí)際調(diào)用,主要是通過 TeamManager 的 self.team.run_stream
實(shí)現(xiàn)的,這就是 “間接調(diào)用”
(本文件的)14,15,16行在src\magentic_ui\backend\teammanager\teammanager.py下的run_stream 調(diào)用
的313到315行代碼中async for message in self.team.run_stream(task=task, cancellation_token=cancellation_token):
其中的self.team.run_stream也就代指了GroupChat.run_stream
src\magentic_ui\backend\teammanager\teammanager.py 的 run_stream 由
src\magentic_ui\backend\web\managers\connection.py 下start_stream方法的run_stream調(diào)用
'''
1.調(diào)用 self.team.run_stream(task=...),其中 task 可以是字符串、消息對(duì)象、
消息列表,或者為 None。2.run_stream 方法會(huì)把 task 轉(zhuǎn)換成 messages,然后封裝到 GroupChatStart(
messages=messages) 這個(gè)消息對(duì)象里。(路徑:.venv\Lib\site-packages\autogen_agentchat\teams\_group_chat\_base_group_chat.py)if task is None:# 如果 task 是 None,則 messages 也是 Nonepasselif isinstance(task, str):# 如果 task 是字符串,就會(huì)被包裝成一個(gè) TextMessage,放到 messages 列表里messages = [TextMessage(content=task, source="user")]elif isinstance(task, BaseChatMessage):# 如果 task 是單個(gè)消息對(duì)象,也會(huì)被放到 messages 列表里。messages = [task]elif isinstance(task, list):# 如果 task 是消息對(duì)象列表,則直接賦值給 messages。...# 也就是把 messages 作為參數(shù),封裝進(jìn) GroupChatStart 消息對(duì)象里 499行await self._runtime.send_message(GroupChatStart(messages=messages),recipient=AgentId(type=self._group_chat_manager_topic_type, key=self._team_id),cancellation_token=cancellation_token,)3.通過 self._runtime.send_message(...),把 GroupChatStart
消息發(fā)送給群聊管理器(GroupChatManager)。
# .venv/Lib/site-packages/autogen_agentchat/teams/_group_chat/_base_group_chat.py 中的run_stream方法await self._runtime.send_message(GroupChatStart(messages=messages),recipient=AgentId(type=self._group_chat_manager_topic_type, key=self._team_id),cancellation_token=cancellation_token,)
# recipient=AgentId(type=self._group_chat_manager_topic_type, key=self._team_id)
# 明確指定了接收者就是群聊管理器, _group_chat_manager_topic_type 就是Orchestrator這個(gè)群聊管理器實(shí)例4.群聊管理器收到 GroupChatStart 后,會(huì)調(diào)用 handle_start 方法,正式啟動(dòng)群聊流程。
# 對(duì)于 GroupChatStart 消息,自動(dòng)調(diào)用handle_start方法,message參數(shù)就是剛收到的 GroupChatStart消息
self.team
是一個(gè)團(tuán)隊(duì)對(duì)象,類型是GroupChat
,在業(yè)務(wù)層(比如 WebSocket、CLI、API)不會(huì)直接操作 GroupChat
,而是通過 TeamManager
這個(gè)中間層來操作團(tuán)隊(duì)。現(xiàn)在到了GroupChat.run_stream
了,它內(nèi)部繼承了.venv\Lib\site-packages\autogen_agentchat\teams\_group_chat\_base_group_chat.py
下的run_stream
,這個(gè)就是上面代碼框中描述的第二點(diǎn)?,F(xiàn)在把消息封裝到GroupChatStart
-
WebSocket 路由/管理器調(diào)用 WebSocketManager.start_stream
-
WebSocketManager.start_stream 調(diào)用 TeamManager.run_stream
-
TeamManager.run_stream 內(nèi)部調(diào)用 self.team.run_stream
-
self.team 實(shí)際上就是 GroupChat 或其子類的實(shí)例
-
最終執(zhí)行到 GroupChat.run_stream