golang channel源碼分析

channel是golang中特有的一種數(shù)據(jù)結(jié)構(gòu),通常與goroutine一起使用,下面我們就介紹一下這種數(shù)據(jù)結(jié)構(gòu)。

channel數(shù)據(jù)結(jié)構(gòu)

channel最重要的一個結(jié)構(gòu)體就是hchan,我們創(chuàng)建一個channel的時候,實際上是創(chuàng)建了一個下面結(jié)構(gòu)體的實例。

hchan結(jié)構(gòu)體

// src/runtime/chan.go

type hchan struct {
    qcount   uint           // total data in the queue
    dataqsiz uint           // size of the circular queue
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    elemsize uint16
    closed   uint32
    elemtype *_type // element type
    sendx    uint   // send index
    recvx    uint   // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex
}

字段說明

  • qcount 當(dāng)前channel中的元素數(shù)量
  • dataqsiz 環(huán)形隊列的大小
  • buf 指向dataqsize的數(shù)組指針,只有緩沖chan有效
  • closed 當(dāng)前channel關(guān)閉狀態(tài)
  • elemsize 存儲元素的大小
  • elemtype 存儲元素的數(shù)據(jù)類型
  • sendx 發(fā)送操作處理到的索引位置,最大值為數(shù)組buf的最大下標值
  • recvx 接收操作處理到的索引位置,最大值為數(shù)組buf的最大下標值
  • recvq 接收隊列,雙向鏈表,阻塞元素
  • sendq 發(fā)送列隊,雙向鏈表,阻塞元素
  • lock 鎖,,用來保護sudog里的所的字段
image

hchan struct

其中elemsizeelemtype 表示存儲數(shù)據(jù)的大小和類型;sendxrecvx是指向底層數(shù)據(jù)的索引位置,表示當(dāng)前處理的進度位置;recvqsendq 是一個由雙向鏈表實現(xiàn)的隊列,它存儲的內(nèi)容是由于隊列dataqsize過小,而阻塞的數(shù)據(jù)。

每次進行發(fā)送數(shù)據(jù)和讀取數(shù)據(jù)時都需要加鎖。

waitq結(jié)構(gòu)體

// src/runtime/chan.go

type waitq struct {
    first *sudog
    last  *sudog
}

sudog結(jié)構(gòu)體

// src/runtime/runtime2.go

// sudog represents a g in a wait list, such as for sending/receiving
// on a channel.
//
// sudog is necessary because the g ? synchronization object relation
// is many-to-many. A g can be on many wait lists, so there may be
// many sudogs for one g; and many gs may be waiting on the same
// synchronization object, so there may be many sudogs for one object.
//
// sudogs are allocated from a special pool. Use acquireSudog and
// releaseSudog to allocate and free them.
type sudog struct {
    // The following fields are protected by the hchan.lock of the
    // channel this sudog is blocking on. shrinkstack depends on
    // this for sudogs involved in channel ops.

    g *g

    next *sudog
    prev *sudog
    elem unsafe.Pointer // data element (may point to stack)

    // The following fields are never accessed concurrently.
    // For channels, waitlink is only accessed by g.
    // For semaphores, all fields (including the ones above)
    // are only accessed when holding a semaRoot lock.

    acquiretime int64
    releasetime int64
    ticket      uint32

    // isSelect indicates g is participating in a select, so
    // g.selectDone must be CAS'd to win the wake-up race.
    isSelect bool

    parent   *sudog // semaRoot binary tree
    waitlink *sudog // g.waiting list or semaRoot
    waittail *sudog // semaRoot
    c        *hchan // channel
}

這里 sudog 實際上是對 goroutine 的一個封裝,一個sudog 就是一個goroutine,如在channal上發(fā)送和接收。

sudogs 是通過一個特殊的池來分配的,通過 acquireSudog()releaseSudog()進行獲取和釋放。

sudog里的字段是由 hchan.lock 鎖來進行保護。

channel 整體結(jié)構(gòu)圖

image

<figcaption>hchan 結(jié)構(gòu)圖(來源:互聯(lián)網(wǎng)技術(shù)窩)</figcaption>

創(chuàng)建

// 無緩沖通道
ch1 := make(chan int)
// 有緩沖通道
ch2 := make(chan int, 10)

通過編譯可以發(fā)現(xiàn)channel的創(chuàng)建是由[makechan()](https://github.com/golang/go/blob/go1.15.6/src/runtime/chan.go#L71-L118)函數(shù)來完成的。源碼

// src/runtime/chan.go

func makechan(t *chantype, size int) *hchan {
    elem := t.elem

    // compiler checks this but be safe.
    if elem.size >= 1<<16 {
        throw("makechan: invalid channel element type")
    }
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {
        throw("makechan: bad alignment")
    }

    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {
        panic(plainError("makechan: size out of range"))
    }

    // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
    // buf points into the same allocation, elemtype is persistent.
    // SudoG's are referenced from their owning thread so they can't be collected.
    // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
    var c *hchan
    switch {
    case mem == 0:
        // Queue or element size is zero.
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // Race detector uses this location for synchronization.
        c.buf = c.raceaddr()
    case elem.ptrdata == 0:
        // Elements do not contain pointers.
        // Allocate hchan and buf in one call.
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        // Elements contain pointers.
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }

    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)
    lockInit(&c.lock, lockRankHchan)

    if debugChan {
        print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "n")
    }
    return c
}

函數(shù)返回的是一個指針類型,因此我們可以在函數(shù)中通過參數(shù)直接傳遞,不需要再轉(zhuǎn)為指針使傳遞。

步驟

  1. 數(shù)據(jù)合法性檢查,包括發(fā)送數(shù)據(jù)的類型和大小
  2. 根據(jù)不同場景分配內(nèi)存,主要針對buf字段
    a. 內(nèi)存大小為0,注意這時c.buf 的值為c.raceaddr()
    b. 元素不包含指針,一次性分配一段內(nèi)存地址
    c. 元素包含指針,分配內(nèi)存
  3. 初始化其它字段

第一個參數(shù) *chantype 結(jié)構(gòu)定義

// src/runtime/type.go

type chantype struct {
    typ  _type
    elem *_type
    dir  uintptr
}

實際上創(chuàng)建一個channel, 只是對一個hchan結(jié)構(gòu)體進行了一些初始化操作,并返回其指針。因此我們在函數(shù)傳遞時,不需要傳遞指針,直接使用就可以了,因為它本身就是一個指針的類型。

注意:對于chan內(nèi)存是在heap上分配的。

發(fā)送數(shù)據(jù)

對于channel的寫操作是由 chansend() 函數(shù)來實現(xiàn)的。

/*
 * generic single channel send/recv
 * If block is not nil,
 * then the protocol will not
 * sleep but return if it could
 * not complete.
 *
 * sleep can wake up with g.param == nil
 * when a channel involved in the sleep has
 * been closed.  it is easiest to loop and re-run
 * the operation; we'll see that it's now closed.
 */
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if c == nil {
        if !block {
            return false
        }
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    ...
}

在chan為nil的情況下, 如果是非阻塞則直接返回,否則panic。

對于分送數(shù)據(jù)chan有三種情況,分別是直接發(fā)送,緩存區(qū)發(fā)送阻塞發(fā)送,其中阻塞發(fā)送涉及到GMP 的調(diào)度,理解起來有些吃力。

在發(fā)送數(shù)據(jù)前需要進行加鎖操作,發(fā)送完再解鎖,保證原子性操作。

直接發(fā)送

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    ......

    // 直接發(fā)送
    // 如果接收隊列中有接收者,則直接將數(shù)據(jù)發(fā)給接收者,重點在send()函數(shù),并在函數(shù)里進行解鎖
    if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

    ......
}

如果接收隊列中有接收者,則優(yōu)化從接收者從隊列中取出來sg(sg := c.recvq.dequeue()),然后再通過調(diào)用 send() 函數(shù)將數(shù)據(jù)發(fā)送給接收者即可。

image

<figcaption>channel send</figcaption>

在send()函數(shù)里,會執(zhí)行一個回調(diào)函數(shù)主要用來進行解鎖c.lock。真正的發(fā)送操作是函數(shù) sendDirect(),通過memmove(dst, src, t.size) 將數(shù)據(jù)復(fù)制過去。

緩沖區(qū)發(fā)送

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    ......

    // 緩沖區(qū)發(fā)送
    // 接收者隊列中沒有接收者goroutine
    // 當(dāng)前channel中的元素<隊列的大小,有緩沖buffer未滿的情況
    // 將數(shù)據(jù)存放在sendx在buf數(shù)組中的索引位置,然后再將sendx索引+1
    // 由于是一個循環(huán)數(shù)組,所以如果達到了dataqsize,則從0開始,同時個數(shù)+1
    if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }   

    ......
}

如果當(dāng)前recvq 隊列里沒有處于等待執(zhí)行的sudog的話,則需要將數(shù)據(jù)發(fā)送到緩沖隊列中(如果當(dāng)前隊列為緩沖chan)。

假設(shè)當(dāng)前buffer大小為6(dataqsiz=6),數(shù)據(jù)個數(shù)為0(qcount=0),這里寫入6個數(shù)據(jù),如下圖。

image

<figcaption>channel send</figcaption>

如果當(dāng)前緩沖區(qū)的元素數(shù)量<隊列的大小,說明緩沖區(qū)還沒有滿,還可以繼續(xù)裝載數(shù)據(jù)。

這時第一步先計算出 s.sendx 索引位置的內(nèi)存地址,然后調(diào)用 typememmove() 函數(shù)將qp復(fù)制到內(nèi)存地址,再將s.sendx索引值+1,同時c.qcount++。

當(dāng) sendx = dataqsiz 的時候,說明已到了數(shù)組最后一個元素,下次存儲數(shù)據(jù)的話,則需要重新從0開始了,所以需要重置為0。

buf是一個由數(shù)組組成的隊列,滿足隊列的FIFO的機制,最新存儲的數(shù)據(jù)也先消費,最多可以存儲 dataqsiz 個數(shù)量。超出這個數(shù)據(jù)量就需要使用第三種 阻塞發(fā)送 方式了。

sendx 始終保存的是下次存儲數(shù)據(jù)的數(shù)組索引位置,每次使用完記得+1 。每次存儲以前都需要判斷當(dāng)前buffer是否有空間可用 c.qcount < c.dataqsiz

總結(jié)

  • q.sendx 最大值為 c.dataqsiz -1,即數(shù)組的最大索引值。
  • q.count 是當(dāng)前chan 存儲的元素個數(shù),有可能 > c.dataqsiz

阻塞發(fā)送

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    ......

    // 阻塞發(fā)送
    // Block on the channel. Some receiver will complete our operation for us.
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    c.sendq.enqueue(mysg)
    // Signal to anyone trying to shrink our stack that we're about
    // to park on a channel. The window between when this G's status
    // changes and when we set gp.activeStackChans is not safe for
    // stack shrinking.
    atomic.Store8(&gp.parkingOnChan, 1)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
    // Ensure the value being sent is kept alive until the
    // receiver copies it out. The sudog has a pointer to the
    // stack object, but sudogs aren't considered as roots of the
    // stack tracer.
    KeepAlive(ep)

    // someone woke us up.
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    gp.activeStackChans = false
    if gp.param == nil {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
        panic(plainError("send on closed channel"))
    }
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    mysg.c = nil
    releaseSudog(mysg)
    return true 

    ......
}

如果當(dāng)buff也寫滿的話,再send數(shù)據(jù)的話,則需要進行阻塞發(fā)送了。

channel send

假如我們有一個緩沖chan,但緩沖大小已經(jīng)使用完,再次發(fā)送數(shù)據(jù)的話,則需要進入sendq隊列了(將sudog綁定到一個goroutine,并放在sendq,等待讀?。?/p>

對于阻塞的情況,理解起來有些吃力,因為涉及到GMP的關(guān)系和調(diào)度。

  1. 調(diào)用 getg() 函數(shù)獲取當(dāng)前運行的goroutine
  2. 調(diào)用 acquireSudog() 函數(shù)獲取一個sudog,并進行數(shù)據(jù)綁定
  3. 將mysg 添加到發(fā)送隊列sendq,并設(shè)置為gp.waiting
  4. 更改goroutine狀態(tài)
  5. 設(shè)置goroutine為等待喚醒狀態(tài),調(diào)用 atomic.Store8(&gp.parkingOnChan, 1)函數(shù)?
  6. 通過keepAlive()函數(shù)可以保證發(fā)送的值一直有效,直到被接收者取走
  7. 進行清理工作
  8. 釋放 sudog 結(jié)構(gòu)體

總結(jié)

讀取數(shù)據(jù)

對于channel的讀取方式:

v <- ch
v, ok <- ch

其中 v<-ch 對應(yīng)的是 runtime.chanrecv1(), v, ok <-ch 對應(yīng)的是`runtime.chanrecv2()。但這兩個函數(shù)最終調(diào)用的還是同一個函數(shù),即 chanrecv()。

我們先看一下官方文檔對這個函數(shù)的說明

// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {}
  • chanrecv 用來從chan 中接收數(shù)據(jù),并將接收的數(shù)據(jù)寫入到ep
  • 如果ep為 nil 的話,則接收的數(shù)據(jù)將被忽略
  • 如果非阻塞的且沒有可接收的數(shù)據(jù)將返回 (false ,false)
  • 如果chan已關(guān)閉,零值 ep 和返回值將是true, false,否則使用一個元素代替ep并返回 (true, true)
  • 一個非nil的 ep, 必須指向heap或者調(diào)用stack
// src/runtime/chan.go

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ...

    // 如果c為nil,表示非法操作,則直接gopark(),表示出讓當(dāng)前GMP中的P的使用權(quán),允許其它G使用
    if c == nil {
        // 如果非阻塞的話,直接返回;如果是阻塞的話,直接panic
        if !block {
            return
        }
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }
    ...

    // 如果chan已關(guān)閉且元素個數(shù)為0
    if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(c.raceaddr())
        }
        unlock(&c.lock)
        if ep != nil {
            // 設(shè)置內(nèi)存內(nèi)容為類型 c.elemtype 的零值
            typedmemclr(c.elemtype, ep)
        }
        return true, false
    }

}

如果當(dāng)前讀取的 chan 為nil的話,且非阻塞的情況,則會產(chǎn)生死鎖,最終提示

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan receive (nil chan)]:

否則返回零值。

同時出讓自己占用的P,允許其它goroutine搶占使用。

如果讀取的chan已關(guān)閉,則讀取出來的值為零值(函數(shù)說明第四條)。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ...

    // Fast path: check for failed non-blocking operation without acquiring the lock.
    // 在沒有獲取鎖的情況下,檢查非阻塞操作失敗
    if !block && empty(c) {
        // After observing that the channel is not ready for receiving, we observe whether the
        // channel is closed.
        //
        // Reordering of these checks could lead to incorrect behavior when racing with a close.
        // For example, if the channel was open and not empty, was closed, and then drained,
        // reordered reads could incorrectly indicate "open and empty". To prevent reordering,
        // we use atomic loads for both checks, and rely on emptying and closing to happen in
        // separate critical sections under the same lock.  This assumption fails when closing
        // an unbuffered channel with a blocked send, but that is an error condition anyway.

        // 如果當(dāng)前chan未關(guān)閉
        if atomic.Load(&c.closed) == 0 {
            // Because a channel cannot be reopened, the later observation of the channel
            // being not closed implies that it was also not closed at the moment of the
            // first observation. We behave as if we observed the channel at that moment
            // and report that the receive cannot proceed.
            return
        }
        // The channel is irreversibly closed. Re-check whether the channel has any pending data
        // to receive, which could have arrived between the empty and closed checks above.
        // Sequential consistency is also required here, when racing with such a send.
        if empty(c) {
            // The channel is irreversibly closed and empty.
            if raceenabled {
                raceacquire(c.raceaddr())
            }
            if ep != nil {
                typedmemclr(c.elemtype, ep)
            }
            return true, false
        }
    }

    ...

}

這段代碼主要是對重排讀的情況,進行了雙重檢測,暫是未明白code中考慮的情況,改天再消化消化。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ...

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    // 加鎖,下面才是真正要讀取的邏輯
    lock(&c.lock)

    if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(c.raceaddr())
        }
        unlock(&c.lock)
        if ep != nil {
            typedmemclr(c.elemtype, ep)
        }
        return true, false
    }

    ...
}

讀取之前先加鎖。

對chan的讀取與發(fā)送一樣,同樣有三種方式,為直接讀取、緩沖區(qū)讀取和阻塞讀取。

直接讀取

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ...

    // 直接讀取 
    // 從c.sendq隊列中取sudog, 將數(shù)據(jù)復(fù)制到sg
    if sg := c.sendq.dequeue(); sg != nil {
        // Found a waiting sender. If buffer is size 0, receive value
        // directly from sender. Otherwise, receive from head of queue
        // and add sender's value to the tail of the queue (both map to
        // the same buffer slot because the queue is full).
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }
}

獲取一個待發(fā)送者,如果buffer大小為0,則直接從發(fā)送者接收數(shù)據(jù)。否則從隊列頭部接收,并將發(fā)送者發(fā)送的數(shù)據(jù)放在隊列尾部。

chan recv

從c.sendq隊列里讀取一個 *sudog,通過調(diào)用 recv() 函數(shù),將數(shù)據(jù)從發(fā)送者復(fù)制到ep中,并返回true,true,表示讀取成功。真正讀取函數(shù)為 recvDirect()

緩沖區(qū)讀取

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ...

    // 如果c.qcount>0,說明緩沖區(qū)有元素可直接讀取
    if c.qcount > 0 {
        // Receive directly from queue
        // 直接從隊列中讀取
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
    }
}

如果c.qcount > 0,則說明緩沖區(qū)里有內(nèi)容可以讀取。則

直接獲取 c.recvx 數(shù)組索引位置的內(nèi)存地址,則

  1. r.recvx 索引地址的值讀取出來復(fù)制給 ep,
  2. 然后更新接收數(shù)組索引c.recvx++, 如果>數(shù)組索引最大索引值 ,重置為0
  3. 減少元素個數(shù)
  4. 釋放鎖 c.qcount--
  5. 最后unlock返回。
image

<figcaption>chan recv</figcaption>

阻塞讀取

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ......

    // c.sendq沒有sender,buffer里也是空的,直接阻塞讀取
    // no sender available: block on this channel.
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil
    c.recvq.enqueue(mysg)
    // Signal to anyone trying to shrink our stack that we're about
    // to park on a channel. The window between when this G's status
    // changes and when we set gp.activeStackChans is not safe for
    // stack shrinking.
    atomic.Store8(&gp.parkingOnChan, 1)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

    // someone woke us up
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    gp.activeStackChans = false
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    closed := gp.param == nil
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, !closed
}
  1. 通過getg()獲取一個goroutine
  2. 獲取一個sudog結(jié)構(gòu)體
  3. 綁定兩者關(guān)系
  4. 加入 c.recvq 隊列
  5. 設(shè)置goroutine為等待喚醒狀態(tài)
  6. 清理狀態(tài)
chan recv

關(guān)閉chan

關(guān)閉chan語句

close(ch)

對于已關(guān)閉的chan,是不允許再次關(guān)閉的,否則會產(chǎn)生panic。對應(yīng)的函數(shù)為 runtime.closechan()

// src/runtime/chan.go

func closechan(c *hchan) {
    // 如果chan未初始化,觸發(fā)panic
    if c == nil {
        panic(plainError("close of nil channel"))
    }

    lock(&c.lock)
    // 關(guān)閉已關(guān)閉的chan,觸發(fā)panicc
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }

    ......

}

對于一個未初始化的chan,或者已關(guān)閉的chan,如果再次關(guān)閉則會觸發(fā)panic。

func closechan(c *hchan) {
    ......
    // 設(shè)置chan關(guān)閉狀態(tài)
    c.closed = 1

    // 聲明一個結(jié)構(gòu)體鏈表gList,主要用來調(diào)度使用
    var glist gList

    // release all readers
    // 釋放所有readers
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }

        // 設(shè)置元素為nil
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }

    // release all writers (they will panic)
    // 釋放所有writers,會引起panic,見下面說明
    for {
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }

        // 設(shè)置元素為nil
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }

    // 釋放鎖
    unlock(&c.lock)

    // Ready all Gs now that we've dropped the channel lock.
    // 調(diào)度所有g(shù)
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        // 喚醒goroutine
        goready(gp, 3)
    }
}
  1. 聲明一個gList 鏈表結(jié)構(gòu)體
  2. 將接收隊列 c.recvq 中的所有元素添加到gList 中,并將原來的值設(shè)置為
  3. 將發(fā)送隊列 c.sendq 中的所有元素添加到 gList 中,并將原來的值設(shè)置為
  4. 將所有的阻塞goroutine通過函數(shù)goready() 進行調(diào)度

文章里提到在對c.sendq 處理的時候可能會觸發(fā)panic。這是因為關(guān)閉chan后,執(zhí)行了 goready() 對原來sendq里的sudogs 進行了進行了重新調(diào)度,這時候發(fā)現(xiàn)chan已經(jīng)關(guān)閉了,所以會panic。那么又是如何調(diào)度的呢?

package main

import (
    "fmt"
    "time"
)

var ch chan int

func f() {
}

func main() {
    ch := make(chan int, 10)
    // buffer大小為10,這里發(fā)送11個,使最后一個進入到c.sendq里面
    for i := 0; i < 11; i++ { // i < 10 則正常
        go func(v int) {
            ch <- v
        }(i)
    }
    time.Sleep(time.Second)
    fmt.Println("發(fā)送完畢")
    // 關(guān)閉chan,將對sendq里的g進行喚醒,喚醒后發(fā)現(xiàn)chan關(guān)閉狀態(tài),直接panic
    close(ch)
    for v := range ch {
        fmt.Println(v)
    }
    time.Sleep(time.Second)
}

有一條廣泛流傳的關(guān)閉 channel 的原則:

don't close a channel from the receiver side and don't close a channel if the channel has multiple concurrent senders.

不要從一個 receiver 側(cè)關(guān)閉 channel,也不要在有多個 sender 時,關(guān)閉 channel。對于只有一個sender的話,直接在sender端關(guān)閉就可以。但對于多個sender的話,則需要通過一個信號量進行關(guān)閉,參考這里。

總結(jié)

close 操作會觸發(fā)goroutine的調(diào)度行為。

總結(jié)

  1. 在發(fā)送和讀取 chan的時候,如果chan為nil的話,這時候就根據(jù)是否阻塞進行判斷是否會發(fā)生panic。如果阻塞狀態(tài)的話,則會發(fā)生panic,否則會直接返回
  2. 對chan 發(fā)送或接收數(shù)據(jù)的時候要保證已初始化狀態(tài)
  3. 對于已關(guān)閉的chan再次關(guān)閉會觸發(fā)panic
  4. 對于發(fā)送和讀取數(shù)據(jù)都有三種處理情況,分別是直接讀寫,緩存區(qū)讀寫和阻塞讀寫
  5. 發(fā)送和接收數(shù)據(jù)的本質(zhì)上是對值的復(fù)制操作。All transfer of value on the go channels happens with the copy of value.
  6. close(ch)會觸發(fā)goroutine 的調(diào)度行為
  7. 內(nèi)部使用 sudogs對goroutine進行了一次封裝。
  8. 如果buffer中的元素?zé)o法保證消費完的話,則會產(chǎn)生內(nèi)存泄漏的危險,這時gc是無法對這些元素時間清理的,過多的 chan就會占用大量的資源
  9. 對于chan的分配的內(nèi)存是在哪里,heap還是stack?

參考

本文如有錯誤,歡迎大家在下方留言指出。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 基礎(chǔ)用法 channel chan T是雙向channel類型,編譯器允許對雙向channel同時進行發(fā)送和接收。...
    杰克慢閱讀 392評論 0 0
  • channel 在 golang 中是一個非常重要的特性,它為我們提供了一個并發(fā)模型。對比鎖,通過 chan 在多...
    安佳瑋閱讀 752評論 1 4
  • 簡書前話: 由于簡書不支持 mermaid 流程圖,所以想看完整的版本,可以到我的個人博客 中查看 01.chan...
    Abson在簡書閱讀 1,120評論 0 0
  • 簡介 熟悉Go的人都知道,它提倡著不要通過共享內(nèi)存來通訊,而要通過通訊來共享內(nèi)存。Go提供了一種獨特的并發(fā)同步技術(shù)...
    marsjhe閱讀 3,075評論 0 2
  • Golang channel 作為Go的核心的數(shù)據(jù)結(jié)構(gòu)和Goroutine之間的通信,是支撐Go語言高并發(fā)的關(guān)鍵 ...
    LegendGo閱讀 538評論 0 1

友情鏈接更多精彩內(nèi)容