一種 golang 實現(xiàn) 多協(xié)程任務(wù)處理的套路

一種 golang 實現(xiàn) 多協(xié)程任務(wù)處理的套路

那么是什么樣的任務(wù)呢,一般是在生產(chǎn)者-消費者模式的消費者進程 ,舉幾個例子

  1. 消費kafka 數(shù)據(jù)
  2. 消費redis 數(shù)據(jù)
  3. 輪詢處理數(shù)據(jù)庫數(shù)據(jù)
  4. ...

下面來分析一下

  1. 業(yè)務(wù)邏輯處理協(xié)程
    到底多少個呢 ?處理一個數(shù)據(jù) 就 go 一個嗎,也可以不過有點粗暴,協(xié)程也不是越多越好,調(diào)度也是要好性能的
    所以還是控制一下,一般吧 弄個cpu * 2 就差不多了
    (runtime.NumCPU() *2)

  2. 獲取數(shù)據(jù)協(xié)程
    由于我要分析的例子 都是一個 for 循環(huán) 不停讀取數(shù)據(jù) 交個任務(wù)處理協(xié)程,所以這里就 用一個協(xié)程

  3. 進程如何關(guān)閉
    總不能kill -9 粗暴處理吧,這樣容易造成數(shù)據(jù)異常或者丟數(shù)據(jù),一般都是 捕捉 信號
    signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)

直接上代碼

package main

import (
    "fmt"
    "os"
    "os/signal"
    "runtime"
    "sync/atomic"
    "syscall"
    "time"
)

type TaskData struct {
}
type Service struct {
    capacity     int
    tasks        chan *TaskData
    numThread    int
    closeChans   chan struct{}
    stopFlag     int32
    loopStopChan chan struct{}
}

func NewService(capacity int) *Service {
    service := &Service{}
    service.capacity = capacity
    service.numThread = runtime.NumCPU() * 2
    service.tasks = make(chan *TaskData, capacity)
    service.stopFlag = 0
    service.closeChans = make(chan struct{}, service.numThread)
    service.loopStopChan = make(chan struct{})
    return service
}

func (this *Service) Stop() {
    atomic.StoreInt32(&this.stopFlag, 1)
    <-this.loopStopChan
    close(this.tasks)
    for i := 0; i < this.numThread; i++ {
        <-this.closeChans
    }
}

func (this *Service) Run() {
    for i := 0; i < this.numThread; i++ {
        go this.run(i)
    }
    go this.LoopConsume()
}

func (this *Service) run(i int) {
    fmt.Println("go run:", i)
loop:
    for {
        select {
        case task, ok := <-this.tasks:
            if ok {
                //#TODO process
                fmt.Println("process", task)
            } else {
                break loop
            }
        }
    }
    this.closeChans <- struct{}{}
}

func (this *Service) LoopConsume() {
    fmt.Println("loop")
    for atomic.LoadInt32(&this.stopFlag) == 0 {
        //TODO ReadData
        task := &TaskData{}
        this.tasks <- task

        fmt.Println("consume.")
        time.Sleep(time.Second * 2)
    }
    this.loopStopChan <- struct{}{}
}

func main() {
    service := NewService(100)
    go service.Run() //啟動程序處理

    c := make(chan os.Signal)
    signal.Notify(c, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
    s := <-c //等待關(guān)閉信號
    fmt.Println(s)
    service.Stop() //關(guān)閉service 
    fmt.Println("exit :D")
}

思考 1:

service.stopFlag 
service.closeChans
service.loopStopChan 

這幾個變量干什么用的,是為了安全退出程序用的

1. stopFlag

首先 要退出 LoopConsume 大循環(huán) 用什么去通知呢,用channel 也可以,就是要配合select 使用,但是用原子標記是不是更簡潔呢? 所以 stopFlag 是為了退出 LoopConsume 用的

2. closeChans

由于我們go 了很多個協(xié)程,那么要監(jiān)聽每一個協(xié)程退出,就需要多個channel 去接收

    for i := 0; i < this.numThread; i++ {
        <-this.closeChans 
    }

這段代碼的意思就是等待所有 處理協(xié)成退出

3. loopStopChan

這個又是干什么的呢,同樣也是處理協(xié)程退出的 只不過是 LoopConsume,為什么不 closeChans 大小再加一個 而變成這樣呢

service.closeChans = make(chan struct{}, service.numThread+1)
func (this *Service) Stop() {
    atomic.StoreInt32(&this.stopFlag, 1)
    close(this.closeChans)
    for i := 0; i < this.numThread+1; i++ {
        <-this.closeChans
    }
}

這么做會發(fā)生什么呢?,假如這樣 一旦執(zhí)行了stop 那么
this.stopFlag = 1,但是 LoopConsume 可能還在從 //TODO ReadData 獲取數(shù)據(jù)階段
當執(zhí)行了 close(this.tasks) ,此時 恰好又要執(zhí)行 this.tasks <- task,但是此時
tasks 已經(jīng)關(guān)閉,那么就會panic
其實在整個例子里 LoopConsume 就相當于一個生產(chǎn)者,而run 相當于一個消費者,我們是不是應(yīng)該先關(guān)不生產(chǎn)者 等待 消費者 消費完了 再退出呢,毫無疑問 肯定是的,所以就要有一個channel 等 生產(chǎn)者 退出了 再發(fā)送 channel 去 讓消費者退出,所以單獨用一個 loopStopChan

思考2:

func (this *Service) LoopConsume() {
    fmt.Println("loop")
    for atomic.LoadInt32(&this.stopFlag) == 0 {
        //TODO ReadData
        task := &TaskData{}
        this.tasks <- task

        fmt.Println("consume.")
        time.Sleep(time.Second * 2)
    }
    this.loopStopChan <- struct{}{}
}

這段代碼其實就是不停的獲取數(shù)據(jù),我這里沒有寫獲取數(shù)據(jù)的部分,因為這個是和業(yè)務(wù)相關(guān)的,舉個實際點的例子 比如 比如 讀取mysql
SELECT ID, * FROM DATA WHERE ID > OFFSET LIMIT N;
每次 從OFFSET 位置讀取 N 條數(shù)據(jù),讀取后 如果獲取的條數(shù) 為 num ,若num等于 N , 那么 OFFSET += N 繼續(xù) read,否則 說明數(shù)據(jù)不夠了 ,則,OFFSET += num,并且 sleep n 秒 (避免沒有數(shù)據(jù)的時候空跑)

上偽代碼

func (this *Service) LoopConsume() {
    fmt.Println("loop")
    for atomic.LoadInt32(&this.stopFlag) == 0 {
        rows:= Read(offset)
        if rows 行數(shù) ==  N {
            task := &TaskData{}
            this.tasks <- task
            offset + = N
        }else{
            time.Sleep(time.Second * 20)
            offset +=   rows 行數(shù)
        }   
    }
    this.loopStopChan <- struct{}{}
}

這里有沒有發(fā)現(xiàn)問題呢,假如程序剛好進入了time.Sleep(time.Second * 20) 這里呢,此時stop 豈不是 要等待20s 可是其實進程已經(jīng)很閑了,有什么辦法解決 呢,還是可以用標記的方法,一段程序進入sleep 可以設(shè)置一個標記

上偽代碼

func (this *Service) LoopConsume() {
    fmt.Println("loop")
    for atomic.LoadInt32(&this.stopFlag) == 0 {
        atomic.StoreInt32(&this.forcestopFlag, 0)
        rows:= Read(offset)
        if rows 行數(shù) ==  N {
            task := &TaskData{}
            this.tasks <- task
            offset + = N
        }else{
            atomic.StoreInt32(&this.forcestopFlag, 1)
            time.Sleep(time.Second * 20)
            offset +=   rows 行數(shù)
        }   
    }
    this.loopStopChan <- struct{}{}
}


func (this *Service) Stop() {
    atomic.StoreInt32(&this.stopFlag, 1)
    if  atomic.LoadInt32(&this.forcestopFlag) == 0{
        <-this.loopStopChan //只有當forcestopFlag = 0 的時候才需要等待 LoopConsume退出
    }
    close(this.tasks)
    for i := 0; i < this.numThread; i++ {
        <-this.closeChans
    }
}

思考3:

此模型 run 里面 或者 LoopConsume 還可以 go 協(xié)程出來嗎,顯然不行,因為一旦go 出來 了,那么現(xiàn)有的 stop 就失效了,因為無法獲取這些協(xié)程是否退出。
其實我覺得也沒有必要 再go 一個出來,因為LoopConsume 一般是讀 ,速度比 業(yè)務(wù)處理的要高, 如果這個不滿足你的實際業(yè)務(wù)需求,你可以 go 多個 LoopConsume ,同樣把 loopStopChan 也弄成 長度為 N 的channel

<-this.loopStopChan 變成這樣

for i := 0; i <N ; i++ {
    <-this.loopStopChan
}

再極端 你的業(yè)務(wù)非得 在 LoopConsume go 一個或者多個協(xié)程,那么你得思考 該怎么同步了,至于用什么方法,得好好思考了,可以提出來大伙一起討論討論

同理在 run 里面你也想 go 一個或者多個協(xié)程, 還是一樣得想辦法考慮同步問題

總結(jié)

這個小service 只是一種多協(xié)程處理任務(wù)的套路,常用在生產(chǎn)者消費者模型的消費者進程。
對于對性能要求比較高的可能不適合,比如這里的 協(xié)程數(shù)是固定的,可以改進成伸縮的動態(tài)變化,
代碼寫的比較簡單,一些錯誤之處 還望各位 大神多多指正,歡迎討論。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 輕量級線程:協(xié)程 在常用的并發(fā)模型中,多進程、多線程、分布式是最普遍的,不過近些年來逐漸有一些語言以first-c...
    Tenderness4閱讀 6,518評論 2 10
  • 原文鏈接:https://github.com/EasyKotlin 在常用的并發(fā)模型中,多進程、多線程、分布式是...
    JackChen1024閱讀 10,900評論 3 23
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,715評論 19 139
  • 文/五柳家先生 記憶中我看過最美的景色,是小時候跟著年輕的父親爬上一座不知名的山,從上向下看一片湖光山色,...
    五柳家先生閱讀 1,325評論 4 8
  • 在我們每個人心中,都有著自己的小九九,每天都在各種測試與測試中度過,每天都在研究或者渴望有著新的黑科技出現(xiàn),自己然...
    一王二山閱讀 328評論 0 0

友情鏈接更多精彩內(nèi)容