黃岡網(wǎng)站制作百度一下你就知道百度官網(wǎng)
目錄
- 一、前情提要
- 二、JDK8的CompletableFuture
- 1、ForkJoinPool
- 2、從ForkJoinPool和ThreadPoolExecutor探索CompletableFuture和Future的區(qū)別
- 三、通過CompletableFuture優(yōu)化 “通過Future獲取異步返回值”
- 1、通過Future獲取異步返回值關(guān)鍵代碼
- (1)將異步方法的返回值改為```Future<Integer>```,將返回值放到```new AsyncResult<>();```中;
- (2)通過```Future<Integer>.get()```獲取返回值:
- 2、通過CompletableFuture獲取異步返回值關(guān)鍵代碼
- (1)將異步方法的返回值改為 int
- (2)通過```completableFuture.get()```獲取返回值
- 3、效率對比
- (1)測試環(huán)境
- (2)統(tǒng)計四種情況下10萬數(shù)據(jù)入庫時間
- (3)設(shè)置核心線程數(shù)
- 自定義ForkJoinPool線程池
- 自定義線程池
- (4)統(tǒng)計分析
- 四、通過CompletableFuture.allOf解決阻塞主線程問題
- 1、語法
- 2、代碼實例
- 五、CompletableFuture中花俏的語法糖
- 1、runAsync
- 2、supplyAsync
- 六、順序執(zhí)行異步任務(wù)
- 1、thenRun
- 2、thenAccept
- 3、thenApply
- 七、CompletableFuture合并任務(wù)
- 八、CompletableFuture VS Future總結(jié)
- 在BUG中磨礪,在優(yōu)化中成長
大家好,我是哪吒。
一、前情提要
在上一篇文章中,使用雙異步后,如何保證數(shù)據(jù)一致性?,通過Future獲取異步返回值,輪詢判斷Future狀態(tài),如果執(zhí)行完畢或已取消,則通過get()獲取返回值,get()是阻塞的方法,因此會阻塞當前線程,如果通過new Runnable()執(zhí)行g(shù)et()方法,那么還是需要返回AsyncResult,然后再通過主線程去get()獲取異步線程返回結(jié)果。
寫法很繁瑣,還會阻塞主線程。
下面是FutureTask異步執(zhí)行流程圖:
二、JDK8的CompletableFuture
1、ForkJoinPool
Java8中引入了CompletableFuture,它實現(xiàn)了對Future的全面升級,可以通過回調(diào)的方式,獲取異步線程返回值。
CompletableFuture的異步執(zhí)行通過ForkJoinPool實現(xiàn), 它使用守護線程去執(zhí)行任務(wù)。
ForkJoinPool在于可以充分利用多核CPU的優(yōu)勢,把一個任務(wù)拆分成多個小任務(wù),把多個小任務(wù)放到多個CPU上并行執(zhí)行,當多個小任務(wù)執(zhí)行完畢后,再將其執(zhí)行結(jié)果合并起來。
Future的異步執(zhí)行是通過ThreadPoolExecutor實現(xiàn)的。
2、從ForkJoinPool和ThreadPoolExecutor探索CompletableFuture和Future的區(qū)別
- ForkJoinPool中的每個線程都會有一個隊列,而ThreadPoolExecutor只有一個隊列,并根據(jù)queue類型不同,細分出各種線程池;
- ForkJoinPool在使用過程中,會創(chuàng)建大量的子任務(wù),會進行大量的gc,但是ThreadPoolExecutor不需要,因為ThreadPoolExecutor是任務(wù)分配平均的;
- ThreadPoolExecutor中每個異步線程之間是相互獨立的,當執(zhí)行速度快的線程執(zhí)行完畢后,它就會一直處于空閑的狀態(tài),等待其它線程執(zhí)行完畢;
- ForkJoinPool中每個異步線程之間并不是絕對獨立的,在ForkJoinPool線程池中會維護一個隊列來存放需要執(zhí)行的任務(wù),當線程自身任務(wù)執(zhí)行完畢后,它會從其它線程中獲取未執(zhí)行的任務(wù)并幫助它執(zhí)行,直至所有線程執(zhí)行完畢。
因此,在多線程任務(wù)分配不均時,ForkJoinPool的執(zhí)行效率更高。但是,如果任務(wù)分配均勻,ThreadPoolExecutor的執(zhí)行效率更高,因為ForkJoinPool會創(chuàng)建大量子任務(wù),并對其進行大量的GC,比較耗時。
三、通過CompletableFuture優(yōu)化 “通過Future獲取異步返回值”
1、通過Future獲取異步返回值關(guān)鍵代碼
(1)將異步方法的返回值改為Future<Integer>
,將返回值放到new AsyncResult<>();
中;
@Async("async-executor")
public void readXls(String filePath, String filename) {try {// 此代碼為簡化關(guān)鍵性代碼List<Future<Integer>> futureList = new ArrayList<>();for (int time = 0; time < times; time++) {Future<Integer> sumFuture = readExcelDataAsyncFutureService.readXlsCacheAsync();futureList.add(sumFuture);}}catch (Exception e){logger.error("readXlsCacheAsync---插入數(shù)據(jù)異常:",e);}
}
@Async("async-executor")
public Future<Integer> readXlsCacheAsync() {try {// 此代碼為簡化關(guān)鍵性代碼return new AsyncResult<>(sum);}catch (Exception e){return new AsyncResult<>(0);}
}
(2)通過Future<Integer>.get()
獲取返回值:
public static boolean getFutureResult(List<Future<Integer>> futureList, int excelRow) {int[] futureSumArr = new int[futureList.size()];for (int i = 0;i<futureList.size();i++) {try {Future<Integer> future = futureList.get(i);while (true) {if (future.isDone() && !future.isCancelled()) {Integer futureSum = future.get();logger.info("獲取Future返回值成功"+"----Future:" + future+ ",Result:" + futureSum);futureSumArr[i] += futureSum;break;} else {logger.info("Future正在執(zhí)行---獲取Future返回值中---等待3秒");Thread.sleep(3000);}}} catch (Exception e) {logger.error("獲取Future返回值異常: ", e);}}boolean insertFlag = getInsertSum(futureSumArr, excelRow);logger.info("獲取所有異步線程Future的返回值成功,Excel插入結(jié)果="+insertFlag);return insertFlag;
}
2、通過CompletableFuture獲取異步返回值關(guān)鍵代碼
(1)將異步方法的返回值改為 int
@Async("async-executor")
public void readXls(String filePath, String filename) {List<CompletableFuture<Integer>> completableFutureList = new ArrayList<>();for (int time = 0; time < times; time++) {// 此代碼為簡化關(guān)鍵性代碼CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {return readExcelDbJdk8Service.readXlsCacheAsyncMybatis();}}).thenApply((result) -> {// 回調(diào)方法return thenApplyTest2(result);// supplyAsync返回值 * 1}).thenApply((result) -> {return thenApplyTest5(result);// thenApply返回值 * 1}).exceptionally((e) -> { // 如果執(zhí)行異常:logger.error("CompletableFuture.supplyAsync----異常:", e);return null;});completableFutureList.add(completableFuture);}
}
@Async("async-executor")
public int readXlsCacheAsync() {try {// 此代碼為簡化關(guān)鍵性代碼return sum;}catch (Exception e){return -1;}
}
(2)通過completableFuture.get()
獲取返回值
public static boolean getCompletableFutureResult(List<CompletableFuture<Integer>> list, int excelRow){logger.info("通過completableFuture.get()獲取每個異步線程的插入結(jié)果----開始");int sum = 0;for (int i = 0; i < list.size(); i++) {Integer result = list.get(i).get();sum += result;}boolean insertFlag = excelRow == sum;logger.info("全部執(zhí)行完畢,excelRow={},入庫={}, 數(shù)據(jù)是否一致={}",excelRow,sum,insertFlag);return insertFlag;
}
3、效率對比
(1)測試環(huán)境
- 12個邏輯處理器的電腦;
- Excel中包含10萬條數(shù)據(jù);
- Future的自定義線程池,核心線程數(shù)為24;
- ForkJoinPool的核心線程數(shù)為24;
(2)統(tǒng)計四種情況下10萬數(shù)據(jù)入庫時間
- 不獲取異步返回值
- 通過Future獲取異步返回值
- 通過CompletableFuture獲取異步返回值,默認ForkJoinPool線程池的核心線程數(shù)為本機邏輯處理器數(shù)量,測試電腦為12;
- 通過CompletableFuture獲取異步返回值,修改ForkJoinPool線程池的核心線程數(shù)為24。
備注:因為CompletableFuture不阻塞主線程,主線程執(zhí)行時間只有2秒,表格中統(tǒng)計的是異步線程全部執(zhí)行完成的時間。
(3)設(shè)置核心線程數(shù)
將核心線程數(shù)CorePoolSize設(shè)置成CPU的處理器數(shù)量,是不是效率最高的?
// 獲取CPU的處理器數(shù)量
int curSystemThreads = Runtime.getRuntime().availableProcessors() * 2;// 測試電腦是24
因為在接口被調(diào)用后,開啟異步線程,執(zhí)行入庫任務(wù),因為測試機最多同時開啟24線程處理任務(wù),故將10萬條數(shù)據(jù)拆分成等量的24份,也就是10萬/24 = 4166,那么我設(shè)置成4200,是不是效率最佳呢?
測試的過程中發(fā)現(xiàn),好像真的是這樣的。
自定義ForkJoinPool線程池
@Autowired
@Qualifier("asyncTaskExecutor")
private Executor asyncTaskExecutor;@Override
public void readXls(String filePath, String filename) {List<CompletableFuture<Integer>> completableFutureList = new ArrayList<>();for (int time = 0; time < times; time++) {CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {try {return readExcelDbJdk8Service.readXlsCacheAsync(sheet, row, start, finalEnd, insertBuilder);} catch (Exception e) {logger.error("CompletableFuture----readXlsCacheAsync---異常:", e);return -1;}};},asyncTaskExecutor);completableFutureList.add(completableFuture);}// 不會阻塞主線程CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[completableFutureList.size()])).whenComplete((r,e) -> {try {int insertSum = getCompletableFutureResult(completableFutureList, excelRow);} catch (Exception ex) {return;}});
}
自定義線程池
/*** 自定義異步線程池*/
@Bean("asyncTaskExecutor")
public AsyncTaskExecutor asyncTaskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();//設(shè)置線程名稱executor.setThreadNamePrefix("asyncTask-Executor");//設(shè)置最大線程數(shù)executor.setMaxPoolSize(200);//設(shè)置核心線程數(shù)executor.setCorePoolSize(24);//設(shè)置線程空閑時間,默認60executor.setKeepAliveSeconds(200);//設(shè)置隊列容量executor.setQueueCapacity(50);/*** 當線程池的任務(wù)緩存隊列已滿并且線程池中的線程數(shù)目達到maximumPoolSize,如果還有任務(wù)到來就會采取任務(wù)拒絕策略* 通常有以下四種策略:* ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。* ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。* ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過程)* ThreadPoolExecutor.CallerRunsPolicy:重試添加當前的任務(wù),自動重復(fù)調(diào)用 execute() 方法,直到成功*/executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.initialize();return executor;
}
(4)統(tǒng)計分析
效率對比:
③通過CompletableFuture獲取異步返回值(12線程) < ②通過Future獲取異步返回值 < ④通過CompletableFuture獲取異步返回值(24線程) < ①不獲取異步返回值
不獲取異步返回值時性能最優(yōu),這不廢話嘛~
核心線程數(shù)相同的情況下,CompletableFuture的入庫效率要優(yōu)于Future的入庫效率,10萬條數(shù)據(jù)大概要快4秒鐘,這還是相當驚人的,優(yōu)化的價值就在于此。
四、通過CompletableFuture.allOf解決阻塞主線程問題
1、語法
CompletableFuture.allOf(CompletableFuture的可變數(shù)組).whenComplete((r,e) -> {})
。
2、代碼實例
getCompletableFutureResult方法在 “3.2.2 通過completableFuture.get()
獲取返回值”。
// 不會阻塞主線程
CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[completableFutureList.size()])).whenComplete((r,e) -> {logger.info("全部執(zhí)行完畢,解決主線程阻塞問題~");try {int insertSum = getCompletableFutureResult(completableFutureList, excelRow);} catch (Exception ex) {logger.error("全部執(zhí)行完畢,解決主線程阻塞問題,異常:", ex);return;}
});// 會阻塞主線程
//getCompletableFutureResult(completableFutureList, excelRow);logger.info("CompletableFuture----會阻塞主線程嗎?");
五、CompletableFuture中花俏的語法糖
1、runAsync
runAsync 方法不支持返回值。
可以通過runAsync執(zhí)行沒有返回值的異步方法。
不會阻塞主線程。
// 分批異步讀取Excel內(nèi)容并入庫
int finalEnd = end;
CompletableFuture.runAsync(() -> readExcelDbJdk8Service.readXlsCacheAsyncMybatis();
2、supplyAsync
supplyAsync也可以異步處理任務(wù),傳入的對象實現(xiàn)了Supplier接口。將Supplier作為參數(shù)并返回CompletableFuture結(jié)果值,這意味著它不接受任何輸入?yún)?shù),而是將result作為輸出返回。
會阻塞主線程。
supplyAsync()方法關(guān)鍵代碼:
int finalEnd = end;
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {return readExcelDbJdk8Service.readXlsCacheAsyncMybatis();}
});
@Override
public int readXlsCacheAsyncMybatis() {// 不為人知的操作// 返回異步方法執(zhí)行結(jié)果即可return 100;
}
六、順序執(zhí)行異步任務(wù)
1、thenRun
thenRun()不接受參數(shù),也沒有返回值,與runAsync()配套使用,恰到好處。
// JDK8的CompletableFuture
CompletableFuture.runAsync(() -> readExcelDbJdk8Service.readXlsCacheAsyncMybatis())
.thenRun(() -> logger.info("CompletableFuture----.thenRun()方法測試"));
2、thenAccept
thenAccept()接受參數(shù),沒有返回值。
supplyAsync + thenAccept
- 異步線程順序執(zhí)行
- supplyAsync的異步返回值,可以作為thenAccept的參數(shù)使用
- 不會阻塞主線程
CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {return readExcelDbJdk8Service.readXlsCacheAsyncMybatis();}
}).thenAccept(x -> logger.info(".thenAccept()方法測試:" + x));
但是,此時無法通過completableFuture.get()獲取supplyAsync的返回值了。
3、thenApply
thenApply在thenAccept的基礎(chǔ)上,可以再次通過completableFuture.get()獲取返回值。
supplyAsync + thenApply,典型的鏈式編程。
- 異步線程內(nèi)方法順序執(zhí)行
- supplyAsync 的返回值,作為第 1 個thenApply的參數(shù),進行業(yè)務(wù)處理
- 第 1 個thenApply的返回值,作為第 2 個thenApply的參數(shù),進行業(yè)務(wù)處理
- 最后,通過future.get()方法獲取最終的返回值
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {return readExcelDbJdk8Service.readXlsCacheAsyncMybatis();}
}).thenApply((result) -> {return thenApplyTest2(result);// supplyAsync返回值 * 2
}).thenApply((result) -> {return thenApplyTest5(result);// thenApply返回值 * 5
});logger.info("readXlsCacheAsyncMybatis插入數(shù)據(jù) * 2 * 5 = " + completableFuture.get());
七、CompletableFuture合并任務(wù)
- thenCombine,多個異步任務(wù)并行處理,有返回值,最后合并結(jié)果返回新的CompletableFuture對象;
- thenAcceptBoth,多個異步任務(wù)并行處理,無返回值;
- acceptEither,多個異步任務(wù)并行處理,無返回值;
- applyToEither,,多個異步任務(wù)并行處理,有返回值;
CompletableFuture合并任務(wù)的代碼實例,這里就不多贅述了,一些語法糖而已,大家切記陷入低水平勤奮的怪圈。
八、CompletableFuture VS Future總結(jié)
本文中以下幾個方面對比了CompletableFuture和Future的差異:
- ForkJoinPool和ThreadPoolExecutor的實現(xiàn)原理,探索了CompletableFuture和Future的差異;
- 通過代碼實例的形式簡單介紹了CompletableFuture中花俏的語法糖;
- 通過CompletableFuture優(yōu)化了 “通過Future獲取異步返回值”;
- 通過CompletableFuture.allOf解決阻塞主線程問題。
Future提供了異步執(zhí)行的能力,但Future.get()會通過輪詢的方式獲取異步返回值,get()方法還會阻塞主線程。
輪詢的方式非常消耗CPU資源,阻塞的方式顯然與我們的異步初衷背道而馳。
JDK8提供的CompletableFuture實現(xiàn)了Future接口,添加了很多Future不具備的功能,比如鏈式編程、異常處理回調(diào)函數(shù)、獲取異步結(jié)果不阻塞不輪詢、合并異步任務(wù)等。
獲取異步線程結(jié)果后,我們可以通過添加事務(wù)的方式,實現(xiàn)Excel入庫操作的數(shù)據(jù)一致性。
異步多線程情況下如何實現(xiàn)事務(wù)?
有的小伙伴可能會說:
這還不簡單?添加@Transactional注解,如果發(fā)生異常或入庫數(shù)據(jù)量不符,直接回滾就可以了~
那么,真的是這樣嗎?我們下期見~
在BUG中磨礪,在優(yōu)化中成長
使用雙異步后,從 191s 優(yōu)化到 2s
增加索引 + 異步 + 不落地后,從 12h 優(yōu)化到 15 min
使用懶加載 + 零拷貝后,程序的秒開率提升至99.99%
性能優(yōu)化2.0,新增緩存后,程序的秒開率不升反降
🏆文章收錄于:100天精通Java從入門到就業(yè)
全網(wǎng)最細Java零基礎(chǔ)手把手入門教程,系列課程包括:Java基礎(chǔ)、Java8新特性、Java集合、高并發(fā)、性能優(yōu)化等,適合零基礎(chǔ)和進階提升的同學(xué)。
🏆哪吒多年工作總結(jié):Java學(xué)習(xí)路線總結(jié),搬磚工逆襲Java架構(gòu)師。
華為OD機試 2023B卷題庫瘋狂收錄中,刷題點這里
刷的越多,抽中的概率越大,每一題都有詳細的答題思路、詳細的代碼注釋、樣例測試,發(fā)現(xiàn)新題目,隨時更新,全天CSDN在線答疑。