完整版網(wǎng)站推廣方案網(wǎng)站排名優(yōu)化方案
背景
我們應(yīng)對并發(fā)場景時一般會采用下面方式去預(yù)估線程池的線程數(shù)量,比如QPS需求是1000,平均每個任務(wù)需要執(zhí)行的時間是t秒,那么我們需要的線程數(shù)是t * 1000。
但是在一些情況下,這個t是不好估算的,即便是估算出來了,在實際的線程環(huán)境上也需要進行驗證和微調(diào)。比如在本文所闡述分頁查詢的數(shù)據(jù)項組合場景中。
1、數(shù)據(jù)組合依賴不同的上游接接口, 它們的響應(yīng)時間參差不齊,甚至差距還非常大。有些接口支持批量查詢而另一些則不支持批量查詢。有些接口因為性能問題還需要考慮降級和平滑方案。
2、為了提升用戶體驗,這里的查詢設(shè)計了動態(tài)列,因此每一次訪問所需要組合的數(shù)據(jù)項和數(shù)量也是不同的。
因此這里如果需要估算出一個合理的t是不太現(xiàn)實的。
方案
一種可動態(tài)調(diào)節(jié)的策略,根據(jù)監(jiān)控的反饋對線程池進行微調(diào)。整體設(shè)計分為裝配邏輯和線程池封裝設(shè)計。
1、裝配邏輯
查詢結(jié)果,拆分分片(水平拆分),并行裝配(垂直拆分),獲得裝配項列表(動態(tài)列), 并行裝配每一項。
2、線程池封裝
可調(diào)節(jié)的核心線程數(shù)、最大線程數(shù)、線程保持時間,隊列大小,提交任務(wù)重試等待時間,提交任務(wù)重試次數(shù)。 固定異常拒絕策略。
調(diào)節(jié)參數(shù):
字段 | 名稱 | 說明 |
---|---|---|
corePoolSize | 核心線程數(shù) | 參考線程池定義 |
maximumPoolSize | 最大線程數(shù) | 參考線程池定義 |
keepAliveTime | 線程存活時間 | 參考線程池定義 |
queueSize | 隊列長度 | 參考線程池定義 |
resubmitSleepMillis | 提交任務(wù)重試等待時間 | 添加任務(wù)被拒絕后重試時的等待時間 |
resubmitTimes | 提交任務(wù)重試次數(shù) | 添加任務(wù)被拒絕后重試添加的最大次數(shù) |
@Dataprivate static class PoolPolicy {/** 核心線程數(shù) */private Integer corePoolSize;/** 最大線程數(shù) */private Integer maximumPoolSize;/** 線程存活時間 */private Integer keepAliveTime;/** 隊列容量 */private Integer queueSize;/** 重試等待時間 */private Long resubmitSleepMillis;/** 重試次數(shù) */private Integer resubmitTimes;}
創(chuàng)建線程池:
線程池的創(chuàng)建考慮了動態(tài)的需求,滿足根據(jù)壓測結(jié)果進行微調(diào)的要求。首先緩存舊的線程池后再創(chuàng)建新的線程,當(dāng)新的線程池創(chuàng)建成功后再去關(guān)閉舊的線程池。保證在這個替換過程中不影響正在執(zhí)行的業(yè)務(wù)。線程池使用了中斷策略,用戶可以及時感知到系統(tǒng)繁忙并保證了系統(tǒng)資源占用的安全。
public void reloadThreadPool(PoolPolicy poolPolicy) {if (poolPolicy == null) {throw new RuntimeException("The thread pool policy cannot be empty.");}if (poolPolicy.getCorePoolSize() == null) {poolPolicy.setCorePoolSize(0);}if (poolPolicy.getMaximumPoolSize() == null) {poolPolicy.setMaximumPoolSize(Runtime.getRuntime().availableProcessors() + 1);}if (poolPolicy.getKeepAliveTime() == null) {poolPolicy.setKeepAliveTime(60);}if (poolPolicy.getQueueSize() == null) {poolPolicy.setQueueSize(Runtime.getRuntime().availableProcessors() + 1);}if (poolPolicy.getResubmitSleepMillis() == null) {poolPolicy.setResubmitSleepMillis(200L);}if (poolPolicy.getResubmitTimes() == null) {poolPolicy.setResubmitTimes(5);}// - 線程池策略沒有變化直接返回已有線程池。ExecutorService original = this.executorService;this.executorService = new ThreadPoolExecutor(poolPolicy.getCorePoolSize(),poolPolicy.getMaximumPoolSize(),poolPolicy.getKeepAliveTime(), TimeUnit.SECONDS,new ArrayBlockingQueue<>(poolPolicy.getQueueSize()),new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "-%d").setDaemon(true).build(),new ThreadPoolExecutor.AbortPolicy());this.poolPolicy = poolPolicy;if (original != null) {original.shutdownNow();}
}
任務(wù)提交:
線程池封裝對象中使用的線程池拒絕策略是AbortPolicy,因此在線程數(shù)和阻塞隊列到達上限后會觸發(fā)異常。另外在這里為了保證提交的成功率利用重試策略實現(xiàn)了一定程度的延遲處理,具體場景中可以結(jié)合業(yè)務(wù)特點進行適當(dāng)?shù)恼{(diào)節(jié)和配置。
public <T> Future<T> submit(Callable<T> task) {RejectedExecutionException exception = null;Future<T> future = null;for (int i = 0; i < this.poolPolicy.getResubmitTimes(); i++) {try {// - 添加任務(wù)future = this.executorService.submit(task);exception = null;break;} catch (RejectedExecutionException e) {exception = e;this.theadSleep(this.poolPolicy.getResubmitSleepMillis());}}if (exception != null) {throw exception;}return future;
}
監(jiān)控:
1、submit提交的監(jiān)控
見代碼中的「監(jiān)控點①」,在submit方法中添加監(jiān)控點,監(jiān)控key的需要添線程池封裝對象的線程名稱前綴,用于區(qū)分具體的線程池對象。
「監(jiān)控點①」用于監(jiān)控添加任務(wù)的動作是否正常,以便對線程池對象及策略參數(shù)進行微調(diào)。
public <T> Future<T> submit(Callable<T> task) {// - 監(jiān)控點①CallerInfo callerInfo = Profiler.registerInfo(UmpConstant.THREAD_POOL_WAP + threadNamePrefix,UmpConstant.APP_NAME,UmpConstant.UMP_DISABLE_HEART,UmpConstant.UMP_ENABLE_TP);RejectedExecutionException exception = null;Future<T> future = null;for (int i = 0; i < this.poolPolicy.getResubmitTimes(); i++) {try {// - 添加任務(wù)future = this.executorService.submit(task);exception = null;break;} catch (RejectedExecutionException e) {exception = e;this.theadSleep(this.poolPolicy.getResubmitSleepMillis());}}if (exception != null) {// - 監(jiān)控點①Profiler.functionError(callerInfo);throw exception;}// - 監(jiān)控點①Profiler.registerInfoEnd(callerInfo);return future;
}
2、線程池并行任務(wù)
見代碼的「監(jiān)控點②」,分別在添加任務(wù)和任務(wù)完成后。
「監(jiān)控點②」實時統(tǒng)計在線程中執(zhí)行的總?cè)蝿?wù)數(shù)量,用于評估線程池的任務(wù)的數(shù)量的滿載水平。
/** 任務(wù)并行數(shù)量統(tǒng)計 */
private AtomicInteger parallelTaskCount = new AtomicInteger(0);public <T> Future<T> submit(Callable<T> task) {RejectedExecutionException exception = null;Future<T> future = null;for (int i = 0; i < this.poolPolicy.getResubmitTimes(); i++) {try {// - 添加任務(wù)future = this.executorService.submit(()-> {T rst = task.call();// - 監(jiān)控點②log.info("{} - Parallel task count {}", this.threadNamePrefix, this.parallelTaskCount.decrementAndGet());return rst;});// - 監(jiān)控點②log.info("{} + Parallel task count {}", this.threadNamePrefix, this.parallelTaskCount.incrementAndGet());exception = null;break;} catch (RejectedExecutionException e) {exception = e;this.theadSleep(this.poolPolicy.getResubmitSleepMillis());}}if (exception != null) {throw exception;}return future;
}
3、調(diào)節(jié)
線程池封裝對象策略的調(diào)節(jié)時機
1)上線前基于流量預(yù)估的壓測階段;
2)上線后跟進監(jiān)控數(shù)據(jù)和線程池中任務(wù)的滿載水平進行人工微調(diào),也可以通過JOB在指定的時間自動調(diào)整;
3)大促前依據(jù)往期大促峰值來調(diào)高相關(guān)參數(shù)。
線程池封裝對象策略的調(diào)節(jié)經(jīng)驗
1)訪問時長要求較低時,我們可以考慮調(diào)小線程數(shù)和阻塞隊列,適當(dāng)調(diào)大提交任務(wù)重試等待時間和次數(shù),以便降低資源占用。
2)訪問時長要求較高時,就需要調(diào)大線程數(shù)并保證相對較小的阻塞隊列,調(diào)小提交任務(wù)的重試等待時間和次數(shù)甚至分別調(diào)成0和1(即關(guān)閉重試提交邏輯)。
作者:京東零售 王文明
來源:京東云開發(fā)者社區(qū) 轉(zhuǎn)載請注明來源