Android日記之線(xiàn)程池

前言

在編程中經(jīng)常會(huì)使用線(xiàn)程來(lái)異步處理任務(wù),但是每個(gè)線(xiàn)程的創(chuàng)建和銷(xiāo)毀都需要一定的開(kāi)銷(xiāo)。如果每次執(zhí)行一個(gè)任務(wù)都需要一個(gè)新進(jìn)程去執(zhí)行,則這些線(xiàn)程的創(chuàng)建和銷(xiāo)毀將消耗大量的資源;并且線(xiàn)程都是“各自為政”的,很難對(duì)其進(jìn)行控制,更何況有一堆的線(xiàn)程在執(zhí)行。這時(shí)候就需要線(xiàn)程池來(lái)對(duì)線(xiàn)程進(jìn)行管理。在Java 1.5中提供了Executor框架用于把任務(wù)的提交和執(zhí)行解耦。任務(wù)的提交交給RUnnable或者Callable,而Executor框架用來(lái)處理任務(wù)。Executor框架中最核心的成員就是ThreadPoolExecutor,它是線(xiàn)程池的核心實(shí)現(xiàn)類(lèi)。本篇文章就著重講解ThreadPoolExecutor。

ThreadPoolExecutor介紹

可以通過(guò)ThreadPoolExecutor開(kāi)創(chuàng)建一個(gè)線(xiàn)程池,ThreadPoolExecutor類(lèi)一共有四個(gè)構(gòu)造方法。下面展示的都是擁有最多參數(shù)的的構(gòu)造方法。

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
  • corePoolSize:核心線(xiàn)程數(shù)。默認(rèn)情況下線(xiàn)程池是空的,只有任務(wù)提交時(shí)才會(huì)創(chuàng)建線(xiàn)程。如果當(dāng)前運(yùn)行的線(xiàn)程數(shù)少于corePoolSize,則會(huì)創(chuàng)建新線(xiàn)程來(lái)處理任務(wù);如果等于或者多于corePoolSize,則不會(huì)創(chuàng)建,如果調(diào)用線(xiàn)程池的prestartAllcoreThread()方法,線(xiàn)程池會(huì)提前創(chuàng)建并啟動(dòng)所有核心線(xiàn)程來(lái)等待任務(wù)。

  • maximumPoolSize:線(xiàn)程池允許創(chuàng)建的最大線(xiàn)程數(shù),如果任務(wù)隊(duì)列滿(mǎn)了并且線(xiàn)程數(shù)小于maximumPoolSize時(shí),則線(xiàn)程池仍舊會(huì)創(chuàng)建新的線(xiàn)程來(lái)處理任務(wù)。

  • keepAliveTime:非核心線(xiàn)程閑置的超時(shí)時(shí)間,超過(guò)這個(gè)事件則回收,如果任務(wù)很多,并且每個(gè)任務(wù)的執(zhí)行事件很短,則可以調(diào)用keepAliveTime來(lái)提高線(xiàn)程的利用率。另外,如果設(shè)置allowCoreThreadTimeOut屬性為true時(shí),keepAliveTime也會(huì)應(yīng)用到核心線(xiàn)程上。

  • TimeUnit:keepAliveTime參數(shù)的時(shí)間單位,可選的單位有天(DAYS)、小時(shí)(HOURS)、分鐘(MINUTES)、秒(SECONDS)、毫秒(MILLOSECONDS)等。

  • BlockingQueue<Runnable> :任務(wù)隊(duì)列,如果當(dāng)前線(xiàn)程數(shù)大于corePoolSize,則將任務(wù)添加到此任務(wù)隊(duì)列中。該任務(wù)隊(duì)列是BlockiingQueue類(lèi)型,也就是阻塞隊(duì)列。

  • ThreadFactory:線(xiàn)程工廠??梢杂镁€(xiàn)程工廠給每個(gè)創(chuàng)建出來(lái)的線(xiàn)程設(shè)置名字。一般情況下無(wú)須設(shè)置參數(shù)。

  • RejectedExecutionHandler :飽和策略,這是當(dāng)任務(wù)隊(duì)列中和線(xiàn)程池都滿(mǎn)了時(shí)所采取的對(duì)應(yīng)策略,默認(rèn)是ABordPolicy,表示無(wú)法處理新任務(wù),并拋出RejetctedExecutionException異常。此外還有3種策略,它們分別如下:

(1)CallerRunsPolicy:用調(diào)用者所在的線(xiàn)程來(lái)處理任務(wù),此策略提供簡(jiǎn)單的反饋控制機(jī)制,能夠減緩新任務(wù)的提交速度。
(2)DiscardPolicy:不能執(zhí)行的任務(wù),并將該任務(wù)刪除。
(3)DiscardOldestPolicy:丟棄隊(duì)列最近的任務(wù),并執(zhí)行當(dāng)前的任務(wù)。

線(xiàn)程池的處理流程和原理

圖1,線(xiàn)程池的處理流程

從上圖1中可以得知線(xiàn)程的處理流程主要分為3個(gè)步驟:

  • 提交任務(wù)后,線(xiàn)程池先判斷線(xiàn)程數(shù)是否達(dá)到核心線(xiàn)程數(shù)(corePoolSize)。如果未核心線(xiàn)程數(shù),則創(chuàng)建核心線(xiàn)程處理任務(wù);否則就執(zhí)行下一步操作。
  • 接著線(xiàn)程池判斷任務(wù)隊(duì)列是否滿(mǎn)了。如果沒(méi)滿(mǎn),則將任務(wù)添加到任務(wù)隊(duì)列中,否則,就執(zhí)行下一步操作。
  • 接著因?yàn)槿蝿?wù)隊(duì)列滿(mǎn)了,線(xiàn)程池就會(huì)判斷線(xiàn)程數(shù)是否達(dá)到了最大線(xiàn)程數(shù),如果未達(dá)到,則創(chuàng)建非核心線(xiàn)程1處理任務(wù);否則,就執(zhí)行飽和策略,默認(rèn)會(huì)拋出RejectedExecutionException異常。

雖然上面介紹了線(xiàn)程池的處理流程,但還不是很直觀。我們結(jié)合下面的圖2來(lái)更好的了解線(xiàn)程池的原理。

圖2,線(xiàn)程池執(zhí)行

從圖2中可以看到,如果我們執(zhí)行ThreadPoolExecutor的execute方法,會(huì)遇到各種情況:
(1)如果線(xiàn)程池中的線(xiàn)程數(shù)未達(dá)到核心線(xiàn)程數(shù),則創(chuàng)建核心線(xiàn)程處理任務(wù)。
(2)如果線(xiàn)程數(shù)大于或者等于核心線(xiàn)程數(shù),則將任務(wù)加入任務(wù)隊(duì)列,線(xiàn)程池中的空閑線(xiàn)程會(huì)不斷地從任務(wù)隊(duì)列中取出任務(wù)進(jìn)行處理。
(3)如果任務(wù)隊(duì)列滿(mǎn)了,并且線(xiàn)程數(shù)沒(méi)有達(dá)到最大線(xiàn)程數(shù),則創(chuàng)建非核心線(xiàn)程去處理任務(wù)。
(4)如果線(xiàn)程數(shù)超過(guò)了最大線(xiàn)程數(shù),則執(zhí)行飽和策略。

ThreadPoolExecutor的基本使用

<?xml version="1.0" encoding="utf-8"?>
<LinearLayout xmlns:android="http://schemas.android.com/apk/res/android"
    xmlns:app="http://schemas.android.com/apk/res-auto"
    xmlns:tools="http://schemas.android.com/tools"
    android:layout_width="match_parent"
    android:layout_height="match_parent"
    android:orientation="vertical"
    tools:context=".MainActivity">
    
    <Button
        android:id="@+id/btn_start"
        android:layout_width="match_parent"
        android:layout_height="wrap_content"
        android:text="啟動(dòng)" />
    
</LinearLayout>
package com.ju.executordemo;

import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import android.view.View;
import android.widget.Button;

import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MainActivity extends AppCompatActivity{


    private Button btnStart;
    private final int CORE_POOL_SIZE = 4;//核心線(xiàn)程數(shù)
    private final int MAX_POOL_SIZE = 5;//最大線(xiàn)程數(shù)
    private final long KEEP_ALIVE_TIME = 10;//空閑線(xiàn)程超時(shí)時(shí)間
    private ThreadPoolExecutor executorPool;
    private int songIndex = 0;


    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        initView();
        //創(chuàng)建線(xiàn)程池
        initExec();
    }

    private void initView() {
        btnStart = findViewById(R.id.btn_start);
        btnStart.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                begin();
            }
        });
    }


    public void begin() {
        songIndex++;
        try {
            executorPool.execute(new WorkerThread("歌曲" + songIndex));
        } catch (Exception e) {
            Log.e("threadtest", "AbortPolicy...已超出規(guī)定的線(xiàn)程數(shù)量,不能再增加了....");
        }

        // 所有任務(wù)已經(jīng)執(zhí)行完畢,我們?cè)诒O(jiān)聽(tīng)一下相關(guān)數(shù)據(jù)
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(20 * 1000);
                } catch (Exception e) {

                }
                sout("monitor after");
            }
        }).start();

    }

    private void sout(String msg) {
        Log.i("threadtest", "monitor " + msg
                + " CorePoolSize:" + executorPool.getCorePoolSize()
                + " PoolSize:" + executorPool.getPoolSize()
                + " MaximumPoolSize:" + executorPool.getMaximumPoolSize()
                + " ActiveCount:" + executorPool.getActiveCount()
                + " TaskCount:" + executorPool.getTaskCount()
        );
    }



    private void initExec() {
        executorPool = new ThreadPoolExecutor(CORE_POOL_SIZE,MAX_POOL_SIZE,KEEP_ALIVE_TIME, TimeUnit.SECONDS,
                new LinkedBlockingDeque<Runnable>(), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
    }

    class WorkerThread implements Runnable {

        private String threadName;

        public WorkerThread (String name){
            threadName = name;
        }


        @Override
        public void run() {
            boolean flag = true;
            try {
                while (flag){
                    String tn  = Thread.currentThread().getName();
                    //模擬耗時(shí)操作
                    Random random = new Random();
                    long time = (random.nextInt(5) + 1) * 1000;
                    Thread.sleep(time);
                    Log.e("threadtest","線(xiàn)程\"" + tn + "\"耗時(shí)了(" + time / 1000 + "秒)下載了第<" + threadName + ">");
                    //下載完畢跳出循環(huán)
                    flag = false;
                }
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

}

圖3,運(yùn)行結(jié)果

上述代碼模擬一個(gè)下載音樂(lè)的例子來(lái)演示ThreadPoolExecutor的基本使用,啟動(dòng)ThreadPoolExecutor的函數(shù)是execute()方法,然后他需要一個(gè)Runnable的參數(shù)來(lái)進(jìn)行啟動(dòng)。

ThreadPoolExecutor的其它種類(lèi)

通過(guò)直接或者間接地配置ThreadPoolExecutor的參數(shù)可以創(chuàng)建不同類(lèi)型的ThreadPoolExecutor,其中有 4 種線(xiàn)程池比較常用,它們分別是 FixedThreadPool、CachedThreadPool、SingleThreadExecutor和 ScheduledThreadPool。下面分別介紹這4種線(xiàn)程池。

  • FixedThreadPool

FixedThreadPool 是可重用固定線(xiàn)程數(shù)的線(xiàn)程池。在 Executors 類(lèi)中提供了創(chuàng)建FixedThreadPool的方法, 如下所示:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

FixedThreadPool的corePoolSize和maximumPoolSize都設(shè)置為創(chuàng)建FixedThreadPool指定的參數(shù)nThreads,也就意味著FixedThreadPool只有核心線(xiàn)程,并且數(shù)量是固定的,沒(méi)有非核心線(xiàn)程。keepAliveTime設(shè)置為0L 意味著多余的線(xiàn)程會(huì)被立即終止。因?yàn)椴粫?huì)產(chǎn)生多余的線(xiàn)程,所以keepAliveTime是無(wú)效的參數(shù)。另外,任 務(wù)隊(duì)列采用了無(wú)界的阻塞隊(duì)列LinkedBlockingQueue。FixedThreadPool的execute方法的執(zhí)行示意圖如圖4所 示。

圖4,F(xiàn)ixedThreadPool流程圖

當(dāng)執(zhí)行execute()方法時(shí),如果當(dāng)前運(yùn)行的線(xiàn)程未達(dá)到corePoolSize(核心線(xiàn)程數(shù))時(shí) 就創(chuàng)建核心線(xiàn)程來(lái)處理任務(wù),如果達(dá)到了核心線(xiàn)程數(shù)則將任務(wù)添加到LinkedBlockingQueue中。 FixedThreadPool就是一個(gè)有固定數(shù)量核心線(xiàn)程的線(xiàn)程池,并且這些核心線(xiàn)程不會(huì)被回收。當(dāng)線(xiàn)程數(shù)超過(guò)corePoolSize時(shí),就將任務(wù)存儲(chǔ)在任務(wù)隊(duì)列中;當(dāng)線(xiàn)程池有空閑線(xiàn)程時(shí),則從任務(wù)隊(duì)列中去取任務(wù)執(zhí)行

  • CachedThreadPool

CachedThreadPool是一個(gè)根據(jù)需要?jiǎng)?chuàng)建線(xiàn)程的線(xiàn)程池,創(chuàng)建CachedThreadPool的代碼如下所示:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

CachedThreadPool的corePoolSize為0,maximumPoolSize設(shè)置為Integer.MAX_VALUE,這意味著 CachedThreadPool沒(méi)有核心線(xiàn)程,非核心線(xiàn)程是無(wú)界的。keepAliveTime設(shè)置為60L,則空閑線(xiàn)程等待新任務(wù) 的最長(zhǎng)時(shí)間為 60s。在此用了阻塞隊(duì)列 SynchronousQueue,它是一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列,每個(gè)插入操作 必須等待另一個(gè)線(xiàn)程的移除操作,同樣任何一個(gè)移除操作都等待另一個(gè)線(xiàn)程的插入操作。CachedThreadPool 的execute方法的執(zhí)行示意圖如圖5所示。

圖5,CachedThreadPool流程圖

當(dāng)執(zhí)行execute()方法時(shí),首先會(huì)執(zhí)行SynchronousQueue的offer()方法來(lái)提交任務(wù),并且查詢(xún)線(xiàn)程池中是否有空閑的線(xiàn)程執(zhí)行SynchronousQueue的poll()方法來(lái)移除任務(wù)。如果有則配對(duì)成功,將任務(wù)交給這個(gè)空閑的線(xiàn)程處理;如果沒(méi)有則配對(duì)失敗,創(chuàng)建新的線(xiàn)程去處理任務(wù)。當(dāng)線(xiàn)程池中的線(xiàn)程空閑時(shí),它會(huì)執(zhí)行 SynchronousQueue的poll()方法,等待SynchronousQueue中新提交的任務(wù)。如果超過(guò) 60s 沒(méi)有新任務(wù)提交到 SynchronousQueue,則這個(gè)空閑線(xiàn)程將終止。因?yàn)閙aximumPoolSize 是無(wú)界的,所以如果提交的任務(wù)大于線(xiàn) 程池中線(xiàn)程處理任務(wù)的速度就會(huì)不斷地創(chuàng)建新線(xiàn)程。另外,每次提交任務(wù)都會(huì)立即有線(xiàn)程去處理。所以,CachedThreadPool比較適于大量的需要立即處理并且耗時(shí)較少的任務(wù)。

  • SingleThreadExecutor

SingleThreadExecutor是使用單個(gè)工作線(xiàn)程的線(xiàn)程池,其創(chuàng)建源碼如下所示:

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

corePoolSize和maximumPoolSize都為1,意味著SingleThreadExecutor只有一個(gè)核心線(xiàn)程,其他的參數(shù)都 和FixedThreadPool一樣,這里就不贅述了。SingleThreadExecutor的execute()方法的執(zhí)行示意圖如圖5所示。

圖5,SingleThreadExecutor流程圖

當(dāng)執(zhí)行execute()方法時(shí),如果當(dāng)前運(yùn)行的線(xiàn)程數(shù)未達(dá)到核心線(xiàn)程數(shù),也就是當(dāng)前沒(méi)有運(yùn)行的線(xiàn)程,則創(chuàng)建一個(gè)新線(xiàn)程來(lái)處理任務(wù)。如果當(dāng)前有運(yùn)行的線(xiàn)程,則將任務(wù)添加到阻塞隊(duì)列LinkedBlockingQueue中。因此,SingleThreadExecutor能確保所有的任務(wù)在一個(gè)線(xiàn)程中按照順序逐一執(zhí)行。

  • ScheduledThreadPool

ScheduledThreadPool是一個(gè)能實(shí)現(xiàn)定時(shí)和周期性任務(wù)的線(xiàn)程池,它的創(chuàng)建源碼如下所示:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

這里創(chuàng)建了ScheduledThreadPoolExecutor,ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,它主要用于給定延時(shí)之后的運(yùn)行任務(wù)或者定期處理任務(wù)。ScheduledThreadPoolExecutor 的構(gòu)造方法如下所示:

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE,
          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
          new DelayedWorkQueue());
}

從上面的代碼可以看出,ScheduledThreadPoolExecutor 的構(gòu)造方法最終調(diào)用的是ThreadPoolExecutor的 構(gòu)造方法。corePoolSize是傳進(jìn)來(lái)的固定數(shù)值,maximumPoolSize的值是Integer.MAX_VALUE。因?yàn)椴捎玫?DelayedWorkQueue是無(wú)界的,所以maximumPoolSize這個(gè)參數(shù)是無(wú)效的。ScheduledThreadPoolExecutor的 execute方法的執(zhí)行示意圖如圖6所示。

圖6,ScheduledThreadPoolExecutor流程圖

當(dāng)執(zhí)行ScheduledThreadPoolExecutor的scheduleAtFixedRate()或者scheduleWithFixedDelay()方法時(shí),會(huì)向DelayedWorkQueue添加一個(gè) 實(shí)現(xiàn)RunnableScheduledFuture接口的ScheduledFutureTask(任務(wù)的包裝類(lèi)),并會(huì)檢查運(yùn)行的線(xiàn)程是否達(dá)到corePoolSize。如果沒(méi)有則新建線(xiàn)程并啟動(dòng)它,但并不是立即去執(zhí)行任務(wù),而是去DelayedWorkQueue中取ScheduledFutureTask,然后去執(zhí)行任務(wù)。如果運(yùn)行的線(xiàn)程達(dá)到了corePoolSize時(shí),則將任務(wù)添加到DelayedWorkQueue中。DelayedWorkQueue會(huì)將任務(wù)進(jìn)行排序,先要執(zhí)行的任務(wù)放在隊(duì)列的前面。其跟此前介紹的線(xiàn)程池不同的是,當(dāng)執(zhí)行完任務(wù)后,會(huì)將ScheduledFutureTask中time變量改為下次要執(zhí)行的時(shí)間并放回到DelayedWorkQueue中。

參考

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容