memoryMsgChan = nil backendMsgChan = nil flusherChan = nil
nsqd的啟動(dòng)過(guò)程從nsq/apps/nsqd文件中main函數(shù)啟動(dòng),使用"github.com/judwhite/go-svc/svc"的svc包進(jìn)行init, run, stop。program結(jié)構(gòu)實(shí)現(xiàn)svc包中的service接口。
該接口定義了三個(gè)生命周期方法
Init(Environment)該方法在nsqd開(kāi)始工作之前被執(zhí)行,完成一些初始化工作
Start()該方法對(duì)應(yīng)nsqd啟動(dòng)過(guò)程,注釋要求該方法非阻塞的,
Stop()對(duì)應(yīng)nsqd的停止過(guò)程
啟動(dòng)的正式函數(shù)為nsqd.Main()啟動(dòng)和初始化。啟動(dòng)之后開(kāi)啟tcp和http服務(wù),開(kāi)啟lookupLoop和queueScanLoop服務(wù)。
Http服務(wù)有GET和POST請(qǐng)求,能夠執(zhí)行ping,info, pub, mpub, stats, topic/create, topic/delete, topic/pause, topic/unpause, channel/create,channel/delete, channel/empty, channel/pause ,channel/unpause/, config/opt還有一些debug請(qǐng)求。
tcp提供FIN, RDY, REQU, PUB, MPUB,DPUB,NOP, TOUCH, SUB, CLS, AUTH
以下為nsqd的啟動(dòng)流程,到數(shù)據(jù)處理之前。
func (p *program) Start() error {
? ? //獲取配置文件信息,默認(rèn)的配置信息
? opts := nsqd.NewOptions()
? ? //以下解析用戶輸入的命令行參數(shù),
? ? flagSet := nsqdFlagSet(opts)
? flagSet.Parse(os.Args[1:])
? rand.Seed(time.Now().UTC().UnixNano())
? if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) {
? ? ? fmt.Println(version.String("nsqd"))
? ? ? os.Exit(0)
? }
? var cfg config
? //以下解析配置文件
? configFile := flagSet.Lookup("config").Value.String()
? if configFile != "" {
? ? ? //toml這個(gè)包沒(méi)懂干嘛的,
? ? ? _, err := toml.DecodeFile(configFile, &cfg)
? ? ? if err != nil {
? ? ? ? log.Fatalf("ERROR: failed to load config file %s - %s", configFile, err.Error())
? ? ? }
? }
? cfg.Validate()
//通過(guò)options.Resolve方法將兩種方式的參數(shù)合并到opts
? options.Resolve(opts, flagSet, cfg)
? ? //創(chuàng)建nsqd實(shí)例
? ? nsqd := nsqd.New(opts)
? ? //加載以前的元數(shù)據(jù)
? err := nsqd.LoadMetadata()
? if err != nil {
? ? ? log.Fatalf("ERROR: %s", err.Error())
? }
? err = nsqd.PersistMetadata()
? if err != nil {
? ? ? log.Fatalf("ERROR: failed to persist metadata - %s", err.Error())
? }
? //nsqd啟動(dòng)的正式開(kāi)始函數(shù),nsq/nsqd/nsqd.go中
? nsqd.Main()
? p.nsqd = nsqd
? return nil
}
在program結(jié)構(gòu)體實(shí)現(xiàn)的Start方法中,對(duì)用戶輸入的命令行參數(shù)進(jìn)行解析,判斷用戶是否指定了自定義配置文件,有,對(duì)該文件進(jìn)行加載與校驗(yàn),nsqd參數(shù)設(shè)置通過(guò)兩種方式實(shí)現(xiàn),命令行和自定義配置文件。
創(chuàng)建nsqd實(shí)例
通過(guò)nsqio/nsq/nsqd/nsqd.go提供的工廠方法。主要完成了一下工作:
func New(opts *Options) *NSQD {
dataPath := opts.DataPath
? ? //數(shù)據(jù)持久化路徑,用戶未指定,將使用當(dāng)前文件夾
? ? //作為數(shù)據(jù)持久化的路徑
if opts.DataPath == "" {
cwd, _ := os.Getwd()
dataPath = cwd
}
if opts.Logger == nil {
opts.Logger = log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Ltime|log.Lmicroseconds)
}
//初始化一個(gè)NSQD實(shí)例,設(shè)置其中一些字段
n := &NSQD{
startTime:? ? ? ? ? ? time.Now(),
topicMap:? ? ? ? ? ? make(map[string]*Topic),
exitChan:? ? ? ? ? ? make(chan int),
notifyChan:? ? ? ? ? make(chan interface{}),
optsNotificationChan: make(chan struct{}, 1),
dl:? ? ? ? ? ? ? ? ? dirlock.New(dataPath),
}
//創(chuàng)建一個(gè)http客戶端
httpcli := http_api.NewClient(nil, opts.HTTPClientConnectTimeout, opts.HTTPClientRequestTimeout)
n.ci = clusterinfo.New(n.logf, httpcli)
//以下都是對(duì)配置一些字段進(jìn)行校驗(yàn)
n.swapOpts(opts)
n.errValue.Store(errStore{})
var err error
opts.logLevel, err = lg.ParseLogLevel(opts.LogLevel, opts.Verbose)
if err != nil {
n.logf(LOG_FATAL, "%s", err)
os.Exit(1)
}
err = n.dl.Lock()
if err != nil {
n.logf(LOG_FATAL, "--data-path=%s in use (possibly by another instance of nsqd)", dataPath)
os.Exit(1)
}
if opts.MaxDeflateLevel < 1 || opts.MaxDeflateLevel > 9 {
n.logf(LOG_FATAL, "--max-deflate-level must be [1,9]")
os.Exit(1)
}
if opts.ID < 0 || opts.ID >= 1024 {
n.logf(LOG_FATAL, "--node-id must be [0,1024)")
os.Exit(1)
}
if opts.StatsdPrefix != "" {
var port string
_, port, err = net.SplitHostPort(opts.HTTPAddress)
if err != nil {
n.logf(LOG_FATAL, "failed to parse HTTP address (%s) - %s", opts.HTTPAddress, err)
os.Exit(1)
}
statsdHostKey := statsd.HostKey(net.JoinHostPort(opts.BroadcastAddress, port))
prefixWithHost := strings.Replace(opts.StatsdPrefix, "%s", statsdHostKey, -1)
if prefixWithHost[len(prefixWithHost)-1] != '.' {
prefixWithHost += "."
}
opts.StatsdPrefix = prefixWithHost
}
if opts.TLSClientAuthPolicy != "" && opts.TLSRequired == TLSNotRequired {
opts.TLSRequired = TLSRequired
}
tlsConfig, err := buildTLSConfig(opts)
if err != nil {
n.logf(LOG_FATAL, "failed to build TLS config - %s", err)
os.Exit(1)
}
if tlsConfig == nil && opts.TLSRequired != TLSNotRequired {
n.logf(LOG_FATAL, "cannot require TLS client connections without TLS key and cert")
os.Exit(1)
}
n.tlsConfig = tlsConfig
for _, v := range opts.E2EProcessingLatencyPercentiles {
if v <= 0 || v > 1 {
n.logf(LOG_FATAL, "Invalid percentile: %v", v)
os.Exit(1)
}
}
n.logf(LOG_INFO, version.String("nsqd"))
n.logf(LOG_INFO, "ID: %d", opts.ID)
return n
}
加載之前元數(shù)據(jù)nsqd運(yùn)行停止后會(huì)創(chuàng)建兩個(gè)文件:nsqd.dat和nsqd.(id號(hào)).dat,若之后啟動(dòng)nsqd仍指定相同的dataPath,nsqd會(huì)嘗試從兩個(gè)文件中加載之前保存元數(shù)據(jù),即topic和channel信息,如果兩個(gè)都不存在,該方法在19行返回,認(rèn)為是一次全新啟動(dòng),如果兩個(gè)文件存在但其中保存信息不一致,nsqd停止,并提示用戶刪除其中一個(gè)。
func (n *NSQD) LoadMetadata() error {
atomic.StoreInt32(&n.isLoading, 1)
defer atomic.StoreInt32(&n.isLoading, 0)
fn := newMetadataFile(n.getOpts())
// old metadata filename with ID, maintained in parallel to enable roll-back
fnID := oldMetadataFile(n.getOpts())
data, err := readOrEmpty(fn)
if err != nil {
return err
}
dataID, errID := readOrEmpty(fnID)
if errID != nil {
return errID
}
if data == nil && dataID == nil {
return nil // fresh start
}
if data != nil && dataID != nil {
if bytes.Compare(data, dataID) != 0 {
return fmt.Errorf("metadata in %s and %s do not match (delete one)", fn, fnID)
}
}
if data == nil {
// only old metadata file exists, use it
fn = fnID
data = dataID
}
var m meta
err = json.Unmarshal(data, &m)
if err != nil {
return fmt.Errorf("failed to parse metadata in %s - %s", fn, err)
}
for _, t := range m.Topics {
if !protocol.IsValidTopicName(t.Name) {
n.logf(LOG_WARN, "skipping creation of invalid topic %s", t.Name)
continue
}
topic := n.GetTopic(t.Name)
if t.Paused {
topic.Pause()
}
for _, c := range t.Channels {
if !protocol.IsValidChannelName(c.Name) {
n.logf(LOG_WARN, "skipping creation of invalid channel %s", c.Name)
continue
}
channel := topic.GetChannel(c.Name)
if c.Paused {
channel.Pause()
}
}
}
return nil
}
啟動(dòng)nsqd提供的服務(wù)&waitGroupWrapper
nsqd正式工作函數(shù)入口為nsqd.Main(),該Main函數(shù)位于,nsq/nsqd/nsqd.go中。完成了Tcp, Http, Https(如果需要的話)服務(wù)器啟動(dòng),之前的program實(shí)現(xiàn)的Start方法非阻塞,NSQD的Main方法在Start中被調(diào)用,這就要求Main方法必須非阻塞的。但無(wú)論Tcp服務(wù)器還是Http服務(wù)器,都需要在一個(gè)死循環(huán)中對(duì)新連接進(jìn)行業(yè)務(wù)處理,Main方法通過(guò)WaitGroupWrapper保證自身非阻塞。
func (n *NSQD) Main() {
? var err error
? //建立上下文機(jī)制,退出時(shí),等待子協(xié)程的結(jié)束
? ctx := &context{n}
? n.tcpListener, err = net.Listen("tcp", n.getOpts().TCPAddress)
? if err != nil {
? ? ? n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().TCPAddress, err)
? ? ? os.Exit(1)
? }
? n.httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress)
? if err != nil {
? ? ? n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().HTTPAddress, err)
? ? ? os.Exit(1)
? }
? if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
? ? ? n.httpsListener, err = tls.Listen("tcp", n.getOpts().HTTPSAddress, n.tlsConfig)
? ? ? if err != nil {
? ? ? ? n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().HTTPSAddress, err)
? ? ? ? os.Exit(1)
? ? ? }
? }
? tcpServer := &tcpServer{ctx: ctx}
? //使用waiGroup等待所有協(xié)程退出,為了避免代碼冗余和美觀,封裝成一個(gè)類進(jìn)行。
? n.waitGroup.Wrap(func() {
? ? ? //處理tcp協(xié)議信息,通過(guò)實(shí)現(xiàn)接口使用,接口的實(shí)現(xiàn)類nsq/nsqd/tcp.go中
? ? ? protocol.TCPServer(n.tcpListener, tcpServer, n.logf)
? })
? httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
? n.waitGroup.Wrap(func() {
? ? ? http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf)
? })
? if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
? ? ? httpsServer := newHTTPServer(ctx, true, true)
? ? ? n.waitGroup.Wrap(func() {
? ? ? ? http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf)
? ? ? })
? }
? n.waitGroup.Wrap(n.queueScanLoop)
? n.waitGroup.Wrap(n.lookupLoop)
? if n.getOpts().StatsdAddress != "" {
? ? ? n.waitGroup.Wrap(n.statsdLoop)
? }
}
以上,nsqd啟動(dòng)完畢。開(kāi)始接收tcp和http請(qǐng)求處理請(qǐng)求。tcp處理實(shí)現(xiàn)通過(guò)實(shí)現(xiàn)Protocol接口類,每一個(gè)請(qǐng)求都會(huì)在IOLoop中進(jìn)行處理,建立client類,處理請(qǐng)求返回response。
處理tcp請(qǐng)求時(shí),nsq/nsqd/tcp.go中的tcpServer結(jié)構(gòu)體實(shí)現(xiàn)了nsq/internal/protocol/tcp_server.go中的TCPHandler接口處理tcp請(qǐng)求。
在tcp.go中的Handler方法中,根據(jù)發(fā)送過(guò)來(lái)信息的前四個(gè)字節(jié)判斷是否是V2,如果是使用protocolV2類處理信息。否則直接返回錯(cuò)誤。
var prot protocol.Protocol
switch protocolMagic {
case "? V2":
? prot = &protocolV2{ctx: p.ctx}
default:
? protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
? clientConn.Close()
? p.ctx.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
? ? ? clientConn.RemoteAddr(), protocolMagic)
? return
}
err = prot.IOLoop(clientConn)
if err != nil {
? p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
? return
}
信息處理轉(zhuǎn)到nsq/nsqd/protocol_v2.go中的 IOLoop函數(shù)執(zhí)行。
func (p *protocolV2) IOLoop(conn net.Conn) error {
? var err error
? var line []byte
? var zeroTime time.Time
? clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1)
? client := newClientV2(clientID, conn, p.ctx)
? p.ctx.nsqd.AddClient(client.ID, client)
? // synchronize the startup of messagePump in order
? // to guarantee that it gets a chance to initialize
? // goroutine local state derived from client attributes
? // and avoid a potential race with IDENTIFY (where a client
? // could have changed or disabled said attributes)
? //采用這種方式,是為了保證messagePump中的局部變量初始化成功,
? //messagePump對(duì)sub訂閱進(jìn)行處理
? ? messagePumpStartedChan := make(chan bool)
? go p.messagePump(client, messagePumpStartedChan)
? <-messagePumpStartedChan
? for {
? ? ? if client.HeartbeatInterval > 0 {
? ? ? ? client.SetReadDeadline(time.Now().Add(client.HeartbeatInterval * 2))
? ? ? } else {
? ? ? ? client.SetReadDeadline(zeroTime)
? ? ? }
? ? ? // ReadSlice does not allocate new space for the data each request
? ? ? // ie. the returned slice is only valid until the next call to it
? ? ? //一次讀取一行的數(shù)據(jù),以\n結(jié)束。
? ? ? line, err = client.Reader.ReadSlice('\n')
? ? ? if err != nil {
? ? ? ? if err == io.EOF {
? ? ? ? ? ? err = nil
? ? ? ? } else {
? ? ? ? ? ? err = fmt.Errorf("failed to read command - %s", err)
? ? ? ? }
? ? ? ? break
? ? ? }
? ? ? // trim the '\n'
? ? ? line = line[:len(line)-1]
? ? ? // optionally trim the '\r'
? ? ? if len(line) > 0 && line[len(line)-1] == '\r' {
? ? ? ? line = line[:len(line)-1]
? ? ? }
? ? ? //按照分隔符空格符分開(kāi)。通過(guò)查看consumer端代碼,發(fā)過(guò)來(lái)的格式,命令名字+" " +(param長(zhǎng)度 + param) + 加換行符。
? ? ? //param topic, channel以及發(fā)送信息
? ? ? params := bytes.Split(line, separatorBytes)
? ? ? p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, params)
? ? ? var response []byte
? ? ? //根據(jù)解析的信息,判斷命令類型,轉(zhuǎn)到對(duì)應(yīng)的命令操作,進(jìn)行處理和回復(fù)。
? ? ? response, err = p.Exec(client, params)
? ? ? if err != nil {
? ? ? ? ctx := ""
? ? ? ? if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {
? ? ? ? ? ? ctx = " - " + parentErr.Error()
? ? ? ? }
? ? ? ? p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx)
? ? ? ? sendErr := p.Send(client, frameTypeError, []byte(err.Error()))
? ? ? ? if sendErr != nil {
? ? ? ? ? ? p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx)
? ? ? ? ? ? break
? ? ? ? }
? ? ? ? // errors of type FatalClientErr should forceably close the connection
? ? ? ? if _, ok := err.(*protocol.FatalClientErr); ok {
? ? ? ? ? ? break
? ? ? ? }
? ? ? ? continue
? ? ? }
? ? ? if response != nil {
? ? ? ? err = p.Send(client, frameTypeResponse, response)
? ? ? ? if err != nil {
? ? ? ? ? ? err = fmt.Errorf("failed to send response - %s", err)
? ? ? ? ? ? break
? ? ? ? }
? ? ? }
? }
? p.ctx.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting ioloop", client)
? conn.Close()
? close(client.ExitChan)
? if client.Channel != nil {
? ? ? client.Channel.RemoveClient(client.ID)
? }
? p.ctx.nsqd.RemoveClient(client.ID)
? return err
}
在IOLoop函數(shù)處理中,在死循環(huán)中不斷讀取client連接中傳過(guò)來(lái)的信息,每一行一條信息,解析每條信息,p.Exec轉(zhuǎn)到對(duì)應(yīng)命令函數(shù)進(jìn)行處理。在執(zhí)行sub命中,牽扯到SubEventChan,會(huì)導(dǎo)致channel數(shù)據(jù)的分配發(fā)生重塑。
p.messagePump處理channel中的信息。
p.messagePump,(首先明白的一點(diǎn)tcp長(zhǎng)連接不發(fā)送釋放命令或者意外的情況下不斷開(kāi))根據(jù)連接的client發(fā)送過(guò)來(lái)的請(qǐng)求類型處理消息。
以sub命令為例。
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
? var err error
? //memeoryMsgChan和backendMsgChan是訂閱channel的。
? var memoryMsgChan chan *Message
? var backendMsgChan chan []byte
? var subChannel *Channel
? // NOTE: `flusherChan` is used to bound message latency for
? // the pathological case of a channel on a low volume topic
? // with >1 clients having >1 RDY counts
? var flusherChan <-chan time.Time
? var sampleRate int32
? ? //判斷是否訂閱新通道
? subEventChan := client.SubEventChan
? identifyEventChan := client.IdentifyEventChan
? outputBufferTicker := time.NewTicker(client.OutputBufferTimeout)
? heartbeatTicker := time.NewTicker(client.HeartbeatInterval)
? heartbeatChan := heartbeatTicker.C
? msgTimeout := client.MsgTimeout
? // v2 opportunistically buffers data to clients to reduce write system calls
? // we force flush in two cases:
? //? ? 1. when the client is not ready to receive messages
? //? ? 2. we're buffered and the channel has nothing left to send us
? //? ? ? (ie. we would block in this loop anyway)
? //
? flushed := true
? // signal to the goroutine that started the messagePump
? // that we've started up
? close(startedChan)
? for {
? ? ? if subChannel == nil || !client.IsReadyForMessages() {
? ? ? ? // the client is not ready to receive messages...
? ? ? ? memoryMsgChan = nil
? ? ? ? backendMsgChan = nil
? ? ? ? flusherChan = nil
? ? ? ? // force flush
? ? ? ? client.writeLock.Lock()
? ? ? ? err = client.Flush()
? ? ? ? client.writeLock.Unlock()
? ? ? ? if err != nil {
? ? ? ? ? ? goto exit
? ? ? ? }
? ? ? ? flushed = true
? ? ? } else if flushed {
? ? ? ? // last iteration we flushed...
? ? ? ? // do not select on the flusher ticker channel
? ? ? ? memoryMsgChan = subChannel.memoryMsgChan
? ? ? ? backendMsgChan = subChannel.backend.ReadChan()
? ? ? ? flusherChan = nil
? ? ? } else {
? ? ? ? // we're buffered (if there isn't any more data we should flush)...
? ? ? ? // select on the flusher ticker channel, too
? ? ? ? memoryMsgChan = subChannel.memoryMsgChan
? ? ? ? backendMsgChan = subChannel.backend.ReadChan()
? ? ? ? flusherChan = outputBufferTicker.C
? ? ? }
? ? ? select {
? ? ? case <-flusherChan:
? ? ? ? // if this case wins, we're either starved
? ? ? ? // or we won the race between other channels...
? ? ? ? // in either case, force flush
? ? ? ? client.writeLock.Lock()
? ? ? ? err = client.Flush()
? ? ? ? client.writeLock.Unlock()
? ? ? ? if err != nil {
? ? ? ? ? ? goto exit
? ? ? ? }
? ? ? ? flushed = true
? ? ? case <-client.ReadyStateChan:
? ? ? case subChannel = <-subEventChan:
? ? ? ? // you can't SUB anymore
? ? ? ? subEventChan = nil
? ? ? case identifyData := <-identifyEventChan:
? ? ? ? // you can't IDENTIFY anymore
? ? ? ? identifyEventChan = nil
? ? ? ? outputBufferTicker.Stop()
? ? ? ? if identifyData.OutputBufferTimeout > 0 {
? ? ? ? ? ? outputBufferTicker = time.NewTicker(identifyData.OutputBufferTimeout)
? ? ? ? }
? ? ? ? heartbeatTicker.Stop()
? ? ? ? heartbeatChan = nil
? ? ? ? if identifyData.HeartbeatInterval > 0 {
? ? ? ? ? ? heartbeatTicker = time.NewTicker(identifyData.HeartbeatInterval)
? ? ? ? ? ? heartbeatChan = heartbeatTicker.C
? ? ? ? }
? ? ? ? if identifyData.SampleRate > 0 {
? ? ? ? ? ? sampleRate = identifyData.SampleRate
? ? ? ? }
? ? ? ? msgTimeout = identifyData.MsgTimeout
? ? ? case <-heartbeatChan:
? ? ? ? err = p.Send(client, frameTypeResponse, heartbeatBytes)
? ? ? ? if err != nil {
? ? ? ? ? ? goto exit
? ? ? ? }
? ? ? case b := <-backendMsgChan:
? ? ? ? if sampleRate > 0 && rand.Int31n(100) > sampleRate {
? ? ? ? ? ? continue
? ? ? ? }
? ? ? ? msg, err := decodeMessage(b)
? ? ? ? if err != nil {
? ? ? ? ? ? p.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
? ? ? ? ? ? continue
? ? ? ? }
? ? ? ? msg.Attempts++
? ? ? ? subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
? ? ? ? client.SendingMessage()
? ? ? ? err = p.SendMessage(client, msg)
? ? ? ? if err != nil {
? ? ? ? ? ? goto exit
? ? ? ? }
? ? ? ? flushed = false
? ? ? case msg := <-memoryMsgChan:
? ? ? ? if sampleRate > 0 && rand.Int31n(100) > sampleRate {
? ? ? ? ? ? continue
? ? ? ? }
? ? ? ? msg.Attempts++
? ? ? ? subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
? ? ? ? client.SendingMessage()
? ? ? ? err = p.SendMessage(client, msg)
? ? ? ? if err != nil {
? ? ? ? ? ? goto exit
? ? ? ? }
? ? ? ? flushed = false
? ? ? case <-client.ExitChan:
? ? ? ? goto exit
? ? ? }
? }
exit:
? p.ctx.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting messagePump", client)
? heartbeatTicker.Stop()
? outputBufferTicker.Stop()
? if err != nil {
? ? ? p.ctx.nsqd.logf(LOG_ERROR, "PROTOCOL(V2): [%s] messagePump error - %s", client, err)
? }
}
12行subEventChan := client.SubEventChan更新消息泵的意思正是將客戶端訂閱的channel發(fā)送到subEventChan中去。
72-74行,在這里將從subEventChan中接收到的值保存到subChannel中。
54-56行,當(dāng)subChannel被賦值為客戶端訂閱的channel之后,使用subChannel內(nèi)存消息隊(duì)列和磁盤消息隊(duì)列對(duì)memoryMsgChan和backedMsgChan進(jìn)行賦值。
101-132行,監(jiān)聽(tīng)memoryMsgChan和backendMsgChan,當(dāng)其上有消息傳入時(shí)(向nsq發(fā)布消息,消息被寫(xiě)入到topic,進(jìn)而被寫(xiě)入到topic。
當(dāng)客戶端訂閱的topic的channel有消息時(shí),寫(xiě)入到topic下的channel的消息,通過(guò)p.SendMessage(client,msg)來(lái)將消息發(fā)送給客戶端。
客戶端的tcp發(fā)布PUB流程
首先每一個(gè)新連接的客戶端都是通過(guò)IOLoop進(jìn)行處理,啟動(dòng)(protocloV2)messagePump,在messagePump里因?yàn)槭莗roducer所以, memoryMsgChan = nil backendMsgChan = nil flusherChan = nil。
在IOLoop中p.Exec函數(shù)的PUB將消息寫(xiě)到topic中,根據(jù)寫(xiě)入的topic中。
客戶端http的PUB看nsq的http。
客戶端SUB訂閱
IOLoop中進(jìn)行處理,啟動(dòng)messagePump和Exec函數(shù)。在Exec函數(shù)中,更新訂閱channel的信息,更新client的信息。返回ok。
在messagePumn中當(dāng)接收到channel有消息時(shí)就