springboot通過注解使用線程池

項目中經常用到線程池,1000個人有1000鐘創(chuàng)建線程池的方式,先背書一段阿里ajva開發(fā)規(guī)范上的話:
【強制】線程池不允許使用 Executors 去創(chuàng)建,而是通過 ThreadPoolExecutor 的方式,這樣
的處理方式讓寫的同學更加明確線程池的運行規(guī)則,規(guī)避資源耗盡的風險。
說明:Executors 返回的線程池對象的弊端如下:
1)FixedThreadPool 和 SingleThreadPool:
允許的請求隊列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM。
2)CachedThreadPool 和 ScheduledThreadPool:
允許的創(chuàng)建線程數量為 Integer.MAX_VALUE,可能會創(chuàng)建大量的線程,從而導致 OOM。
今天來說說springboot中通過線程池的使用

springboot中通過注解線程池的使用

先創(chuàng)建一個自定義線程池

先創(chuàng)建一個自定義線程池, 繼承spring并發(fā)包里面提供ThreadPoolTaskExecutor,演示用,子類就不加別的邏輯了

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;

public class TestThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {

    @Override
    public void execute(Runnable task) {
        super.execute(task);
    }

    @Override
    public void execute(Runnable task, long startTimeout) {
        super.execute(task, startTimeout);
    }

    @Override
    public Future<?> submit(Runnable task) {
        return super.submit(task);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return super.submit(task);
    }

    @Override
    public ListenableFuture<?> submitListenable(Runnable task) {
        return super.submitListenable(task);
    }

    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        return super.submitListenable(task);
    }
}

在創(chuàng)建一個線程config類,注入到spring容器里面

創(chuàng)建一個config類,在里面創(chuàng)建我們自定義的線程池,在里面設置線程池各種參數,具體參數就說了,原來的文章里面

@Configuration
@EnableAsync
@Slf4j
public class ExecutorConfig {

    @Bean
    public Executor asyncServiceExecutor() {
        log.info("start asyncServiceExecutor");
        ThreadPoolTaskExecutor executor = new TestThreadPoolTaskExecutor();
        //配置核心線程數
        executor.setCorePoolSize(5);
        //配置最大線程數
        executor.setMaxPoolSize(200);
        //配置隊列大小
        executor.setQueueCapacity(10000);
        //配置線程池中的線程的名稱前綴
        executor.setThreadNamePrefix("test-thread-");

        // rejection-policy:當pool已經達到max size的時候,如何處理新任務
        // CALLER_RUNS:不在新線程中執(zhí)行任務,而是有調用者所在的線程來執(zhí)行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //執(zhí)行初始化
        executor.initialize();
        return executor;
    }
}

最后就是來通過注解使用線程池了

通過@Async注解,放到方法上,就可以把方法加入到線程池中去執(zhí)行,就和線程類的run方法一樣了。

@Async("asyncServiceExecutor")

@Async("asyncServiceExecutor"),這個里面的值,就是我們前面注冊到容器里面的線程池,來看一眼


容器中的線程

看一眼spring容器視圖,這個線程池對象時我們注冊線程config類時候注入到容器里面的,讓我們來測試兩個方法吧,一個沒有返回的,一個有返回值,直接上代碼

@Component
public class TestTask {

    @Async("asyncServiceExecutor")
    public void task1(){
        System.out.println("測試一個沒有返回值的任務1");
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("測試一個沒有返回值的任務2");
    }

    @Async("asyncServiceExecutor")
    public Future<String> task2(){
        System.out.println("測試一個有返回值的任務1");
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("測試一個有返回值的任務2");
        //返回當前線程的線程名稱
        return new AsyncResult<>(Thread.currentThread().getName());
    }
}

這個任務類中兩個任務,一個沒有返回值的任務task1,就相當于Runable線程,一個有返回值的任務task2,相當于callable線程,我這邊用了AsyncResult做了返回值,看一眼AsyncResult,也是spring的并發(fā)包里面的AsyncResult


AsyncResult

他實現(xiàn)了ListenableFuture接口,類關系如下:


AsyncResult類關系
來看一下task1沒有返回值任務的測試結果

測試類如下:

@RunWith( SpringRunner.class)
@SpringBootTest(classes = BootStartApplication.class)
public class TestTaskCase {

    @Autowired
    private TestTask testTask;

    @Test
    public void testVoid(){
        //測試沒有返回值的任務
        testTask.task1();
        try {
            System.out.println("正在測試沒有返回值的任務task1");
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

運行結果如下:


運行結果

看到結果了,就是把一個runable線程加入到了線程中執(zhí)行了

在來看一下task2有返回值任務的測試結果

測試類如下:

@RunWith( SpringRunner.class)
@SpringBootTest(classes = BootStartApplication.class)
public class TestTaskCase {

    @Autowired
    private TestTask testTask;

    @Test
    public void testVoid(){
        Future<String> future = testTask.task2();
        System.out.println("正在測試有返回值的任務task2");
        while (true) {
            //while true中判斷任務是否完成
            if (future.isDone() ) {
                System.out.println("task2任務已經完成");
                try {
                    System.out.println("task2任務已經完成,返回值為:" + future.get());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
                break;
            }
            System.out.println("task2任務沒有完成正在等待");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

這里使用了Future的isDone()方法來判斷任務是否已經完成,看下運行結果:


task2的運行結果

就是吧一個callable線程加入到了線程中執(zhí)行

使用說完,再來說說給線程池添加監(jiān)控吧

線程池在用的時候,有時候會因為線程數量的一些問題,引發(fā)一些線上問題,所以對線程池添加一些監(jiān)控是很有必要的,記得我們自定義的那個線程吧,我那會說演示就不添加邏輯,我們需要的監(jiān)控就要加到那里面去了,直接上代碼吧

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

public class TestThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {

    private void showThreadPoolInfo(String method) {
        ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();

        if (null == threadPoolExecutor) {
            return;
        }

        //這里可以用使用存儲到在線庫、打日志、存緩存等方式把監(jiān)控數據記錄下來
        //用到線程池里面具體哪個方法執(zhí)行的線程任務
        System.out.println("使用到的方法:" + method);
        //是我們配置在config里面線程前綴的名字(用于多個線程池中區(qū)分線程池)
        System.out.println("線程的前綴名字:" +
                this.getThreadNamePrefix());
        System.out.println("線程池中的任務數量:" +
                threadPoolExecutor.getTaskCount());
        System.out.println("線程池中已經完成的任務數:" +
                threadPoolExecutor.getCompletedTaskCount());
        System.out.println("線程池中的線程數:" +
                threadPoolExecutor.getActiveCount());
        System.out.println("線程池中隊列的長度:" +
                threadPoolExecutor.getQueue().size());
    }
    @Override
    public void execute(Runnable task) {
        showThreadPoolInfo("executeRunnable");
        super.execute(task);
    }

    @Override
    public void execute(Runnable task, long startTimeout) {
        showThreadPoolInfo("executeRunnableStartTimeout");
        super.execute(task, startTimeout);
    }

    @Override
    public Future<?> submit(Runnable task) {
        showThreadPoolInfo("submitRunnable");
        return super.submit(task);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        showThreadPoolInfo("submitCallable");
        return super.submit(task);
    }

    @Override
    public ListenableFuture<?> submitListenable(Runnable task) {
        showThreadPoolInfo("submitListenableRunnable");
        return super.submitListenable(task);
    }

    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        showThreadPoolInfo("submitListenableCallable");
        return super.submitListenable(task);
    }
}

這里主要用到了線程中的任務數、線程數等屬性,這個屬性可以自己按需要來添加,添加監(jiān)控的方式可以使用存儲到數據庫、緩存、打日志接flume、kafka等方式,加上了監(jiān)控日志,我們再來看下測試類運行的結果:


監(jiān)控的結果

這樣每次把任務添加到線程池中都會把這些線程池屬性進行實時監(jiān)控,可以實時關注應用的運行情況,也可以出現(xiàn)線上問題時快速定位查詢

springboot通過注解使用線程池就為大家說到這里,歡迎大家來交流,指出文中一些說錯的地方,讓我加深認識。
謝謝大家!

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容