MQTT全稱Message Queue Telemetry Transport,是一個(gè)針對輕量級的發(fā)布/訂閱式消息傳輸場景的協(xié)議,同時(shí)也是被推崇的物聯(lián)網(wǎng)傳輸協(xié)議。MQTT詳細(xì)的介紹文章可以從官方網(wǎng)站獲得,所以這里就不進(jìn)行詳細(xì)的展開了,而是針對這些天的使用經(jīng)歷與感受做一番紀(jì)錄。
MQTT開源的iOS客戶端有以下幾種:
| MQTTKit | Marquette | Moscapsule | Musqueteer | MQTT-Client | MqttSDK | CocoaMQTT |
|---|---|---|---|---|---|---|
| Obj-C | Obj-C | Swift | Obj-C | Obj-C | Obj-C | Swift |
| Mosquitto | Mosquitto | Mosquitto | Mosquitto | native | native | native |
以上開源庫我只看過部分MQTTKit、MQTT-Client、CocoaMQTT的開源代碼,總體來說MQTT-Client支持的功能更多全面一些。如果只是對協(xié)議本身進(jìn)行學(xué)習(xí)不考慮功能的話,可以閱讀CocoaMQTT,也可以閱讀我重寫的SwiftMQTT,因?yàn)榇a量相對前面兩個(gè)庫少了很多。
而MQTT的broker一般選擇Mosquitto,Mosquitto是一個(gè)由C編寫的集客戶端和服務(wù)端為一體的開源項(xiàng)目,所以相對來說風(fēng)格較為友好,可以無障礙地閱讀并調(diào)試源碼(開源地址)??梢钥吹?,以上客戶端開源庫中的前四種就是基于Mosquitto的一層封裝。
Mosquitto的安裝和使用
Mosquitto在Linux下的安裝相對比Mac-OS簡單很多,主要是因?yàn)閛penssl的一些路徑問題,后者需要多一些步驟。Mac-OS下可以通過兩種方法運(yùn)行Mosquitto,一種是通過brew命令安裝Mosquitto:
brew install mosquitto
安裝完成后就可以在mosquitto.conf文件中更改相應(yīng)的配置了。接著進(jìn)入根目錄(也可以指定$PATH到mosquitto可執(zhí)行文件的目錄),執(zhí)行以下命令運(yùn)行mosquitto:
// -c 讀取配置
// -d 后臺運(yùn)行
// -v 打印詳細(xì)日志
./sbin/mosquitto -c etc/mosquitto/mosquitto.conf -d -v
如果要重啟mosquitto服務(wù),可以先kill掉,再重啟:
tripleCC:1.4.8 songruiwang$ ps -A | grep mosquitto
55417 ?? 0:00.05 ./sbin/mosquitto -c etc/mosquitto/mosquitto.conf -d -v
tripleCC:1.4.8 songruiwang$ kill -9 55417
現(xiàn)在要說明的是第二種方式,通過源碼編譯生成mosquitto可執(zhí)行文件(好處是可以通過lldb對mosquitto進(jìn)行調(diào)試,能更好地熟悉運(yùn)行機(jī)制)。
下載mosquitto源碼后進(jìn)入根目錄,然后執(zhí)行以下命令:
// 禁用TLS_PSK,并且聲稱Debug版本(后續(xù)lldb調(diào)試需要用到符號表)
// 如果openssl是通過brew進(jìn)行安裝,就需要手動指定OPENSSL_ROOT_DIR和OPENSSL_INCLUDE_DIR環(huán)境變量
// 但是后來發(fā)現(xiàn)即使指定了,在編譯時(shí)符號表中還是找不到TLS_PSK相關(guān)的函數(shù)
cmake -DWITH_TLS_PSK=OFF -DWITH_TLS=OFF -DCMAKE_BUILD_TYPE=Debug
make install
終端會提示無法拷貝可執(zhí)行文件mosquitto,這個(gè)問題無傷大雅。可以手動拷貝到$PATH指定的目錄下,也可以直接進(jìn)入mosquitto所在目錄運(yùn)行:
tripleCC:src songruiwang$ lldb mosquitto
(lldb) target create "mosquitto"
Current executable set to 'mosquitto' (x86_64).
(lldb) b mqtt3_packet_handle
Breakpoint 1: where = mosquitto`mqtt3_packet_handle + 16 at read_handle.c:36, address = 0x0000000100018eb0
(lldb) r
這樣當(dāng)客戶端連接到broker時(shí),就可以對mosquitto進(jìn)行逐行調(diào)試了:
Process 57680 launched: '/Users/songruiwang/Desktop/mosquitto/src/mosquitto' (x86_64)
1463049645: mosquitto version 1.4.8 (build date 2016-05-12 18:36:15+0800) starting
1463049645: Using default config.
1463049645: Opening ipv4 listen socket on port 1883.
1463049645: Opening ipv6 listen socket on port 1883.
1463049659: New connection from 127.0.0.1 on port 1883.
Process 57680 stopped
* thread #1: tid = 0xba449, 0x0000000100018eb0 mosquitto`mqtt3_packet_handle(db=0x000000010002f4f0, context=0x0000000100201990) + 16 at read_handle.c:36, queue = 'com.apple.main-thread', stop reason = breakpoint 1.1
frame #0: 0x0000000100018eb0 mosquitto`mqtt3_packet_handle(db=0x000000010002f4f0, context=0x0000000100201990) + 16 at read_handle.c:36
33
34 int mqtt3_packet_handle(struct mosquitto_db *db, struct mosquitto *context)
35 {
-> 36 if(!context) return MOSQ_ERR_INVAL;
37
38 switch((context->in_packet.command)&0xF0){
39 case PINGREQ:
(lldb) p *context
(mosquitto) $0 = {
sock = 6
protocol = mosq_p_invalid
address = 0x0000000100200db0 "127.0.0.1"
id = 0x0000000000000000 <no value available>
username = 0x0000000000000000 <no value available>
password = 0x0000000000000000 <no value available>
keepalive = 60
last_mid = 0
state = mosq_cs_new
last_msg_in = 39584
last_msg_out = 39584
......
這里安利一款代碼閱讀器Understand(和window下的SourceInsight很相似,都很強(qiáng)大?。?br>
lldb很多命令和gdb相似,具體更多命令可以在lldb中執(zhí)行help進(jìn)行查看。
更加詳細(xì)的使用教程可以參考Mosquitto簡要教程(安裝/使用/測試)
使用Wireshark抓取報(bào)文
測試時(shí)使用的host一般為lo0,即本地回環(huán)地址,所以選擇對應(yīng)的過濾器:

對端口進(jìn)行過濾(這里使用的是1883端口):

然后連接客戶端和服務(wù)端,就可以看見對應(yīng)的MQTT報(bào)文了:

在一些linux嵌入式環(huán)境下,無法通過Wireshark抓取報(bào)文,可以使用tcpdump抓取生成pcap文件,然后使用ftp等協(xié)議將文件傳回到電腦,再使用Wireshark打開:
// 這里還是用回環(huán)地址舉例
tcpdump -i lo0 'tcp port 1883' -s 65535 -w packet.pcap
MQTT協(xié)議的實(shí)踐
MQTT協(xié)議消息類型
為了能夠更好地熟悉協(xié)議,我用struct+protocol的方式重寫了CocoaMQTT的代碼(SwiftMQTT)。CocoaMQTT庫使用的是傳統(tǒng)的面相對象編程方式,所以閱讀起來并沒有什么障礙,只不過小小吐槽下代碼風(fēng)格。
MQTT協(xié)議總共有14種消息類型,使用枚舉表示如下:
public enum SwiftMQTTMessageType : UInt8 {
case Connect = 0x10
case ConnAck = 0x20
case Publish = 0x30
case PubAck = 0x40
case PubRec = 0x50
case PubRel = 0x60
case PubComp = 0x70
case Subscribe = 0x80
case SubAck = 0x90
case Unsubscribe = 0xA0
case UnsubBack = 0xB0
case PingReq = 0xC0
case PingResp = 0xD0
case Disconnect = 0xE0
}
以上消息可由"固定報(bào)頭"+"可變報(bào)頭"+"有效載荷"三部分組成。
固定報(bào)頭由"類型+標(biāo)志位"+"剩余長度"組成,可以使用protocol表示第一部分:
public protocol SwiftMQTTCommandProtocol {
var command: UInt8 {get set}
var messageType: SwiftMQTTMessageType {get set}
var dupFlag: Bool {get set}
var qosLevel: SwiftMQTTQosLevel {get set}
var retain: Bool {get set}
}
extension SwiftMQTTCommandProtocol {
/**
* +---------------+----------+-----------+--------+
* | 7 6 5 4 | 3 | 2 1 | 0 |
* | Message Type | DUP flag | QoS level | RETAIN |
* +---------------+----------+-----------+--------+
*/
public var messageType: SwiftMQTTMessageType {
get { return SwiftMQTTMessageType(rawValue: command & 0xF0) ?? .Connect }
set { command = newValue.rawValue | (command & 0x0F) }
}
public var dupFlag: Bool {
get { return Bool((command >> 3) & 0x01) }
set { command = (UInt8(newValue) << 3) | (command & 0xF7) }
}
public var qosLevel: SwiftMQTTQosLevel {
get { return SwiftMQTTQosLevel(rawValue: (command >> 1) & 0x03) ?? .AtMostOnce }
set { command = newValue.rawValue << 1 | (command & 0xF9 ) }
}
public var retain: Bool {
get { return Bool(command & 0x01) }
set { command = UInt8(newValue) | (command & 0xFE) }
}
}
剩余長度等于"可變報(bào)頭"+"有效載荷"各自的長度相加,這兩者表示如下:
public protocol SwiftMQTTVariableHeaderProtocol {
var variableHeader: NSData {get}
}
extension SwiftMQTTVariableHeaderProtocol {
public var variableHeader: NSData { return NSData() }
}
public protocol SwiftMQTTPayloadProtocol {
var payload: NSData {get}
}
extension SwiftMQTTPayloadProtocol {
public var payload: NSData { return NSData() }
}
為了減少沒有這兩個(gè)部分的消息結(jié)構(gòu)體的代碼量,所以協(xié)議擴(kuò)展中先返回空數(shù)據(jù)。
然后就可以定義并實(shí)現(xiàn)一個(gè)固定報(bào)頭的總協(xié)議了:
public protocol SwiftMQTTFixedHeaderProtocol : SwiftMQTTCommandProtocol, SwiftMQTTVariableHeaderProtocol, SwiftMQTTPayloadProtocol {
var remainingLength: UInt32 {get}
}
extension SwiftMQTTFixedHeaderProtocol {
public var remainingLength: UInt32 {
let remainingLength = variableHeader.length + payload.length
guard remainingLength <= kSwiftMQTTMaxRemainingLength else {
SMPrint("the size of remaining length field should be below \(kSwiftMQTTMaxRemainingLength).")
return UInt32(kSwiftMQTTMaxRemainingLength)
}
return UInt32(remainingLength)
}
}
有了所有發(fā)送消息的組成部分之后,就可以對數(shù)據(jù)進(jìn)行編碼了:
public protocol SwiftMQTTMessageProtocol : SwiftMQTTFixedHeaderProtocol {
var data: NSData {get}
}
extension SwiftMQTTMessageProtocol {
public var data: NSData {
let data = NSMutableData()
data.appendByte(command)
data.appendData(remainingLength.data)
data.appendData(variableHeader)
data.appendData(payload)
return data
}
}
這里以Connect報(bào)文為例,結(jié)合以上協(xié)議,構(gòu)成一個(gè)有效的消息結(jié)構(gòu)體。
首先讓SwiftMQTTConnectMessage遵守SwiftMQTTMessageProtocol協(xié)議,以此獲得固定報(bào)頭解析以及編碼等能力:
public struct SwiftMQTTConnectMessage : SwiftMQTTMessageProtocol {
public var command = UInt8(0x00)
...
}
由于command是固定報(bào)頭類型和標(biāo)志的必要載體,所以必須在結(jié)構(gòu)體中實(shí)現(xiàn)。那么問題來了,MQTT協(xié)議的消息有14種,于是就需要在14種結(jié)構(gòu)體種都實(shí)現(xiàn)一次這個(gè)成員變量,如果使用面向?qū)ο蟮姆绞?,在公共子類中呈現(xiàn)這個(gè)成員變量就行了。這里是第一個(gè)讓我感覺面向協(xié)議方式在實(shí)現(xiàn)MQTT不順手的地方。
Connect報(bào)文的可變報(bào)頭中分為四個(gè)部分:協(xié)議名,協(xié)議級別,連接標(biāo)志和保持連接。這幾個(gè)部分可以使用兩個(gè)協(xié)議來實(shí)現(xiàn):
public protocol SwiftMQTTConnectFlagProtocol {
var connectFlag: UInt8 {get set}
var usernameFlag: Bool {get set}
var passwordFlag: Bool {get set}
var willRetain: Bool {get set}
var willQos: SwiftMQTTQosLevel {get set}
var willFlag: Bool {get set}
var cleanSession: Bool {get set}
}
extension SwiftMQTTConnectFlagProtocol {
/**
* +----------+----------+------------+---------+----------+--------------+----------+
* | 7 | 6 | 5 | 4 3 | 2 | 1 | 0 |
* | username | password | willretain | willqos | willflag | cleansession | reserved |
* +----------+----------+------------+---------+----------+--------------+----------+
*/
public var usernameFlag: Bool {
get { return Bool((connectFlag & 0x80) >> 7) }
set { connectFlag = (UInt8(newValue) << 7) | (connectFlag & 0x7F) }
}
public var passwordFlag: Bool {
get { return Bool((connectFlag & 0x40) >> 6) }
set { connectFlag = (UInt8(newValue) << 6) | (connectFlag & 0xBF) }
}
public var willRetain: Bool {
get { return Bool((connectFlag & 0x20) >> 5) }
set { connectFlag = (UInt8(newValue) << 5) | (connectFlag & 0xDF) }
}
public var willQos: SwiftMQTTQosLevel {
get { return SwiftMQTTQosLevel(rawValue: (connectFlag & 0x18) >> 3) ?? .AtMostOnce }
set { connectFlag = (UInt8(newValue.rawValue) << 3) | (connectFlag & 0xE7) }
}
public var willFlag: Bool {
get { return Bool((connectFlag & 0x08) >> 2) }
set { connectFlag = (UInt8(newValue) << 2) | (connectFlag & 0xFA) }
}
public var cleanSession: Bool {
get { return Bool((connectFlag & 0x04) >> 1) }
set { connectFlag = (UInt8(newValue) << 1) | (connectFlag & 0xFD) }
}
}
protocol SwiftMQTTClientProtocol {
var protocolName: String {get}
var protocolLevel: UInt8 {get}
var keepalive: UInt16 {get}
var clientId: String {get}
var account: SwiftMQTTAccount? {get}
var will: SwiftMQTTWill? {get}
}
extension SwiftMQTTClientProtocol {
var protocolName: String { return "MQTT" }
var protocolLevel: UInt8 { return 0x04 }
var keepalive: UInt16 { return 60 }
}
這樣Connect報(bào)文結(jié)構(gòu)體已經(jīng)有了所有需要的協(xié)議,接下來主要的工作就是實(shí)現(xiàn)真正的variableHeader和payload了:
public var variableHeader: NSData {
let variableHeader = NSMutableData()
variableHeader.appendMQTTString(protocolName)
variableHeader.appendByte(protocolLevel)
variableHeader.appendByte(connectFlag)
variableHeader.appendUInt16(keepalive)
return variableHeader
}
public var payload: NSData {
let payload = NSMutableData()
// 客戶端標(biāo)識符->遺囑主題->遺囑消息->用戶名->密碼
payload.appendMQTTString(clientId)
if let willTopic = will?.willTopic {
payload.appendMQTTString(willTopic)
}
if let willMessage = will?.willMessage {
payload.appendMQTTString(willMessage)
}
if let username = account?.username {
payload.appendMQTTString(username)
}
if let password = account?.password {
payload.appendMQTTString(password)
}
return payload
}
至此,Connect的主要部分都已經(jīng)構(gòu)建完成。接下來以ConAck報(bào)文為例,實(shí)現(xiàn)從broker中返回的報(bào)文。
由于需要解析從broker中返回的報(bào)文,所以定義一個(gè)返回消息類型協(xié)議:
public protocol SwiftMQTTAckMessageProtocol: SwiftMQTTCommandProtocol {
init?(_ bytes: [UInt8], command: UInt8)
}
最終SwiftMQTTConnAckMessage結(jié)構(gòu)體如下:
public struct SwiftMQTTConnAckMessage : SwiftMQTTAckMessageProtocol {
public var command = UInt8(0x00)
public var sessionPresent: Bool
public var connectReturnCode: SwiftMQTTConnectReturnCode
public init?(_ bytes: [UInt8], command: UInt8) {
guard bytes.count == 2 else { return nil }
sessionPresent = Bool(bytes[0])
connectReturnCode = SwiftMQTTConnectReturnCode(rawValue: bytes[1]) ?? .Accepted
self.command = command
}
}
這里又產(chǎn)生了第二個(gè)讓我不是很舒服的地方:在protocol extension中實(shí)現(xiàn)有效的init非常麻煩(暫且不論在protocol extension中實(shí)現(xiàn)init的必要性)。下面是一個(gè)不完全的實(shí)現(xiàn)方式:
protocol MessageProtocol {
var messageId : UInt16 { get set }
init()
init?(_ bytes: [UInt8])
}
extension Message {
init?(_ bytes: [UInt8]) {
guard bytes.count == 2 else { return nil }
messageId = UInt16(bytes[0]) << 8 + UInt16(bytes[1])
}
}
struct Message: MessageProtocol {
var messageId: UInt16
init() {
messageId = 0
}
}
為了能在protocol extension實(shí)現(xiàn)一個(gè)默認(rèn)的init?(_ bytes: [UInt8])方法,就必須要多定義一個(gè)沒什么意義的init()方法。這讓我直接放棄了這個(gè)念頭,轉(zhuǎn)而直接在每個(gè)消息類型的struct中實(shí)現(xiàn)對應(yīng)的解析init方法,雖然這樣會讓部分代碼重復(fù)。
至此,MQTT協(xié)議的消息類型實(shí)現(xiàn)差不多完成了,因?yàn)楹罄m(xù)的12種消息和前面這2種大同小異。
MQTT協(xié)議消息解析
和CocoaMQTT一樣,SwiftMQTT也是使用GCDAsyncSocket來進(jìn)行socket通信。在調(diào)用GCDAsyncSocket實(shí)例的readData系列方法并讀取到數(shù)據(jù)后,便可以從以下代理方法中解析讀取到的數(shù)據(jù):
- (void)socket:(GCDAsyncSocket *)sock didReadData:(NSData *)data withTag:(long)tag
需要注意的是,如果使用的是按照緩存排列每次讀取固定子節(jié)的方法:
- (void)readDataToLength:(NSUInteger)length withTimeout:(NSTimeInterval)timeout tag:(long)tag;
那么只要有一次讀取錯(cuò)誤,就會影響到后續(xù)所有數(shù)據(jù)的讀取。
解析返回報(bào)文的主要方法如下:
mutating func unpackData(data: NSData, part: SwiftMQTTMessagePart, nextReader:SwiftMQTTMessageDecoderNextReader) {
let bytes = data.bytesArray
switch part {
case .Header:
messageHeader = unpackHeader(bytes)
// 讀取一個(gè)字節(jié)的剩余長度
nextReader(length: 1, part: .Length)
case .Length:
messageLengthBytes.appendContentsOf(bytes)
// 如果最高位為0,則剩余長度已確定
if Bool(bytes[0] & 0x80) {
// 繼續(xù)讀取一個(gè)字節(jié)的剩余長度
nextReader(length: 1, part: .Length)
} else {
// 獲取剩余長度
let messageLength = unpackLength(messageLengthBytes)
if messageLength > 0 {
// 讀取可變報(bào)頭和payload
nextReader(length: messageLength, part: .Content)
} else {
// 沒有可變報(bào)頭和payload,不需要再進(jìn)行讀取操作,直接解包
unpackContent()
}
// 重置長度緩存
messageLengthBytes.removeAll()
}
case .Content:
// 解析可變報(bào)頭和payload
unpackContent(bytes)
}
}
報(bào)文分三個(gè)部分進(jìn)行讀取。需要注意的是讀取剩余長度時(shí),需要循環(huán)讀取一個(gè)字節(jié),以便確定剩余長度的最高字節(jié)。
小結(jié)
最后對比各個(gè)協(xié)議庫,如果需要使用到MQTT的大部分功能,那么閱讀Mosquitto源碼會是個(gè)不錯(cuò)的選擇,畢竟其實(shí)現(xiàn)的功能還是相對完善的。
而對于這次實(shí)踐,總感覺有些地方使用面向協(xié)議沒有面向?qū)ο髞淼母雍啙崳贿^這也是利弊的權(quán)衡吧,還是在可以接受的范圍。