這是一篇使用并發(fā)和通道來實(shí)現(xiàn)控制程序生命周期的并發(fā)模式示例,該示例演示了如何控制程序在規(guī)定時(shí)間段內(nèi)的執(zhí)行,并可以手動(dòng)中斷來終止程序的運(yùn)行。
功能
展示如何通過通道來監(jiān)視程序的執(zhí)行時(shí)間,如果程序執(zhí)行時(shí)間過長(zhǎng),也可以終止程序
使用場(chǎng)景
當(dāng)需要調(diào)度后臺(tái)處理任務(wù)的時(shí)候,這種模式會(huì)很有用。該程序可能會(huì)作為 cron 作業(yè)執(zhí)行,或者在基于定時(shí)任務(wù)的云環(huán)境 (如 iron.io) 里執(zhí)行。
實(shí)現(xiàn)思路
創(chuàng)建一個(gè)執(zhí)行者 runner, 給 runner 設(shè)置一個(gè)超時(shí)時(shí)間 timeout 和任務(wù)切片 tasks,然后遍歷執(zhí)行 tasks 所有任務(wù)。
設(shè)置一個(gè)存儲(chǔ)中斷信號(hào)的字段 interrupt,通過 interrupt 來判斷是否已經(jīng)中斷程序。
設(shè)置一個(gè)記錄每個(gè)任務(wù)執(zhí)行錯(cuò)誤結(jié)果的字段 complete,監(jiān)聽 complete, 判斷是那種錯(cuò)誤類型,然后做相應(yīng)的處理。
執(zhí)行所有任務(wù),并監(jiān)聽不同錯(cuò)誤碼,執(zhí)行不同的業(yè)務(wù)邏輯。
實(shí)現(xiàn)詳情
首先我們需要聲明一個(gè) runner 結(jié)構(gòu)
type runner struct {
tasks []func(int)
timeout <-chan time.Time
interrupt chan os.Signal
complete chan error
}
runner 中包括任務(wù)切片 tasks, tasks 是一個(gè)存儲(chǔ) func(int) 類型的切片,后面會(huì)遍歷 tasks 來進(jìn)行處理任務(wù)。
timeout 字段是一個(gè)存放超時(shí)時(shí)間的只讀的通道,通過該字段來判斷任務(wù)執(zhí)行是否超時(shí)。
interrupt 字段是存放 os.Signal 類型的通道,接收到來自終端的中斷信號(hào)會(huì)存放在該字段中。
complete 字段是存放任務(wù)執(zhí)行的錯(cuò)誤結(jié)果,如果沒有錯(cuò)誤則是 nil。
有了 runner 執(zhí)行者這個(gè)結(jié)構(gòu)后,我們可以聲明一個(gè) New 工廠函數(shù)來創(chuàng)建 runner 類型的對(duì)象,并初始化需要的字段。
// 工廠函數(shù)創(chuàng)建 runner
func New(timeout time.Duration) *runner {
return &runner{
timeout: time.After(timeout),
interrupt: make(chan os.Signal),
complete: make(chan error),
}
}
創(chuàng)建 runner 的時(shí)候,我們需要傳入一個(gè) time.Duration 類型的參數(shù),然后內(nèi)部調(diào)用 time.After() 這個(gè)函數(shù)來返回一個(gè)time.Time 類型的只讀通道。interrupt 和 complete 字段正常初始化即可。tasks 默認(rèn)是空切片(表示還沒有任何任務(wù))。
有了一個(gè) runner 執(zhí)行者對(duì)象后,在執(zhí)行任務(wù)之前我們需要給 runner 的增加任務(wù),那我們需要寫一個(gè)給 runner 增加任務(wù)的方法。
// 增加任務(wù)
// 可變參數(shù) ...func(int) 表示參數(shù)可以是多個(gè)參數(shù)
func (r *runner) Add(tasks ...func(int)) {
// 使用 tasks... 解構(gòu) tasks
r.tasks = append(r.tasks, tasks...)
}
增加任務(wù)方法 Add 接收一個(gè)參數(shù)類型為 func(int) 的可變參數(shù),可變參數(shù)意味著參數(shù)的數(shù)量是可變的,可以是單個(gè),也可以是多個(gè)。
當(dāng)調(diào)用 Add 方法后,就會(huì)把傳入的參數(shù)賦值給 runner 類型對(duì)象的 tasks 字段,此時(shí) runner 類型對(duì)象就有任務(wù)了。
因?yàn)槿蝿?wù)執(zhí)行過程會(huì)有多個(gè)錯(cuò)誤值,比如超時(shí)錯(cuò)誤和中斷錯(cuò)誤,所以我們先定義兩個(gè)錯(cuò)誤變量,以備后面使用。
// 錯(cuò)誤類型
var (
ErrTimeout = errors.New("超時(shí)錯(cuò)誤")
ErrInterrupt = errors.New("程序中斷錯(cuò)誤")
)
接下來就是任務(wù)的執(zhí)行。
// 執(zhí)行任務(wù)
func (r *runner) Run() error {
for id, task := range r.tasks {
// 判斷是否已經(jīng)中斷程序
if r.isInterrupt() {
return ErrInterrupt
}
task(id)
}
return nil
}
執(zhí)行任務(wù)就是遍歷runner 對(duì)象中的 tasks 的所有任務(wù),然后執(zhí)行每一個(gè)任務(wù)即可,但是在執(zhí)行任務(wù)之前,需要判斷是否已經(jīng)中斷了程序。如果已經(jīng)中斷了程序,則直接返回中斷錯(cuò)誤 ErrInterrupt。
以下是判斷程序中斷的方法
// 判斷是否中斷程序
func (r *runner) isInterrupt() bool {
select {
case <- r.interrupt:
signal.Stop(r.interrupt)
return true
default:
return false
}
}
該方法是 runner 類型的一個(gè)方法,方法內(nèi)使用了 select 多路復(fù)用來進(jìn)行監(jiān)聽
interrupt 通道是否有中斷信號(hào),如果監(jiān)聽到有中斷信號(hào),則任務(wù)是用戶中斷了程序,此時(shí)會(huì)調(diào)用 signal.Stop() 方法 中斷程序,然后返回 true ,表示程序已經(jīng)被中斷。
接下來,我們的實(shí)現(xiàn)一個(gè)方法來整合整個(gè)任務(wù)執(zhí)行的流程,包括任務(wù)的執(zhí)行,中斷和超時(shí)的監(jiān)聽。
// 執(zhí)行所有任務(wù),并監(jiān)聽通道事件
func (r *runner) Start() error {
// 收到的所有中斷信號(hào)
signal.Notify(r.interrupt, os.Interrupt)
?
go func() {
r.complete <- r.Run()
}()
?
select {
case err := <- r.complete:
return err
case <- r.timeout:
return ErrTimeout
}
}
Start 方法中,會(huì)接收所有的中斷信號(hào),放在 interrupt 通道中,然后調(diào)用 Run 方法來執(zhí)行所有任務(wù),并把執(zhí)行的結(jié)果存放到 complete 通道中,最后通過 select 多路復(fù)用方式監(jiān)聽 complete 通道和 timeout 通道中的消息,一旦有錯(cuò)誤就返回錯(cuò)誤碼。
執(zhí)行者調(diào)用 Start 方法后拿到錯(cuò)誤碼,執(zhí)行自己的業(yè)務(wù)邏輯,如果沒有錯(cuò)誤碼返回則表示所有任務(wù)在規(guī)定的超時(shí)時(shí)間內(nèi)成功執(zhí)行了所有任務(wù)。
目前所有任務(wù)的執(zhí)行,錯(cuò)誤碼監(jiān)聽等工作已經(jīng)全部完成。
接下來我們創(chuàng)建一個(gè) runner 對(duì)象來驗(yàn)證一下程序。
func main() {
timeout := 2 * time.Second
runner := New(timeout)
runner.Add(CreateTask(), CreateTask(), CreateTask())
?
if err := runner.Start(); err != nil {
switch err {
case ErrInterrupt:
fmt.Println(ErrInterrupt)
case ErrTimeout:
fmt.Println(ErrTimeout)
}
}
fmt.Println("程序結(jié)束")
}
因?yàn)?Add 方法參數(shù)要求是一個(gè)傳入 int 類型的函數(shù),所以為了方便創(chuàng)建任務(wù),我們聲明一個(gè)使用了閉包的 CreateTask 函數(shù)來返回任務(wù)函數(shù)。
// 創(chuàng)建任務(wù)
func CreateTask() func(int) {
return func(id int) {
fmt.Println("正在執(zhí)行 Task ", id)
// 模擬任務(wù)執(zhí)行
time.Sleep(time.Duration(id) * time.Second)
}
}
到目前為止所有的代碼實(shí)現(xiàn)已經(jīng)全部編寫完成
以下是完整的示例代碼
package main
?
import (
"errors"
"fmt"
"os"
"os/signal"
"time"
)
?
type runner struct {
tasks []func(int)
timeout <-chan time.Time
interrupt chan os.Signal
complete chan error
}
?
// 工廠函數(shù)創(chuàng)建 runner
func New(timeout time.Duration) *runner
{
return &runner{
timeout: time.After(timeout),
interrupt: make(chan os.Signal),
complete: make(chan error),
}
}
?
// 錯(cuò)誤類型
var (
ErrTimeout = errors.New("超時(shí)錯(cuò)誤")
ErrInterrupt = errors.New("程序中斷錯(cuò)誤")
)
?
// 執(zhí)行任務(wù)
func (r *runner) Run() error {
for id, task := range r.tasks {
// 判斷是否已經(jīng)中斷程序
if r.isInterrupt() {
return ErrInterrupt
}
task(id)
}
return nil
}
?
// 判斷是否中斷程序
func (r *runner) isInterrupt() bool {
select {
case <- r.interrupt:
signal.Stop(r.interrupt)
return true
default:
return false
}
}
?
// 增加任務(wù)
// 可變參數(shù) ...func(int) 表示參數(shù)可以是多個(gè)參數(shù)
func (r *runner) Add(tasks ...func(int)) {
// 使用 tasks... 解構(gòu) tasks
r.tasks = append(r.tasks, tasks...)
}
?
// 執(zhí)行所有任務(wù),并監(jiān)聽通道事件
func (r *runner) Start() error {
// 收到的所有中斷信號(hào)
signal.Notify(r.interrupt, os.Interrupt)
?
go func() {
r.complete <- r.Run()
}()
?
select {
case err := <- r.complete:
return err
case <- r.timeout:
return ErrTimeout
}
}
?
// 創(chuàng)建任務(wù)
func CreateTask() func(int) {
return func(id int) {
fmt.Println("正在執(zhí)行 Task ", id)
time.Sleep(time.Duration(id) * time.Second)
}
}
?
?
func main() {
timeout := 2 * time.Second
runner := New(timeout)
runner.Add(CreateTask(), CreateTask(), CreateTask())
?
if err := runner.Start(); err != nil {
switch err {
case ErrInterrupt:
fmt.Println(ErrInterrupt)
case ErrTimeout:
fmt.Println(ErrTimeout)
}
}
fmt.Println("程序結(jié)束")
}
一起精進(jìn)Go技術(shù), 關(guān)注公眾號(hào):陸貴成