package main
import (
"fmt"
"time"
)
func sig(after time.Duration) <-chan interface{} {
c := make(chan interface{})
go func() {
defer close(c)
time.Sleep(after)
}()
return c
}
func main() {
// start := time.Now()
// //or-Done模式
// <-or(
// sig(10*time.Second),
// sig(20*time.Second),
// sig(30*time.Second),
// sig(40*time.Second),
// sig(50*time.Second),
// sig(01*time.Minute),
// )
//fan in
// <-fanInRec(
// sig(10*time.Second),
// sig(20*time.Second),
// sig(30*time.Second),
// sig(40*time.Second),
// sig(50*time.Second),
// sig(01*time.Minute),
// )
// fmt.Printf("done after %v", time.Since(start))
//fan out
ch := make(chan interface{})
chLister := []chan interface{}{make(chan interface{}), make(chan interface{}), make(chan interface{})}
fanOut(ch, chLister, false)
ch <- 888
fmt.Println(<-chLister[0])
fmt.Println(<-chLister[1])
fmt.Println(<-chLister[2])
}
func or(channels ...<-chan interface{}) <-chan interface{} {
// 特殊情況,只有零個或者1個chan
switch len(channels) {
case 0:
return nil
case 1:
return channels[0]
}
orDone := make(chan interface{})
go func() {
defer close(orDone)
switch len(channels) {
case 2: // 2個也是一種特殊情況
select {
case <-channels[0]:
case <-channels[1]:
}
default: //超過兩個,二分法遞歸處理,也可以使用reflect
m := len(channels) / 2
select {
case <-or(channels[:m]...):
case <-or(channels[m:]...):
}
}
}()
return orDone
}
func fanInRec(chans ...<-chan interface{}) <-chan interface{} {
switch len(chans) {
case 0:
c := make(chan interface{})
close(c)
return c
case 1:
return chans[0]
case 2:
return mergeTwo(chans[0], chans[1])
default:
m := len(chans) / 2
return mergeTwo(
fanInRec(chans[:m]...),
fanInRec(chans[m:]...))
}
}
func mergeTwo(a, b <-chan interface{}) <-chan interface{} {
c := make(chan interface{})
go func() {
defer close(c)
for a != nil || b != nil { //只要還有可讀的chan
select {
case v, ok := <-a:
if !ok { // a 已關(guān)閉,設(shè)置為nil
a = nil
continue
}
c <- v
case v, ok := <-b:
if !ok { // b 已關(guān)閉,設(shè)置為nil
b = nil
continue
}
c <- v
}
}
}()
return c
}
func fanOut(ch <-chan interface{}, out []chan interface{}, async bool) {
go func() {
defer func() { //退出時關(guān)閉所有的輸出chan
for i := 0; i < len(out); i++ {
close(out[i])
}
}()
for v := range ch { // 從輸入chan中讀取數(shù)據(jù)
v := v
for i := 0; i < len(out); i++ {
i := i
if async { //異步
go func() {
out[i] <- v // 放入到輸出chan中,異步方式
}()
} else {
out[i] <- v // 放入到輸出chan中,同步方式
}
}
}
}()
}
golang channel實現(xiàn)fan-in、fan-out、or-done
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。
相關(guān)閱讀更多精彩內(nèi)容
- 問題 開發(fā)過程中,有批量處理的邏輯,依賴下游一同的接口(這個接口耗時感人,400 ~ 600ms),如果串行執(zhí)行批...
- 1、利用channel實現(xiàn)線程互斥 2、利用channel實現(xiàn)線程同步 3、利用channel實現(xiàn)信號量
- 路徑為:./src/runtime/chan.go 文件中,先看channel結(jié)構(gòu)體: 以及waitq的結(jié)構(gòu)體: ...
- golang中使用channel實現(xiàn)互斥鎖 通過將帶有一個緩沖區(qū)的channel作為一個桶,桶中的數(shù)據(jù)作為鎖,每次...
- 用一個緩存空間的channel實現(xiàn)鎖比較簡單,如果是無緩存就會稍微麻煩點(diǎn)直接上代碼了: 以下是sample: 歡迎...