多線程異步執(zhí)行定時(shí)任務(wù)

核心關(guān)鍵點(diǎn):
@Async("npmExecutor") 異步調(diào)用的返回結(jié)果只能是void或者Future<T>,如果執(zhí)行Future.get()方法,等待異步調(diào)用結(jié)果,則主線程會(huì)阻塞,知道拿到結(jié)果后,才會(huì)繼續(xù)執(zhí)行

@Async源碼注釋中可以看到具體用法解釋:
public @interface Async {

    /**
     * A qualifier value for the specified asynchronous operation(s).
          指定為自己定義的線程池npmExecutor,否則默認(rèn)最大線程數(shù)不是自己設(shè)定的50個(gè)
     * <p>May be used to determine the target executor to be used when executing
     * the asynchronous operation(s), matching the qualifier value (or the bean
     * name) of a specific {@link java.util.concurrent.Executor Executor} or
     * {@link org.springframework.core.task.TaskExecutor TaskExecutor}
     * bean definition.
     * <p>When specified on a class-level {@code @Async} annotation, indicates that the
     * given executor should be used for all methods within the class. Method-level use
     * of {@code Async#value} always overrides any value set at the class level.
     * @since 3.1.2
     */
    String value() default "";

定時(shí)任務(wù)執(zhí)行類


import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
@Component
@EnableScheduling
public class ScheduledService {


    @Autowired
    @Qualifier("npmExecutor")
    private ThreadPoolExecutor executor;
    @Autowired
    private TestFuture testFuture;
    @Autowired
    private TestMulti testMulti;


    @Scheduled(cron = "*/3 * * * * ?")
    @Async("npmExecutor")
    public void testMulti() {
        log.info("testMulti 執(zhí)行了={}",DateUtils.formatDate(new Date()));
        log.info("testMulti 激活線程數(shù)={},任務(wù)數(shù)={},已完成數(shù)={},隊(duì)列大小={}",
                executor.getActiveCount(),executor.getTaskCount(),executor.getCompletedTaskCount(),executor.getQueue().size());
        try {
            for (int i =0;i<2;i++){
                Future<String> submit = executor.submit(testFuture);
                log.info("執(zhí)行結(jié)果={}",submit.get());
                log.info("執(zhí)行中 激活線程數(shù)={},任務(wù)數(shù)={},已完成數(shù)={}",executor.getActiveCount(),executor.getTaskCount(),executor.getCompletedTaskCount());
            }
        } catch (Exception e) {
            log.error("alarm 獲取數(shù)據(jù)失敗={}", e);
        }
    }
}

配置線程池

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Configuration
public class NpmThreadPoolConfig {


    @Bean("npmExecutor")
    public ThreadPoolExecutor getExecutor() {

        ThreadPoolExecutor taskExecutor = new ThreadPoolExecutor(30, 50, 30L, TimeUnit.SECONDS,  new LinkedBlockingQueue(1));
        taskExecutor.setThreadFactory(new NamedThreadFactory("npm"));
        taskExecutor.setRejectedExecutionHandler(new MyRejectHandle());
        
        return taskExecutor;
    }
}

自定義拒絕策略

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
//@Configuration
@Slf4j
public class MyRejectHandle implements RejectedExecutionHandler {


    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

        log.info("you task is rejected." + executor.toString());
    }
}
任務(wù)類
@Slf4j
@Service
public class TestFuture implements Callable<String> {

    AtomicInteger atomicInteger = new AtomicInteger(0);


    @Override
    public String call() throws Exception {

        Thread.sleep(30000);
        return "TestFuture:"+atomicInteger.incrementAndGet()+":"+Thread.currentThread().getName();
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容