控制程序的生命周期

這是一篇使用并發(fā)和通道來實(shí)現(xiàn)控制程序生命周期的并發(fā)模式示例,該示例演示了如何控制程序在規(guī)定時(shí)間段內(nèi)的執(zhí)行,并可以手動(dòng)中斷來終止程序的運(yùn)行。

功能

展示如何通過通道來監(jiān)視程序的執(zhí)行時(shí)間,如果程序執(zhí)行時(shí)間過長(zhǎng),也可以終止程序

使用場(chǎng)景

當(dāng)需要調(diào)度后臺(tái)處理任務(wù)的時(shí)候,這種模式會(huì)很有用。該程序可能會(huì)作為 cron 作業(yè)執(zhí)行,或者在基于定時(shí)任務(wù)的云環(huán)境 (如 iron.io) 里執(zhí)行。

實(shí)現(xiàn)思路

  1. 創(chuàng)建一個(gè)執(zhí)行者 runner, 給 runner 設(shè)置一個(gè)超時(shí)時(shí)間 timeout 和任務(wù)切片 tasks,然后遍歷執(zhí)行 tasks 所有任務(wù)。

  2. 設(shè)置一個(gè)存儲(chǔ)中斷信號(hào)的字段 interrupt,通過 interrupt 來判斷是否已經(jīng)中斷程序。

  3. 設(shè)置一個(gè)記錄每個(gè)任務(wù)執(zhí)行錯(cuò)誤結(jié)果的字段 complete,監(jiān)聽 complete, 判斷是那種錯(cuò)誤類型,然后做相應(yīng)的處理。

  4. 執(zhí)行所有任務(wù),并監(jiān)聽不同錯(cuò)誤碼,執(zhí)行不同的業(yè)務(wù)邏輯。

實(shí)現(xiàn)詳情

首先我們需要聲明一個(gè) runner 結(jié)構(gòu)

type runner struct {
  tasks []func(int)
  timeout <-chan time.Time
  interrupt chan os.Signal
  complete chan error
}

runner 中包括任務(wù)切片 tasks, tasks 是一個(gè)存儲(chǔ) func(int) 類型的切片,后面會(huì)遍歷 tasks 來進(jìn)行處理任務(wù)。

timeout 字段是一個(gè)存放超時(shí)時(shí)間的只讀的通道,通過該字段來判斷任務(wù)執(zhí)行是否超時(shí)。

interrupt 字段是存放 os.Signal 類型的通道,接收到來自終端的中斷信號(hào)會(huì)存放在該字段中。

complete 字段是存放任務(wù)執(zhí)行的錯(cuò)誤結(jié)果,如果沒有錯(cuò)誤則是 nil。

有了 runner 執(zhí)行者這個(gè)結(jié)構(gòu)后,我們可以聲明一個(gè) New 工廠函數(shù)來創(chuàng)建 runner 類型的對(duì)象,并初始化需要的字段。

// 工廠函數(shù)創(chuàng)建 runner
func New(timeout time.Duration) *runner {
  return &runner{
    timeout: time.After(timeout),
    interrupt: make(chan os.Signal),
    complete: make(chan error),
  }
}

創(chuàng)建 runner 的時(shí)候,我們需要傳入一個(gè) time.Duration 類型的參數(shù),然后內(nèi)部調(diào)用 time.After() 這個(gè)函數(shù)來返回一個(gè)time.Time 類型的只讀通道。interrupt 和 complete 字段正常初始化即可。tasks 默認(rèn)是空切片(表示還沒有任何任務(wù))。

有了一個(gè) runner 執(zhí)行者對(duì)象后,在執(zhí)行任務(wù)之前我們需要給 runner 的增加任務(wù),那我們需要寫一個(gè)給 runner 增加任務(wù)的方法。

// 增加任務(wù)
// 可變參數(shù) ...func(int) 表示參數(shù)可以是多個(gè)參數(shù)
func (r *runner) Add(tasks ...func(int)) {
  // 使用 tasks... 解構(gòu) tasks
  r.tasks = append(r.tasks, tasks...)
}

增加任務(wù)方法 Add 接收一個(gè)參數(shù)類型為 func(int) 的可變參數(shù),可變參數(shù)意味著參數(shù)的數(shù)量是可變的,可以是單個(gè),也可以是多個(gè)。

當(dāng)調(diào)用 Add 方法后,就會(huì)把傳入的參數(shù)賦值給 runner 類型對(duì)象的 tasks 字段,此時(shí) runner 類型對(duì)象就有任務(wù)了。

因?yàn)槿蝿?wù)執(zhí)行過程會(huì)有多個(gè)錯(cuò)誤值,比如超時(shí)錯(cuò)誤和中斷錯(cuò)誤,所以我們先定義兩個(gè)錯(cuò)誤變量,以備后面使用。

// 錯(cuò)誤類型
var (
  ErrTimeout = errors.New("超時(shí)錯(cuò)誤")
  ErrInterrupt = errors.New("程序中斷錯(cuò)誤")
)

接下來就是任務(wù)的執(zhí)行。

// 執(zhí)行任務(wù)
func (r *runner) Run() error {
  for id, task := range r.tasks {
    // 判斷是否已經(jīng)中斷程序
    if r.isInterrupt() {
      return ErrInterrupt
    }
    task(id)
  }
  return nil
}

執(zhí)行任務(wù)就是遍歷runner 對(duì)象中的 tasks 的所有任務(wù),然后執(zhí)行每一個(gè)任務(wù)即可,但是在執(zhí)行任務(wù)之前,需要判斷是否已經(jīng)中斷了程序。如果已經(jīng)中斷了程序,則直接返回中斷錯(cuò)誤 ErrInterrupt。

以下是判斷程序中斷的方法

// 判斷是否中斷程序
func (r *runner) isInterrupt() bool {
  select {
  case <- r.interrupt:
    signal.Stop(r.interrupt)
    return true
  default:
    return false
  }
}

該方法是 runner 類型的一個(gè)方法,方法內(nèi)使用了 select 多路復(fù)用來進(jìn)行監(jiān)聽

interrupt 通道是否有中斷信號(hào),如果監(jiān)聽到有中斷信號(hào),則任務(wù)是用戶中斷了程序,此時(shí)會(huì)調(diào)用 signal.Stop() 方法 中斷程序,然后返回 true ,表示程序已經(jīng)被中斷。

接下來,我們的實(shí)現(xiàn)一個(gè)方法來整合整個(gè)任務(wù)執(zhí)行的流程,包括任務(wù)的執(zhí)行,中斷和超時(shí)的監(jiān)聽。

// 執(zhí)行所有任務(wù),并監(jiān)聽通道事件
func (r *runner) Start() error  {
  // 收到的所有中斷信號(hào)
  signal.Notify(r.interrupt, os.Interrupt)
?
  go func() {
    r.complete <- r.Run()
  }()
?
  select {
  case err := <- r.complete:
    return err
  case <- r.timeout:
    return ErrTimeout
  }
}

Start 方法中,會(huì)接收所有的中斷信號(hào),放在 interrupt 通道中,然后調(diào)用 Run 方法來執(zhí)行所有任務(wù),并把執(zhí)行的結(jié)果存放到 complete 通道中,最后通過 select 多路復(fù)用方式監(jiān)聽 complete 通道和 timeout 通道中的消息,一旦有錯(cuò)誤就返回錯(cuò)誤碼。

執(zhí)行者調(diào)用 Start 方法后拿到錯(cuò)誤碼,執(zhí)行自己的業(yè)務(wù)邏輯,如果沒有錯(cuò)誤碼返回則表示所有任務(wù)在規(guī)定的超時(shí)時(shí)間內(nèi)成功執(zhí)行了所有任務(wù)。

目前所有任務(wù)的執(zhí)行,錯(cuò)誤碼監(jiān)聽等工作已經(jīng)全部完成。

接下來我們創(chuàng)建一個(gè) runner 對(duì)象來驗(yàn)證一下程序。

func main() {
  timeout := 2 * time.Second
  runner := New(timeout)
  runner.Add(CreateTask(), CreateTask(), CreateTask())
?
  if err := runner.Start(); err != nil {
    switch err {
    case ErrInterrupt:
      fmt.Println(ErrInterrupt)
    case ErrTimeout:
      fmt.Println(ErrTimeout)
    }
  }
  fmt.Println("程序結(jié)束")
}

因?yàn)?Add 方法參數(shù)要求是一個(gè)傳入 int 類型的函數(shù),所以為了方便創(chuàng)建任務(wù),我們聲明一個(gè)使用了閉包的 CreateTask 函數(shù)來返回任務(wù)函數(shù)。

// 創(chuàng)建任務(wù)
func CreateTask() func(int)  {
  return func(id int) {
    fmt.Println("正在執(zhí)行 Task ", id)
    // 模擬任務(wù)執(zhí)行
    time.Sleep(time.Duration(id) * time.Second)
  }
}

到目前為止所有的代碼實(shí)現(xiàn)已經(jīng)全部編寫完成

以下是完整的示例代碼

package main
?
import (
  "errors"
  "fmt"
  "os"
  "os/signal"
  "time"
)
?
type runner struct {
  tasks []func(int)
  timeout <-chan time.Time
  interrupt chan os.Signal
  complete chan error
}
?
// 工廠函數(shù)創(chuàng)建 runner
func New(timeout time.Duration) *runner
 {
  return &runner{
    timeout: time.After(timeout),
    interrupt: make(chan os.Signal),
    complete: make(chan error),
  }
}
?
// 錯(cuò)誤類型
var (
  ErrTimeout = errors.New("超時(shí)錯(cuò)誤")
  ErrInterrupt = errors.New("程序中斷錯(cuò)誤")
)
?
// 執(zhí)行任務(wù)
func (r *runner) Run() error {
  for id, task := range r.tasks {
    // 判斷是否已經(jīng)中斷程序
    if r.isInterrupt() {
      return ErrInterrupt
    }
    task(id)
  }
  return nil
}
?
// 判斷是否中斷程序
func (r *runner) isInterrupt() bool {
  select {
  case <- r.interrupt:
    signal.Stop(r.interrupt)
    return true
  default:
    return false
  }
}
?
// 增加任務(wù)
// 可變參數(shù) ...func(int) 表示參數(shù)可以是多個(gè)參數(shù)
func (r *runner) Add(tasks ...func(int)) {
  // 使用 tasks... 解構(gòu) tasks
  r.tasks = append(r.tasks, tasks...)
}
?
// 執(zhí)行所有任務(wù),并監(jiān)聽通道事件
func (r *runner) Start() error  {
  // 收到的所有中斷信號(hào)
  signal.Notify(r.interrupt, os.Interrupt)
?
  go func() {
    r.complete <- r.Run()
  }()
?
  select {
  case err := <- r.complete:
    return err
  case <- r.timeout:
    return ErrTimeout
  }
}
?
// 創(chuàng)建任務(wù)
func CreateTask() func(int)  {
  return func(id int) {
    fmt.Println("正在執(zhí)行 Task ", id)
    time.Sleep(time.Duration(id) * time.Second)
  }
}
?
?
func main() {
  timeout := 2 * time.Second
  runner := New(timeout)
  runner.Add(CreateTask(), CreateTask(), CreateTask())
?
  if err := runner.Start(); err != nil {
    switch err {
    case ErrInterrupt:
      fmt.Println(ErrInterrupt)
    case ErrTimeout:
      fmt.Println(ErrTimeout)
    }
  }
  fmt.Println("程序結(jié)束")
}

一起精進(jìn)Go技術(shù), 關(guān)注公眾號(hào):陸貴成

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容