可以做動(dòng)態(tài)影集的網(wǎng)站精準(zhǔn)引流客源的方法可靠嗎
目錄
- asyncio簡(jiǎn)介
- 示例
- 什么是 asyncio?
- 適用場(chǎng)景
- API
- asyncio的使用
- 可等待對(duì)象
- 什么是可等待對(duì)象?
- 協(xié)程對(duì)象
- 任務(wù)對(duì)象
- Future對(duì)象
- 協(xié)程
- 什么是協(xié)程?
- 基本使用
- 運(yùn)行協(xié)程
- Task
- 什么是 Task?
- 創(chuàng)建 Task
- 取消 Task
- Task 異常獲取
- Task 回調(diào)
- TaskGroup
- 什么是 TaskGroup?
- 為什么使用 TaskGroup?
- 創(chuàng)建任務(wù)
- 異常處理
- 同步任務(wù)完成
asyncio簡(jiǎn)介
示例
首先,我們來看一個(gè)簡(jiǎn)單的Hello World示例代碼:
import asyncioasync def main():print('Hello ...')await asyncio.sleep(1)print('... World!')asyncio.run(main())"""
輸出:
Hello ...
... World!
"""
這個(gè)例子展示了如何使用 asyncio
庫來編寫并發(fā)代碼。通過 async
和 await
語法,我們可以讓 Python 程序在執(zhí)行IO操作(如計(jì)算、文件讀寫、網(wǎng)絡(luò)請(qǐng)求等)時(shí)實(shí)現(xiàn)在其他任務(wù)間高效切換,從而提升程序性能。
什么是 asyncio?
asyncio
是一個(gè)用于編寫并發(fā)代碼的Python庫,允許我們利用 async
和 await
關(guān)鍵字進(jìn)行異步編程。作為多個(gè)Python異步框架的基礎(chǔ),asyncio
提供了諸如高性能網(wǎng)絡(luò)和Web服務(wù)器、數(shù)據(jù)庫連接庫以及分布式任務(wù)隊(duì)列等功能。
適用場(chǎng)景
asyncio
非常適合用于 IO密集型和高層次結(jié)構(gòu)化網(wǎng)絡(luò)代碼處理。其高效的異步IO處理方式,使其在需要大量網(wǎng)絡(luò)通信和異步操作的場(chǎng)景中表現(xiàn)優(yōu)異。
API
高級(jí)API用于:
- 并發(fā)運(yùn)行Python協(xié)程,完全控制它們的執(zhí)行;
- 執(zhí)行網(wǎng)絡(luò)IO和進(jìn)程間通信(IPC);
- 控制子進(jìn)程;
- 通過隊(duì)列分發(fā)任務(wù);
- 同步并發(fā)代碼。
低級(jí)API用于(庫和框架開發(fā)人):
- 創(chuàng)建和管理事件循環(huán),提供異步API實(shí)現(xiàn)網(wǎng)絡(luò)通信、運(yùn)行子進(jìn)程、處理OS信號(hào)等;
- 使用 transports 實(shí)現(xiàn)高效率協(xié)議;
- 用異步語法橋接基于回調(diào)的庫和代碼。
asyncio的使用
可等待對(duì)象
什么是可等待對(duì)象?
簡(jiǎn)單來說,可等待對(duì)象是可以在await
表達(dá)式中使用的對(duì)象。它們可以暫停異步函數(shù)的執(zhí)行,等待某個(gè)操作完成后再恢復(fù)執(zhí)行。Python中的可等待對(duì)象主要包括三種類型:
- 協(xié)程對(duì)象(coroutine objects)
- 任務(wù)對(duì)象(tasks)
- Future對(duì)象(futures)
協(xié)程對(duì)象
協(xié)程函數(shù)是定義時(shí)使用async def
語句的函數(shù)。當(dāng)調(diào)用協(xié)程函數(shù)時(shí),會(huì)返回一個(gè)協(xié)程對(duì)象。這些對(duì)象必須在事件循環(huán)中運(yùn)行,可以直接被await
。
import asyncioasync def main():await asyncio.sleep(1)print("Hello, world!")# 運(yùn)行協(xié)程
asyncio.run(main())
在上面的示例中,main()
是一個(gè)協(xié)程函數(shù),調(diào)用它返回一個(gè)協(xié)程對(duì)象。asyncio.run(main())
將運(yùn)行事件循環(huán)并執(zhí)行協(xié)程。
任務(wù)對(duì)象
任務(wù)對(duì)象是對(duì)協(xié)程對(duì)象的進(jìn)一步封裝,被用來“并行的”調(diào)度協(xié)程,它們會(huì)安排協(xié)程在事件循環(huán)中執(zhí)行,并可以跟蹤協(xié)程的狀態(tài)和結(jié)果??梢酝ㄟ^asyncio.create_task
函數(shù)創(chuàng)建任務(wù),當(dāng)一個(gè)協(xié)程通過 asyncio.create_task() 等函數(shù)被封裝為一個(gè)任務(wù),該協(xié)程會(huì)被自動(dòng)調(diào)度執(zhí)行。
import asyncioasync def say_hello():await asyncio.sleep(1)print("Hello!")async def main():task = asyncio.create_task(say_hello())await taskasyncio.run(main())
在這個(gè)示例中,我們使用asyncio.create_task
創(chuàng)建了一個(gè)任務(wù)對(duì)象,該對(duì)象隨后被await
,這意味著程序?qū)⒌却蝿?wù)完成。
Future對(duì)象
Future對(duì)象表示一個(gè)將來可能會(huì)有結(jié)果的操作,他們主要是用于低級(jí)別的異步編程。通常情況下,沒有必要在應(yīng)用層級(jí)的代碼中創(chuàng)建 Future 對(duì)象。開發(fā)者更多使用高層次的抽象如任務(wù)對(duì)象,但了解Future對(duì)象仍然很有價(jià)值。
import asyncioasync def set_future_value(fut):await asyncio.sleep(1)fut.set_result("Finished!")async def main():fut = asyncio.Future()await asyncio.create_task(set_future_value(fut))result = await futprint(result)asyncio.run(main())
在這個(gè)示例中,通過asyncio.Future()
創(chuàng)建了一個(gè)Future對(duì)象,并在一個(gè)協(xié)程中使用set_result
方法設(shè)置了其結(jié)果。
協(xié)程
什么是協(xié)程?
協(xié)程(Coroutine)是一種比線程更輕量級(jí)的“并發(fā)”方式。它允許程序在同一個(gè)線程里“并行”地執(zhí)行多個(gè)任務(wù)。這里“并行”并不是指真正的并行執(zhí)行,而是協(xié)程可以在任務(wù)之間快速切換,從而讓這些任務(wù)看起來像是同時(shí)進(jìn)行的。
你可以把協(xié)程想象成一個(gè)大辦公室里的一名員工,這名員工需要完成一些任務(wù),比如接電話、發(fā)郵件、寫報(bào)告。這些任務(wù)可能需要等一段時(shí)間才能完成,比如等電話的對(duì)方回復(fù),等郵件發(fā)送成功,或者等等數(shù)據(jù)。但是在等待的時(shí)間里,這名員工不會(huì)閑著,他會(huì)繼續(xù)去做別的任務(wù)。
- 線程就像是一個(gè)員工每做一個(gè)任務(wù)他就需要一個(gè)獨(dú)立的辦公桌。線程是重量級(jí)的,需要更多資源,啟動(dòng)和管理也更復(fù)雜。
- 協(xié)程就像是一個(gè)員工在同一個(gè)辦公桌上同時(shí)處理多個(gè)任務(wù),快速切換。協(xié)程是輕量級(jí)的,消耗的資源很少,啟動(dòng)和管理也比較簡(jiǎn)單。
基本使用
- 定義協(xié)程函數(shù):使用 async def 關(guān)鍵字定義一個(gè)協(xié)程函數(shù)。
async def my_coroutine():pass
- 運(yùn)行協(xié)程:可以使用 await 關(guān)鍵詞等待另一個(gè)協(xié)程完成,或使用 asyncio.run() 來運(yùn)行最頂層的入口點(diǎn)協(xié)程。
import asyncioasync def my_coroutine():print("這是使用await運(yùn)行的協(xié)程")async def main():print("使用asyncio.run運(yùn)行協(xié)程開始")await my_coroutine()print("使用asyncio.run運(yùn)行協(xié)程結(jié)束")asyncio.run(main())"""
輸出:
使用asyncio.run運(yùn)行協(xié)程開始
這是使用await運(yùn)行的協(xié)程
使用asyncio.run運(yùn)行協(xié)程結(jié)束
"""
注意:簡(jiǎn)單地調(diào)用一個(gè)協(xié)程并不會(huì)使其被調(diào)度執(zhí)行
# python console 中運(yùn)行
async def my_coroutine():print("my coroutine")my_coroutine()
<coroutine object my_coroutine at 0x104519f50>
運(yùn)行協(xié)程
-
使用asyncio.run() 函數(shù)用來運(yùn)行最頂層的入口點(diǎn) “main()” 函數(shù) (見上面的示例)
-
使用await執(zhí)行(以下代碼段會(huì)在等待3秒后打印 "1號(hào)協(xié)程"結(jié)束時(shí)間,然后再次等待5秒后打印 "2號(hào)協(xié)程"完成時(shí)間)
import asyncio import timeasync def my_coroutine(name, delay):await asyncio.sleep(delay) # 模擬I/O操作print(f"{name}完成時(shí)間:{time.time()}")async def main():start_time = time.time()print(f"開始時(shí)間 {start_time}")await my_coroutine("1號(hào)協(xié)程", 3)await my_coroutine("2號(hào)協(xié)程", 5)end_time = time.time()print(f"結(jié)束時(shí)間 {end_time}")print(f"耗時(shí){end_time - start_time:.2f}秒")asyncio.run(main())""" 輸出: 開始時(shí)間 1726210238.44529 1號(hào)協(xié)程完成時(shí)間:1726210241.446931 2號(hào)協(xié)程完成時(shí)間:1726210246.4494321 結(jié)束時(shí)間 1726210246.449468 耗時(shí)8.00秒 """
-
asyncio.create_task() 函數(shù)用來并發(fā)運(yùn)行作為 asyncio 任務(wù)的多個(gè)協(xié)程。(修改以上示例,并發(fā)運(yùn)行兩個(gè)協(xié)程)
import asyncio import timeasync def my_coroutine(name, delay):await asyncio.sleep(delay) # 模擬I/O操作print(f"{name}完成時(shí)間:{time.time()}")async def main():start_time = time.time()print(f"開始時(shí)間 {start_time}")task1 = asyncio.create_task(my_coroutine("1號(hào)協(xié)程", 3))task2 = asyncio.create_task(my_coroutine("2號(hào)協(xié)程", 5))await task1await task2end_time = time.time()print(f"結(jié)束時(shí)間 {end_time}")print(f"耗時(shí){end_time - start_time:.2f}秒")asyncio.run(main())""" 輸出: 開始時(shí)間 1726210659.84198 1號(hào)協(xié)程完成時(shí)間:1726210662.843559 2號(hào)協(xié)程完成時(shí)間:1726210664.842549 結(jié)束時(shí)間 1726210664.84267 耗時(shí)5.00秒 """ # 可以明顯看出總耗時(shí)比之前明顯快了3秒,1和2之間的時(shí)間間隔也變成了2秒
-
asyncio.TaskGroup 類提供了 create_task() 的替代。 使用此 API,之前的例子可以改為
async def main():start_time = time.time()print(f"開始時(shí)間 {start_time}")async with asyncio.TaskGroup() as tg:task1 = tg.create_task(my_coroutine("1號(hào)協(xié)程", 3))task2 = tg.create_task(my_coroutine("2號(hào)協(xié)程", 5))end_time = time.time()print(f"結(jié)束時(shí)間 {end_time}")print(f"耗時(shí){end_time - start_time:.2f}秒")
Task
什么是 Task?
在 asyncio 中,Task 是對(duì)協(xié)程進(jìn)行調(diào)度管理的對(duì)象。Task 實(shí)際上是 asyncio 事件循環(huán)的一個(gè)抽象概念,通過 Task 我們可以控制協(xié)程(coroutine)的執(zhí)行,允許它們并發(fā)運(yùn)行。在底層,Task 使用事件循環(huán)調(diào)度多個(gè)協(xié)程,使得它們似乎是同時(shí)運(yùn)行的。
asyncio.Task
對(duì)象可以被看作是 Future
的一種特化,用于運(yùn)行 Python 協(xié)程。它們被設(shè)計(jì)用來在事件循環(huán)中調(diào)度和運(yùn)行協(xié)程。
Task對(duì)象是非線程安全的,意味著它們主要用于單線程的 asyncio 事件循環(huán)。
創(chuàng)建 Task
-
使用
asyncio.create_task()
方法,
asyncio.create_task()
是創(chuàng)建 Task 的最常見方法,它會(huì)立即調(diào)度協(xié)程的運(yùn)行并返回一個(gè) Task 對(duì)象:import asyncio import timeasync def my_coroutine():print(f"協(xié)程開始時(shí)間 {time.strftime('%X')}")await asyncio.sleep(2)print(f"協(xié)程結(jié)束時(shí)間 {time.strftime('%X')}")async def main():print(f"主協(xié)程開始時(shí)間:{time.strftime('%X')}")# 創(chuàng)建一個(gè) Tasktask = asyncio.create_task(my_coroutine())print(f"任務(wù)創(chuàng)建時(shí)間:{time.strftime('%X')}")# 稍微等待一下,但不會(huì) await Taskawait asyncio.sleep(5)print(f"延時(shí)結(jié)束,開始等待任務(wù):{time.strftime('%X')}")# 現(xiàn)在等待 Task 完成await taskprint(f"主協(xié)程結(jié)束時(shí)間:{time.strftime('%X')}")# 運(yùn)行主協(xié)程 asyncio.run(main())""" 輸出: 主協(xié)程開始時(shí)間:17:45:30 任務(wù)創(chuàng)建時(shí)間:17:45:30 協(xié)程開始時(shí)間 17:45:30 協(xié)程結(jié)束時(shí)間 17:45:32 延時(shí)結(jié)束,開始等待任務(wù):17:45:35 主協(xié)程結(jié)束時(shí)間:17:45:35 """
在上面的示例中,
asyncio.create_task(my_coroutine())
創(chuàng)建了一個(gè) Task,它會(huì)立即開始運(yùn)行my_coroutine
協(xié)程,即使我還并沒有執(zhí)行await。 -
使用
loop.create_task()
方法我們還可以通過獲取事件循環(huán),然后調(diào)用它的
create_task
方法來創(chuàng)建任務(wù):import asyncio import timeasync def my_coroutine():print(f"協(xié)程開始時(shí)間 {time.strftime('%X')}")await asyncio.sleep(2)print(f"協(xié)程結(jié)束時(shí)間 {time.strftime('%X')}")async def main():print(f"主協(xié)程開始時(shí)間:{time.strftime('%X')}")# 創(chuàng)建一個(gè) Taskloop = asyncio.get_running_loop()task = loop.create_task(my_coroutine())print(f"任務(wù)創(chuàng)建時(shí)間:{time.strftime('%X')}")# 稍微等待一下,但不會(huì) await Taskawait asyncio.sleep(5)print(f"延時(shí)結(jié)束,開始等待任務(wù):{time.strftime('%X')}")# 現(xiàn)在等待 Task 完成await taskprint(f"主協(xié)程結(jié)束時(shí)間:{time.strftime('%X')}")# 運(yùn)行主協(xié)程 asyncio.run(main())""" 輸出: 主協(xié)程開始時(shí)間:17:50:03 任務(wù)創(chuàng)建時(shí)間:17:50:03 協(xié)程開始時(shí)間 17:50:03 協(xié)程結(jié)束時(shí)間 17:50:05 延時(shí)結(jié)束,開始等待任務(wù):17:50:08 主協(xié)程結(jié)束時(shí)間:17:50:08 """
在上面的示例中,使用
asyncio.get_running_loop()
獲取當(dāng)前正在運(yùn)行的事件循環(huán),loop.create_task(my_coroutine())
使用事件循環(huán)的create_task
方法創(chuàng)建并調(diào)度一個(gè)協(xié)程任務(wù)。
asyncio.create_task()
與loop.create_task()
的不同之處:asyncio.create_task()
:是一個(gè)便捷方法,直接通過當(dāng)前的默認(rèn)事件循環(huán)創(chuàng)建任務(wù)loop.create_task()
:需要明確提供事件循環(huán),適用于更復(fù)雜或特定需求的場(chǎng)景,比如管理多個(gè)事件循環(huán)。
-
使用
asyncio.ensure_future()
雖然不如前兩種方法常用,但asyncio.ensure_future
也可以用來創(chuàng)建 Task。它可以接受協(xié)程或 Future 對(duì)象,并確保返回一個(gè) Task:import asyncio import timeasync def my_coroutine():print(f"協(xié)程開始時(shí)間 {time.strftime('%X')}")await asyncio.sleep(2)print(f"協(xié)程結(jié)束時(shí)間 {time.strftime('%X')}")async def main():print(f"主協(xié)程開始時(shí)間:{time.strftime('%X')}")# 創(chuàng)建一個(gè) Tasktask = asyncio.ensure_future(my_coroutine())print(f"任務(wù)創(chuàng)建時(shí)間:{time.strftime('%X')}")# 稍微等待一下,但不會(huì) await Taskawait asyncio.sleep(5)print(f"延時(shí)結(jié)束,開始等待任務(wù):{time.strftime('%X')}")# 現(xiàn)在等待 Task 完成await taskprint(f"主協(xié)程結(jié)束時(shí)間:{time.strftime('%X')}")# 運(yùn)行主協(xié)程 asyncio.run(main())""" 輸出: 主協(xié)程開始時(shí)間:18:00:54 任務(wù)創(chuàng)建時(shí)間:18:00:54 協(xié)程開始時(shí)間 18:00:54 協(xié)程結(jié)束時(shí)間 18:00:56 延時(shí)結(jié)束,開始等待任務(wù):18:00:59 主協(xié)程結(jié)束時(shí)間:18:00:59 """
asyncio.ensure_future()
是一個(gè)功能強(qiáng)大的函數(shù),常用于將一個(gè)協(xié)程轉(zhuǎn)換為一個(gè) Future 對(duì)象。
它在處理異步任務(wù)時(shí)提供了更多的靈活性,特別是在需要將協(xié)程包裝為 Future 時(shí)。
asyncio.create_task()
和asyncio.ensure_future()
的不同之處:asyncio.create_task()
:專門用于將協(xié)程轉(zhuǎn)換為 Task,只能處理協(xié)程對(duì)象。asyncio.ensure_future()
:可以處理協(xié)程對(duì)象和 Future 對(duì)象,更加通用,適用于更多場(chǎng)景。
取消 Task
在 asyncio
中,當(dāng)一個(gè) Task
對(duì)象的 cancel()
方法被調(diào)用時(shí),它會(huì)請(qǐng)求取消該任務(wù)。具體步驟如下:
- 標(biāo)記任務(wù)為取消狀態(tài):調(diào)用
cancel()
方法后,任務(wù)會(huì)被標(biāo)記為取消狀態(tài)。 - 拋出
CancelledError
異常:再次調(diào)度這個(gè)任務(wù)時(shí),它會(huì)在等待的位置拋出一個(gè)asyncio.CancelledError
異常。 - 任務(wù)處理異常:協(xié)程內(nèi)部可以捕獲這個(gè)異常,進(jìn)行相應(yīng)的清理操作。
import asyncioasync def cancellable_task():try:print("Task 啟動(dòng)")await asyncio.sleep(10) # 長時(shí)間任務(wù)print("Task 完成")except asyncio.CancelledError:print("Task 被取消")raise # 重新拋出err,以便外部可以檢測(cè)到任務(wù)已被取消async def main():task = asyncio.create_task(cancellable_task())await asyncio.sleep(2) # 等待一段時(shí)間task.cancel() # 請(qǐng)求取消任務(wù)try:await taskexcept asyncio.CancelledError:print("主協(xié)程: Task 被取消")asyncio.run(main())"""
輸出:
Task 啟動(dòng)
Task 被取消
主協(xié)程: Task 被取消
"""
在這個(gè)例子中,main()
協(xié)程啟動(dòng)了一個(gè)長時(shí)間運(yùn)行的任務(wù) cancellable_task()
并在2秒后請(qǐng)求取消它。
Task 異常獲取
在 asyncio
中,Task
對(duì)象繼承了 Future
對(duì)象的許多方法和屬性,其中包括 exception()
方法。exception()
用于獲取任務(wù)在執(zhí)行過程中拋出的異常。如果任務(wù)完成且沒有異常發(fā)生,exception()
返回 None
。如果任務(wù)還未完成,調(diào)用 exception()
將會(huì)引發(fā) asyncio.InvalidStateError
異常。因此,通常我們需要在任務(wù)完成之后調(diào)用 exception()
方法。
import asyncioasync def my_task():await asyncio.sleep(1)raise ValueError("任務(wù)執(zhí)行出錯(cuò)")async def main():task = asyncio.create_task(my_task())try:await taskexcept Exception as e:print(f"主協(xié)程中捕獲異常: {e}")# 現(xiàn)在任務(wù)已經(jīng)完成,可以檢查異常if task.exception():print(f"任務(wù)結(jié)束通過exception方法檢查異常: {task.exception()}")asyncio.run(main())"""
輸出:
主協(xié)程中捕獲異常: 任務(wù)執(zhí)行出錯(cuò)
任務(wù)結(jié)束通過exception方法檢查異常: 任務(wù)執(zhí)行出錯(cuò)
"""
在這個(gè)示例中,my_task()
會(huì)拋出一個(gè) ValueError
異常。我們?cè)谥鲄f(xié)程 main()
中捕獲該異常,同時(shí)也通過 exception()
方法再次獲取并打印異常。
Task 回調(diào)
add_done_callback()
方法是 asyncio
提供的一個(gè)強(qiáng)大的工具,允許我們?cè)谌蝿?wù)完成后執(zhí)行特定的回調(diào)函數(shù)?;卣{(diào)函數(shù)幫助我們更有效地管理任務(wù)的生命周期,處理結(jié)果和異常,并執(zhí)行一些后續(xù)操作。
import asyncioasync def my_task():await asyncio.sleep(1) # 模擬一些異步操作return "一鍵三連"def task_done_callback(future):print(f"回調(diào)-任務(wù)已完成,結(jié)果: {future.result()}")async def main():task = asyncio.create_task(my_task())task.add_done_callback(task_done_callback)await task # 等待任務(wù)完成asyncio.run(main())"""
輸出:
回調(diào)-任務(wù)已完成,結(jié)果: 一鍵三連
"""
在這個(gè)例子中,task_done_callback
回調(diào)函數(shù)會(huì)在 my_task
任務(wù)完成后被調(diào)用,并打印任務(wù)的結(jié)果。
import asyncioasync def failing_task():await asyncio.sleep(1)raise ValueError("任務(wù)出現(xiàn)了奇怪的錯(cuò)誤")def task_done_callback(future):if future.exception():print(f"回調(diào)-任務(wù)失敗,異常: {future.exception()}")else:print(f"回調(diào)-任務(wù)完成,結(jié)果: {future.result()}")async def main():task = asyncio.create_task(failing_task())task.add_done_callback(task_done_callback)try:await taskexcept Exception as e:print(f"主協(xié)程捕獲異常: {e}")asyncio.run(main())"""
輸出:
回調(diào)-任務(wù)失敗,異常: 任務(wù)出現(xiàn)了奇怪的錯(cuò)誤
主協(xié)程捕獲異常: 任務(wù)出現(xiàn)了奇怪的錯(cuò)誤
"""
在這個(gè)示例中,當(dāng) failing_task
拋出異常時(shí),task_done_callback
會(huì)檢測(cè)并打印異常,而主協(xié)程也會(huì)捕獲并處理該異常。
TaskGroup
什么是 TaskGroup?
TaskGroup
是 Python 3.11 中新增的 asyncio
組件。它提供了一種更簡(jiǎn)潔、更安全的方式來管理多個(gè)并發(fā)任務(wù)。TaskGroup
是一個(gè)上下文管理器,當(dāng)與 async with
語句一起使用時(shí),它允許我們?cè)谝粋€(gè)塊內(nèi)啟動(dòng)多個(gè)任務(wù),并確保這些任務(wù)在上下文管理器退出時(shí)正確清理。
為什么使用 TaskGroup?
- 更簡(jiǎn)潔的語法:在沒有
TaskGroup
之前,管理多個(gè)任務(wù)通常需要手動(dòng)創(chuàng)建每個(gè)任務(wù)并在最后通過await
語句等待所有任務(wù)完成。TaskGroup
簡(jiǎn)化了這一過程。 - 更好的錯(cuò)誤處理:由于
TaskGroup
是一個(gè)上下文管理器,它更容易管理任務(wù)中的異常情況。 - 更清晰的結(jié)構(gòu):代碼的可讀性和結(jié)構(gòu)性得到了顯著提升。
創(chuàng)建任務(wù)
在 TaskGroup
中創(chuàng)建任務(wù)使用 create_task
方法。每個(gè)任務(wù)會(huì)立即調(diào)度,并在 TaskGroup
的管理范圍內(nèi)運(yùn)行。
import asyncio
import timeasync def task(n):print(f"任務(wù) {n} 啟動(dòng) {time.strftime('%X')}")await asyncio.sleep(2)print(f"任務(wù) {n} 結(jié)束 {time.strftime('%X')}")async def main():async with asyncio.TaskGroup() as tg:tg.create_task(task(1))tg.create_task(task(2))tg.create_task(task(3))asyncio.run(main())"""
輸出:
任務(wù) 1 啟動(dòng) 17:08:18
任務(wù) 2 啟動(dòng) 17:08:18
任務(wù) 3 啟動(dòng) 17:08:18
任務(wù) 1 結(jié)束 17:08:20
任務(wù) 2 結(jié)束 17:08:20
任務(wù) 3 結(jié)束 17:08:20
"""
在這個(gè)示例中,我們通過 async with
語句創(chuàng)建了一個(gè) TaskGroup
,并使用 create_task
方法啟動(dòng)了三個(gè)并行運(yùn)行的任務(wù) task(n)
,這三個(gè)任務(wù)立即調(diào)度,并在 TaskGroup
的管理范圍內(nèi)運(yùn)行。當(dāng)所有任務(wù)完成時(shí),TaskGroup
會(huì)自動(dòng)進(jìn)行清理。
異常處理
當(dāng) TaskGroup
中的任務(wù)引發(fā)異常時(shí),異常會(huì)在退出 async with
塊時(shí)處理。如果多個(gè)任務(wù)引發(fā)異常,TaskGroup
會(huì)聚合這些異常,并引發(fā)一個(gè) ExceptionGroup
異常。
import asyncioasync def error_task():raise RuntimeError("一些奇怪的錯(cuò)誤")async def main():try:async with asyncio.TaskGroup() as tg:tg.create_task(error_task())tg.create_task(error_task())tg.create_task(error_task())except ExceptionGroup as e:print("任務(wù)組捕獲異常: ", e)asyncio.run(main())"""
輸出:
任務(wù)組捕獲異常: unhandled errors in a TaskGroup (3 sub-exceptions)
"""
在使用 asyncio.TaskGroup
時(shí),如果多個(gè)任務(wù)引發(fā)異常,異常會(huì)被聚合成一個(gè) ExceptionGroup
異常,并在 TaskGroup
上下文管理器退出時(shí)被捕獲和處理。然而,默認(rèn)情況下,ExceptionGroup
只提供較為簡(jiǎn)略的信息。要看到具體的子異常信息,我們需要更詳細(xì)地打印 ExceptionGroup
對(duì)象。
import asyncioasync def error_task():raise RuntimeError("一些奇怪的錯(cuò)誤")async def main():try:async with asyncio.TaskGroup() as tg:tg.create_task(error_task())tg.create_task(error_task())tg.create_task(error_task())except ExceptionGroup as e:print("任務(wù)組捕獲異常:")for sub_exception in e.exceptions:print(f"子異常: {sub_exception}")asyncio.run(main())"""
輸出:
任務(wù)組捕獲異常:
子異常: 一些奇怪的錯(cuò)誤
子異常: 一些奇怪的錯(cuò)誤
子異常: 一些奇怪的錯(cuò)誤
"""
在上面的示例中,我們?cè)诓东@ ExceptionGroup
異常后,迭代其 exceptions
屬性,逐個(gè)打印出子異常的信息。這樣可以更全面了解 ExceptionGroup
中包含的所有異常。
同步任務(wù)完成
TaskGroup
保證所有任務(wù)在同一個(gè)上下文管理器范圍內(nèi)完成。如果某個(gè)任務(wù)需要較長時(shí)間完成,其他任務(wù)會(huì)等待它。
import asyncioasync def long_task():await asyncio.sleep(5)print("長任務(wù)完成")async def short_task():await asyncio.sleep(1)print("短任務(wù)完成")async def main():async with asyncio.TaskGroup() as tg:tg.create_task(long_task())tg.create_task(short_task())print("所有任務(wù)完成")asyncio.run(main())"""
輸出:
短任務(wù)完成
長任務(wù)完成
所有任務(wù)完成
"""
上面的示例展示了如何使用 asyncio.TaskGroup
同時(shí)管理多個(gè)異步任務(wù),其中短任務(wù)先完成并輸出結(jié)果,長任務(wù)隨后完成,最終確保所有任務(wù)結(jié)束后輸出 “所有任務(wù)完成”。