微服務之間通過RabbitMQ通信

微服務之間通過RabbitMQ通信

微服務之間是相互獨立的,不像單個工程一樣各個模塊之間可以直接通過方法調用實現(xiàn)通信,相互獨立的服務直接一般的通信方式是使用 HTTP協(xié)議、rpc協(xié)議或者使用消息中間件如RabbitMQ``Kafka

image

在這篇文章 使用Golang和MongoDB構建微服務 已經(jīng)實現(xiàn)了一個微服務的應用,在文章中已經(jīng)實現(xiàn)了各個服務直接的通信,是使用的 HTTP的形式 ,那各個服務之間如何通過 RabbitMQ進行消息通信呢,我們現(xiàn)在要實現(xiàn)一個功能,就是一個用戶預訂電影票的接口,需要服務 User Service(port 8000) 和 服務 Booking Service(port 8003)之間通信,用戶預訂之后,把預訂信息寫入到 booking的數(shù)據(jù)庫中

安裝 RabbitMQ

安裝 RabbitMQ 之前需要先安裝 Erlang 的環(huán)境 ,然后下載安裝RabbitMQ ,請選擇對應的版本,安裝完成之后,RabbitMQ在Windows上是作為一個服務在后臺運行,關于 RabbitMQ 的接口如何使用,請參考官網(wǎng)的 教程,有各個主流語言的實現(xiàn)我們使用的是Go版本,請下載對應的實現(xiàn)接口 go get github.com/streadway/amqp

RabbitMQ的接口做一下簡單的封裝

  • 定義一個接口

messaging/message.go

type IMessageClient interface {
    ConnectToBroker(connectionStr string) error
    PublishToQueue(data []byte, queueName string) error
    SubscribeToQueue(queueName string, handlerFunc func(amqp.Delivery)) error
    Close()
}

type MessageClient struct {
    conn *amqp.Connection
}
  • 連接接口
func (m *MessageClient) ConnectToBroker(connectionStr string) error {
    if connectionStr == "" {
        panic("the connection str mustnt be null")
    }
    var err error
    m.conn, err = amqp.Dial(connectionStr)
    return err
}
  • 發(fā)布消息接口
func (m *MessageClient) PublishToQueue(body []byte, queueName string) error {
    if m.conn == nil {
        panic("before publish you must connect the RabbitMQ first")
    }

    ch, err := m.conn.Channel()
    defer ch.Close()
    failOnError(err, "Failed to open a channel")

    q, err := ch.QueueDeclare(
        queueName,
        false,
        false,
        false,
        false,
        nil,
    )
    failOnError(err, "Failed to declare a queue")

    err = ch.Publish(
        "",
        q.Name,
        false,
        false,
        amqp.Publishing{
            ContentType: "application/json",
            Body:        body,
        },
    )
    failOnError(err, "Failed to publish a message")

    return nil
}
  • 訂閱消息接口
func (m *MessageClient) SubscribeToQueue(queueName string, handlerFunc func(amqp.Delivery)) error {
    ch, err := m.conn.Channel()
    //defer ch.Close()
    failOnError(err, "Failed to open a channel")

    q, err := ch.QueueDeclare(
        queueName,
        false,
        false,
        false,
        false,
        nil,
    )
    failOnError(err, "Failed to declare a queue")

    msgs, err := ch.Consume(
        q.Name,
        "",
        true,
        false,
        false,
        false,
        nil,
    )

    failOnError(err, "Failed to register a consumer")
    go consumeLoop(msgs, handlerFunc)
    return nil
}

實現(xiàn)通信

User Service中定義一個新的POST接口 /user/{name}/booking,實現(xiàn)用戶的預訂功能,預訂之后,通過RabbitMQ發(fā)布一個消息給
Booking Service,Booking Service接收到消息之后,做相應的處理(寫入數(shù)據(jù)庫)

User Service

  • 初始化 MessageClient

users/controllers/user.go

var client messaging.IMessageClient

func init() {
    client = &messaging.MessageClient{}
    err := client.ConnectToBroker("amqp://guest:guest@localhost:5672/")
    if err != nil {
        fmt.Println("connect to rabbitmq error", err)
    }
}
  • 添加新的路由和實現(xiàn)

routes.go

register("POST", "/user/{name}/booking", controllers.NewBooking, nil)

users/controllers/user.go

func NewBooking(w http.ResponseWriter, r *http.Request) {
    params := mux.Vars(r)
    user_name := params["name"]
    defer r.Body.Close()

    var bookings models.Booking
    body, _ := ioutil.ReadAll(r.Body)
    err := json.Unmarshal(body, &bookings)
    if err != nil {
        fmt.Println("the format body error ", err)
    }
    fmt.Println("user name:", user_name, bookings)
    go notifyMsg(body)
}
  • 用一個協(xié)程實現(xiàn)消息的發(fā)布
func notifyMsg(body []byte) {
    err := client.PublishToQueue(body, "new_booking")
    if err != nil {
        fmt.Println("Failed to publis message", err)
    }
}

Booking Service

  • 初始化MessageClient
var client messaging.IMessageClient

func initMessage() {
    client = &messaging.MessageClient{}
    err := client.ConnectToBroker("amqp://guest:guest@localhost:5672/")
    if err != nil {
        fmt.Println("Failed to connect to RabbitMQ", err)
    }

    err = client.SubscribeToQueue("new_booking", getBooking)
    if err != nil {
        fmt.Println("Failed to comsuer the msg", err)
    }
}

在 web服務之前啟動

func main() {

    initMessage()

    r := routes.NewRouter()
    http.ListenAndServe(":8003", r)

}
  • 接收后的消息處理
func getBooking(delivery amqp.Delivery) {

  var booking models.Booking
    json.Unmarshal(delivery.Body, &booking)
  booking.Id = bson.NewObjectId().Hex()
    dao.Insert("Booking", "BookModel", booking)
    fmt.Println("the booking msg", booking)
}

驗證,需要啟動 User ServiceBooking Service
使用 Postman 發(fā)送對應的數(shù)據(jù)

post 127.0.0.1:8000/user/kevin_woo/booking

{
    "name":"kevin_woo",
    "books":[
        {
            "date":"20180727",
            "movies":["5b4c45d49d5e3e33c4a5b97a"]
        },
        {
            "date":"20180810",
            "movies":["5b4c45ea9d5e3e33c4a5b97b"]
        }
    ]
}

可以看到數(shù)據(jù)庫已經(jīng)有了一條新的預訂信息

說明,我這里POST的數(shù)據(jù)就是booking數(shù)據(jù)庫中的結構,實際情況需要對數(shù)據(jù)進行封裝處理,在POST數(shù)據(jù)時,沒有對數(shù)據(jù)進行驗證,
在實際開發(fā)過程中需要對各個數(shù)據(jù)做相應的驗證,這里主要是看一下 RabbitMQ的消息傳遞處理的過程

源碼 Github

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容