阿里自助建站平臺(tái)/網(wǎng)站優(yōu)化公司收費(fèi)
CompletableFuture(可完成的Future)
一個(gè)可完成的Future,在我們調(diào)用他的get方法的時(shí)候,他會(huì)阻塞等待這個(gè)任務(wù)完成來獲取他的結(jié)果。
當(dāng)然也可以為這個(gè)任務(wù)注冊(cè)一些回調(diào),類似于完成時(shí),出現(xiàn)異常時(shí),或者執(zhí)行超時(shí)等額外處理。
使用
CompletableFuture.suppleAsync
異步執(zhí)行一個(gè)任務(wù)并返回結(jié)果
CompletableFuture.runAsync
異步執(zhí)行一個(gè)任務(wù)不返回結(jié)果
這兩方法都可以為我們快速的創(chuàng)建一個(gè)CompletableFuture對(duì)象
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + ":supplyAsync start");return "supplyAsync";
});CompletableFuture<String> completableFuture = CompletableFuture.runAsync(() -> {System.out.println(Thread.currentThread().getName() + ":runAsync start");return "runAsync";
});
這兩個(gè)方法都是可以有第二個(gè)參數(shù)的,也就是可以執(zhí)行線程池,這里默認(rèn)是
**Fork-Join-Pool**
(一個(gè)可以將一個(gè)大任務(wù)很自然的分解為多個(gè)子任務(wù)的線程池)。
回調(diào)有哪些
方法 | 參數(shù) | 描述 |
---|---|---|
thenApply | T -> U | 對(duì)結(jié)果進(jìn)行處理,并返回一個(gè)新的結(jié)果 |
thenAccpet | T -> void | 對(duì)結(jié)果進(jìn)行處理,返回結(jié)果為Void |
thenCompose | T -> CompletableFuture | 對(duì)結(jié)果調(diào)用這個(gè)函數(shù)來進(jìn)行處理,并返回一個(gè)新的結(jié)果 |
handle | (T,Throwable) -> U | 與thenApply類似,但是他可以處理異常,根據(jù)異常對(duì)象是否為null,可以判斷是否出現(xiàn)異常。 不報(bào)錯(cuò)也會(huì)執(zhí)行 |
whenCompletable | (T,Throwable) -> void | 類似handle 但是不返回結(jié)果 不報(bào)錯(cuò)也會(huì)執(zhí)行 |
exceptionally | Throwable -> U | 出現(xiàn)異常時(shí),返回一個(gè)結(jié)果 報(bào)錯(cuò)時(shí)才會(huì)執(zhí)行。 |
completableOnTimeout | T, long, TimeUnit | 如果超時(shí)返回一個(gè)指定的結(jié)果。 超時(shí)后的鏈?zhǔn)讲僮鞫疾粓?zhí)行 |
orTimeout | long, TimeUnit | 超時(shí)返回一個(gè)異常 TimeOutException 超時(shí)后的鏈?zhǔn)讲僮鞫疾粓?zhí)行 |
thenRun | Runable | 執(zhí)行Runable,返回void,對(duì)于不需要任務(wù)的返回結(jié)果 |
示例
private static final ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(10, 10, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<>(50));public static void main(String[] args) {CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + ":supplyAsync");return "supplyAsync";});completableFuture.thenApply((result) -> {System.out.println(Thread.currentThread().getName() + ":thenApply1 ");return result + "\nthenApply1";}).thenAccept((result) -> {System.out.println(Thread.currentThread().getName() + ":thenAccept ");}).thenCompose(result ->CompletableFuture.supplyAsync(() -> {try {System.out.println(Thread.currentThread().getName() + ":thenCompose ");TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {throw new RuntimeException(e);}return result + "\nthenCompose";}))// thenCompose 超時(shí)了,orTimeout 之前的的操作都將被忽略 之后的還會(huì)執(zhí)行.thenAccept((result) -> {System.out.println(Thread.currentThread().getName() + ":ignore ");})// .completeOnTimeout("completeOnTimeout", 1, TimeUnit.SECONDS)// 這里會(huì)拋出一個(gè)TimeOutException.orTimeout(1, TimeUnit.SECONDS)// throwable則可以獲取到 TimeOutException .handleAsync((result, throwable) -> {System.out.println(Thread.currentThread().getName() + ":handleAsync ");if (throwable == null) {return result;}throwable.printStackTrace();return result + "\nhandleAsync";}, THREAD_POOL_EXECUTOR)// 因?yàn)門imeOutException 被handleAsync處理了,所以這里也沒有異常了throwable為null.whenComplete((result, throwable) -> {System.out.println(Thread.currentThread().getName() + ":whenComplete ");if (throwable == null) {return;}throwable.printStackTrace();}).thenRun(() -> {System.out.println(Thread.currentThread().getName() + ":thenRun ");});try {Thread.sleep(5000); // 添加短暫的延遲 因?yàn)槭钱惒饺蝿?wù)這里不等待一下的話main線程就終止了} catch (InterruptedException e) {e.printStackTrace();}THREAD_POOL_EXECUTOR.shutdownNow();
}#### 執(zhí)行結(jié)果
ForkJoinPool.commonPool-worker-3:supplyAsync
ForkJoinPool.commonPool-worker-3:thenApply1
ForkJoinPool.commonPool-worker-3:thenAccept
ForkJoinPool.commonPool-worker-3:thenCompose
pool-1-thread-1:handleAsync
pool-1-thread-1:whenComplete
pool-1-thread-1:thenRun
java.util.concurrent.TimeoutException
注意回調(diào)時(shí)機(jī)即可。
Async回調(diào)
我們這里使用一個(gè)叫
handleAsync
的方法與普通的handle相比,他是執(zhí)行的線程發(fā)送了變化。
使用Async在大多數(shù)情況下都會(huì)是在一個(gè)新的線程下去幫我們執(zhí)行這個(gè)回調(diào),而普通的則是在原有由原有執(zhí)行任務(wù)的線程去執(zhí)行這個(gè)回調(diào)。
這里的大多數(shù)情況是指我們?cè)谑褂米远x線程池的時(shí)候。而我們的Fork-Join-Pool可能會(huì)為一些短暫的任務(wù)重用相同的線程,以減少線程的創(chuàng)建和銷毀開銷。
get、join
當(dāng)我們的CompletableFuture提供了返回值的時(shí)候,我們可以通過get或者join方法來阻塞的得到這個(gè)結(jié)果
與之不同是get他可能會(huì)拋出異常,而join不會(huì)。通常我們使用join
組合CompletableFuture
可以根據(jù)某種條件去執(zhí)行兩個(gè)或者多個(gè)CompletableFuture
因?yàn)榻M合太多,這里就簡單描述下我自己比較常用的
方法 | 參數(shù) | 描述 |
---|---|---|
static allOf | CompletableFuture<?>… | 所以任務(wù)都執(zhí)行完成后完成,返回結(jié)果為void |
static anyOf | CompletableFuture<?>… | 任意任務(wù)都執(zhí)行完成后完成,返回結(jié)果為void |
示例
public static void main(String[] args) {// 1.兩個(gè)任務(wù)都執(zhí)行完成后才完成CompletableFuture.allOf(CompletableFuture.runAsync(()->{System.out.println("supplyAsync1");}),CompletableFuture.runAsync(()->{// 異步任務(wù)等待1秒try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("supplyAsync2");}));// 任意一個(gè)完成則完成CompletableFuture.anyOf(CompletableFuture.runAsync(()->{System.out.println("supplyAsync3");}),CompletableFuture.runAsync(()->{// 異步任務(wù)等待2秒try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("supplyAsync4");}));// 主線程只等待一秒 try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}
}### 結(jié)果
supplyAsync1
supplyAsync3
supplyAsync2
allOf 因?yàn)樾枰獌蓚€(gè)都完成所以等待一秒后完成輸出supplyAsync1、supplyAsync2
antOf 任意一個(gè)完成則算結(jié)束。因?yàn)榈诙€(gè)等待兩秒,主線程已經(jīng)結(jié)束了,main已經(jīng)退出了,所以只輸出supplyAsync3