golang channel實現(xiàn)fan-in、fan-out、or-done

package main

import (
    "fmt"
    "time"
)

func sig(after time.Duration) <-chan interface{} {
    c := make(chan interface{})
    go func() {
        defer close(c)
        time.Sleep(after)
    }()
    return c
}

func main() {

    // start := time.Now()
    
    // //or-Done模式
    // <-or(
    //  sig(10*time.Second),
    //  sig(20*time.Second),
    //  sig(30*time.Second),
    //  sig(40*time.Second),
    //  sig(50*time.Second),
    //  sig(01*time.Minute),
    // )

    //fan in
    // <-fanInRec(
    //  sig(10*time.Second),
    //  sig(20*time.Second),
    //  sig(30*time.Second),
    //  sig(40*time.Second),
    //  sig(50*time.Second),
    //  sig(01*time.Minute),
    // )
    // fmt.Printf("done after %v", time.Since(start))

    //fan out
    ch := make(chan interface{})
    chLister := []chan interface{}{make(chan interface{}), make(chan interface{}), make(chan interface{})}
    fanOut(ch, chLister, false)
    ch <- 888
    fmt.Println(<-chLister[0])
    fmt.Println(<-chLister[1])
    fmt.Println(<-chLister[2])

}

func or(channels ...<-chan interface{}) <-chan interface{} {
    // 特殊情況,只有零個或者1個chan
    switch len(channels) {
    case 0:
        return nil
    case 1:
        return channels[0]
    }

    orDone := make(chan interface{})
    go func() {
        defer close(orDone)

        switch len(channels) {
        case 2: // 2個也是一種特殊情況
            select {
            case <-channels[0]:
            case <-channels[1]:
            }
        default: //超過兩個,二分法遞歸處理,也可以使用reflect
            m := len(channels) / 2
            select {
            case <-or(channels[:m]...):
            case <-or(channels[m:]...):
            }
        }
    }()

    return orDone
}

func fanInRec(chans ...<-chan interface{}) <-chan interface{} {
    switch len(chans) {
    case 0:
        c := make(chan interface{})
        close(c)
        return c
    case 1:
        return chans[0]
    case 2:
        return mergeTwo(chans[0], chans[1])
    default:
        m := len(chans) / 2
        return mergeTwo(
            fanInRec(chans[:m]...),
            fanInRec(chans[m:]...))
    }
}

func mergeTwo(a, b <-chan interface{}) <-chan interface{} {
    c := make(chan interface{})
    go func() {
        defer close(c)
        for a != nil || b != nil { //只要還有可讀的chan
            select {
            case v, ok := <-a:
                if !ok { // a 已關(guān)閉,設(shè)置為nil
                    a = nil
                    continue
                }
                c <- v
            case v, ok := <-b:
                if !ok { // b 已關(guān)閉,設(shè)置為nil
                    b = nil
                    continue
                }
                c <- v
            }
        }
    }()
    return c
}

func fanOut(ch <-chan interface{}, out []chan interface{}, async bool) {
    go func() {
        defer func() { //退出時關(guān)閉所有的輸出chan
            for i := 0; i < len(out); i++ {
                close(out[i])
            }
        }()

        for v := range ch { // 從輸入chan中讀取數(shù)據(jù)
            v := v
            for i := 0; i < len(out); i++ {
                i := i
                if async { //異步
                    go func() {
                        out[i] <- v // 放入到輸出chan中,異步方式
                    }()
                } else {
                    out[i] <- v // 放入到輸出chan中,同步方式
                }
            }
        }
    }()
}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容