緩沖信道
之前看到的都是無緩沖信道,無緩沖信道的發(fā)送和接收過程是阻塞的。我們還可以創(chuàng)建一個有緩沖(Buffer)的信道。
- 只在緩沖已滿的情況,才會阻塞向緩沖信道(Buffered Channel)發(fā)送數(shù)據(jù)。
- 只有在緩沖為空的時候,才會阻塞從緩沖信道接收數(shù)據(jù)。
通過向 make 函數(shù)再傳遞一個表示容量的參數(shù)(指定緩沖的大小),可以創(chuàng)建緩沖信道。
ch := make(chan type, capacity)
要讓一個信道有緩沖,上面語法中的 capacity 應(yīng)該大于 0。無緩沖信道的容量默認(rèn)為 0。我們在上一章創(chuàng)建信道時,省略了容量參數(shù)。
示例1:
package main
import (
"fmt"
)
func main() {
ch := make(chan string, 2)
ch <- "naveen"
ch <- "paul"
fmt.Println(<- ch)
fmt.Println(<- ch)
}
創(chuàng)建了一個緩沖信道,其容量為 2。由于該信道的容量為 2,因此可向它寫入兩個字符串,而且不會發(fā)生阻塞。
示例2:
package main
import (
"fmt"
"time"
)
func write(ch chan int) {
for i := 0; i < 5; i++ {
ch <- i
fmt.Println("successfully wrote", i, "to ch")
}
close(ch)
}
func main() {
ch := make(chan int, 2)
go write(ch)
time.Sleep(2 * time.Second)
for v := range ch {
fmt.Println("read value", v,"from ch")
time.Sleep(2 * time.Second)
}
}
死鎖
向容量為 2 的緩沖信道寫入 3 個字符串。當(dāng)在程序控制到達(dá)第 3 次寫入時,由于它超出了信道的容量,因此這次寫入發(fā)生了阻塞?,F(xiàn)在想要這次寫操作能夠進(jìn)行下去,必須要有其它協(xié)程來讀取這個信道的數(shù)據(jù)。但在本例中,并沒有并發(fā)協(xié)程來讀取這個信道,因此這里會發(fā)生死鎖(deadlock)。程序會在運行時觸發(fā) panic
長度 vs 容量
- 容量:指信道可以存儲的值的數(shù)量。我們在使用 make 函數(shù)創(chuàng)建緩沖信道的時候會指定容量大小。
- 長度:指信道中當(dāng)前排隊的元素個數(shù)。
示例代碼:
package main
import (
"fmt"
)
func main() {
ch := make(chan string, 3)
ch <- "naveen"
ch <- "paul"
fmt.Println("capacity is", cap(ch))
fmt.Println("length is", len(ch))
fmt.Println("read value", <-ch)
fmt.Println("new length is", len(ch))
}
WaitGroup
WaitGroup 用于實現(xiàn)工作池,因此要理解工作池,我們首先需要了解 WaitGroup。
WaitGroup 用于等待一批 Go 協(xié)程執(zhí)行結(jié)束。程序控制會一直阻塞,直到這些協(xié)程全部執(zhí)行完畢。假設(shè)我們有 3 個并發(fā)執(zhí)行的 Go 協(xié)程(由 Go 主協(xié)程生成)。Go 主協(xié)程需要等待這 3 個協(xié)程執(zhí)行結(jié)束后,才會終止。這就可以用 WaitGroup 來實現(xiàn)。
WaitGroup原理:
WaitGroup 使用計數(shù)器來工作。
調(diào)用WaitGroup的Add并傳遞一個int時,WaitGroup 的計數(shù)器會增加上Add的傳參。
要減少計數(shù)器,可以調(diào)用WaitGroup的Done()方法。
Wait() 方法會阻塞調(diào)用它的 Go 協(xié)程,直到計數(shù)器變?yōu)?0 后才會停止阻塞。
代碼:
package main
import (
"fmt"
"sync"
"time"
)
func process(i int, wg *sync.WaitGroup) {
fmt.Println("started Goroutine ", i)
time.Sleep(2 * time.Second)
fmt.Printf("Goroutine %d ended\n", i)
wg.Done()
}
func main() {
no := 3
var wg sync.WaitGroup
for i := 0; i < no; i++ {
wg.Add(1)
go process(i, &wg)
}
wg.Wait()
fmt.Println("All go routines finished executing")
}
解釋代碼:
-
WaitGroup 是一個結(jié)構(gòu)體類型,創(chuàng)建
WaitGroup類型的變量,其初始值為零值。 - for 循環(huán)迭代了 3 次,在循環(huán)內(nèi)調(diào)用了 wg.Add(1)。因此計數(shù)器變?yōu)?3。for 循環(huán)同樣創(chuàng)建了 3 個 process 協(xié)程
- 調(diào)用了 wg.Wait(),確保 Go 主協(xié)程等待計數(shù)器變?yōu)?0
- process 協(xié)程內(nèi)調(diào)用了 wg.Done,可以讓計數(shù)器遞減。一旦 3 個子協(xié)程都執(zhí)行完畢(即 wg.Done() 調(diào)用了 3 次),那么計數(shù)器就變?yōu)?0,于是主協(xié)程會解除阻塞。
注意:
process函數(shù)中傳遞 wg 的地址是很重要的。如果沒有傳遞 wg 的地址,那么每個 Go 協(xié)程將會得到一個 WaitGroup 值的拷貝,因而當(dāng)它們執(zhí)行結(jié)束時,main 函數(shù)并不會知道。
由于 Go 協(xié)程的執(zhí)行順序不一定,因此你的輸出可能和我不一樣。
工作池的實現(xiàn)
緩沖信道的重要應(yīng)用之一就是實現(xiàn)工作池。
一般而言,工作池就是一組等待任務(wù)分配的線程。
一旦完成了所分配的任務(wù),這些線程可繼續(xù)等待任務(wù)的分配。
用一個需求說明工作池:
計算所輸入數(shù)字的每一位的和。例如,如果輸入 234,結(jié)果會是 9(即 2 + 3 + 4)。向工作池輸入的是一列隨機(jī)數(shù)。
我們工作池的核心功能如下:
- 創(chuàng)建一個 Go 協(xié)程池,監(jiān)聽一個等待作業(yè)分配的輸入型緩沖信道。
- 將作業(yè)添加到該輸入型緩沖信道中。
- 作業(yè)完成后,再將結(jié)果寫入一個輸出型緩沖信道。
- 從輸出型緩沖信道讀取并打印結(jié)果。