Nsq源碼學(xué)習(xí)

memoryMsgChan = nil backendMsgChan = nil flusherChan = nil

nsqd的啟動(dòng)過(guò)程從nsq/apps/nsqd文件中main函數(shù)啟動(dòng),使用"github.com/judwhite/go-svc/svc"的svc包進(jìn)行init, run, stop。program結(jié)構(gòu)實(shí)現(xiàn)svc包中的service接口。

該接口定義了三個(gè)生命周期方法

Init(Environment)該方法在nsqd開(kāi)始工作之前被執(zhí)行,完成一些初始化工作

Start()該方法對(duì)應(yīng)nsqd啟動(dòng)過(guò)程,注釋要求該方法非阻塞的,

Stop()對(duì)應(yīng)nsqd的停止過(guò)程

啟動(dòng)的正式函數(shù)為nsqd.Main()啟動(dòng)和初始化。啟動(dòng)之后開(kāi)啟tcp和http服務(wù),開(kāi)啟lookupLoop和queueScanLoop服務(wù)。

Http服務(wù)有GET和POST請(qǐng)求,能夠執(zhí)行ping,info, pub, mpub, stats, topic/create, topic/delete, topic/pause, topic/unpause, channel/create,channel/delete, channel/empty, channel/pause ,channel/unpause/, config/opt還有一些debug請(qǐng)求。

tcp提供FIN, RDY, REQU, PUB, MPUB,DPUB,NOP, TOUCH, SUB, CLS, AUTH

以下為nsqd的啟動(dòng)流程,到數(shù)據(jù)處理之前。

func (p *program) Start() error {

? ? //獲取配置文件信息,默認(rèn)的配置信息

? opts := nsqd.NewOptions()

? ? //以下解析用戶輸入的命令行參數(shù),

? ? flagSet := nsqdFlagSet(opts)

? flagSet.Parse(os.Args[1:])

? rand.Seed(time.Now().UTC().UnixNano())

? if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) {

? ? ? fmt.Println(version.String("nsqd"))

? ? ? os.Exit(0)

? }

? var cfg config

? //以下解析配置文件

? configFile := flagSet.Lookup("config").Value.String()

? if configFile != "" {

? ? ? //toml這個(gè)包沒(méi)懂干嘛的,

? ? ? _, err := toml.DecodeFile(configFile, &cfg)

? ? ? if err != nil {

? ? ? ? log.Fatalf("ERROR: failed to load config file %s - %s", configFile, err.Error())

? ? ? }

? }

? cfg.Validate()

//通過(guò)options.Resolve方法將兩種方式的參數(shù)合并到opts

? options.Resolve(opts, flagSet, cfg)

? ? //創(chuàng)建nsqd實(shí)例

? ? nsqd := nsqd.New(opts)

? ? //加載以前的元數(shù)據(jù)

? err := nsqd.LoadMetadata()

? if err != nil {

? ? ? log.Fatalf("ERROR: %s", err.Error())

? }

? err = nsqd.PersistMetadata()

? if err != nil {

? ? ? log.Fatalf("ERROR: failed to persist metadata - %s", err.Error())

? }

? //nsqd啟動(dòng)的正式開(kāi)始函數(shù),nsq/nsqd/nsqd.go中

? nsqd.Main()

? p.nsqd = nsqd

? return nil

}

在program結(jié)構(gòu)體實(shí)現(xiàn)的Start方法中,對(duì)用戶輸入的命令行參數(shù)進(jìn)行解析,判斷用戶是否指定了自定義配置文件,有,對(duì)該文件進(jìn)行加載與校驗(yàn),nsqd參數(shù)設(shè)置通過(guò)兩種方式實(shí)現(xiàn),命令行和自定義配置文件。

創(chuàng)建nsqd實(shí)例

通過(guò)nsqio/nsq/nsqd/nsqd.go提供的工廠方法。主要完成了一下工作:

func New(opts *Options) *NSQD {

dataPath := opts.DataPath

? ? //數(shù)據(jù)持久化路徑,用戶未指定,將使用當(dāng)前文件夾

? ? //作為數(shù)據(jù)持久化的路徑

if opts.DataPath == "" {

cwd, _ := os.Getwd()

dataPath = cwd

}

if opts.Logger == nil {

opts.Logger = log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Ltime|log.Lmicroseconds)

}

//初始化一個(gè)NSQD實(shí)例,設(shè)置其中一些字段

n := &NSQD{

startTime:? ? ? ? ? ? time.Now(),

topicMap:? ? ? ? ? ? make(map[string]*Topic),

exitChan:? ? ? ? ? ? make(chan int),

notifyChan:? ? ? ? ? make(chan interface{}),

optsNotificationChan: make(chan struct{}, 1),

dl:? ? ? ? ? ? ? ? ? dirlock.New(dataPath),

}

//創(chuàng)建一個(gè)http客戶端

httpcli := http_api.NewClient(nil, opts.HTTPClientConnectTimeout, opts.HTTPClientRequestTimeout)

n.ci = clusterinfo.New(n.logf, httpcli)

//以下都是對(duì)配置一些字段進(jìn)行校驗(yàn)

n.swapOpts(opts)

n.errValue.Store(errStore{})

var err error

opts.logLevel, err = lg.ParseLogLevel(opts.LogLevel, opts.Verbose)

if err != nil {

n.logf(LOG_FATAL, "%s", err)

os.Exit(1)

}

err = n.dl.Lock()

if err != nil {

n.logf(LOG_FATAL, "--data-path=%s in use (possibly by another instance of nsqd)", dataPath)

os.Exit(1)

}

if opts.MaxDeflateLevel < 1 || opts.MaxDeflateLevel > 9 {

n.logf(LOG_FATAL, "--max-deflate-level must be [1,9]")

os.Exit(1)

}

if opts.ID < 0 || opts.ID >= 1024 {

n.logf(LOG_FATAL, "--node-id must be [0,1024)")

os.Exit(1)

}

if opts.StatsdPrefix != "" {

var port string

_, port, err = net.SplitHostPort(opts.HTTPAddress)

if err != nil {

n.logf(LOG_FATAL, "failed to parse HTTP address (%s) - %s", opts.HTTPAddress, err)

os.Exit(1)

}

statsdHostKey := statsd.HostKey(net.JoinHostPort(opts.BroadcastAddress, port))

prefixWithHost := strings.Replace(opts.StatsdPrefix, "%s", statsdHostKey, -1)

if prefixWithHost[len(prefixWithHost)-1] != '.' {

prefixWithHost += "."

}

opts.StatsdPrefix = prefixWithHost

}

if opts.TLSClientAuthPolicy != "" && opts.TLSRequired == TLSNotRequired {

opts.TLSRequired = TLSRequired

}

tlsConfig, err := buildTLSConfig(opts)

if err != nil {

n.logf(LOG_FATAL, "failed to build TLS config - %s", err)

os.Exit(1)

}

if tlsConfig == nil && opts.TLSRequired != TLSNotRequired {

n.logf(LOG_FATAL, "cannot require TLS client connections without TLS key and cert")

os.Exit(1)

}

n.tlsConfig = tlsConfig

for _, v := range opts.E2EProcessingLatencyPercentiles {

if v <= 0 || v > 1 {

n.logf(LOG_FATAL, "Invalid percentile: %v", v)

os.Exit(1)

}

}

n.logf(LOG_INFO, version.String("nsqd"))

n.logf(LOG_INFO, "ID: %d", opts.ID)

return n

}

加載之前元數(shù)據(jù)nsqd運(yùn)行停止后會(huì)創(chuàng)建兩個(gè)文件:nsqd.dat和nsqd.(id號(hào)).dat,若之后啟動(dòng)nsqd仍指定相同的dataPath,nsqd會(huì)嘗試從兩個(gè)文件中加載之前保存元數(shù)據(jù),即topic和channel信息,如果兩個(gè)都不存在,該方法在19行返回,認(rèn)為是一次全新啟動(dòng),如果兩個(gè)文件存在但其中保存信息不一致,nsqd停止,并提示用戶刪除其中一個(gè)。

func (n *NSQD) LoadMetadata() error {

atomic.StoreInt32(&n.isLoading, 1)

defer atomic.StoreInt32(&n.isLoading, 0)

fn := newMetadataFile(n.getOpts())

// old metadata filename with ID, maintained in parallel to enable roll-back

fnID := oldMetadataFile(n.getOpts())

data, err := readOrEmpty(fn)

if err != nil {

return err

}

dataID, errID := readOrEmpty(fnID)

if errID != nil {

return errID

}

if data == nil && dataID == nil {

return nil // fresh start

}

if data != nil && dataID != nil {

if bytes.Compare(data, dataID) != 0 {

return fmt.Errorf("metadata in %s and %s do not match (delete one)", fn, fnID)

}

}

if data == nil {

// only old metadata file exists, use it

fn = fnID

data = dataID

}

var m meta

err = json.Unmarshal(data, &m)

if err != nil {

return fmt.Errorf("failed to parse metadata in %s - %s", fn, err)

}

for _, t := range m.Topics {

if !protocol.IsValidTopicName(t.Name) {

n.logf(LOG_WARN, "skipping creation of invalid topic %s", t.Name)

continue

}

topic := n.GetTopic(t.Name)

if t.Paused {

topic.Pause()

}

for _, c := range t.Channels {

if !protocol.IsValidChannelName(c.Name) {

n.logf(LOG_WARN, "skipping creation of invalid channel %s", c.Name)

continue

}

channel := topic.GetChannel(c.Name)

if c.Paused {

channel.Pause()

}

}

}

return nil

}

啟動(dòng)nsqd提供的服務(wù)&waitGroupWrapper

nsqd正式工作函數(shù)入口為nsqd.Main(),該Main函數(shù)位于,nsq/nsqd/nsqd.go中。完成了Tcp, Http, Https(如果需要的話)服務(wù)器啟動(dòng),之前的program實(shí)現(xiàn)的Start方法非阻塞,NSQD的Main方法在Start中被調(diào)用,這就要求Main方法必須非阻塞的。但無(wú)論Tcp服務(wù)器還是Http服務(wù)器,都需要在一個(gè)死循環(huán)中對(duì)新連接進(jìn)行業(yè)務(wù)處理,Main方法通過(guò)WaitGroupWrapper保證自身非阻塞。

func (n *NSQD) Main() {

? var err error

? //建立上下文機(jī)制,退出時(shí),等待子協(xié)程的結(jié)束

? ctx := &context{n}

? n.tcpListener, err = net.Listen("tcp", n.getOpts().TCPAddress)

? if err != nil {

? ? ? n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().TCPAddress, err)

? ? ? os.Exit(1)

? }

? n.httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress)

? if err != nil {

? ? ? n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().HTTPAddress, err)

? ? ? os.Exit(1)

? }

? if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {

? ? ? n.httpsListener, err = tls.Listen("tcp", n.getOpts().HTTPSAddress, n.tlsConfig)

? ? ? if err != nil {

? ? ? ? n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().HTTPSAddress, err)

? ? ? ? os.Exit(1)

? ? ? }

? }

? tcpServer := &tcpServer{ctx: ctx}

? //使用waiGroup等待所有協(xié)程退出,為了避免代碼冗余和美觀,封裝成一個(gè)類進(jìn)行。

? n.waitGroup.Wrap(func() {

? ? ? //處理tcp協(xié)議信息,通過(guò)實(shí)現(xiàn)接口使用,接口的實(shí)現(xiàn)類nsq/nsqd/tcp.go中

? ? ? protocol.TCPServer(n.tcpListener, tcpServer, n.logf)

? })

? httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)

? n.waitGroup.Wrap(func() {

? ? ? http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf)

? })

? if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {

? ? ? httpsServer := newHTTPServer(ctx, true, true)

? ? ? n.waitGroup.Wrap(func() {

? ? ? ? http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf)

? ? ? })

? }

? n.waitGroup.Wrap(n.queueScanLoop)

? n.waitGroup.Wrap(n.lookupLoop)

? if n.getOpts().StatsdAddress != "" {

? ? ? n.waitGroup.Wrap(n.statsdLoop)

? }

}

以上,nsqd啟動(dòng)完畢。開(kāi)始接收tcp和http請(qǐng)求處理請(qǐng)求。tcp處理實(shí)現(xiàn)通過(guò)實(shí)現(xiàn)Protocol接口類,每一個(gè)請(qǐng)求都會(huì)在IOLoop中進(jìn)行處理,建立client類,處理請(qǐng)求返回response。

處理tcp請(qǐng)求時(shí),nsq/nsqd/tcp.go中的tcpServer結(jié)構(gòu)體實(shí)現(xiàn)了nsq/internal/protocol/tcp_server.go中的TCPHandler接口處理tcp請(qǐng)求。

在tcp.go中的Handler方法中,根據(jù)發(fā)送過(guò)來(lái)信息的前四個(gè)字節(jié)判斷是否是V2,如果是使用protocolV2類處理信息。否則直接返回錯(cuò)誤。

var prot protocol.Protocol

switch protocolMagic {

case "? V2":

? prot = &protocolV2{ctx: p.ctx}

default:

? protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))

? clientConn.Close()

? p.ctx.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",

? ? ? clientConn.RemoteAddr(), protocolMagic)

? return

}

err = prot.IOLoop(clientConn)

if err != nil {

? p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)

? return

}

信息處理轉(zhuǎn)到nsq/nsqd/protocol_v2.go中的 IOLoop函數(shù)執(zhí)行。

func (p *protocolV2) IOLoop(conn net.Conn) error {

? var err error

? var line []byte

? var zeroTime time.Time

? clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1)

? client := newClientV2(clientID, conn, p.ctx)

? p.ctx.nsqd.AddClient(client.ID, client)

? // synchronize the startup of messagePump in order

? // to guarantee that it gets a chance to initialize

? // goroutine local state derived from client attributes

? // and avoid a potential race with IDENTIFY (where a client

? // could have changed or disabled said attributes)

? //采用這種方式,是為了保證messagePump中的局部變量初始化成功,

? //messagePump對(duì)sub訂閱進(jìn)行處理

? ? messagePumpStartedChan := make(chan bool)

? go p.messagePump(client, messagePumpStartedChan)

? <-messagePumpStartedChan

? for {

? ? ? if client.HeartbeatInterval > 0 {

? ? ? ? client.SetReadDeadline(time.Now().Add(client.HeartbeatInterval * 2))

? ? ? } else {

? ? ? ? client.SetReadDeadline(zeroTime)

? ? ? }

? ? ? // ReadSlice does not allocate new space for the data each request

? ? ? // ie. the returned slice is only valid until the next call to it

? ? ? //一次讀取一行的數(shù)據(jù),以\n結(jié)束。

? ? ? line, err = client.Reader.ReadSlice('\n')

? ? ? if err != nil {

? ? ? ? if err == io.EOF {

? ? ? ? ? ? err = nil

? ? ? ? } else {

? ? ? ? ? ? err = fmt.Errorf("failed to read command - %s", err)

? ? ? ? }

? ? ? ? break

? ? ? }

? ? ? // trim the '\n'

? ? ? line = line[:len(line)-1]

? ? ? // optionally trim the '\r'

? ? ? if len(line) > 0 && line[len(line)-1] == '\r' {

? ? ? ? line = line[:len(line)-1]

? ? ? }

? ? ? //按照分隔符空格符分開(kāi)。通過(guò)查看consumer端代碼,發(fā)過(guò)來(lái)的格式,命令名字+" " +(param長(zhǎng)度 + param) + 加換行符。

? ? ? //param topic, channel以及發(fā)送信息

? ? ? params := bytes.Split(line, separatorBytes)

? ? ? p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, params)

? ? ? var response []byte

? ? ? //根據(jù)解析的信息,判斷命令類型,轉(zhuǎn)到對(duì)應(yīng)的命令操作,進(jìn)行處理和回復(fù)。

? ? ? response, err = p.Exec(client, params)

? ? ? if err != nil {

? ? ? ? ctx := ""

? ? ? ? if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {

? ? ? ? ? ? ctx = " - " + parentErr.Error()

? ? ? ? }

? ? ? ? p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx)

? ? ? ? sendErr := p.Send(client, frameTypeError, []byte(err.Error()))

? ? ? ? if sendErr != nil {

? ? ? ? ? ? p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx)

? ? ? ? ? ? break

? ? ? ? }

? ? ? ? // errors of type FatalClientErr should forceably close the connection

? ? ? ? if _, ok := err.(*protocol.FatalClientErr); ok {

? ? ? ? ? ? break

? ? ? ? }

? ? ? ? continue

? ? ? }

? ? ? if response != nil {

? ? ? ? err = p.Send(client, frameTypeResponse, response)

? ? ? ? if err != nil {

? ? ? ? ? ? err = fmt.Errorf("failed to send response - %s", err)

? ? ? ? ? ? break

? ? ? ? }

? ? ? }

? }

? p.ctx.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting ioloop", client)

? conn.Close()

? close(client.ExitChan)

? if client.Channel != nil {

? ? ? client.Channel.RemoveClient(client.ID)

? }

? p.ctx.nsqd.RemoveClient(client.ID)

? return err

}

在IOLoop函數(shù)處理中,在死循環(huán)中不斷讀取client連接中傳過(guò)來(lái)的信息,每一行一條信息,解析每條信息,p.Exec轉(zhuǎn)到對(duì)應(yīng)命令函數(shù)進(jìn)行處理。在執(zhí)行sub命中,牽扯到SubEventChan,會(huì)導(dǎo)致channel數(shù)據(jù)的分配發(fā)生重塑。

p.messagePump處理channel中的信息。

p.messagePump,(首先明白的一點(diǎn)tcp長(zhǎng)連接不發(fā)送釋放命令或者意外的情況下不斷開(kāi))根據(jù)連接的client發(fā)送過(guò)來(lái)的請(qǐng)求類型處理消息。

以sub命令為例。

func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {

? var err error

? //memeoryMsgChan和backendMsgChan是訂閱channel的。

? var memoryMsgChan chan *Message

? var backendMsgChan chan []byte

? var subChannel *Channel

? // NOTE: `flusherChan` is used to bound message latency for

? // the pathological case of a channel on a low volume topic

? // with >1 clients having >1 RDY counts

? var flusherChan <-chan time.Time

? var sampleRate int32

? ? //判斷是否訂閱新通道

? subEventChan := client.SubEventChan

? identifyEventChan := client.IdentifyEventChan

? outputBufferTicker := time.NewTicker(client.OutputBufferTimeout)

? heartbeatTicker := time.NewTicker(client.HeartbeatInterval)

? heartbeatChan := heartbeatTicker.C

? msgTimeout := client.MsgTimeout

? // v2 opportunistically buffers data to clients to reduce write system calls

? // we force flush in two cases:

? //? ? 1. when the client is not ready to receive messages

? //? ? 2. we're buffered and the channel has nothing left to send us

? //? ? ? (ie. we would block in this loop anyway)

? //

? flushed := true

? // signal to the goroutine that started the messagePump

? // that we've started up

? close(startedChan)

? for {

? ? ? if subChannel == nil || !client.IsReadyForMessages() {

? ? ? ? // the client is not ready to receive messages...

? ? ? ? memoryMsgChan = nil

? ? ? ? backendMsgChan = nil

? ? ? ? flusherChan = nil

? ? ? ? // force flush

? ? ? ? client.writeLock.Lock()

? ? ? ? err = client.Flush()

? ? ? ? client.writeLock.Unlock()

? ? ? ? if err != nil {

? ? ? ? ? ? goto exit

? ? ? ? }

? ? ? ? flushed = true

? ? ? } else if flushed {

? ? ? ? // last iteration we flushed...

? ? ? ? // do not select on the flusher ticker channel

? ? ? ? memoryMsgChan = subChannel.memoryMsgChan

? ? ? ? backendMsgChan = subChannel.backend.ReadChan()

? ? ? ? flusherChan = nil

? ? ? } else {

? ? ? ? // we're buffered (if there isn't any more data we should flush)...

? ? ? ? // select on the flusher ticker channel, too

? ? ? ? memoryMsgChan = subChannel.memoryMsgChan

? ? ? ? backendMsgChan = subChannel.backend.ReadChan()

? ? ? ? flusherChan = outputBufferTicker.C

? ? ? }

? ? ? select {

? ? ? case <-flusherChan:

? ? ? ? // if this case wins, we're either starved

? ? ? ? // or we won the race between other channels...

? ? ? ? // in either case, force flush

? ? ? ? client.writeLock.Lock()

? ? ? ? err = client.Flush()

? ? ? ? client.writeLock.Unlock()

? ? ? ? if err != nil {

? ? ? ? ? ? goto exit

? ? ? ? }

? ? ? ? flushed = true

? ? ? case <-client.ReadyStateChan:

? ? ? case subChannel = <-subEventChan:

? ? ? ? // you can't SUB anymore

? ? ? ? subEventChan = nil

? ? ? case identifyData := <-identifyEventChan:

? ? ? ? // you can't IDENTIFY anymore

? ? ? ? identifyEventChan = nil

? ? ? ? outputBufferTicker.Stop()

? ? ? ? if identifyData.OutputBufferTimeout > 0 {

? ? ? ? ? ? outputBufferTicker = time.NewTicker(identifyData.OutputBufferTimeout)

? ? ? ? }

? ? ? ? heartbeatTicker.Stop()

? ? ? ? heartbeatChan = nil

? ? ? ? if identifyData.HeartbeatInterval > 0 {

? ? ? ? ? ? heartbeatTicker = time.NewTicker(identifyData.HeartbeatInterval)

? ? ? ? ? ? heartbeatChan = heartbeatTicker.C

? ? ? ? }

? ? ? ? if identifyData.SampleRate > 0 {

? ? ? ? ? ? sampleRate = identifyData.SampleRate

? ? ? ? }

? ? ? ? msgTimeout = identifyData.MsgTimeout

? ? ? case <-heartbeatChan:

? ? ? ? err = p.Send(client, frameTypeResponse, heartbeatBytes)

? ? ? ? if err != nil {

? ? ? ? ? ? goto exit

? ? ? ? }

? ? ? case b := <-backendMsgChan:

? ? ? ? if sampleRate > 0 && rand.Int31n(100) > sampleRate {

? ? ? ? ? ? continue

? ? ? ? }

? ? ? ? msg, err := decodeMessage(b)

? ? ? ? if err != nil {

? ? ? ? ? ? p.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)

? ? ? ? ? ? continue

? ? ? ? }

? ? ? ? msg.Attempts++

? ? ? ? subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)

? ? ? ? client.SendingMessage()

? ? ? ? err = p.SendMessage(client, msg)

? ? ? ? if err != nil {

? ? ? ? ? ? goto exit

? ? ? ? }

? ? ? ? flushed = false

? ? ? case msg := <-memoryMsgChan:

? ? ? ? if sampleRate > 0 && rand.Int31n(100) > sampleRate {

? ? ? ? ? ? continue

? ? ? ? }

? ? ? ? msg.Attempts++

? ? ? ? subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)

? ? ? ? client.SendingMessage()

? ? ? ? err = p.SendMessage(client, msg)

? ? ? ? if err != nil {

? ? ? ? ? ? goto exit

? ? ? ? }

? ? ? ? flushed = false

? ? ? case <-client.ExitChan:

? ? ? ? goto exit

? ? ? }

? }

exit:

? p.ctx.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting messagePump", client)

? heartbeatTicker.Stop()

? outputBufferTicker.Stop()

? if err != nil {

? ? ? p.ctx.nsqd.logf(LOG_ERROR, "PROTOCOL(V2): [%s] messagePump error - %s", client, err)

? }

}

12行subEventChan := client.SubEventChan更新消息泵的意思正是將客戶端訂閱的channel發(fā)送到subEventChan中去。

72-74行,在這里將從subEventChan中接收到的值保存到subChannel中。

54-56行,當(dāng)subChannel被賦值為客戶端訂閱的channel之后,使用subChannel內(nèi)存消息隊(duì)列和磁盤消息隊(duì)列對(duì)memoryMsgChan和backedMsgChan進(jìn)行賦值。

101-132行,監(jiān)聽(tīng)memoryMsgChan和backendMsgChan,當(dāng)其上有消息傳入時(shí)(向nsq發(fā)布消息,消息被寫(xiě)入到topic,進(jìn)而被寫(xiě)入到topic。

當(dāng)客戶端訂閱的topic的channel有消息時(shí),寫(xiě)入到topic下的channel的消息,通過(guò)p.SendMessage(client,msg)來(lái)將消息發(fā)送給客戶端。

客戶端的tcp發(fā)布PUB流程

首先每一個(gè)新連接的客戶端都是通過(guò)IOLoop進(jìn)行處理,啟動(dòng)(protocloV2)messagePump,在messagePump里因?yàn)槭莗roducer所以, memoryMsgChan = nil backendMsgChan = nil flusherChan = nil。

在IOLoop中p.Exec函數(shù)的PUB將消息寫(xiě)到topic中,根據(jù)寫(xiě)入的topic中。

客戶端http的PUB看nsq的http。

客戶端SUB訂閱

IOLoop中進(jìn)行處理,啟動(dòng)messagePump和Exec函數(shù)。在Exec函數(shù)中,更新訂閱channel的信息,更新client的信息。返回ok。

在messagePumn中當(dāng)接收到channel有消息時(shí)就

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容