目錄
Nats簡(jiǎn)介
一、安裝下載
二、啟動(dòng)服務(wù)器
三、發(fā)布/訂閱模式的驗(yàn)證
四、請(qǐng)求-回復(fù)go測(cè)試代碼
Nats簡(jiǎn)介
NATS是由CloudFoundry的架構(gòu)師Derek開(kāi)發(fā)的一個(gè)開(kāi)源的、輕量級(jí)、高性能的,支持發(fā)布、訂閱機(jī)制的分布式消息隊(duì)列系統(tǒng)。
nats三種工作模式
- pub/sub (1對(duì)多)
- request/reply(1對(duì)多 設(shè)置超時(shí),只要有一個(gè)回復(fù)就結(jié)束)
- queue(1對(duì)1)
優(yōu)缺點(diǎn):
優(yōu)點(diǎn):
- 使用簡(jiǎn)單,配置簡(jiǎn)單。
- 速度極快,性能良好。
- 多語(yǔ)言支持,不依賴于網(wǎng)絡(luò)位置,client端只需知道nats的節(jié)點(diǎn)和約定好的subject名稱即可。
缺點(diǎn):
- 對(duì)服務(wù)器穩(wěn)定性要求較高,機(jī)房出現(xiàn)故障,導(dǎo)致nats server端需要重連。可能需要重啟nats-server。
- 在消息timeout后,需要在reconnection里要重新初始化連接,不方便。
關(guān)于nats streaming
nats streaming已被棄用。關(guān)鍵錯(cuò)誤修復(fù)和安全修復(fù)將應(yīng)用到 2023 年 6 月。需要持久性的支持 NATS 的應(yīng)用程序應(yīng)使用JetStream。來(lái)自:https://hub.docker.com/_/nats-streaming/
一、安裝下載
官網(wǎng) https://nats.io/download/
nats java https://github.com/nats-io/nats.java
nats go https://github.com/nats-io/nats.go
1、linux服務(wù)器上下載nats服務(wù)端
wget https://github.com/nats-io/nats-server/releases/download/v2.7.2/nats-server-v2.7.2-linux-amd64.zip --no-check-certificate
#直接解壓即可
2、windows下載nats客戶端
#1、安裝go環(huán)境,直接下載安裝
安裝包下載地址為:https://golang.org/dl/。
如果打不開(kāi)可以使用這個(gè)地址:https://golang.google.cn/dl/。
#2、go設(shè)置代理(用來(lái)下載github.com里的組件)
$ go env -w GOPROXY=https://goproxy.cn
#3、安裝nats
$ go get github.com/nats-io/nats.go/
go: downloading github.com/nats-io/nats.go v1.13.0
go: downloading github.com/nats-io/nkeys v0.3.0
go: downloading github.com/nats-io/nuid v1.0.1
go: downloading golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b
二、啟動(dòng)服務(wù)器
1、單個(gè)運(yùn)行方式
nats-server -D -p 4222
2、集群運(yùn)行模式
#A:
nats-server -D -p 4222 -cluster nats://localhost:6222
#B:
nats-server -D -p 4333 -cluster nats://localhost:6333 -routes nats://localhost:6222
#C:
nats-server -D -p 4444 -cluster nats://localhost:6444 -routes nats://localhost:6222
#D:
nats-server -D -p 4555 -cluster nats://localhost:6555 -routes nats://localhost:6222,nats://localhost:6333
### -cluster nats://localhost:6222 表示開(kāi)啟集群,其它nats-server可以通過(guò)6222端口與之建立集群
### -routes nats://localhost:6222 連接到開(kāi)啟6222端口的nats-server與其建立集群
三、發(fā)布/訂閱模式的驗(yàn)證
使用兩個(gè)客戶端進(jìn)行驗(yàn)證。在遠(yuǎn)程(自己的筆記本) 主機(jī)上開(kāi)兩個(gè)終端命令行環(huán)境,均使用命令 “$ telnet xxx.xxx.xxx.xxx 4222” 連上 gnatsd 服務(wù)器。為了以示區(qū)別,這里命名為客戶端A 和 客戶端B,A表示發(fā)布者,B表示訂閱者。
1.首先運(yùn)行訂閱者B(先有訂閱的發(fā)布才會(huì)有意義)
訂閱者B使用通配符 foot.* 注冊(cè)主題 ID 為 90 的主題,訂閱成功,gnatsd 服務(wù)器返回 +OK 消息。
sub foo.* 90
+OK
2.發(fā)布者A 運(yùn)行
發(fā)布者A發(fā)布一條消息到主題 foo.bar,消息有效負(fù)載的長(zhǎng)度為 5,按下回車,輸入長(zhǎng)度在5范圍內(nèi)的內(nèi)容。消息發(fā)布成功,gnatsd 服務(wù)器返回 +OK 消息。
pub foo.bar 5
hello
+OK
3.訂閱者B 顯示反饋
sub foo.* 90
+OK
MSG foo.bar 90 5
hello
前兩行是之前的內(nèi)容,后兩行是新獲得的推送消息??梢?jiàn),發(fā)布/訂閱的消息通信成功。
4.發(fā)布者A 繼續(xù)發(fā)布
發(fā)布者A繼續(xù)執(zhí)行以下命令,消息發(fā)布成功,gnatsd服務(wù)器返回+OK消息。
pub foo.bar optional.reply.subject 5
hello
+OK
5.訂閱者B 繼續(xù)接收顯示
MSG foo.bar 90 5
hello
PING
MSG foo.bar 90 optional.replay.subject 5
hello
后面三行都是新增的消息內(nèi)容,其中PING是維持連接的消息。
6.訂閱者B 執(zhí)行
訂閱者B 執(zhí)行取消訂閱命令,命令消息發(fā)送成功,gnatsd 服務(wù)器返回 +OK 消息。
unsub 90
+OK
7.發(fā)布者A 執(zhí)行
發(fā)布者A再次發(fā)布一條消息到主題foo.bar,消息有效負(fù)載的長(zhǎng)度為5,按下回車。消息發(fā)布成功,gnatsd服務(wù)器返回+OK消息。
pub foo.bar 5
hell2
+OK
此時(shí)接收者B收不到消息,因?yàn)橛嗛喴呀?jīng)取消了。
客戶端發(fā)出心跳命令消息PING(用小寫ping也是同樣的),gnatsd 服務(wù)器返回 PONG 消息。
四、請(qǐng)求-回復(fù)go測(cè)試代碼
1、request.go
package main
import (
"log"
"runtime"
"encoding/json"
"time"
"github.com/nats-io/nats.go"
)
func main() {
nc, err := nats.Connect("192.168.2.246:4222")
if err != nil {
log.Fatal("connect error")
}
nc.Subscribe("tydf", func(m *nats.Msg) {
log.Println(string(m.Data), "from nats")
result, _ := json.Marshal(m)
log.Println("<<<=== ", string(result))
})
message, err := nc.Request("tydf", []byte("tydf"), 1*time.Second)
if err != nil {
log.Fatal("Request timeout error", err)
}
log.Println("<<<=== ", string(message.Data))
runtime.Goexit()
}
2、reply.go
package main
import (
"log"
"runtime"
"encoding/json"
"github.com/nats-io/nats.go"
)
func main() {
nc, err := nats.Connect("192.168.2.246:4222")
if err != nil {
log.Fatal("connect error")
}
nc.Subscribe("tydf", func(m *nats.Msg) {
log.Println(string(m.Data), "from nats")
result, _ := json.Marshal(m)
log.Println("<<<=== ", string(result))
nc.Publish(m.Reply, []byte("Publish test info from reply."))
})
runtime.Goexit()
}
3、運(yùn)行測(cè)試
#先運(yùn)行reply.go
go run reply.go
#后運(yùn)行request.go
go run request.go