中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當前位置: 首頁 > news >正文

關(guān)于網(wǎng)站建設(shè)公司大全上海app定制開發(fā)公司

關(guān)于網(wǎng)站建設(shè)公司大全,上海app定制開發(fā)公司,網(wǎng)站別人幫做的要注意什么手續(xù),免費個人網(wǎng)站建站能上傳視頻嗎1 概要 通過引入結(jié)構(gòu)化并發(fā)編程的API,簡化并發(fā)編程。結(jié)構(gòu)化并發(fā)將在不同線程中運行的相關(guān)任務(wù)組視為單個工作單元,從而簡化錯誤處理和取消操作,提高可靠性,并增強可觀察性。這是一個預(yù)覽版的API。 2 歷史 結(jié)構(gòu)化并發(fā)是由JEP 42…

1 概要

通過引入結(jié)構(gòu)化并發(fā)編程的API,簡化并發(fā)編程。結(jié)構(gòu)化并發(fā)將在不同線程中運行的相關(guān)任務(wù)組視為單個工作單元,從而簡化錯誤處理和取消操作,提高可靠性,并增強可觀察性。這是一個預(yù)覽版的API。

2 歷史

結(jié)構(gòu)化并發(fā)是由JEP 428提出的,并在JDK 19中作為孵化API發(fā)布。它在JDK 20中被JEP 437重新孵化,通過對作用域值(JEP 429)進行輕微更新。

我們在這里提議將結(jié)構(gòu)化并發(fā)作為JUC包中的預(yù)覽API。唯一重要變化是StructuredTaskScope::fork(...)方法返回一個[子任務(wù)],而不是一個Future,如下面所討論的。

3 目標

推廣一種并發(fā)編程風(fēng)格,可以消除由于取消和關(guān)閉而產(chǎn)生的常見風(fēng)險,如線程泄漏和取消延遲。

提高并發(fā)代碼的可觀察性。

4 非目標

不替換JUC包中的任何并發(fā)構(gòu)造,如ExecutorService和Future。

不定義Java平臺的最終結(jié)構(gòu)化并發(fā)API。其他結(jié)構(gòu)化并發(fā)構(gòu)造可以由第三方庫定義,或在未來的JDK版本中定義。

不定義在線程之間共享數(shù)據(jù)流的方法(即通道)。會在未來提出這樣做。

不用新的線程取消機制替換現(xiàn)有的線程中斷機制。會在未來提出這樣做。

5 動機

開發(fā)人員通過將任務(wù)分解為多個子任務(wù)來管理復(fù)雜性。在普通的單線程代碼中,子任務(wù)按順序執(zhí)行。然而,如果子任務(wù)彼此足夠獨立,并且存在足夠的硬件資源,那么通過在不同線程中并發(fā)執(zhí)行子任務(wù),可以使整個任務(wù)運行得更快(即具有較低的延遲)。例如,將多個I/O操作的結(jié)果組合成一個任務(wù),如果每個I/O操作都在自己的線程中并發(fā)執(zhí)行,那么任務(wù)將運行得更快。虛擬線程(JEP 444)使得為每個此類I/O操作分配一個線程成為一種具有成本效益的方法,但是管理可能會產(chǎn)生大量線程仍然是一個挑戰(zhàn)。

6 ExecutorService 非結(jié)構(gòu)化并發(fā)

java.util.concurrent.ExecutorService API 是在 Java 5 中引入的,它幫助開發(fā)人員以并發(fā)方式執(zhí)行子任務(wù)。

如下 handle() 的方法,它表示服務(wù)器應(yīng)用程序中的一個任務(wù)。它通過將兩個子任務(wù)提交給 ExecutorService 來處理傳入的請求。

ExecutorService 立即返回每個子任務(wù)的 Future,并根據(jù) Executor 的調(diào)度策略同時執(zhí)行這些子任務(wù)。handle() 方法通過阻塞調(diào)用它們的 Futureget() 方法來等待子任務(wù)的結(jié)果,因此該任務(wù)被稱為加入了其子任務(wù)。

Response handle() throws ExecutionException, InterruptedException {Future<String> user = esvc.submit(() -> findUser());Future<Integer> order = esvc.submit(() -> fetchOrder());String theUser = user.get();   // 加入 findUserint theOrder = order.get();    // 加入 fetchOrderreturn new Response(theUser, theOrder);
}

由于子任務(wù)并發(fā)執(zhí)行,每個子任務(wù)都可獨立地成功或失敗。在這個上下文中,"失敗" 意味著拋出異常。通常,像 handle() 這樣的任務(wù)應(yīng)該在任何一個子任務(wù)失敗時失敗。當出現(xiàn)失敗時,理解線程的生命周期會變得非常復(fù)雜:

  • findUser() 拋異常,那么調(diào)用 user.get()handle() 也會拋出異常,但是 fetchOrder() 會繼續(xù)在自己的線程中運行。這是線程泄漏,最好情況下浪費資源,最壞情況下 fetchOrder() 的線程可能會干擾其他任務(wù)。

  • 如執(zhí)行 handle() 的線程被中斷,這個中斷不會傳播到子任務(wù)。findUser()fetchOrder() 的線程都會泄漏,即使在 handle() 失敗后仍然繼續(xù)運行。

  • 如果 findUser() 執(zhí)行時間很長,但是在此期間 fetchOrder() 失敗,那么 handle() 將不必要地等待 findUser(),因為它會在 user.get() 上阻塞,而不是取消它。只有在 findUser() 完成并且 user.get() 返回后,order.get() 才會拋出異常,導(dǎo)致 handle() 失敗。

每種case下,問題在于我們的程序在邏輯上被結(jié)構(gòu)化為任務(wù)-子任務(wù)關(guān)系,但這些關(guān)系只存在于開發(fā)人員的頭腦中。這不僅增加錯誤可能性,還會使診斷和排除此類錯誤變得更加困難。例如,線程轉(zhuǎn)儲等可觀察性工具會在不相關(guān)的線程調(diào)用棧中顯示 handle()、findUser()fetchOrder(),而沒有任務(wù)-子任務(wù)關(guān)系的提示。

可嘗試在錯誤發(fā)生時顯式取消其他子任務(wù),例如通過在失敗的任務(wù)的 catch 塊中使用 try-finally 包裝任務(wù),并調(diào)用其他任務(wù)的 Futurecancel(boolean) 方法。我們還需要在 try-with-resources 語句中使用 ExecutorService,就像

try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {IntStream.range(0, 10_000).forEach(i -> {executor.submit(() -> {Thread.sleep(Duration.ofSeconds(1));return i;});});
}  // executor.close() is called implicitly, and waits

因為 Future 沒有提供等待被取消的任務(wù)的方法。但所有這些都很難做到,并且往往會使代碼的邏輯意圖變得更加難以理解。跟蹤任務(wù)之間的關(guān)系,并手動添加所需的任務(wù)間取消邊緣,是對開發(fā)人員的一種很大要求。

無限制的并發(fā)模式

這種需要手動協(xié)調(diào)生命周期的需求是因為 ExecutorServiceFuture 允許無限制的并發(fā)模式。在涉及的所有線程中,沒有限制或順序:

  • 一個線程可以創(chuàng)建一個 ExecutorService
  • 另一個線程可以向其提交工作
  • 執(zhí)行工作的線程與第一個或第二個線程沒有任何關(guān)系

線程提交工作之后,一個完全不同的線程可以等待執(zhí)行的結(jié)果。具有對 Future 的引用的任何代碼都可以加入它(即通過調(diào)用 get() 等待其結(jié)果),甚至可以在與獲取 Future 的線程不同的線程中執(zhí)行代碼。實際上,由一個任務(wù)啟動的子任務(wù)不必返回到提交它的任務(wù)。它可以返回給許多任務(wù)中的任何一個,甚至可能是沒有返回給任何任務(wù)。

因為 ExecutorServiceFuture 允許這種無結(jié)構(gòu)的使用,它們既不強制執(zhí)行也不跟蹤任務(wù)和子任務(wù)之間的關(guān)系,盡管這些關(guān)系是常見且有用的。因此,即使子任務(wù)在同一個任務(wù)中被提交和加入,一個子任務(wù)的失敗也不能自動導(dǎo)致另一個子任務(wù)的取消。在上述的 handle() 方法中,fetchOrder() 的失敗不能自動導(dǎo)致 findUser() 的取消。fetchOrder()FuturefindUser()Future 沒有關(guān)系,也與最終通過其 get() 方法加入它的線程無關(guān)。與其要求開發(fā)人員手動管理這種取消,我們希望能夠可靠地自動化這一過程。

任務(wù)結(jié)構(gòu)應(yīng)反映代碼結(jié)構(gòu)

ExecutorService 下的自由線程組合相反,單線程代碼的執(zhí)行總是強制執(zhí)行任務(wù)和子任務(wù)的層次結(jié)構(gòu)。方法的代碼塊 {...} 對應(yīng)一個任務(wù),代碼塊內(nèi)部調(diào)用的方法對應(yīng)子任務(wù)。調(diào)用的方法必須返回給調(diào)用它的方法,或者拋出異常給調(diào)用它的方法。它不能生存于調(diào)用它的方法之外,也不能返回或拋出異常給其他方法。因此,所有子任務(wù)在任務(wù)之前完成,每個子任務(wù)都是其父任務(wù)的子任務(wù),每個子任務(wù)的生命周期相對于其他子任務(wù)和任務(wù)來說,都由代碼塊結(jié)構(gòu)的語法規(guī)則來管理。

如單線程版本的 handle() 中,任務(wù)-子任務(wù)關(guān)系在語法結(jié)構(gòu)明顯:

Response handle() throws IOException {String theUser = findUser();int theOrder = fetchOrder();return new Response(theUser, theOrder);
}

我們不會在 findUser() 子任務(wù)完成之前啟動 fetchOrder() 子任務(wù),無論 findUser() 是成功還是失敗。如果 findUser() 失敗,我們根本不會啟動 fetchOrder(),而且 handle() 任務(wù)會隱式地失敗。一個子任務(wù)只能返回給其父任務(wù),這是很重要的:這意味著父任務(wù)可以將一個子任務(wù)的失敗隱式地視為觸發(fā)來取消其他未完成的子任務(wù),然后自己失敗。

單線程代碼中,任務(wù)-子任務(wù)層次關(guān)系在運行時的調(diào)用棧中得到體現(xiàn)。因此,我們獲得了相應(yīng)的父子關(guān)系,這些關(guān)系管理著錯誤傳播。觀察單個線程時,層次關(guān)系顯而易見:findUser()(及后來的 fetchOrder())似乎是在 handle() 下執(zhí)行的。這使得回答問題 "handle() 正在處理什么?" 很容易。

如任務(wù)和子任務(wù)之間的父子關(guān)系在代碼的語法結(jié)構(gòu)中明顯,并且在運行時得到了體現(xiàn),那并發(fā)編程將更加容易、可靠且易于觀察,就像單線程代碼一樣。語法結(jié)構(gòu)將定義子任務(wù)的生命周期,并使得能夠在運行時創(chuàng)建一個類似于單線程調(diào)用棧的線程層次結(jié)構(gòu)的表示。這種表示將實現(xiàn)錯誤傳播、取消以及對并發(fā)程序的有意義的觀察。

7 結(jié)構(gòu)化并發(fā)

結(jié)構(gòu)化并發(fā)是一種并發(fā)編程方法,它保持了任務(wù)和子任務(wù)之間的自然關(guān)系,從而實現(xiàn)了更具可讀性、可維護性和可靠性的并發(fā)代碼。"結(jié)構(gòu)化并發(fā)" 這個術(shù)語由 Martin Sústrik 提出,并由 Nathaniel J. Smith 推廣。從其他編程語言中的概念,如 Erlang 中的層次監(jiān)控者,可以了解到結(jié)構(gòu)化并發(fā)中錯誤處理的設(shè)計思想。

結(jié)構(gòu)化并發(fā)源于一個簡單的原則:

如果一個任務(wù)分解為并發(fā)的子任務(wù),那么所有這些子任務(wù)都會返回到同一個地方,即任務(wù)的代碼塊。

在結(jié)構(gòu)化并發(fā)中,子任務(wù)代表任務(wù)工作。任務(wù)等待子任務(wù)的結(jié)果并監(jiān)視它們的失敗情況。與單線程代碼中的結(jié)構(gòu)化編程技術(shù)類似,結(jié)構(gòu)化并發(fā)在多線程中的威力來自于兩個思想:

  • 為代碼塊中的執(zhí)行流程定義明確的進入和退出點
  • 在嚴格的操作生命周期嵌套中,以反映它們在代碼中的語法嵌套方式

由于代碼塊的進入和退出點被明確定義,因此并發(fā)子任務(wù)的生命周期被限定在其父任務(wù)的語法塊中。因為同級子任務(wù)的生命周期嵌套在其父任務(wù)的生命周期之內(nèi),因此可以將它們作為一個單元進行推理和管理。由于父任務(wù)的生命周期,依次嵌套在其父任務(wù)的生命周期之內(nèi),運行時可以將任務(wù)層次結(jié)構(gòu)實現(xiàn)為樹狀結(jié)構(gòu),類似于單線程調(diào)用棧的并發(fā)對應(yīng)物。這允許代碼為任務(wù)子樹應(yīng)用策略,如截止時間,并允許可觀察性工具將子任務(wù)呈現(xiàn)為父任務(wù)的下屬。

結(jié)構(gòu)化并發(fā)非常適合虛擬線程,這是由JDK實現(xiàn)的輕量級線程。許多虛擬線程可以共享同一個操作系統(tǒng)線程,從而可以支持非常大量的虛擬線程。除此外,虛擬線程足夠廉價,可以表示任何涉及I/O等并發(fā)行為。這意味著服務(wù)器應(yīng)用程序可以使用結(jié)構(gòu)化并發(fā)來同時處理成千上萬甚至百萬個傳入請求:它可以為處理每個請求的任務(wù)分配一個新的虛擬線程,當一個任務(wù)通過提交子任務(wù)進行并發(fā)執(zhí)行時,它可以為每個子任務(wù)分配一個新的虛擬線程。在幕后,任務(wù)-子任務(wù)關(guān)系通過為每個虛擬線程提供一個對其唯一父任務(wù)的引用來實現(xiàn)為樹狀結(jié)構(gòu),類似于調(diào)用棧中的幀引用其唯一的調(diào)用者。

總之,虛擬線程提供了大量的線程。結(jié)構(gòu)化并發(fā)可以正確且強大地協(xié)調(diào)它們,并使可觀察性工具能夠按照開發(fā)人員的理解顯示線程。在JDK中擁有結(jié)構(gòu)化并發(fā)的API將使構(gòu)建可維護、可靠且可觀察的服務(wù)器應(yīng)用程序變得更加容易。

8 描述

結(jié)構(gòu)化并發(fā) API 的主要類是 java.util.concurrent 包中的 StructuredTaskScope。該類允許開發(fā)人員將一個任務(wù)結(jié)構(gòu)化為一組并發(fā)的子任務(wù),并將它們作為一個單元進行協(xié)調(diào)。子任務(wù)通過分別分叉它們并將它們作為一個單元加入,可能作為一個單元取消,來在它們自己的線程中執(zhí)行。子任務(wù)的成功結(jié)果或異常由父任務(wù)匯總并處理。StructuredTaskScope 將子任務(wù)的生命周期限制在一個清晰的詞法作用域內(nèi),在這個作用域中,任務(wù)與其子任務(wù)的所有交互(分叉、加入、取消、處理錯誤和組合結(jié)果)都發(fā)生。

前面提到的 handle() 示例,使用 StructuredTaskScope 編寫:

Response handle() throws ExecutionException, InterruptedException {try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {Supplier<String> user = scope.fork(() -> findUser());Supplier<Integer> order = scope.fork(() -> fetchOrder());scope.join()             // 加入兩個子任務(wù).throwIfFailed();   // ... 并傳播錯誤// 兩個子任務(wù)都成功完成,因此組合它們的結(jié)果return new Response(user.get(), order.get());}
}

與原始示例相比,理解涉及的線程的生命周期在這里變得更加容易:在所有情況下,它們的生命周期都限制在一個詞法作用域內(nèi),即 try-with-resources 語句的代碼塊內(nèi)。此外,使用 StructuredTaskScope 可以確保一些有價值的屬性:

  1. 錯誤處理與短路 — 如果 findUser()fetchOrder() 子任務(wù)中的任何一個失敗,另一個如果尚未完成則會被取消。(這由 ShutdownOnFailure 實現(xiàn)的關(guān)閉策略來管理;還有其他策略可能)。
  2. 取消傳播 — 如果在運行 handle() 的線程在調(diào)用 join() 之前或之中被中斷,則線程在退出作用域時會自動取消兩個子任務(wù)。
  3. 清晰性 — 上述代碼具有清晰的結(jié)構(gòu):設(shè)置子任務(wù),等待它們完成或被取消,然后決定是成功(并處理已經(jīng)完成的子任務(wù)的結(jié)果)還是失敗(子任務(wù)已經(jīng)完成,因此沒有更多需要清理的)。
  4. 可觀察性 — 如下所述,線程轉(zhuǎn)儲清楚地顯示了任務(wù)層次結(jié)構(gòu),其中運行 findUser()fetchOrder() 的線程被顯示為作用域的子任務(wù)。

9 突破預(yù)覽版限制

StructuredTaskScope 是預(yù)覽版 API,默認禁用。要使用 StructuredTaskScope API,需啟用預(yù)覽 API:

  1. 使用 javac --release 21 --enable-preview Main.java 編譯程序,然后使用 java --enable-preview Main 運行它;或
  2. 當使用源代碼啟動器時,使用 java --source 21 --enable-preview Main.java 運行程序
  3. IDEA 運行時,勾選即可:

10 使用 StructuredTaskScope

10.1 API

public class StructuredTaskScope<T> implements AutoCloseable {public <U extends T> Subtask<U> fork(Callable<? extends U> task);public void shutdown();public StructuredTaskScope<T> join() throws InterruptedException;public StructuredTaskScope<T> joinUntil(Instant deadline)throws InterruptedException, TimeoutException;public void close();protected void handleComplete(Subtask<? extends T> handle);protected final void ensureOwnerAndJoined();}

10.2 工作流程

  1. 創(chuàng)建一個作用域。創(chuàng)建作用域的線程是其所有者。
  2. 使用 fork(Callable) 方法在作用域中分叉子任務(wù)。
  3. 在任何時間,任何子任務(wù),或者作用域的所有者,都可以調(diào)用作用域的 shutdown() 方法來取消未完成的子任務(wù)并阻止分叉新的子任務(wù)。
  4. 作用域的所有者將作用域(即所有子任務(wù))作為一個單元加入。所有者可以調(diào)用作用域的 join() 方法,等待所有子任務(wù)已完成(無論成功與否)或通過 shutdown() 被取消?;蛘?#xff0c;它可以調(diào)用作用域的 joinUntil(java.time.Instant) 方法,等待直到截止時間。
  5. 加入后,處理子任務(wù)中的任何錯誤并處理其結(jié)果。
  6. 關(guān)閉作用域,通常通過隱式使用 try-with-resources 實現(xiàn)。這會關(guān)閉作用域(如果尚未關(guān)閉),并等待被取消但尚未完成的任何子任務(wù)完成。

每次調(diào)用 fork(...) 都會啟動一個新線程來執(zhí)行一個子任務(wù),默認情況下是虛擬線程。一個子任務(wù)可以創(chuàng)建它自己的嵌套的 StructuredTaskScope 來分叉它自己的子任務(wù),從而創(chuàng)建一個層次結(jié)構(gòu)。該層次結(jié)構(gòu)反映在代碼的塊結(jié)構(gòu)中,限制了子任務(wù)的生命周期:在作用域關(guān)閉后,所有子任務(wù)的線程都保證已終止,當塊退出時不會留下任何線程。

在作用域中的任何子任務(wù),嵌套作用域中的任何子子任務(wù),以及作用域的所有者,都可以隨時調(diào)用作用域的 shutdown() 方法,表示任務(wù)已完成,即使其他子任務(wù)仍在執(zhí)行。shutdown() 方法會中斷仍在執(zhí)行子任務(wù)的線程,并導(dǎo)致 join()joinUntil(Instant) 方法返回。因此,所有子任務(wù)都應(yīng)該被編寫為響應(yīng)中斷。在調(diào)用 shutdown() 后分叉的新子任務(wù)將處于 UNAVAILABLE 狀態(tài),不會被運行。實際上,shutdown() 是順序代碼中 break 語句的并發(fā)模擬。

在作用域內(nèi)部調(diào)用 join()joinUntil(Instant) 是強制性的。如果作用域的代碼塊在加入之前退出,則作用域?qū)⒌却凶尤蝿?wù)終止,然后拋出異常。

作用域的所有者線程可能在加入之前或加入期間被中斷。例如,它可能是封閉作用域的子任務(wù)。如果發(fā)生這種情況,則 join()joinUntil(Instant) 將拋出異常,因為繼續(xù)執(zhí)行沒有意義。然后,try-with-resources 語句將關(guān)閉作用域,取消所有子任務(wù)并等待它們終止。這的效果是自動將任務(wù)的取消傳播到其子任務(wù)。如果 joinUntil(Instant) 方法的截止時間在子任務(wù)終止或調(diào)用 shutdown() 之前到期,則它將拋出異常,再次,try-with-resources 語句將關(guān)閉作用域。

join() 成功完成時,每個子任務(wù)已經(jīng)成功完成、失敗或因作用域被關(guān)閉而被取消。

一旦加入,作用域的所有者會處理失敗的子任務(wù)并處理成功完成的子任務(wù)的結(jié)果;這通常是通過關(guān)閉策略來完成的(見下文)。成功完成的任務(wù)的結(jié)果可以使用 Subtask.get() 方法獲得。get() 方法永遠不會阻塞;如果錯誤地在加入之前或子任務(wù)尚未成功完成時調(diào)用它,則會拋出 IllegalStateException。

在作用域中分叉任務(wù)的子任務(wù)時,會繼承 ScopedValue 綁定(JEP 446)。如果作用域的所有者從綁定的 ScopedValue 中讀取值,則每個子任務(wù)將讀取相同的值。

如果作用域的所有者本身是現(xiàn)有作用域的子任務(wù),即作為分叉子任務(wù)創(chuàng)建的,則該作用域成為新作用域的父作用域。因此,作用域和子任務(wù)形成一個樹狀結(jié)構(gòu)。

在運行時,StructuredTaskScope 強制執(zhí)行結(jié)構(gòu)和順序并發(fā)操作。因此,它不實現(xiàn) ExecutorServiceExecutor 接口,因為這些接口的實例通常以非結(jié)構(gòu)化方式使用(見下文)。然而,將使用 ExecutorService 的代碼遷移到使用 StructuredTaskScope 并從結(jié)構(gòu)上受益是直接的。

實際上,大多數(shù)使用 StructuredTaskScope 的情況下,可能不會直接使用 StructuredTaskScope 類,而是使用下一節(jié)描述的兩個實現(xiàn)了關(guān)閉策略的子類之一。在其他情況下,用戶可能會編寫自己的子類來實現(xiàn)自定義的關(guān)閉策略。

11 關(guān)閉策略

在處理并發(fā)子任務(wù)時,通常會使用短路模式來避免不必要的工作。有時,例如,如果其中一個子任務(wù)失敗,就會取消所有子任務(wù)(即同時調(diào)用所有任務(wù)),或者在其中一個子任務(wù)成功時取消所有子任務(wù)(即同時調(diào)用任何任務(wù))。StructuredTaskScope 的兩個子類,ShutdownOnFailureShutdownOnSuccess,支持這些模式,并提供在第一個子任務(wù)失敗或成功時關(guān)閉作用域的策略。

關(guān)閉策略還提供了集中處理異常以及可能的成功結(jié)果的方法。這符合結(jié)構(gòu)化并發(fā)的精神,即整個作用域被視為一個單元。

11.1 案例

上面的 handle() 示例也使用了這策略,它在并發(fā)運行一組任務(wù)并在其中任何一個任務(wù)失敗時失敗:

<T> List<T> runAll(List<Callable<T>> tasks) throws InterruptedException, ExecutionException {try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {List<? extends Supplier<T>> suppliers = tasks.stream().map(scope::fork).toList();scope.join().throwIfFailed();  // 任何子任務(wù)失敗,拋異常// 在這里,所有任務(wù)都已成功完成,因此組合結(jié)果return suppliers.stream().map(Supplier::get).toList();}
}

在第一個成功的子任務(wù)返回結(jié)果后返回該結(jié)果:

<T> T race(List<Callable<T>> tasks, Instant deadline) throws InterruptedException, ExecutionException, TimeoutException {try (var scope = new StructuredTaskScope.ShutdownOnSuccess<T>()) {for (var task : tasks) {scope.fork(task);}return scope.joinUntil(deadline).result();  // 如果沒有任何子任務(wù)成功完成,拋出異常}
}

一旦有一個子任務(wù)成功,此作用域?qū)⒆詣雨P(guān)閉,取消未完成的子任務(wù)。如果所有子任務(wù)失敗或給定的截止時間過去,任務(wù)將失敗。這種模式在需要從一組冗余服務(wù)中獲得任何一個服務(wù)的結(jié)果的服務(wù)器應(yīng)用程序中非常有用。

雖然這倆關(guān)閉策略已內(nèi)置,但開發(fā)人員可以創(chuàng)建自定義策略來抽象其他模式。

11.2 處理結(jié)果

在通過關(guān)閉策略(例如,通過 ShutdownOnFailure::throwIfFailed)進行集中異常處理和加入之后,作用域的所有者可以使用從調(diào)用 fork(...) 返回的 [Subtask] 對象處理子任務(wù)的結(jié)果,如果這些結(jié)果沒有被策略處理(例如,通過 ShutdownOnSuccess::result())。

通常情況下,作用域所有者將只調(diào)用 get() 方法的 Subtask 方法。所有其他的 Subtask 方法通常只會在自定義關(guān)閉策略的 handleComplete(...) 方法的實現(xiàn)中使用。實際上,我們建議將引用由 fork(...) 返回的 Subtask 的變量類型定義為 Supplier<String> 而不是 Subtask<String>(除非當然選擇使用 var)。如果關(guān)閉策略本身處理子任務(wù)結(jié)果(如在 ShutdownOnSuccess 的情況下),則應(yīng)完全避免使用由 fork(...) 返回的 Subtask 對象,并將 fork(...) 方法視為返回 void。子任務(wù)應(yīng)將其結(jié)果作為它們的返回結(jié)果,作為策略在處理中央異常后應(yīng)處理的任何信息。

如果作用域所有者處理子任務(wù)異常以生成組合結(jié)果,而不是使用關(guān)閉策略,則異??梢宰鳛閺淖尤蝿?wù)返回的值返回。例如,下面是一個在并行運行一組任務(wù)并返回包含每個任務(wù)各自成功或異常結(jié)果的完成 Future 列表的方法:

<T> List<Future<T>> executeAll(List<Callable<T>> tasks)throws InterruptedException {try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {List<? extends Supplier<Future<T>>> futures = tasks.stream().map(task -> asFuture(task)).map(scope::fork).toList();scope.join();return futures.stream().map(Supplier::get).toList();}
}static <T> Callable<Future<T>> asFuture(Callable<T> task) {return () -> {try {return CompletableFuture.completedFuture(task.call());} catch (Exception ex) {return CompletableFuture.failedFuture(ex);}};
}

11.3 自定義關(guān)閉策略

StructuredTaskScope 可以被擴展,并且可以覆蓋其受保護的 handleComplete(...) 方法,以實現(xiàn)除 ShutdownOnSuccessShutdownOnFailure 之外的其他策略。子類可以,例如:

  • 收集成功完成的子任務(wù)的結(jié)果,并忽略失敗的子任務(wù),
  • 在子任務(wù)失敗時收集異常,或者
  • 在出現(xiàn)某種條件時調(diào)用 shutdown() 方法以關(guān)閉并導(dǎo)致 join() 方法喚醒。

當一個子任務(wù)完成時,即使在調(diào)用 shutdown() 之后,它也會作為一個 Subtask 報告給 handleComplete(...) 方法:

public sealed interface Subtask<T> extends Supplier<T> {enum State { SUCCESS, FAILED, UNAVAILABLE }State state();Callable<? extends T> task();T get();Throwable exception();
}

當子任務(wù)在 SUCCESS 狀態(tài)或 FAILED 狀態(tài)下完成時,handleComplete(...) 方法將被調(diào)用。如果子任務(wù)處于 SUCCESS 狀態(tài),可以調(diào)用 get() 方法,如果子任務(wù)處于 FAILED 狀態(tài),則可以調(diào)用 exception() 方法。在其他情況下調(diào)用 get()exception() 會引發(fā) IllegalStateException 異常。UNAVAILABLE 狀態(tài)表示以下情況之一:(1)子任務(wù)被 fork 但尚未完成;(2)子任務(wù)在關(guān)閉后完成,或者(3)子任務(wù)在關(guān)閉后被 fork,因此尚未啟動。handleComplete(...) 方法永遠不會為處于 UNAVAILABLE 狀態(tài)的子任務(wù)調(diào)用。

子類通常會定義方法,以使結(jié)果、狀態(tài)或其他結(jié)果在 join() 方法返回后可以被后續(xù)代碼使用。收集結(jié)果并忽略失敗子任務(wù)的子類可以定義一個方法,該方法返回一系列結(jié)果。實施在子任務(wù)失敗時關(guān)閉的策略的子類可以定義一個方法,以獲取失敗的第一個子任務(wù)的異常。

擴展 StructuredTaskScope 的子類

該子類收集成功完成的子任務(wù)的結(jié)果。它定義了 results() 方法,供主任務(wù)用于檢索結(jié)果。

class MyScope<T> extends StructuredTaskScope<T> {private final Queue<T> results = new ConcurrentLinkedQueue<>();MyScope() { super(null, Thread.ofVirtual().factory()); }@Overrideprotected void handleComplete(Subtask<? extends T> subtask) {if (subtask.state() == Subtask.State.SUCCESS)results.add(subtask.get());}@Overridepublic MyScope<T> join() throws InterruptedException {super.join();return this;}// 返回從成功完成的子任務(wù)獲取的結(jié)果流public Stream<T> results() {super.ensureOwnerAndJoined();return results.stream();}}

可以像這樣使用這個自定義策略:

<T> List<T> allSuccessful(List<Callable<T>> tasks) throws InterruptedException {try (var scope = new MyScope<T>()) {for (var task : tasks) scope.fork(task);return scope.join().results().toList();}
}

扇入場景

上面的示例側(cè)重于扇出場景,這些場景管理多個并發(fā)的出站 I/O 操作。StructuredTaskScope 在扇入場景中也非常有用,這些場景管理多個并發(fā)的入站 I/O 操作。在這種情況下,我們通常會響應(yīng)傳入請求而動態(tài)地創(chuàng)建未知數(shù)量的子任務(wù)。

以下是一個服務(wù)器的示例,它在 StructuredTaskScope 中 fork 子任務(wù)以處理傳入連接:

void serve(ServerSocket serverSocket) throws IOException, InterruptedException {try (var scope = new StructuredTaskScope<Void>()) {try {while (true) {var socket = serverSocket.accept();scope.fork(() -> handle(socket));}} finally {// 如果發(fā)生錯誤或被中斷,我們停止接受連接scope.shutdown();  // 關(guān)閉所有活動連接scope.join();}}
}

從并發(fā)的角度來看,這種情況與請求的方向不同,但在持續(xù)時間和任務(wù)數(shù)量方面是不同的,因為子任務(wù)是根據(jù)外部事件動態(tài) fork 的。

所有處理連接的子任務(wù)都在作用域內(nèi)創(chuàng)建,因此在線程轉(zhuǎn)儲中很容易看到它們在一個作用域的所有者的子線程。作用域的所有者也很容易被當作一個單元關(guān)閉整個服務(wù)。

可觀察性

我們擴展了由 JEP 444 添加的新的 JSON 線程轉(zhuǎn)儲格式,以顯示 StructuredTaskScope 將線程分組成層次結(jié)構(gòu):

$ jcmd <pid> Thread.dump_to_file -format=json <file>

每個作用域的 JSON 對象包含一個線程數(shù)組,這些線程在作用域中被 fork,并附帶它們的堆棧跟蹤。作用域的所有者線程通常會在 join() 方法中被阻塞,等待子任務(wù)完成;線程轉(zhuǎn)儲可以通過顯示由結(jié)構(gòu)化并發(fā)所施加的樹狀層次結(jié)構(gòu),輕松地查看子任務(wù)的線程正在做什么。作用域的 JSON 對象還具有對其父級的引用,以便可以從轉(zhuǎn)儲中重新構(gòu)建程序的結(jié)構(gòu)。

com.sun.management.HotSpotDiagnosticsMXBean API 也可以用來生成這樣的線程轉(zhuǎn)儲,可以通過平臺的 MBeanServer 和本地或遠程的 JMX 工具直接或間接地使用它。

為什么 fork(...) 沒有返回 Future

StructuredTaskScope API 處于孵化狀態(tài)時,fork(...) 方法返回了 Future。這使得 fork(...) 更像是現(xiàn)有的 ExecutorService::submit 方法,從而提供了一種熟悉的感覺。然而,考慮到 StructuredTaskScope 的使用方式與 ExecutorService 完全不同 — 即以上文描述的結(jié)構(gòu)化方式使用 — 使用 Future 帶來的更多困惑遠遠超過了清晰性。

熟悉的 Future 的使用涉及調(diào)用其 get() 方法,它會阻塞直到結(jié)果可用。但在 StructuredTaskScope 的上下文中,以這種方式使用 Future 不僅是不鼓勵的,而且是不切實際的。Structured Future 對象應(yīng)該只有在 join() 返回之后查詢,此時它們已知已完成或取消,而應(yīng)使用的方法不是熟悉的 get(),而是新引入的 resultNow(),它永遠不會阻塞。

一些開發(fā)人員想知道為什么 fork(...) 沒有返回更強大的 CompletableFuture 對象。由于應(yīng)該只有在已知它們已完成時才使用 fork(...) 返回的 Future,因此 CompletableFuture 不會提供任何好處,因為其高級功能只對未完成的 futures 有用。此外,CompletableFuture 是為異步編程范例設(shè)計的,而 StructuredTaskScope 鼓勵阻塞范例。

總之,FutureCompletableFuture 的設(shè)計旨在提供在結(jié)構(gòu)化并發(fā)中是有害的自由度。

結(jié)構(gòu)化并發(fā)是將在不同線程中運行的多個任務(wù)視為單個工作單元,而 Future 主要在將多個任務(wù)視為單獨任務(wù)時有用。因此,作用域只應(yīng)該阻塞一次以等待其子任務(wù)的結(jié)果,然后集中處理異常。因此,在絕大多數(shù)情況下,從 fork(...) 返回的 Future 上唯一應(yīng)該調(diào)用的方法是 resultNow()。這是與 Future 的正常用法的顯著變化,而 Subtask::get() 方法的行為與在 API 孵化期間 Future::resultNow() 的行為完全相同。

替代方案

增強 ExecutorService 接口。我們對該接口進行了原型實現(xiàn),該接口始終強制執(zhí)行結(jié)構(gòu)化并限制了哪些線程可以提交任務(wù)。然而,我們發(fā)現(xiàn)這在 JDK 和生態(tài)系統(tǒng)中的大多數(shù)使用情況下都不是結(jié)構(gòu)化的。在完全不同的概念中重用相同的 API,會導(dǎo)致混淆。例如,將結(jié)構(gòu)化 ExecutorService 實例傳遞給現(xiàn)有接受此類型的方法,幾乎肯定會在大多數(shù)情況下拋出異常。

本文由博客一文多發(fā)平臺 OpenWrite 發(fā)布!

http://www.risenshineclean.com/news/49457.html

相關(guān)文章:

  • 北京天海網(wǎng)站建設(shè)公司黃頁網(wǎng)站推廣app咋做廣告
  • 做網(wǎng)站的公司找客戶衡陽百度推廣
  • 深圳H5網(wǎng)站開發(fā)南寧seo專員
  • 上海網(wǎng)站建設(shè)網(wǎng)頁制作邢臺備案查詢網(wǎng)
  • 贛州市開發(fā)小程序搜索優(yōu)化整站優(yōu)化
  • 網(wǎng)站備案信息查詢百度seo營銷推廣多少錢
  • 彩票投注網(wǎng)站怎樣做安徽網(wǎng)站建設(shè)優(yōu)化推廣
  • 做百度手機網(wǎng)站優(yōu)化成都seo網(wǎng)絡(luò)優(yōu)化公司
  • 阜陽網(wǎng)站建設(shè)工作室營銷圖片素材
  • 扁平風(fēng)網(wǎng)站哪家培訓(xùn)機構(gòu)學(xué)校好
  • 高校邦營銷型網(wǎng)站建設(shè)答案semifinal
  • 找人做網(wǎng)站注意哪些女教師遭網(wǎng)課入侵視頻大全播放
  • 微信網(wǎng)站開發(fā)簡單百度如何注冊公司網(wǎng)站
  • 權(quán)大師的網(wǎng)站是哪個公司做的指數(shù)基金是什么意思
  • 西安網(wǎng)站建設(shè)那家強深圳網(wǎng)絡(luò)營銷渠道
  • 鄭州網(wǎng)站制作公司名單外貿(mào)新手怎樣用谷歌找客戶
  • 企業(yè)網(wǎng)站備案在哪個部門seo教學(xué)
  • seo外貿(mào)網(wǎng)站建設(shè)百度下載安裝2021最新版
  • 上海做響應(yīng)式網(wǎng)站的公司江西seo
  • 那個網(wǎng)站做室內(nèi)比較好的網(wǎng)站流量排行
  • 拉新推廣變現(xiàn)app寧德seo推廣
  • 網(wǎng)站建站卡頓怎么辦流量查詢網(wǎng)站
  • 深圳網(wǎng)站建設(shè)易佰訊寧波seo排名外包
  • 烏魯木齊經(jīng)濟開發(fā)區(qū)建設(shè)局網(wǎng)站如何創(chuàng)建自己的網(wǎng)址
  • 有個藍色章魚做標志的網(wǎng)站seo和sem的聯(lián)系
  • 蘇寧易購網(wǎng)站建設(shè)的目的競價關(guān)鍵詞排名軟件
  • 大連網(wǎng)站制作師企業(yè)微信scrm
  • Wordpress搜索指定頁面內(nèi)容seo網(wǎng)絡(luò)優(yōu)化推廣
  • 廣東省東莞陽光網(wǎng)seo推廣優(yōu)化外包價格
  • 網(wǎng)站如何做搜索功能的網(wǎng)絡(luò)推廣的途徑有哪些