一、背景###
由于我公司產(chǎn)品需求想實現(xiàn)行情實時刷新功能,大家都知道這時候NSTimer不能再滿足產(chǎn)品需求,所以我們的服務(wù)端采用了RabbitMQ來實現(xiàn)行情的實時刷新。但是我在集成IOS端,發(fā)現(xiàn)網(wǎng)上幾乎沒有關(guān)于IOS端集成RabbitMQ示范文檔,所以我想寫一篇分享給大家如何在IOS端集成RabbitMQ,希望能給大家一點幫助。
二、集成
集成IOS端RabbitMQ框架,我們采用RabbitMQ官方提供的RMQClient框架。集成前的具體準(zhǔn)備我不作贅述,官方文檔很詳細(xì),RMQClient加入工程之后,請參考RabbitMQ詳細(xì)集成指南。這篇指南分為"Hello World!"、 Work queues、Publish/Subscribe、Routing、Topics,詳細(xì)的介紹了RabbitMQ概念、應(yīng)用、各種模式、集成等。
我們公司采用的是RabbitMQ的Publish/Subscribe消息模式,具體代碼如下:
//MARK: - RMQClient
#import "ViewController.h"
//#import <RMQClient/RMQClient.h>
@import RMQClient;
@interface ViewController ()
@property (nonatomic,strong) RMQConnection *conn;
@end
@implementation ViewController
- (void)viewDidLoad {
[super viewDidLoad];
// Do any additional setup after loading the view, typically from a nib.
[[NSNotificationCenter defaultCenter] addObserver:self selector:@selector(activeNotification:) name:UIApplicationDidBecomeActiveNotification object:nil];
[[NSNotificationCenter defaultCenter] addObserver:self selector:@selector(backgroundNotification:) name:UIApplicationDidEnterBackgroundNotification object:nil];
}
#pragma mark - 系統(tǒng)的通知監(jiān)聽
- (void)activeNotification:(NSNotification *)notification{
if (_conn == nil) {
[self receiveLogs];
}
}
- (void)backgroundNotification:(NSNotification *)notification{
if (_conn) {
[self.conn close];
self.conn = nil;
}
}
- (void)viewWillAppear:(BOOL)animated{
[super viewWillAppear:animated];
[self receiveRabbitServicerMessage];
}
- (void)viewWillDisappear:(BOOL)animated{
[super viewWillDisappear:animated];
if (_conn) {
[self.conn close];
self.conn = nil;
}
}
- (void)receiveRabbitServicerMessage {
NSString *url5 = @"amqp://servicerName:servicerPassword@host:port/Vhost";
if (_conn == nil) {//[RMQConnectionDelegateLogger new]
_conn = [[RMQConnection alloc] initWithUri:url5 delegate:[RMQConnectionDelegateLogger new]];
}
[_conn start];
id<RMQChannel> ch = [_conn createChannel];
RMQExchange *x = [ch fanout:@"exchangeName" options:RMQExchangeDeclareDurable];
RMQQueue *q = [ch queue:@"" options:RMQQueueDeclareExclusive |RMQQueueDeclareAutoDelete];
[q bind:x];
NSLog(@"Waiting for logs.");
[q subscribe:RMQBasicConsumeNoOptions handler:^(RMQMessage * _Nonnull message) {
NSLog(@"Received %@", [[NSString alloc] initWithData:message.body encoding:NSUTF8StringEncoding]);
}];
}
三、代碼解釋
(1)RMQConnection的聲明
// RMQConnection 的聲明采用如下方式:
_conn = [[RMQConnection alloc] initWithUri:url5 delegate:[RMQConnectionDelegateLogger new]];
// 源代碼釋義:
/*!
* @brief Parses URI to obtain credentials and TLS enablement (which implies verifyPeer).
* @param uri The URI contains all connection information, including credentials.<br/>
For example, "amqps://user:pass@hostname:1234/myvhost".<br/>
Note: to use the default "/" vhost, omit the trailing slash (or else you must encode it as %2F).
* @param delegate Any object that conforms to the RMQConnectionDelegate protocol.
Use this to handle connection- and channel-level errors.
RMQConnectionDelegateLogger is useful for development purposes.
*/
- (nonnull instancetype)initWithUri:(nonnull NSString *)uri
delegate:(nullable id<RMQConnectionDelegate>)delegate;
參數(shù)uri的格式:"amqps://user:pass@hostname:1234/myvhost"
user:用戶名;
pass:用戶密碼;
hostName:域名;
1234:端口碼;
myvhost:vhost;
[參數(shù)更多參考格式](http://bit.ly/ks8MXK).
參數(shù)delegate:
// 如果使用RMQConnectionDelegate
// RMQConnection聲明如下:
_conn = [[RMQConnection alloc] initWithUri:url5 delegate:self];
// 遵守代理:
#import "ViewController.h"
//#import <RMQClient/RMQClient.h>
@import RMQClient;
@interface ViewController ()<RMQConnectionDelegate>
@property (nonatomic,strong) RMQConnection *conn;
@end
// 實現(xiàn)代理:監(jiān)控RabbitMQ日志
/// @brief Called when a socket cannot be opened, or when AMQP handshaking times out for some reason.
- (void) connection:(RMQConnection *)connection
failedToConnectWithError:(NSError *)error{
if (error) {
NSLog(@"%@",error);
NSLog(@"連接超時");
[self.conn close];
self.conn = nil;
}else{
}
}
/// @brief Called when a connection disconnects for any reason
- (void) connection:(RMQConnection *)connection
disconnectedWithError:(NSError *)error{
if (error) {
NSLog(@"%@",error);
[self.conn close];
self.conn = nil;
}else{
NSLog(@"連接成功");
}
}
/// @brief Called before the configured <a >automatic connection recovery</a> sleep.
- (void)willStartRecoveryWithConnection:(RMQConnection *)connection{
NSLog(@"222222");
}
/// @brief Called after the configured <a >automatic connection recovery</a> sleep.
- (void)startingRecoveryWithConnection:(RMQConnection *)connection{
NSLog(@"333333");
}
/*!
* @brief Called when <a >automatic connection recovery</a> has succeeded.
* @param connection the connection instance that was recovered.
*/
- (void)recoveredConnection:(RMQConnection *)connection{
NSLog(@"333333");
}
/// @brief Called with any channel-level AMQP exception.
- (void)channel:(id<RMQChannel>)channel
error:(NSError *)error{
if (error) {
NSLog(@"%@",error);
[self.conn close];
self.conn = nil;
}
}
如果使用:[RMQConnectionDelegateLogger new]當(dāng)作代理,不需要實現(xiàn)代理的各種方法,系統(tǒng)自動打印各種狀態(tài)的Log。我們只需要在監(jiān)控端監(jiān)控log日志就可以了。
_conn = [[RMQConnection alloc] initWithUri:url5 delegate:[RMQConnectionDelegateLogger new]];
(2)開始連接
[_conn start];// 開始連接, 主要是與AMQP服務(wù)器連接和握手。
/// @brief Connect and handshake with the remote AMQP server.
- (void)start;
(3)創(chuàng)建Channel(通道)
// Connection利用創(chuàng)建Channel,主要內(nèi)容是開始一些操作和發(fā)送RMQConnectionDelegate給Channel。
id<RMQChannel> ch = [_conn createChannel];
/*!
* @brief Create a new channel, using an internally allocated channel number.
* @return An RMQAllocatedChannel or RMQUnallocatedChannel. The latter sends errors to the RMQConnectionDelegate.
*/
- (nonnull id<RMQChannel>)createChannel;
(3)創(chuàng)建Exchange(交換機(jī))
// 這里我們采用的fanout方式。
// 注意交換機(jī)的名稱(exchangeName)和操作類型(options)一定要和你們后臺的格式保持一致。我這里采用的是永久類型的RMQExchangeDeclareDurable。
RMQExchange *x = [ch fanout:@"exchangeName" options:RMQExchangeDeclareDurable];
// options的類型
typedef NS_OPTIONS(NSUInteger, RMQExchangeDeclareOptions) {
RMQExchangeDeclareNoOptions = 0, // 沒有操作類型
/// @brief If set, the server will reply with Declare-Ok if the exchange already exists with the same name, and raise an error if not. The client can use this to check whether an exchange exists without modifying the server state. When set, all other method fields except name and no-wait are ignored. A declare with both passive and no-wait has no effect. Arguments are compared for semantic equivalence.
RMQExchangeDeclarePassive = 1 << 0,
/// @brief If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges remain active when a server restarts. Non-durable exchanges (transient exchanges) are purged if/when a server restarts.
RMQExchangeDeclareDurable = 1 << 1, // 持久類型
/// @brief If set, the exchange is deleted when all queues have finished using it.
RMQExchangeDeclareAutoDelete = 1 << 2, // 自動刪除類型
/// @brief If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. Internal exchanges are used to construct wiring that is not visible to applications.
RMQExchangeDeclareInternal = 1 << 3,
/// @brief
RMQExchangeDeclareNoWait = 1 << 4,
};
(4)創(chuàng)建Queue(隊列)
// 注意隊列的名稱(queueName)不用填寫。
// 因為服務(wù)端隊列的聲明是隨機(jī)的不需要指定名稱,由服務(wù)器默認(rèn)生成名稱。
// 操作類型(options)這個要按照你們后端要求去填寫,我們使用的是RMQQueueDeclareExclusive和RMQQueueDeclareExclusive。
RMQQueue *q = [ch queue:@"" options: RMQQueueDeclareExclusive | RMQQueueDeclareAutoDelete];
(5)Bind(綁定)
// Bind是Queue和Exchange的綁定,實質(zhì)是將隊列名稱與交換機(jī)名稱進(jìn)行綁定。
[q bind:x];
(6)Subscribe(訂閱)
// Subscribe方法內(nèi)部創(chuàng)建了Consumer消費(fèi)者,利用消費(fèi)者去訂閱服務(wù)端發(fā)來消息。在handler的我們可以接收到從服務(wù)器端發(fā)來的消息。
[q subscribe:RMQBasicConsumeNoOptions handler:^(RMQMessage * _Nonnull message) {
NSLog(@"Received %@", [[NSString alloc] initWithData:message.body encoding:NSUTF8StringEncoding]);
}];
四、集成錯誤
Error Domain=com.rabbitmq.rabbitmq-objc-client Code=406 "PRECONDITION_FAILED - cannot redeclare exchange 'xxxx' in vhost 'xxxx' with different type, durable, internal or autodelete value" UserInfo={NSLocalizedDescription=PRECONDITION_FAILED - cannot redeclare exchange 'xxxx' in vhost 'xxxx' with different type, durable, internal or autodelete value
錯誤原因:交換機(jī)的類型與后端交換機(jī)的類型(options)不一致。
Error Domain=com.rabbitmq.rabbitmq-objc-client Code=5 "Cannot use channel after it has been closed." UserInfo={NSLocalizedDescription=Cannot use channel after it has been closed.}
錯誤原因:由于我把交換機(jī)的類型與后端交換機(jī)的類型(options)不一致,所以服務(wù)器端拒絕連接,斷開了channel。
五、集成注意
- 一定要在viewDidLoad方法中添加如下代碼:
[[NSNotificationCenter defaultCenter] addObserver:self selector:@selector(activeNotification:) name:UIApplicationDidBecomeActiveNotification object:nil];
[[NSNotificationCenter defaultCenter] addObserver:self selector:@selector(backgroundNotification:) name:UIApplicationDidEnterBackgroundNotification object:nil];
#pragma mark - 系統(tǒng)的通知監(jiān)聽
- (void)activeNotification:(NSNotification *)notification{
// 當(dāng)程序從后臺切換到前臺后,程序處于活躍狀態(tài),創(chuàng)建連接,開始接收消息。
if (_conn == nil) {
[self receiveLogs];
}
}
- (void)backgroundNotification:(NSNotification *)notification{
// 當(dāng)程序從前臺切換到后臺,程序處于后臺模式,關(guān)閉連接,停止接收消息。
// 這樣做的目的是為了當(dāng)程序處理非活躍狀態(tài),斷開連接,有利于釋放程序段和服務(wù)端資源,減少資源浪費(fèi),減少服務(wù)器壓力,提高程序性能。
if (_conn) {
[self.conn close];
self.conn = nil;
}
}
#pragma mark - Memory Manage
- (void)dealloc {
if (_conn) {
[self.conn close];
self.conn = nil;
}
[[NSNotificationCenter defaultCenter] removeObserver:self];
}
六、總結(jié)。
- 集成前,要去了解
RabbitMQ的概念、原理、應(yīng)用等。 - 集成中,要參考官方文檔和你與后臺約定的集成的規(guī)則。
- 集成后,有時間和精力的話,可以嘗試去閱讀
RMQClient框架的源代碼。