可做區(qū)域代理的網(wǎng)站seo咨詢師
很多業(yè)務(wù)場景需要使用異步去完成,比如:發(fā)送短信通知。要完成異步操作一般有兩種:
1、消息隊(duì)列MQ
2、線程池處理。
我們來看看Spring框架中如何去使用線程池來完成異步操作,以及分析背后的原理。
一. Spring異步線程池的接口類 :TaskExecutor
在Spring4中,Spring中引入了一個(gè)新的注解@Async,這個(gè)注解讓我們在使用Spring完成異步操作變得非常方便。
Spring異步線程池的接口類,其實(shí)質(zhì)是
java.util.concurrent.Executor
Spring 已經(jīng)實(shí)現(xiàn)的異常線程池:
1. SimpleAsyncTaskExecutor:不是真的線程池,這個(gè)類不重用線程,每次調(diào)用都會(huì)創(chuàng)建一個(gè)新的線程。
2. SyncTaskExecutor:這個(gè)類沒有實(shí)現(xiàn)異步調(diào)用,只是一個(gè)同步操作。只適用于不需要多線程的地方
3. ConcurrentTaskExecutor:Executor的適配類,不推薦使用。如果ThreadPoolTaskExecutor不滿足要求時(shí),才用考慮使用這個(gè)類
4. SimpleThreadPoolTaskExecutor:是Quartz的SimpleThreadPool的類。線程池同時(shí)被quartz和非quartz使用,才需要使用此類
5. ThreadPoolTaskExecutor :最常使用,推薦。 其實(shí)質(zhì)是對java.util.concurrent.ThreadPoolExecutor的包裝,
二、簡單使用說明
Spring中用@Async注解標(biāo)記的方法,稱為異步方法。在spring boot應(yīng)用中使用@Async很簡單:
1、調(diào)用異步方法類上或者啟動(dòng)類加上注解@EnableAsync
2、在需要被異步調(diào)用的方法外加上@Async
3、所使用的@Async注解方法的類對象應(yīng)該是Spring容器管理的bean對象;
啟動(dòng)類加上注解@EnableAsync:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;@SpringBootApplication
@EnableAsync
public class CollectorApplication {public static void main(String[] args) throws Exception {SpringApplication.run(CollectorApplication.class, args);}
}
在需要被異步調(diào)用的方法外加上@Async,同時(shí)類AsyncService加上注解@Service或者@Component,使其對象成為Spring容器管理的bean對象;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;@Service
@Transactional
public class AsyncService {@Asyncpublic void asyncMethod(String s) {System.out.println("receive:" + s);}public void test() {System.out.println("test");asyncMethod();//同一個(gè)類里面調(diào)用異步方法}@Asyncpublic void test2() {AsyncService asyncService = context.getBean(AsyncService.class);asyncService.asyncMethod();//異步}/*** 異布調(diào)用返回Future*/@Asyncpublic Future<String> asyncInvokeReturnFuture(int i) {System.out.println("asyncInvokeReturnFuture, parementer="+ i);Future<String> future;try {Thread.sleep(1000 * 1);future = new AsyncResult<String>("success:" + i);} catch (InterruptedException e) {future = new AsyncResult<String>("error");}return future;}
}//異步方法和普通的方法調(diào)用相同
asyncService.asyncMethod("123");
Future<String> future = asyncService.asyncInvokeReturnFuture(100);
System.out.println(future.get());
如果將一個(gè)類聲明為異步類@Async,那么這個(gè)類對外暴露的方法全部成為異步方法。
@Async
@Service
public class AsyncClass {public AsyncClass() {System.out.println("----init AsyncClass----");}volatile int index = 0;public void foo() {System.out.println("asyncclass foo, index:" + index);}public void foo(int i) {this.index = i;System.out.println("asyncclass foo, index:" + i);}public void bar(int i) {this.index = i;System.out.println("asyncclass bar, index:" + i);}
}
這里需要注意的是:
1、同一個(gè)類里面調(diào)用異步方法不生效:原因默認(rèn)類內(nèi)的方法調(diào)用不會(huì)被aop攔截,即調(diào)用方和被調(diào)用方是在同一個(gè)類中,是無法產(chǎn)生切面的,該對象沒有被Spring容器管理。即@Async方法不生效。
解決辦法:如果要使同一個(gè)類中的方法之間調(diào)用也被攔截,需要使用spring容器中的實(shí)例對象,而不是使用默認(rèn)的this,因?yàn)橥ㄟ^bean實(shí)例的調(diào)用才會(huì)被spring的aop攔截
本例使用方法:AsyncService asyncService = context.getBean(AsyncService.class); 然后使用這個(gè)引用調(diào)用本地的方法即可達(dá)到被攔截的目的
備注:這種方法只能攔截protected,default,public方法,private方法無法攔截。這個(gè)是spring aop的一個(gè)機(jī)制。
2、如果不自定義異步方法的線程池默認(rèn)使用SimpleAsyncTaskExecutor。SimpleAsyncTaskExecutor:不是真的線程池,這個(gè)類不重用線程,每次調(diào)用都會(huì)創(chuàng)建一個(gè)新的線程。并發(fā)大的時(shí)候會(huì)產(chǎn)生嚴(yán)重的性能問題。
3、異步方法返回類型只能有兩種:void和
java.util.concurrent.Future。
1)當(dāng)返回類型為void的時(shí)候,方法調(diào)用過程產(chǎn)生的異常不會(huì)拋到調(diào)用者層面,
可以通過注
AsyncUncaughtExceptionHandler來捕獲此類異常
2)當(dāng)返回類型為Future的時(shí)候,方法調(diào)用過程產(chǎn)生的異常會(huì)拋到調(diào)用者層面
三、定義通用線程池
自定義線程池,可對系統(tǒng)中線程池更加細(xì)粒度的控制,方便調(diào)整線程池大小配置,線程執(zhí)行異??刂坪吞幚怼T谠O(shè)置系統(tǒng)自定義線程池代替默認(rèn)線程池時(shí),雖可通過多種模式設(shè)置,但替換默認(rèn)線程池最終產(chǎn)生的線程池有且只能設(shè)置一個(gè)(不能設(shè)置多個(gè)類繼承 AsyncConfigurer)。自定義線程池有如下方式:
- 重新實(shí)現(xiàn)接口 AsyncConfigurer;
- 繼承 AsyncConfigurerSupport;
- 配置由自定義的 TaskExecutor 替代內(nèi)置的任務(wù)執(zhí)行器。
通過查看 Spring 源碼關(guān)于 @Async 的默認(rèn)調(diào)用規(guī)則,會(huì)優(yōu)先查詢源碼中實(shí)現(xiàn) AsyncConfigurer 這個(gè)接口的類,實(shí)現(xiàn)這個(gè)接口的類為 AsyncConfigurerSupport。但默認(rèn)配置的線程池和異步處理方法均為空,所以,無論是繼承或者重新實(shí)現(xiàn)接口,都需指定一個(gè)線程池。且重新實(shí)現(xiàn) public Executor getAsyncExecutor () 方法。
實(shí)現(xiàn)接口 AsyncConfigurer
@Configurationpublic class AsyncConfiguration implements AsyncConfigurer {@Bean("taskExecutor")public ThreadPoolTaskExecutor executor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();int corePoolSize = 10;executor.setCorePoolSize(corePoolSize);int maxPoolSize = 50;executor.setMaxPoolSize(maxPoolSize);int queueCapacity = 10;executor.setQueueCapacity(queueCapacity);executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.setThreadNamePrefix( "asyncServiceExecutor-");executor.setWaitForTasksToCompleteOnShutdown(true);executor.setAwaitTerminationSeconds(awaitTerminationSeconds);executor.initialize();return executor;}@Overridepublic Executor getAsyncExecutor() {return executor();}}
繼承 AsyncConfigurerSupport
Configuration
@EnableAsync
class SpringAsyncConfigurer extends AsyncConfigurerSupport { @Bean public ThreadPoolTaskExecutor asyncExecutor() { ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor(); threadPool.setCorePoolSize(3); threadPool.setMaxPoolSize(3); threadPool.setWaitForTasksToCompleteOnShutdown(true); threadPool.setAwaitTerminationSeconds(60 * 15); return threadPool; } @Override public Executor getAsyncExecutor() { return asyncExecutor; }
}
配置自定義的 TaskExecutor (建議采用方式)
在Spring Boot主類中定義一個(gè)線程池,public Executor taskExecutor() 方法用于自定義自己的線程池,線程池前綴”taskExecutor-”。如果不定義,則使用系統(tǒng)默認(rèn)的線程池。
@SpringBootApplication
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}@EnableAsync@Configurationclass TaskPoolConfig {@Beanpublic Executor taskExecutor1() {ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();pool.setCorePoolSize(5); //線程池活躍的線程數(shù)pool.setMaxPoolSize(10); //線程池最大活躍的線程數(shù)pool.setWaitForTasksToCompleteOnShutdown(true);pool.setThreadNamePrefix("defaultExecutor");return pool;}@Bean("taskExecutor")public Executor taskExecutor2() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(10);executor.setMaxPoolSize(20);executor.setQueueCapacity(200);executor.setKeepAliveSeconds(60);executor.setThreadNamePrefix("taskExecutor-");executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.setWaitForTasksToCompleteOnShutdown(true);executor.setAwaitTerminationSeconds(60);return executor;}}
}
上面我們通過ThreadPoolTaskExecutor創(chuàng)建了一個(gè)線程池,同時(shí)設(shè)置了如下參數(shù):
核心線程數(shù)10:線程池創(chuàng)建時(shí)初始化的線程數(shù)
最大線程數(shù)20:線程池最大的線程數(shù),只有在緩沖隊(duì)列滿了之后才會(huì)申請超過核心線程數(shù)的線程
緩沖隊(duì)列200:用來緩沖執(zhí)行任務(wù)的隊(duì)列
允許線程的空閑時(shí)間60秒:超過了核心線程數(shù)之外的線程,在空閑時(shí)間到達(dá)之后會(huì)被銷毀
線程池名的前綴:設(shè)置好了之后可以方便我們定位處理任務(wù)所在的線程池
線程池對拒絕任務(wù)的處理策略:此處采用了CallerRunsPolicy策略,當(dāng)線程池沒有處理能力的時(shí)候,該策略會(huì)直接在execute方法的調(diào)用線程中運(yùn)行被拒絕的任務(wù);如果執(zhí)行程序已被關(guān)閉,則會(huì)丟棄該任務(wù)
設(shè)置線程池關(guān)閉的時(shí)候等待所有任務(wù)都完成再繼續(xù)銷毀其他的Bean
設(shè)置線程池中任務(wù)的等待時(shí)間,如果超過這個(gè)時(shí)候還沒有銷毀就強(qiáng)制銷毀,以確保應(yīng)用最后能夠被關(guān)閉,而不是阻塞住
也可以單獨(dú)類來配置線程池:
/*** 線程池參數(shù)配置,多個(gè)線程池實(shí)現(xiàn)線程池隔離,@Async注解,默認(rèn)使用系統(tǒng)自定義線程池,可在項(xiàng)目中設(shè)置多個(gè)線程池,在異步調(diào)用的時(shí)候,指明需要調(diào)用的線程池名稱,比如:@Async("taskName")**/
@EnableAsync
@Configuration
public class TaskPoolConfig {@Bean("taskExecutor")public Executor taskExecutor() {//返回可用處理器的Java虛擬機(jī)的數(shù)量 12int i = Runtime.getRuntime().availableProcessors();System.out.println("系統(tǒng)最大線程數(shù) : " + i);ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();//核心線程池大小executor.setCorePoolSize(16);//最大線程數(shù)executor.setMaxPoolSize(20);//配置隊(duì)列容量,默認(rèn)值為Integer.MAX_VALUEexecutor.setQueueCapacity(99999);//活躍時(shí)間executor.setKeepAliveSeconds(60);//線程名字前綴executor.setThreadNamePrefix("asyncServiceExecutor -");//設(shè)置此執(zhí)行程序應(yīng)該在關(guān)閉時(shí)阻止的最大秒數(shù),以便在容器的其余部分繼續(xù)關(guān)閉之前等待剩余的任務(wù)完成他們的執(zhí)行executor.setAwaitTerminationSeconds(60);//等待所有的任務(wù)結(jié)束后再關(guān)閉線程池executor.setWaitForTasksToCompleteOnShutdown(true);return executor;}
}
四、異步方法使用線程池
@Async(“線程池名稱”),指定value使用自己定義的線程池:
只需要在@Async注解中指定線程池名即可
@Component
public class Task {//默認(rèn)使用線程池@Asyncpublic void doTaskOne() throws Exception {System.out.println("開始做任務(wù)");long start = System.currentTimeMillis();Thread.sleep(random.nextInt(10000));long end = System.currentTimeMillis();System.out.println("完成任務(wù)耗時(shí):" + (end - start) + "毫秒");}//根據(jù)Bean Name指定特定線程池@Async("taskExecutor")public void doTaskOne() throws Exception {System.out.println("開始做任務(wù)");long start = System.currentTimeMillis();Thread.sleep(random.nextInt(10000));long end = System.currentTimeMillis();System.out.println("完成任務(wù)耗時(shí):" + (end - start) + "毫秒");}
}
注意事項(xiàng)(一定注意)
在使用@Async注解時(shí),很多小伙伴都會(huì)發(fā)現(xiàn)異步使用失敗。主要原因是異步方法的定義出了問題。
1、異步方法不能使用static修飾
2、異步類沒有使用@Component注解(或其他注解)導(dǎo)致spring無法掃描到異步類
3、異步方法和調(diào)用異步方法的方法不能在同一個(gè)類
4、類中需要使用@Autowired或@Resource等注解自動(dòng)注入,不能自己手動(dòng)new對象
5、如果使用SpringBoot框架必須在啟動(dòng)類中增加@EnableAsync注解?
?