MQTT使用小記

MQTT全稱Message Queue Telemetry Transport,是一個(gè)針對輕量級的發(fā)布/訂閱式消息傳輸場景的協(xié)議,同時(shí)也是被推崇的物聯(lián)網(wǎng)傳輸協(xié)議。MQTT詳細(xì)的介紹文章可以從官方網(wǎng)站獲得,所以這里就不進(jìn)行詳細(xì)的展開了,而是針對這些天的使用經(jīng)歷與感受做一番紀(jì)錄。

MQTT開源的iOS客戶端有以下幾種:

MQTTKit Marquette Moscapsule Musqueteer MQTT-Client MqttSDK CocoaMQTT
Obj-C Obj-C Swift Obj-C Obj-C Obj-C Swift
Mosquitto Mosquitto Mosquitto Mosquitto native native native

以上開源庫我只看過部分MQTTKit、MQTT-Client、CocoaMQTT的開源代碼,總體來說MQTT-Client支持的功能更多全面一些。如果只是對協(xié)議本身進(jìn)行學(xué)習(xí)不考慮功能的話,可以閱讀CocoaMQTT,也可以閱讀我重寫的SwiftMQTT,因?yàn)榇a量相對前面兩個(gè)庫少了很多。

而MQTT的broker一般選擇Mosquitto,Mosquitto是一個(gè)由C編寫的集客戶端和服務(wù)端為一體的開源項(xiàng)目,所以相對來說風(fēng)格較為友好,可以無障礙地閱讀并調(diào)試源碼(開源地址)??梢钥吹?,以上客戶端開源庫中的前四種就是基于Mosquitto的一層封裝。

Mosquitto的安裝和使用

Mosquitto在Linux下的安裝相對比Mac-OS簡單很多,主要是因?yàn)閛penssl的一些路徑問題,后者需要多一些步驟。Mac-OS下可以通過兩種方法運(yùn)行Mosquitto,一種是通過brew命令安裝Mosquitto:

brew install mosquitto

安裝完成后就可以在mosquitto.conf文件中更改相應(yīng)的配置了。接著進(jìn)入根目錄(也可以指定$PATH到mosquitto可執(zhí)行文件的目錄),執(zhí)行以下命令運(yùn)行mosquitto:

// -c 讀取配置
// -d 后臺運(yùn)行
// -v 打印詳細(xì)日志
./sbin/mosquitto -c etc/mosquitto/mosquitto.conf -d -v

如果要重啟mosquitto服務(wù),可以先kill掉,再重啟:

tripleCC:1.4.8 songruiwang$ ps -A | grep mosquitto
55417 ??         0:00.05 ./sbin/mosquitto -c etc/mosquitto/mosquitto.conf -d -v
tripleCC:1.4.8 songruiwang$ kill -9 55417

現(xiàn)在要說明的是第二種方式,通過源碼編譯生成mosquitto可執(zhí)行文件(好處是可以通過lldb對mosquitto進(jìn)行調(diào)試,能更好地熟悉運(yùn)行機(jī)制)。

下載mosquitto源碼后進(jìn)入根目錄,然后執(zhí)行以下命令:

// 禁用TLS_PSK,并且聲稱Debug版本(后續(xù)lldb調(diào)試需要用到符號表)
// 如果openssl是通過brew進(jìn)行安裝,就需要手動指定OPENSSL_ROOT_DIR和OPENSSL_INCLUDE_DIR環(huán)境變量
// 但是后來發(fā)現(xiàn)即使指定了,在編譯時(shí)符號表中還是找不到TLS_PSK相關(guān)的函數(shù)
cmake -DWITH_TLS_PSK=OFF -DWITH_TLS=OFF -DCMAKE_BUILD_TYPE=Debug 
make install

終端會提示無法拷貝可執(zhí)行文件mosquitto,這個(gè)問題無傷大雅。可以手動拷貝到$PATH指定的目錄下,也可以直接進(jìn)入mosquitto所在目錄運(yùn)行:

tripleCC:src songruiwang$ lldb mosquitto
(lldb) target create "mosquitto"
Current executable set to 'mosquitto' (x86_64).
(lldb) b mqtt3_packet_handle
Breakpoint 1: where = mosquitto`mqtt3_packet_handle + 16 at read_handle.c:36, address = 0x0000000100018eb0
(lldb) r

這樣當(dāng)客戶端連接到broker時(shí),就可以對mosquitto進(jìn)行逐行調(diào)試了:

Process 57680 launched: '/Users/songruiwang/Desktop/mosquitto/src/mosquitto' (x86_64)
1463049645: mosquitto version 1.4.8 (build date 2016-05-12 18:36:15+0800) starting
1463049645: Using default config.
1463049645: Opening ipv4 listen socket on port 1883.
1463049645: Opening ipv6 listen socket on port 1883.
1463049659: New connection from 127.0.0.1 on port 1883.
Process 57680 stopped
* thread #1: tid = 0xba449, 0x0000000100018eb0 mosquitto`mqtt3_packet_handle(db=0x000000010002f4f0, context=0x0000000100201990) + 16 at read_handle.c:36, queue = 'com.apple.main-thread', stop reason = breakpoint 1.1
    frame #0: 0x0000000100018eb0 mosquitto`mqtt3_packet_handle(db=0x000000010002f4f0, context=0x0000000100201990) + 16 at read_handle.c:36
   33   
   34   int mqtt3_packet_handle(struct mosquitto_db *db, struct mosquitto *context)
   35   {
-> 36       if(!context) return MOSQ_ERR_INVAL;
   37   
   38       switch((context->in_packet.command)&0xF0){
   39           case PINGREQ:
(lldb) p *context
(mosquitto) $0 = {
  sock = 6
  protocol = mosq_p_invalid
  address = 0x0000000100200db0 "127.0.0.1"
  id = 0x0000000000000000 <no value available>
  username = 0x0000000000000000 <no value available>
  password = 0x0000000000000000 <no value available>
  keepalive = 60
  last_mid = 0
  state = mosq_cs_new
  last_msg_in = 39584
  last_msg_out = 39584
  ......

這里安利一款代碼閱讀器Understand(和window下的SourceInsight很相似,都很強(qiáng)大?。?br>
lldb很多命令和gdb相似,具體更多命令可以在lldb中執(zhí)行help進(jìn)行查看。
更加詳細(xì)的使用教程可以參考Mosquitto簡要教程(安裝/使用/測試)

使用Wireshark抓取報(bào)文

測試時(shí)使用的host一般為lo0,即本地回環(huán)地址,所以選擇對應(yīng)的過濾器:

對端口進(jìn)行過濾(這里使用的是1883端口):

然后連接客戶端和服務(wù)端,就可以看見對應(yīng)的MQTT報(bào)文了:

在一些linux嵌入式環(huán)境下,無法通過Wireshark抓取報(bào)文,可以使用tcpdump抓取生成pcap文件,然后使用ftp等協(xié)議將文件傳回到電腦,再使用Wireshark打開:

// 這里還是用回環(huán)地址舉例
tcpdump -i lo0 'tcp port 1883' -s 65535 -w packet.pcap

MQTT協(xié)議的實(shí)踐

MQTT協(xié)議消息類型

為了能夠更好地熟悉協(xié)議,我用struct+protocol的方式重寫了CocoaMQTT的代碼(SwiftMQTT)。CocoaMQTT庫使用的是傳統(tǒng)的面相對象編程方式,所以閱讀起來并沒有什么障礙,只不過小小吐槽下代碼風(fēng)格。

MQTT協(xié)議總共有14種消息類型,使用枚舉表示如下:

public enum SwiftMQTTMessageType : UInt8 {
    case Connect        = 0x10
    case ConnAck        = 0x20
    case Publish        = 0x30
    case PubAck         = 0x40
    case PubRec         = 0x50
    case PubRel         = 0x60
    case PubComp        = 0x70
    case Subscribe      = 0x80
    case SubAck         = 0x90
    case Unsubscribe    = 0xA0
    case UnsubBack      = 0xB0
    case PingReq        = 0xC0
    case PingResp       = 0xD0
    case Disconnect     = 0xE0
}

以上消息可由"固定報(bào)頭"+"可變報(bào)頭"+"有效載荷"三部分組成。

固定報(bào)頭由"類型+標(biāo)志位"+"剩余長度"組成,可以使用protocol表示第一部分:

public protocol SwiftMQTTCommandProtocol {
    var command: UInt8 {get set}
    var messageType: SwiftMQTTMessageType {get set}
    var dupFlag: Bool {get set}
    var qosLevel: SwiftMQTTQosLevel {get set}
    var retain: Bool {get set}
}

extension SwiftMQTTCommandProtocol {
    /**
     * +---------------+----------+-----------+--------+
     * |    7 6 5 4    |     3    |    2 1    |   0    |
     * |  Message Type | DUP flag | QoS level | RETAIN |
     * +---------------+----------+-----------+--------+
     */
    public var messageType: SwiftMQTTMessageType {
        get { return SwiftMQTTMessageType(rawValue: command & 0xF0) ?? .Connect }
        set { command = newValue.rawValue | (command & 0x0F) }
    }
    public var dupFlag: Bool {
        get { return Bool((command >> 3) & 0x01) }
        set { command = (UInt8(newValue) << 3) | (command & 0xF7) }
    }
    public var qosLevel: SwiftMQTTQosLevel {
        get { return SwiftMQTTQosLevel(rawValue: (command >> 1) & 0x03) ?? .AtMostOnce }
        set { command = newValue.rawValue << 1 | (command & 0xF9 ) }
    }
    public var retain: Bool {
        get { return Bool(command & 0x01) }
        set { command = UInt8(newValue) | (command & 0xFE) }
    }
}

剩余長度等于"可變報(bào)頭"+"有效載荷"各自的長度相加,這兩者表示如下:

public protocol SwiftMQTTVariableHeaderProtocol {
     var variableHeader: NSData {get}
}

extension SwiftMQTTVariableHeaderProtocol {
    public var variableHeader: NSData { return NSData() }
}

public protocol SwiftMQTTPayloadProtocol {
    var payload: NSData {get}
}

extension SwiftMQTTPayloadProtocol {
    public var payload: NSData { return NSData() }
}

為了減少沒有這兩個(gè)部分的消息結(jié)構(gòu)體的代碼量,所以協(xié)議擴(kuò)展中先返回空數(shù)據(jù)。

然后就可以定義并實(shí)現(xiàn)一個(gè)固定報(bào)頭的總協(xié)議了:

public protocol SwiftMQTTFixedHeaderProtocol : SwiftMQTTCommandProtocol, SwiftMQTTVariableHeaderProtocol, SwiftMQTTPayloadProtocol {
    var remainingLength: UInt32 {get}
}

extension SwiftMQTTFixedHeaderProtocol {
    public var remainingLength: UInt32 {
        let remainingLength = variableHeader.length + payload.length
        guard remainingLength <= kSwiftMQTTMaxRemainingLength else {
            SMPrint("the size of remaining length field should be below \(kSwiftMQTTMaxRemainingLength).")
            return UInt32(kSwiftMQTTMaxRemainingLength)
        }
        return UInt32(remainingLength)
    }
}

有了所有發(fā)送消息的組成部分之后,就可以對數(shù)據(jù)進(jìn)行編碼了:

public protocol SwiftMQTTMessageProtocol : SwiftMQTTFixedHeaderProtocol {
    var data: NSData {get}
}

extension SwiftMQTTMessageProtocol {
    public var data: NSData {
        let data = NSMutableData()
        data.appendByte(command)
        data.appendData(remainingLength.data)
        data.appendData(variableHeader)
        data.appendData(payload)
        return data
    }
}

這里以Connect報(bào)文為例,結(jié)合以上協(xié)議,構(gòu)成一個(gè)有效的消息結(jié)構(gòu)體。

首先讓SwiftMQTTConnectMessage遵守SwiftMQTTMessageProtocol協(xié)議,以此獲得固定報(bào)頭解析以及編碼等能力:

public struct SwiftMQTTConnectMessage : SwiftMQTTMessageProtocol {
    public var command = UInt8(0x00)
    ...
}

由于command是固定報(bào)頭類型和標(biāo)志的必要載體,所以必須在結(jié)構(gòu)體中實(shí)現(xiàn)。那么問題來了,MQTT協(xié)議的消息有14種,于是就需要在14種結(jié)構(gòu)體種都實(shí)現(xiàn)一次這個(gè)成員變量,如果使用面向?qū)ο蟮姆绞?,在公共子類中呈現(xiàn)這個(gè)成員變量就行了。這里是第一個(gè)讓我感覺面向協(xié)議方式在實(shí)現(xiàn)MQTT不順手的地方。

Connect報(bào)文的可變報(bào)頭中分為四個(gè)部分:協(xié)議名,協(xié)議級別,連接標(biāo)志和保持連接。這幾個(gè)部分可以使用兩個(gè)協(xié)議來實(shí)現(xiàn):

public protocol SwiftMQTTConnectFlagProtocol {
    var connectFlag: UInt8 {get set}
    var usernameFlag: Bool {get set}
    var passwordFlag: Bool {get set}
    var willRetain: Bool {get set}
    var willQos: SwiftMQTTQosLevel {get set}
    var willFlag: Bool {get set}
    var cleanSession: Bool {get set}
}

extension SwiftMQTTConnectFlagProtocol {
    /**
     * +----------+----------+------------+---------+----------+--------------+----------+
     * |     7    |    6     |      5     |  4  3   |     2    |       1      |     0    |
     * | username | password | willretain | willqos | willflag | cleansession | reserved |
     * +----------+----------+------------+---------+----------+--------------+----------+
     */
    public var usernameFlag: Bool {
        get { return Bool((connectFlag & 0x80) >> 7) }
        set { connectFlag = (UInt8(newValue) << 7) | (connectFlag & 0x7F) }
    }
    public var passwordFlag: Bool {
        get { return Bool((connectFlag & 0x40) >> 6) }
        set { connectFlag = (UInt8(newValue) << 6) | (connectFlag & 0xBF) }
    }
    public var willRetain: Bool {
        get { return Bool((connectFlag & 0x20) >> 5) }
        set { connectFlag = (UInt8(newValue) << 5) | (connectFlag & 0xDF) }
    }
    public var willQos: SwiftMQTTQosLevel {
        get { return SwiftMQTTQosLevel(rawValue: (connectFlag & 0x18) >> 3) ?? .AtMostOnce }
        set { connectFlag = (UInt8(newValue.rawValue) << 3) | (connectFlag & 0xE7) }
    }
    public var willFlag: Bool {
        get { return Bool((connectFlag & 0x08) >> 2) }
        set { connectFlag = (UInt8(newValue) << 2) | (connectFlag & 0xFA) }
    }
    public var cleanSession: Bool {
        get { return Bool((connectFlag & 0x04) >> 1) }
        set { connectFlag = (UInt8(newValue) << 1) | (connectFlag & 0xFD) }
    }
}

protocol SwiftMQTTClientProtocol {
    var protocolName: String {get}
    var protocolLevel: UInt8 {get}
    var keepalive: UInt16 {get}
    var clientId: String {get}
    var account: SwiftMQTTAccount? {get}
    var will: SwiftMQTTWill? {get}
}

extension SwiftMQTTClientProtocol {
    var protocolName: String { return "MQTT" }
    var protocolLevel: UInt8 { return 0x04 }
    var keepalive: UInt16 { return 60 }
}

這樣Connect報(bào)文結(jié)構(gòu)體已經(jīng)有了所有需要的協(xié)議,接下來主要的工作就是實(shí)現(xiàn)真正的variableHeader和payload了:

public var variableHeader: NSData {
    let variableHeader = NSMutableData()
    variableHeader.appendMQTTString(protocolName)
    variableHeader.appendByte(protocolLevel)
    variableHeader.appendByte(connectFlag)
    variableHeader.appendUInt16(keepalive)
    return variableHeader
}
public var payload: NSData {
    let payload = NSMutableData()
    // 客戶端標(biāo)識符->遺囑主題->遺囑消息->用戶名->密碼
    payload.appendMQTTString(clientId)
    if let willTopic = will?.willTopic {
        payload.appendMQTTString(willTopic)
    }
    if let willMessage = will?.willMessage {
        payload.appendMQTTString(willMessage)
    }
    if let username = account?.username {
        payload.appendMQTTString(username)
    }
    if let password = account?.password {
        payload.appendMQTTString(password)
    }
    return payload
}

至此,Connect的主要部分都已經(jīng)構(gòu)建完成。接下來以ConAck報(bào)文為例,實(shí)現(xiàn)從broker中返回的報(bào)文。

由于需要解析從broker中返回的報(bào)文,所以定義一個(gè)返回消息類型協(xié)議:

public protocol SwiftMQTTAckMessageProtocol: SwiftMQTTCommandProtocol {
    init?(_ bytes: [UInt8], command: UInt8)
}

最終SwiftMQTTConnAckMessage結(jié)構(gòu)體如下:

public struct SwiftMQTTConnAckMessage : SwiftMQTTAckMessageProtocol {
    public var command = UInt8(0x00)
    public var sessionPresent: Bool
    public var connectReturnCode: SwiftMQTTConnectReturnCode
    public init?(_ bytes: [UInt8], command: UInt8) {
        guard bytes.count == 2 else { return nil }
        sessionPresent = Bool(bytes[0])
        connectReturnCode = SwiftMQTTConnectReturnCode(rawValue: bytes[1]) ?? .Accepted
        self.command = command
    }
}

這里又產(chǎn)生了第二個(gè)讓我不是很舒服的地方:在protocol extension中實(shí)現(xiàn)有效的init非常麻煩(暫且不論在protocol extension中實(shí)現(xiàn)init的必要性)。下面是一個(gè)不完全的實(shí)現(xiàn)方式:

protocol MessageProtocol {
    var messageId : UInt16 { get set }
    init()
    init?(_ bytes: [UInt8])
}

extension Message {
    init?(_ bytes: [UInt8]) {
        guard bytes.count == 2 else { return nil }
        messageId = UInt16(bytes[0]) << 8 + UInt16(bytes[1])
    }
}

struct Message: MessageProtocol {
    var messageId: UInt16
    init() {
        messageId = 0
    }
}

為了能在protocol extension實(shí)現(xiàn)一個(gè)默認(rèn)的init?(_ bytes: [UInt8])方法,就必須要多定義一個(gè)沒什么意義的init()方法。這讓我直接放棄了這個(gè)念頭,轉(zhuǎn)而直接在每個(gè)消息類型的struct中實(shí)現(xiàn)對應(yīng)的解析init方法,雖然這樣會讓部分代碼重復(fù)。

至此,MQTT協(xié)議的消息類型實(shí)現(xiàn)差不多完成了,因?yàn)楹罄m(xù)的12種消息和前面這2種大同小異。

MQTT協(xié)議消息解析

和CocoaMQTT一樣,SwiftMQTT也是使用GCDAsyncSocket來進(jìn)行socket通信。在調(diào)用GCDAsyncSocket實(shí)例的readData系列方法并讀取到數(shù)據(jù)后,便可以從以下代理方法中解析讀取到的數(shù)據(jù):

- (void)socket:(GCDAsyncSocket *)sock didReadData:(NSData *)data withTag:(long)tag

需要注意的是,如果使用的是按照緩存排列每次讀取固定子節(jié)的方法:

- (void)readDataToLength:(NSUInteger)length withTimeout:(NSTimeInterval)timeout tag:(long)tag;

那么只要有一次讀取錯(cuò)誤,就會影響到后續(xù)所有數(shù)據(jù)的讀取。

解析返回報(bào)文的主要方法如下:

mutating func unpackData(data: NSData, part: SwiftMQTTMessagePart, nextReader:SwiftMQTTMessageDecoderNextReader) {
    let bytes = data.bytesArray
    switch part {
    case .Header:
        messageHeader = unpackHeader(bytes)
        // 讀取一個(gè)字節(jié)的剩余長度
        nextReader(length: 1, part: .Length)
    case .Length:
        messageLengthBytes.appendContentsOf(bytes)
        // 如果最高位為0,則剩余長度已確定
        if Bool(bytes[0] & 0x80) {
            // 繼續(xù)讀取一個(gè)字節(jié)的剩余長度
            nextReader(length: 1, part: .Length)
        } else {
            // 獲取剩余長度
            let messageLength = unpackLength(messageLengthBytes)
            if messageLength > 0 {
                // 讀取可變報(bào)頭和payload
                nextReader(length: messageLength, part: .Content)
            } else {
                // 沒有可變報(bào)頭和payload,不需要再進(jìn)行讀取操作,直接解包
                unpackContent()
            }
            // 重置長度緩存
            messageLengthBytes.removeAll()
        }
    case .Content:
        // 解析可變報(bào)頭和payload
        unpackContent(bytes)
    }
}

報(bào)文分三個(gè)部分進(jìn)行讀取。需要注意的是讀取剩余長度時(shí),需要循環(huán)讀取一個(gè)字節(jié),以便確定剩余長度的最高字節(jié)。

小結(jié)

最后對比各個(gè)協(xié)議庫,如果需要使用到MQTT的大部分功能,那么閱讀Mosquitto源碼會是個(gè)不錯(cuò)的選擇,畢竟其實(shí)現(xiàn)的功能還是相對完善的。

而對于這次實(shí)踐,總感覺有些地方使用面向協(xié)議沒有面向?qū)ο髞淼母雍啙崳贿^這也是利弊的權(quán)衡吧,還是在可以接受的范圍。

參考鏈接

MosquittoDocumentation

MQTT中文文檔

MQTT英文文檔

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容