網(wǎng)站建設(shè)百度推廣防疫管控優(yōu)化措施
文章目錄
- 6 任務(wù)執(zhí)行
- 6.1 在線程中執(zhí)行任務(wù)
- 6.1.1 串行地執(zhí)行任務(wù)
- 6.1.2 顯式地為任務(wù)創(chuàng)建線程
- 6.1.3 無(wú)限制創(chuàng)建線程的不足
- 6.2 Executor框架
- 6.2.1 示例:基于Executor的Web服務(wù)器
- 6.2.2 執(zhí)行策略
- 6.2.3 線程池
- 6.2.4 Executor的生命周期
- 6.2.5 延遲任務(wù)與周期任務(wù)
- 6.3 找出可利用的并行性
- 6.3.1 示例:串行的頁(yè)面渲染器
- 6.3.2 攜帶結(jié)果的任務(wù)Callable和Future
- 6.3.3 示例:使用Future實(shí)現(xiàn)頁(yè)面渲染器
- 6.3.4 在異構(gòu)任務(wù)并行化中存在的局限
- 6.3.5 CompletionService:Executor與BlockingQueue
- 6.3.6 使用CompletionService實(shí)現(xiàn)頁(yè)面渲染器
- 6.3.7 為任務(wù)設(shè)置時(shí)間
- 6.3.8 示例:旅行預(yù)訂門(mén)戶網(wǎng)站
6 任務(wù)執(zhí)行
6.1 在線程中執(zhí)行任務(wù)
應(yīng)選擇清晰的任務(wù)邊界以及明確的任務(wù)執(zhí)行策略。
一種自然的任務(wù)邊界選擇方式:以獨(dú)立的客戶請(qǐng)求為邊界。
6.1.1 串行地執(zhí)行任務(wù)
6.1.2 顯式地為任務(wù)創(chuàng)建線程
程序6-1-2-1 在Web服務(wù)器中為每一個(gè)請(qǐng)求啟動(dòng)一個(gè)新的線程(不要這么做)
class ThreadPerTaskWebServer {public static void main(String[] args) throws IOException {ServerSocket socket = new ServerSocket(80);while (true) {final Socket connection = socket.accept();Runnable task = new Runnable() {@Overridepublic void run() {handleRequest(connection);}};new Thread(task).start();}}
}
串行情況下可以提升性能。只要請(qǐng)求的到達(dá)速率不超出服務(wù)器的請(qǐng)求處理能力。
6.1.3 無(wú)限制創(chuàng)建線程的不足
線程生命周期的開(kāi)銷非常高。 線程創(chuàng)建需要時(shí)間,延遲處理的請(qǐng)求,并且需要JVM和操作系統(tǒng)提供一些輔助操作。
資源消耗。 活躍線程會(huì)消耗系統(tǒng)資源,尤其是內(nèi)存。大量空閑的線程會(huì)占用許多內(nèi)存,給垃圾收集器帶來(lái)壓力,而且大量線程在競(jìng)爭(zhēng)CPU資源時(shí)還會(huì)產(chǎn)生其他的性能開(kāi)銷。如果有足夠多的線程使所有的CPU保持忙碌狀態(tài),那么再創(chuàng)建更多的線程反而會(huì)降低性能。
穩(wěn)定性。 在一定范圍內(nèi),增加線程可以提高系統(tǒng)的吞吐率,但是如果超出了這個(gè)范圍,再創(chuàng)建更多的線程會(huì)降低程序的執(zhí)行速度。如果過(guò)多地創(chuàng)建一個(gè)線程,那么整個(gè)應(yīng)用程序?qū)⒈罎ⅰ?/p>
6.2 Executor框架
Executor基于生產(chǎn)者-消費(fèi)者模式,提交任務(wù)的操作相當(dāng)于生產(chǎn)者(生成待完成的工作單元),執(zhí)行任務(wù)的線程相當(dāng)于消費(fèi)者(執(zhí)行完這些工作單元)。
6.2.1 示例:基于Executor的Web服務(wù)器
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;class TaskExecutionWebServer {private static final int NTHREADS = 100;private static final Executor exec = Executors.newFixedThreadPool(NTHREADS);public static void main(String[] args) throws IOException {ServerSocket socket = new ServerSocket(80);while (true) {final Socket connection = socket.accept();Runnable task = new Runnable() {@Overridepublic void run() {handleRequest(connection);}};exec.execute(task);}}
}
6.2.2 執(zhí)行策略
使用線程池來(lái)取代new Thread方式。
6.2.3 線程池
- newFixedThreadPool。固定長(zhǎng)度的線程池。
- newCachedThreadPool。如果線程池的當(dāng)前規(guī)模超過(guò)了處理需求時(shí),那么將回收空閑的線程;而當(dāng)需求增加時(shí),則可以添加新的線程,線程池的規(guī)模不存在任何限制。
- newSingleThreadExecutor。創(chuàng)建單個(gè)工作線程來(lái)執(zhí)行任務(wù),如果這個(gè)線程異常結(jié)束,會(huì)創(chuàng)建另一個(gè)線程來(lái)替代。
- newScheduledThreadPool。固定長(zhǎng)度的線程池。以延遲或者定時(shí)的方式來(lái)執(zhí)行任務(wù)。
6.2.4 Executor的生命周期
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;public class LifeCycleWebServer {private final ExecutorService exec = ...;public void start() throws IOException {ServerSocket socket = new ServerSocket(80);while (!exec.isShutdown()) {try {final Socket conn = socket.accept();exec.execute(new Runnable() {@Overridepublic void run() {handleRequest(conn);}});} catch (RejectedExecutionException e) {if (!exec.isShutdown()) {log("task submission rejected", e);}}}}public void stop() {exec.shutdown();}void handleRequest(Socket connection) {Request req = readRequest(connection);if (isShutdownRequest(req)) {stop();} else {dispatchRequest(req);}}
}
6.2.5 延遲任務(wù)與周期任務(wù)
推薦使用ScheduleThreadPoolExecutor。
Timer存在的問(wèn)題:
(1)Timer支持基于絕對(duì)時(shí)間的調(diào)度機(jī)制,因此任務(wù)的執(zhí)行對(duì)系統(tǒng)時(shí)鐘變化很敏感。ScheduleThreadPoolExecutor只支持基于相對(duì)時(shí)間的變化。
(2)Timer執(zhí)行所有的定時(shí)任務(wù)時(shí)只會(huì)創(chuàng)建一個(gè)線程。如果某個(gè)任務(wù)的執(zhí)行時(shí)間過(guò)長(zhǎng),那么將破壞其他TimerTask的定時(shí)精確性。例如某個(gè)周期TimerTask需要每10ms執(zhí)行一次,而另一個(gè)需要40ms,那么這個(gè)周期任務(wù)或者在40ms之后快速連續(xù)調(diào)用四次,或者徹底丟失4次調(diào)用。
(3)Timer線程并不捕獲異常。如果TimerTask拋出異常終止了定時(shí)任務(wù),timer不會(huì)回復(fù)線程執(zhí)行,而是會(huì)錯(cuò)誤的認(rèn)為整個(gè)Timer都被取消了。 因此,已被調(diào)度但尚未執(zhí)行的TimerTask將不會(huì)再執(zhí)行,新的任務(wù)也不會(huì)被調(diào)度。
程序清單 6-2-5-1 錯(cuò)誤的Timer行為
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;public class OutOfTime {public static void main(String[] args) throws Exception {long startTime = System.currentTimeMillis();Timer timer = new Timer();timer.schedule(new ThrowTask(), 1);TimeUnit.SECONDS.sleep(1);long endTime = System.currentTimeMillis();System.out.println(endTime - startTime);timer.schedule(new ThrowTask(), 1);TimeUnit.SECONDS.sleep(5);}static class ThrowTask extends TimerTask {@Overridepublic void run() {throw new RuntimeException();}}
}Exception in thread "Timer-0" java.lang.RuntimeExceptionat executor.OutOfTime$ThrowTask.run(OutOfTime.java:22)at java.base/java.util.TimerThread.mainLoop(Timer.java:566)at java.base/java.util.TimerThread.run(Timer.java:516)
1002
Exception in thread "main" java.lang.IllegalStateException: Timer already cancelled.at java.base/java.util.Timer.sched(Timer.java:409)at java.base/java.util.Timer.schedule(Timer.java:205)at executor.OutOfTime.main(OutOfTime.java:15)
程序一秒鐘就結(jié)束了,并拋出了異常。
在Java5.0或者更高的JDK中,將很少使用Timer。
6.3 找出可利用的并行性
6.3.1 示例:串行的頁(yè)面渲染器
import java.util.ArrayList;
import java.util.List;public class SingleThreadRender {void renderPage(CharSequence source) {renderText(source);List<ImageData> imageData = new ArrayList<>();for (ImageInfo imageInfo : scanForImageInfo(source)) {imageData.add(imageInfo.downloadImage());}for (ImageData data : imageData) {renderImage(data);}}
}
6.3.2 攜帶結(jié)果的任務(wù)Callable和Future
6.3.3 示例:使用Future實(shí)現(xiàn)頁(yè)面渲染器
程序清單6-3-3-1 使用Future等待圖像下載
import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;public class FutureRender {private final ExecutorService executor = ...;void renderPage(CharSequence source) {final List<ImageInfo> imageInfos = scanForImageInfo(source);Callable<List<ImageData>> task = new Callable<List<ImageData>>() {@Overridepublic List<ImageData> call() throws Exception {List<ImageData> result = new ArrayList<>();for (ImageInfo imageInfo : imageInfos) {result.add(imageInfo.downloadImage());}return result;}};Future<List<ImageData>> future = executor.submit(task);renderText(source);try {List<ImageData> imageDataList = future.get();for (ImageData imageData : imageDataList) {renderImage(imageData);}} catch (InterruptedException e) {// 重新設(shè)置線程的中斷狀態(tài)Thread.currentThread().interrupt();// 由于不需要結(jié)果,因此取消任務(wù)future.cancel(true);} catch (ExecutionException e) {throw launcherThrowable(e.getCause());}}}
6.3.4 在異構(gòu)任務(wù)并行化中存在的局限
只有大量相互獨(dú)立且同構(gòu)的任務(wù)可以并發(fā)進(jìn)行處理時(shí),才能體現(xiàn)出將程序的工作負(fù)載分配到多個(gè)任務(wù)中帶來(lái)的真正性能提升。
6.3.5 CompletionService:Executor與BlockingQueue
6.3.6 使用CompletionService實(shí)現(xiàn)頁(yè)面渲染器
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;public class Render {private final ExecutorService excutor;Render(ExecutorService excutor) {this.excutor = excutor;}void renderPage(CharSequence source) {List<ImageInfo> info = scanForImageInfo(source);CompletionService<ImageData> completionService = new ExecutorCompletionService<>(excutor);for (final ImageInfo imageInfo: info) {completionService.submit(new Callable<ImageData>() {@Overridepublic ImageData call() throws Exception {return imageInfo.downloadImage();}});}renderText(source);try {for (int t = 0, n = info.size(); t < n; t++) {Future<ImageData> f = completionService.take();ImageData imageData = f.get();renderImage(imageData);}} catch (InterruptedException e) {Thread.currentThread().interrupt();} catch (ExecutionException e) {throw launcherThrowable(e.getCause());}}
}
6.3.7 為任務(wù)設(shè)置時(shí)間
Page renderPageWithAd() throws InterruptedException {long endNanos = System.nanoTime() + END_BUDGET;Future<Ad> f = excutor.submit(new FetchAdTask());// 在等待廣告的同時(shí)顯示頁(yè)面。Page page = renderPageBody();Ad ad;try {long timeLeft = endNanos - System.nanoTime();ad = f.get(timeLeft, TimeUnit.NANOSECONDS);} catch (ExecutionException e) {ad = DEFAULT_AD;} catch (TimeoutException e) {ad = DEFAULT_AD;f.cancel(true);}page.setAd(ad);return page;}
6.3.8 示例:旅行預(yù)訂門(mén)戶網(wǎng)站
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;private class QuoteTask implements Callable<TravelQuote> {private final TravelCompany company;private final TravelInfo travelInfo;@Overridepublic TravelQuote call() throws Exception {return company.solicitQuote(travelInfo);}
}public List<TravelQuote> getRankedTravelQuotes(TravelInfo travelInfo, Set<TravelCompany> companies,Comparator<TravelQuote> ranking, long time, TimeUnit unit) throws InterruptedException {List<QuoteTask> tasks = new ArrayList<>();for (TravelCompany company : companies) {tasks.add(new QuoteTask(company, travelInfo));}List<Future<TravelQuote>> futures = exec.invokeAll(tasks, time, unit);List<TravelQuote> quotes = new ArrayList<>(futures.size());Iterator<QuoteTask> iterator = tasks.iterator();for (Future<TravelQuote> f : futures) {QuoteTask task = iterator.next();try {quotes.add(f.get());} catch (ExecutionException e) {quotes.add(task.getFailureQuote(e.getCause()));} catch (CancellationException e) {quotes.add(task.getTimeoutQuote(e.getCause()));}}Collections.sort(quotes, ranking);return quotes;
}