前言
有時(shí)候,前端可能提交了一個(gè)耗時(shí)任務(wù),如果后端接收到請(qǐng)求后,直接執(zhí)行該耗時(shí)任務(wù),那么前端需要等待很久一段時(shí)間才能接受到響應(yīng)。如果該耗時(shí)任務(wù)是通過瀏覽器直接進(jìn)行請(qǐng)求,那么瀏覽器頁面會(huì)一直處于轉(zhuǎn)圈等待狀態(tài)。一個(gè)簡(jiǎn)單的例子如下所示:
@RestController
@RequestMapping("async")
public class AsyncController {
@GetMapping("/")
public String index() throws InterruptedException {
// 模擬耗時(shí)操作
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
return "consuming time behavior done!";
}
}
當(dāng)我們?cè)跒g覽器請(qǐng)求localhost:8080/async/頁面時(shí),可以看到瀏覽器一直處于轉(zhuǎn)圈等待狀態(tài),這樣體驗(yàn)十分不友好。
事實(shí)上,當(dāng)后端要處理一個(gè)耗時(shí)任務(wù)時(shí),通常都會(huì)將耗時(shí)任務(wù)提交到一個(gè)異步任務(wù)中進(jìn)行執(zhí)行,此時(shí)前端提交耗時(shí)任務(wù)后,就可直接返回,進(jìn)行其他操作。
在 Java 中,開啟異步任務(wù)最常用的方式就是開辟線程執(zhí)行異步任務(wù),如下所示:
@RestController
@RequestMapping("async")
public class AsyncController {
@GetMapping("/")
public String index() {
new Thread(new Runnable() {
@Override
public void run() {
try {
// 模擬耗時(shí)操作
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
return "consuming time behavior processing!";
}
}
這時(shí)瀏覽器請(qǐng)求localhost:8080/async/,就可以很快得到響應(yīng),并且耗時(shí)任務(wù)會(huì)在后臺(tái)得到執(zhí)行。
一般來說,前端不會(huì)關(guān)注耗時(shí)任務(wù)結(jié)果,因此前端只需負(fù)責(zé)提交該任務(wù)給到后端即可。但是如果前端需要獲取耗時(shí)任務(wù)結(jié)果,則可通過Future等方式將結(jié)果返回,詳細(xì)內(nèi)容請(qǐng)參考后文。
事實(shí)上,在 Spring Boot 中,我們不需要手動(dòng)創(chuàng)建線程異步執(zhí)行耗時(shí)任務(wù),因?yàn)?Spring 框架已提供了相關(guān)異步任務(wù)執(zhí)行解決方案,本文主要介紹下在 Spring Boot 中執(zhí)行異步任務(wù)的相關(guān)內(nèi)容。
執(zhí)行異步任務(wù)
Spring 3.0 時(shí)提供了一個(gè)@Async注解,該注解用于標(biāo)記要進(jìn)行異步執(zhí)行的方法,當(dāng)在其他線程調(diào)用被@Async注解的方法時(shí),就會(huì)開啟一個(gè)線程執(zhí)行該方法。
注:@Async注解通常用在方法上,但是也可以用作類型上,當(dāng)類被@Async注解時(shí),表示該類中所有的方法都是異步執(zhí)行的。
在 Spring Boot 中,如果要執(zhí)行一個(gè)異步任務(wù),只需進(jìn)行如下兩步操作:
-
使用注解
@EnableAsync開啟異步任務(wù)支持,如下所示:@SpringBootApplication @ComponentScan("com.yn.async") @EnableAsync // 開啟異步調(diào)用 public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }@EnableAsync注解可以讓 Spring 開啟異步方法執(zhí)行,它會(huì)讓 Spring 掃描被其注解的包及其子包下被@Async注解的類或方法,所以這里我們?cè)诟屡渲?code>@EnableAsync。 -
使用
@Async注解標(biāo)記要進(jìn)行異步執(zhí)行的方法,如下所示:@Service // 假設(shè)當(dāng)前類是一個(gè) Service @Slf4j public class AsyncTaskService { @Async public void asyncTaskWithoutReturnType() throws InterruptedException { log.info("asyncTaskWithoutReturnType: AsyncTaskService Thread = {}",Thread.currentThread().getName()); // 模擬耗時(shí)任務(wù) Thread.sleep(TimeUnit.SECONDS.toMillis(5)); } @Async public Future<String> asyncTaskWithReturnType() throws InterruptedException { log.info("asyncTaskWithReturnType: AsyncTaskService Thread = {}",Thread.currentThread().getName()); // 模擬耗時(shí)任務(wù) Thread.sleep(TimeUnit.SECONDS.toMillis(5)); return new AsyncResult<>("async tasks done!"); } }上述代碼使用
@Async標(biāo)記了兩個(gè)異步執(zhí)行方法,一個(gè)沒有返回值的asyncTaskWithoutReturnType,另一個(gè)擁有返回值asyncTaskWithReturnType,這里需要注意的一點(diǎn)時(shí),被@Async注解的方法可以接受任意類型參數(shù),但只能返回void或Future類型數(shù)據(jù)。所以當(dāng)異步方法返回?cái)?shù)據(jù)時(shí),需要使用Future包裝異步任務(wù)結(jié)果,上述代碼使用AsyncResult包裝異步任務(wù)結(jié)果,AsyncResult間接繼承Future,是 Spring 提供的一個(gè)可用于追蹤異步方法執(zhí)行結(jié)果的包裝類。其他常用的Future類型還有 Spring 4.2 提供的ListenableFuture,或者 JDK 8 提供的CompletableFuture,這些類型可提供更豐富的異步任務(wù)操作。如果前端需要獲取耗時(shí)任務(wù)結(jié)果,則異步任務(wù)方法應(yīng)當(dāng)返回一個(gè)
Future類型數(shù)據(jù),此時(shí)Controller相關(guān)接口需要調(diào)用該Future的get()方法獲取異步任務(wù)結(jié)果,get()方法是一個(gè)阻塞方法,因此該操作相當(dāng)于將異步任務(wù)轉(zhuǎn)換為同步任務(wù),瀏覽器同樣會(huì)面臨我們前面所講的轉(zhuǎn)圈等待過程,但是異步執(zhí)行還是有他的好處的,因?yàn)槲覀兛梢钥刂?code>get()方法的調(diào)用時(shí)序,因此可以先執(zhí)行其他一些操作后,最后再調(diào)用get()方法。 -
經(jīng)過前面兩個(gè)步驟后,其實(shí)就已經(jīng)完成了異步任務(wù)配置。到此就可以調(diào)用這些異步任務(wù)方法,如下所示:
@RestController @RequestMapping("async") @Slf4j public class AsyncController { @Autowired // 注入異步任務(wù)類 private AsyncTaskService asyncTaskService; @GetMapping("/asyncTaskWithoutReturnType") public void asyncTaskWithoutReturnType() throws InterruptedException { log.info("asyncTaskWithoutReturnType: Controller Thread = {}",Thread.currentThread().getName()); this.asyncTaskService.asyncTaskWithoutReturnType(); } @GetMapping("/asyncTaskWithReturnType") public String asyncTaskWithReturnType() throws Exception { log.info("asyncTaskWithReturnType: Controller Thread = {}",Thread.currentThread().getName()); Future<String> future = this.asyncTaskService.asyncTaskWithReturnType(); return future.get(); } }請(qǐng)求上述兩個(gè)接口,如下所示:
$ curl -X GET localhost:8080/async/asyncTaskWithoutReturnType $ curl -X GET localhost:8080/async/asyncTaskWithReturnType async tasks done!查看日志,如下圖所示:
可以看到,異步任務(wù)方法運(yùn)行在于
Controller不同的線程上。
異步任務(wù)相關(guān)限制
被@Async注解的異步任務(wù)方法存在相關(guān)限制:
被
@Async注解的方法必須是public的,這樣方法才可以被代理。不能在同一個(gè)類中調(diào)用
@Async方法,因?yàn)橥粋€(gè)類中調(diào)用會(huì)繞過方法代理,調(diào)用的是實(shí)際的方法。被
@Async注解的方法不能是static。@Async不能用于被@Configuration注解的類方法上。
注:官方文檔寫的是不能在@Configuration類中使用,但本人實(shí)際測(cè)試發(fā)現(xiàn),無論是將@Async注解到@Configuration類上,還是將@Async注解到方法上,都是可以異步執(zhí)行方法的。@Async注解不能與 Bean 對(duì)象的生命周期回調(diào)函數(shù)(比如@PostConstruct)一起注解到同一個(gè)方法中。解決方法可參考:Spring - The @Async annotation異步類必須注入到 Spring IOC 容器中(也即異步類必須被
@Component/@Service等進(jìn)行注解)。其他類中使用異步類對(duì)象必須通過
@Autowired等方式進(jìn)行注入,不能手動(dòng)new對(duì)象。
自定義 Executor
默認(rèn)情況下,Spring 會(huì)自動(dòng)搜索相關(guān)線程池定義:要么是一個(gè)唯一TaskExecutor Bean 實(shí)例,要么是一個(gè)名稱為taskExecutor的Executor Bean 實(shí)例。如果這兩個(gè) Bean 實(shí)例都不存在,就會(huì)使用SimpleAsyncTaskExecutor來異步執(zhí)行被@Async注解的方法。
綜上,可以知道,默認(rèn)情況下,Spring 使用的 Executor 是SimpleAsyncTaskExecutor,SimpleAsyncTaskExecutor每次調(diào)用都會(huì)創(chuàng)建一個(gè)新的線程,不會(huì)重用之前的線程。很多時(shí)候,這種實(shí)現(xiàn)方式不符合我們的業(yè)務(wù)場(chǎng)景,因此通常我們都會(huì)自定義一個(gè) Executor 來替換SimpleAsyncTaskExecutor。
對(duì)于自定義 Executor(自定義線程池),可以分為如下兩個(gè)層級(jí):
-
方法層級(jí):即為單獨(dú)一個(gè)或多個(gè)方法指定運(yùn)行線程池,其他未指定的異步方法運(yùn)行在默認(rèn)線程池。如下所示:
@SpringBootApplication @ComponentScan("com.yn.async") @EnableAsync public class Application { // ... @Bean("methodLevelExecutor1") public TaskExecutor getAsyncExecutor1() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 設(shè)置核心線程數(shù) executor.setCorePoolSize(4); // 設(shè)置最大線程數(shù) executor.setMaxPoolSize(20); // 等待所有任務(wù)結(jié)束后再關(guān)閉線程池 executor.setWaitForTasksToCompleteOnShutdown(true); // 設(shè)置線程默認(rèn)前綴名 executor.setThreadNamePrefix("Method-Level-Async1-"); return executor; } @Bean("methodLevelExecutor2") public TaskExecutor getAsyncExecutor2() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 設(shè)置核心線程數(shù) executor.setCorePoolSize(8); // 設(shè)置最大線程數(shù) executor.setMaxPoolSize(20); // 等待所有任務(wù)結(jié)束后再關(guān)閉線程池 executor.setWaitForTasksToCompleteOnShutdown(true); // 設(shè)置線程默認(rèn)前綴名 executor.setThreadNamePrefix("Method-Level-Async2-"); return executor; } }上述特意設(shè)置了多個(gè)
TaskExecutor,因?yàn)槿绻辉O(shè)置一個(gè)TaskExecutor,那么 Spring 就會(huì)默認(rèn)采用該TaskExecutor作為所有@Async的Executor,而設(shè)置了多個(gè)TaskExecutor,Spring 檢測(cè)到全局存在多個(gè)Executor,就會(huì)降級(jí)使用默認(rèn)的SimpleAsyncTaskExecutor,此時(shí)我們就可以為@Async方法配置執(zhí)行線程池,其他未配置的@Async就會(huì)默認(rèn)運(yùn)行在SimpleAsyncTaskExecutor中,這就是方法層級(jí)的自定義 Executor。如下代碼所示:@RestController @RequestMapping("async") @Slf4j public class AsyncController { @Autowired // 注入異步任務(wù)類 private AsyncTaskService asyncTaskService; @GetMapping("/asyncTaskWithoutReturnType") public void asyncTaskWithoutReturnType() throws InterruptedException { log.info("asyncTaskWithoutReturnType: Controller Thread = {}",Thread.currentThread().getName()); this.asyncTaskService.asyncTaskWithoutReturnType(); } @GetMapping("/asyncTaskWithReturnType") public String asyncTaskWithReturnType() throws Exception { log.info("asyncTaskWithReturnType: Controller Thread = {}",Thread.currentThread().getName()); Future<String> future = this.asyncTaskService.asyncTaskWithReturnType(); return future.get(); } }請(qǐng)求上述接口,如下所示:
$ curl -X GET localhost:8080/async/asyncTaskWithoutReturnType $ curl -X GET localhost:8080/async/asyncTaskWithReturnType async tasks done!請(qǐng)求日志如下所示:
2020-09-25 00:55:31,953 INFO [http-nio-8080-exec-1] com.yn.async.AsyncController: asyncTaskWithoutReturnType: Controller Thread = http-nio-8080-exec-1 2020-09-25 00:55:31,984 INFO [Method-Level-Async1-1] com.yn.async.AsyncTaskService: asyncTaskWithoutReturnType: AsyncTaskService Thread = Method-Level-Async1-1 2020-09-25 00:55:45,592 INFO [http-nio-8080-exec-2] com.yn.async.AsyncController: asyncTaskWithReturnType: Controller Thread = http-nio-8080-exec-2 2020-09-25 00:55:45,594 INFO [http-nio-8080-exec-2] org.springframework.aop.interceptor.AsyncExecutionAspectSupport: More than one TaskExecutor bean found within the context, and none is named 'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly as an alias) in order to use it for async processing: [methodLevelExecutor1, methodLevelExecutor2] 2020-09-25 00:55:45,595 INFO [SimpleAsyncTaskExecutor-1] com.yn.async.AsyncTaskService: asyncTaskWithReturnType: AsyncTaskService Thread = SimpleAsyncTaskExecutor-1結(jié)果跟我們上述的分析一致。
-
應(yīng)用層級(jí):即全局生效的 Executor。依據(jù) Spring 默認(rèn)搜索機(jī)制,其實(shí)就是配置一個(gè)全局唯一的
TaskExecutor實(shí)例或者一個(gè)名稱為taskExecutor的Executor實(shí)例即可,如下所示:@SpringBootApplication @ComponentScan("com.yn.async") @EnableAsync public class Application { // ... @Bean("taskExecutor") public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 設(shè)置核心線程數(shù) int cores = Runtime.getRuntime().availableProcessors(); executor.setCorePoolSize(cores); // 設(shè)置最大線程數(shù) executor.setMaxPoolSize(20); // 等待所有任務(wù)結(jié)束后再關(guān)閉線程池 executor.setWaitForTasksToCompleteOnShutdown(true); // 設(shè)置線程默認(rèn)前綴名 executor.setThreadNamePrefix("Application-Level-Async-"); return executor; } }上述代碼定義了一個(gè)名稱為
taskExecutor的Executor,此時(shí)@Async方法默認(rèn)就會(huì)運(yùn)行在該Executor中。其實(shí) Spring 還提供了另一個(gè)功能更加強(qiáng)大的接口
AsyncConfigurer,該接口主要是用于自定義一個(gè)Executor配置類,提供了應(yīng)用層級(jí)Executor接口,以及對(duì)于@Async方法異常捕獲功能。如果 Spring 檢測(cè)到該接口實(shí)例,會(huì)優(yōu)先采用該接口自定義的Executor。如下所示:@Configuration @EnableAsync public class AsyncConfigure implements AsyncConfigurer { @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 設(shè)置核心線程數(shù) int cores = Runtime.getRuntime().availableProcessors(); executor.setCorePoolSize(cores); // 設(shè)置最大線程數(shù) executor.setMaxPoolSize(20); // 等待所有任務(wù)結(jié)束后再關(guān)閉線程池 executor.setWaitForTasksToCompleteOnShutdown(true); // 設(shè)置線程默認(rèn)前綴名 executor.setThreadNamePrefix("AsyncConfigure-"); // 注意,此時(shí)需要調(diào)用 initialize executor.initialize(); return executor; } }注:使用自定義實(shí)現(xiàn)
AsyncConfigurer接口的配置類的另一個(gè)好處就是無論@EnableAsync的包層級(jí)多深,默認(rèn)都會(huì)對(duì)整個(gè)項(xiàng)目掃描@Async方法,這樣我們就無需將@EnableAsync注解到根包類中。
異常處理
前文介紹過,對(duì)于被@Async注解的異步方法,只能返回void或者Future類型。對(duì)于返回Future類型數(shù)據(jù),如果異步任務(wù)方法拋出異常,則很容易進(jìn)行處理,因?yàn)?code>Future.get()會(huì)重新拋出該異常,我們只需對(duì)其進(jìn)行捕獲即可。但是對(duì)于返回void的異步任務(wù)方法,異常不會(huì)傳播到被調(diào)用者線程,因此我們需要自定義一個(gè)額外的異步任務(wù)異常處理器,捕獲異步任務(wù)方法拋出的異常。
自定義異步任務(wù)異常處理器的步驟如下所示:
-
首先自定義一個(gè)異常處理器類實(shí)現(xiàn)接口
AsyncUncaughtExceptionHandler,如下所示:public class CustomAsyncExceptionHandler implements AsyncUncaughtExceptionHandler { @Override public void handleUncaughtException(Throwable throwable, Method method, Object... objects) { System.out.println("Exception message - " + throwable.getMessage()); System.out.println("Method name - " + method.getName()); for (Object param : objects) { System.out.println("Parameter value - " + param); } } } -
然后,創(chuàng)建一個(gè)自定義
Executor異步配置類,將我們的自定義異常處理器設(shè)置到其接口上。如下所示:@Configuration @EnableAsync public class AsyncConfigure implements AsyncConfigurer { // ... @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return new CustomAsyncExceptionHandler(); } }此時(shí)異步方法如果拋出異常,就可以被我們的自定義異步異常處理器捕獲得到。
