NSQ 源碼學(xué)習(xí)筆記(三)

上一篇的最后一段代碼中,channel中的消息在發(fā)送至客戶端時,也同步了一份消息發(fā)送到了inFight隊列中

    subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)

這里其實一開始不是很明白,在上網(wǎng)查閱了資料后,了解到inFlight隊列是NSQ用來實現(xiàn)消息至少投遞一次的。知道了功能后,再來看就很明了了。

func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error {
    now := time.Now()
    msg.clientID = clientID
    msg.deliveryTS = now
    msg.pri = now.Add(timeout).UnixNano()
    err := c.pushInFlightMessage(msg)
    if err != nil {
        return err
    }
    c.addToInFlightPQ(msg)
    return nil
}

上述代碼中,首先初始化消息的過期時間timeout+now,通過將msg加入到InFlight隊列中,InFlight其實是一個堆排序隊列,優(yōu)先級是按照超時時間來排序的,越靠近過期時間,將會越靠前。這里只是將消息存入隊列,那么在哪里消費呢?我們在第一篇筆記中的末尾,Nsqd在完成監(jiān)聽部分的初始化后,有四個自啟動的goroutine,第一個通過Wrap啟動的n.queueScanLoop()就是用來執(zhí)行消費的。

func (n *NSQD) queueScanLoop() {
    //任務(wù)派發(fā) 隊列
    workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount)

    //任務(wù)結(jié)果 隊列
    responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount)

    // 用來優(yōu)雅關(guān)閉
    closeCh := make(chan int)
    // 利用Ticket來定期開始任務(wù)和調(diào)整worker
    workTicker := time.NewTicker(n.getOpts().QueueScanInterval)
    refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)

    channels := n.channels()
    // 調(diào)整worker
    n.resizePool(len(channels), workCh, responseCh, closeCh)

    for {
        select {
        case <-workTicker.C: // 開始一次任務(wù)的派發(fā)
            if len(channels) == 0 {
                continue
            }
        case <-refreshTicker.C:  // 重新調(diào)整 worker 數(shù)量
            channels = n.channels()
            n.resizePool(len(channels), workCh, responseCh, closeCh)
            continue
        case <-n.exitChan:  // 退出
            goto exit
        }
        
        // num最大為nsqd的所有channel總數(shù)
        num := n.getOpts().QueueScanSelectionCount
        if num > len(channels) {
            num = len(channels)
        }

        loop:
        // 隨機取出num個channel, 派發(fā)給 worker 進行 掃描
        for _, i := range util.UniqRands(num, len(channels)) {
            workCh <- channels[i]
        }

        // 接收 掃描結(jié)果, 統(tǒng)一 有多少 channel 是 "臟" 的
        numDirty := 0
        for i := 0; i < num; i++ {
            if <-responseCh {
                numDirty++
            }
        }

        // 假如 "臟" 的 "比例" 大于閥值, 則不等待 workTicker
        // 馬上進行下一輪 掃描
        if float64(numDirty) / float64(num) > n.getOpts().QueueScanDirtyPercent {
            goto loop
        }
    }

    exit:
    n.logf("QUEUESCAN: closing")
    close(closeCh)
    workTicker.Stop()
    refreshTicker.Stop()
}

// resizePool adjusts the size of the pool of queueScanWorker goroutines
//
//  1 <= pool <= min(num * 0.25, QueueScanWorkerPoolMax)
//
func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
    // 校驗啟動的worker數(shù)量,最大為nsqd的所有channel數(shù) * 1/4,
    idealPoolSize := int(float64(num) * 0.25)
    if idealPoolSize < 1 {
        idealPoolSize = 1
    } else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax {
        idealPoolSize = n.getOpts().QueueScanWorkerPoolMax
    }
    for {
        // 當(dāng)前啟動的worker數(shù)等于設(shè)定的idealPoolSize,那么直接返回,
        // 如果大于了idealPoolSize,通過closeCh關(guān)閉一個worker
        // 如果未達到idealPoolSize,啟動worker的goroutine
        if idealPoolSize == n.poolSize {
            break
        } else if idealPoolSize < n.poolSize {
            // contract
            closeCh <- 1
            n.poolSize--
        } else {
            // expand
            n.waitGroup.Wrap(func() {
                n.queueScanWorker(workCh, responseCh, closeCh)
            })
            n.poolSize++
        }
    }
}

worker的具體實現(xiàn)是queueScanWorker

// queueScanWorker receives work (in the form of a channel) from queueScanLoop
// and processes the deferred and in-flight queues
func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
    for {
        select {
        case c := <-workCh:
            now := time.Now().UnixNano()
            dirty := false
            // 實現(xiàn)消息至少被投遞一次
            if c.processInFlightQueue(now) {
                dirty = true
            }
            // 實現(xiàn)延遲消息隊列
            if c.processDeferredQueue(now) {
                dirty = true
            }
            // 如果有過期消息的存在,則dirty
            responseCh <- dirty
        case <-closeCh:
            return
        }
    }
}

func (c *Channel) processInFlightQueue(t int64) bool {
    c.exitMutex.RLock()
    defer c.exitMutex.RUnlock()

    if c.Exiting() {
        return false
    }

    dirty := false
    for {
        c.inFlightMutex.Lock()
        // 從隊列中獲取已經(jīng)過期的消息
        msg, _ := c.inFlightPQ.PeekAndShift(t)
        c.inFlightMutex.Unlock()
        
        if msg == nil {
            goto exit
        }
        dirty = true
        // 如果獲取到了符合條件的msg,按msg.ID將msg在infight隊列中刪除
        _, err := c.popInFlightMessage(msg.clientID, msg.ID)
        if err != nil {
            goto exit
        }
        atomic.AddUint64(&c.timeoutCount, 1)
        c.RLock()
        client, ok := c.clients[msg.clientID]
        c.RUnlock()
        if ok {
            client.TimedOutMessage()
        }
        // 消息在channel中發(fā)起重新投遞
        c.doRequeue(msg)
    }

exit:
    return dirty
}

// 延遲消息隊列的實現(xiàn)
func (c *Channel) processDeferredQueue(t int64) bool {
    c.exitMutex.RLock()
    defer c.exitMutex.RUnlock()

    if c.Exiting() {
        return false
    }

    dirty := false
    for {
        c.deferredMutex.Lock()
        item, _ := c.deferredPQ.PeekAndShift(t)
        c.deferredMutex.Unlock()

        if item == nil {
            goto exit
        }
        dirty = true

        msg := item.Value.(*Message)
        _, err := c.popDeferredMessage(msg.ID)
        if err != nil {
            goto exit
        }
        c.doRequeue(msg)
    }

exit:
    return dirty
}

??上面的兩個函數(shù)processDeferredQueueprocessInFlightQueue的實現(xiàn)基本一致,那為什么相同的邏輯要實現(xiàn)兩次呢。兩個隊列,DeferredQueue 用 head 包實現(xiàn), InFlightQueue 自己又實現(xiàn)了一次heap, 其實跟 DeferredQueue 不是一樣的么?

??之前兩個就真是是一樣的, 后來有一個提交,里面的注釋是: this eliminates the use of container/heap and the associated cost of boxing and interface type assertions.

https://github.com/nsqio/nsq/commit/74bfde101934700cb0cd980d01b6dfe2fe5a6a53

??意思就是說, 這些 隊列里 存的是 Message 這個類型, 如果使用 heap, 需要存到 heap.Item 的 Value 里,而這個value 是一個 interface{} , 賦值 和 取值 都需要做類型推斷 和 包裝,那么作為 InFlightQueue 這個 “高負荷” 的隊列, 減少這種 “類型推斷和包裝” , 有利于提高性能

測試一下:

type Item struct {
    d1 int
    d2 int
}

func BenchmarkT1(b *testing.B) {
    q := make([]*Item, 0)   // 不需要類型推斷的 slice
    for i := 0; i < b.N; i++ {
        q = append(q, &Item{i, i})
    }
    for _, hero := range q {
        hero.d1++
    }
}

func BenchmarkT2(b *testing.B) {
    q := make([]interface{}, 0)
    for i := 0; i < b.N; i++ {
        q = append(q, &Item{i, i})
    }
    for _, hero := range q {
        hero.(*Item).d1++   // 需要做類型推斷
    }
}

測試結(jié)果:

BenchmarkT1-8           10000000               241 ns/op
BenchmarkT2-8            5000000               332 ns/op
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容