做冰淇淋生意網(wǎng)站企業(yè)查詢系統(tǒng)
上節(jié)回顧
在上一節(jié)當(dāng)中,已經(jīng)實(shí)現(xiàn)了一個(gè)線程池,在本節(jié)當(dāng)中,我們需要添加拒絕策略。這里使用到了策略模式的設(shè)計(jì)模式,因?yàn)榫芙^策略是多種的,我們需要將這個(gè)權(quán)利下放給調(diào)用者(由調(diào)用者來指定我要采取哪種策略),而線程池只需要調(diào)用拒絕的接口即可。
步驟
(1)定義拒絕策略接口
(2)在線程池中加入拒絕策略參數(shù)
(3)自行調(diào)用測(cè)試
1.定義接口類
@FunctionalInterface
interface RejectPolicy<T>{//注意傳遞參數(shù)void reject(BlockQueue<T> queue,Runnable task);
}
2.線程池中添加接口以及調(diào)用方法
@Slf4j
class ThreadPool {//任務(wù)隊(duì)列private BlockQueue<Runnable> taskQueue;//線程集合 我們需要對(duì)線程做一個(gè)包裝private HashSet<Worker> workers = new HashSet<>();//核心線程數(shù)量private long coreSize;//超時(shí)時(shí)間private long timeout;//時(shí)間單位private TimeUnit timeUnit;//自定義拒絕策略private RejectPolicy<Runnable> rejectPolicy;public ThreadPool(int queueCapacity,long coreSize,long timeout,TimeUnit timeUnit){this.taskQueue = new BlockQueue<>(queueCapacity);this.coreSize = coreSize;this.timeout = timeout;this.timeUnit = timeUnit;this.rejectPolicy = (queue, task) -> {throw new RuntimeException();};}public ThreadPool(int queueCapacity,long coreSize,long timeout,TimeUnit timeUnit,RejectPolicy<Runnable> rejectPolicy){taskQueue = new BlockQueue<>(queueCapacity);this.coreSize = coreSize;this.timeout = timeout;this.timeUnit = timeUnit;this.rejectPolicy = rejectPolicy;}//執(zhí)行任務(wù)public void execute(Runnable task){//當(dāng)任務(wù)數(shù)量尚未超過coreSizesynchronized (workers){if (workers.size() < coreSize){log.info("創(chuàng)建工作線程{}",task);Worker worker = new Worker(task);workers.add(worker);worker.start();}else{log.info("加入到任務(wù)隊(duì)列{}",task);//有可能會(huì)阻塞在這里 進(jìn)而將主線程阻塞掉//taskQueue.put(task);//這里會(huì)有很多種策略自定義策略//策略模式:操作抽象成接口實(shí)現(xiàn)代碼是傳過來不會(huì)寫死taskQueue.tryPut(rejectPolicy,task);//rejectPolicy.reject(taskQueue,task);}}}class Worker extends Thread{private Runnable task;public Worker(Runnable task){this.task = task;}@Overridepublic void run() {while (task != null || (task = taskQueue.poll(timeout,timeUnit)) != null){try {log.info("正在執(zhí)行...{}",task);//執(zhí)行任務(wù)task.run();}catch (Exception e){System.out.println(e.getMessage());}finally {//不要忘記這一步task = null;}}synchronized (workers){log.info("worker被移除{}",this);workers.remove(this);}}}
}
3.main測(cè)試
@Slf4j
public class TestPool {//阻塞隊(duì)列是平衡生產(chǎn)者和消費(fèi)者之間的中介//任務(wù)數(shù)量超過任務(wù)隊(duì)列的情況public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(10, 2, 1000, TimeUnit.MICROSECONDS, (queue, task) -> {//1.死等queue.put(task);//2.超時(shí)等待queue.offer(task, 1500, TimeUnit.MICROSECONDS);//3.調(diào)用者自己放棄// log.debug("放棄{}",task);//4.調(diào)用者拋異常//throw new RuntimeException("task執(zhí)行失敗" + task);//5.調(diào)用者自己執(zhí)行task.run();});for (int i = 0; i < 20; i++) {int j = i;//主線程可能會(huì)在這里阻塞threadPool.execute(() -> {try {Thread.sleep(30000);} catch (InterruptedException e) {throw new RuntimeException(e);}TestPool.log.debug("{}", j);});}}
}