RabbitMQ——第二篇:IOS版本RabbitMQ集成

一、背景###

由于我公司產(chǎn)品需求想實現(xiàn)行情實時刷新功能,大家都知道這時候NSTimer不能再滿足產(chǎn)品需求,所以我們的服務(wù)端采用了RabbitMQ來實現(xiàn)行情的實時刷新。但是我在集成IOS端,發(fā)現(xiàn)網(wǎng)上幾乎沒有關(guān)于IOS端集成RabbitMQ示范文檔,所以我想寫一篇分享給大家如何在IOS端集成RabbitMQ,希望能給大家一點幫助。

二、集成

集成IOSRabbitMQ框架,我們采用RabbitMQ官方提供的RMQClient框架。集成前的具體準(zhǔn)備我不作贅述,官方文檔很詳細(xì),RMQClient加入工程之后,請參考RabbitMQ詳細(xì)集成指南。這篇指南分為"Hello World!"、 Work queues、Publish/SubscribeRouting、Topics,詳細(xì)的介紹了RabbitMQ概念、應(yīng)用各種模式、集成等。

我們公司采用的是RabbitMQPublish/Subscribe消息模式,具體代碼如下:

//MARK: - RMQClient
#import "ViewController.h"
//#import <RMQClient/RMQClient.h>
@import RMQClient;
@interface ViewController ()
@property (nonatomic,strong) RMQConnection *conn;
@end
@implementation ViewController
- (void)viewDidLoad {
    [super viewDidLoad];
    // Do any additional setup after loading the view, typically from a nib.

    [[NSNotificationCenter defaultCenter] addObserver:self selector:@selector(activeNotification:) name:UIApplicationDidBecomeActiveNotification object:nil];
    
    [[NSNotificationCenter defaultCenter] addObserver:self selector:@selector(backgroundNotification:) name:UIApplicationDidEnterBackgroundNotification object:nil];

}

#pragma mark - 系統(tǒng)的通知監(jiān)聽
- (void)activeNotification:(NSNotification *)notification{
    if (_conn == nil) {
        [self receiveLogs];
    }
}
- (void)backgroundNotification:(NSNotification *)notification{
    if (_conn) {
        [self.conn close];
        self.conn = nil;
    }
}

- (void)viewWillAppear:(BOOL)animated{
    [super viewWillAppear:animated];
    [self receiveRabbitServicerMessage];
}

- (void)viewWillDisappear:(BOOL)animated{
    [super viewWillDisappear:animated];
    
    if (_conn) {
        [self.conn close];
        self.conn = nil;
    }
}

- (void)receiveRabbitServicerMessage {
    
    NSString *url5 = @"amqp://servicerName:servicerPassword@host:port/Vhost";

    if (_conn == nil) {//[RMQConnectionDelegateLogger new]
        _conn = [[RMQConnection alloc] initWithUri:url5 delegate:[RMQConnectionDelegateLogger new]];
    }

    [_conn start];
    
    id<RMQChannel> ch = [_conn createChannel];
    
    RMQExchange *x = [ch fanout:@"exchangeName" options:RMQExchangeDeclareDurable];
    
    RMQQueue *q = [ch queue:@"" options:RMQQueueDeclareExclusive |RMQQueueDeclareAutoDelete];
    
    [q bind:x];
    
    NSLog(@"Waiting for logs.");
    
    [q subscribe:RMQBasicConsumeNoOptions handler:^(RMQMessage * _Nonnull message) {
        NSLog(@"Received %@", [[NSString alloc] initWithData:message.body encoding:NSUTF8StringEncoding]);
    }];
    
}

三、代碼解釋

(1)RMQConnection的聲明

// RMQConnection 的聲明采用如下方式:
_conn = [[RMQConnection alloc] initWithUri:url5 delegate:[RMQConnectionDelegateLogger new]];
// 源代碼釋義:

/*!
 * @brief Parses URI to obtain credentials and TLS enablement (which implies verifyPeer).
 * @param uri        The URI contains all connection information, including credentials.<br/>
                     For example, "amqps://user:pass@hostname:1234/myvhost".<br/>
                     Note: to use the default "/" vhost, omit the trailing slash (or else you must encode it as %2F).
 * @param delegate   Any object that conforms to the RMQConnectionDelegate protocol.
                     Use this to handle connection- and  channel-level errors.
                     RMQConnectionDelegateLogger is useful for development purposes.
 */
- (nonnull instancetype)initWithUri:(nonnull NSString *)uri
                           delegate:(nullable id<RMQConnectionDelegate>)delegate;
參數(shù)uri的格式:"amqps://user:pass@hostname:1234/myvhost"
user:用戶名;
pass:用戶密碼;
hostName:域名;
1234:端口碼;
myvhost:vhost;

[參數(shù)更多參考格式](http://bit.ly/ks8MXK).
參數(shù)delegate:
// 如果使用RMQConnectionDelegate
// RMQConnection聲明如下:
_conn = [[RMQConnection alloc] initWithUri:url5 delegate:self];

//  遵守代理:
#import "ViewController.h"
//#import <RMQClient/RMQClient.h>
@import RMQClient;
@interface ViewController ()<RMQConnectionDelegate>
@property (nonatomic,strong) RMQConnection *conn;
@end

// 實現(xiàn)代理:監(jiān)控RabbitMQ日志
/// @brief Called when a socket cannot be opened, or when AMQP handshaking times out for some reason.
- (void)      connection:(RMQConnection *)connection
failedToConnectWithError:(NSError *)error{
    
    if (error) {
        NSLog(@"%@",error);
        NSLog(@"連接超時");
        [self.conn close];
        self.conn = nil;
    }else{
        
    }
}

/// @brief Called when a connection disconnects for any reason
- (void)   connection:(RMQConnection *)connection
disconnectedWithError:(NSError *)error{
    if (error) {
        NSLog(@"%@",error);
        [self.conn close];
        self.conn = nil;
    }else{
        NSLog(@"連接成功");
    }
}
/// @brief Called before the configured <a >automatic connection recovery</a> sleep.
- (void)willStartRecoveryWithConnection:(RMQConnection *)connection{
    NSLog(@"222222");
}
/// @brief Called after the configured <a >automatic connection recovery</a> sleep.
- (void)startingRecoveryWithConnection:(RMQConnection *)connection{
    NSLog(@"333333");
}
/*!
 * @brief Called when <a >automatic connection recovery</a> has succeeded.
 * @param connection the connection instance that was recovered.
 */
- (void)recoveredConnection:(RMQConnection *)connection{
    NSLog(@"333333");
}
/// @brief Called with any channel-level AMQP exception.
- (void)channel:(id<RMQChannel>)channel
          error:(NSError *)error{
    if (error) {
        NSLog(@"%@",error);
        [self.conn close];
        self.conn = nil;
    }
}


如果使用:[RMQConnectionDelegateLogger new]當(dāng)作代理,不需要實現(xiàn)代理的各種方法,系統(tǒng)自動打印各種狀態(tài)的Log。我們只需要在監(jiān)控端監(jiān)控log日志就可以了。
_conn = [[RMQConnection alloc] initWithUri:url5 delegate:[RMQConnectionDelegateLogger new]];

(2)開始連接

[_conn start];// 開始連接, 主要是與AMQP服務(wù)器連接和握手。
/// @brief Connect and handshake with the remote AMQP server.
- (void)start;

(3)創(chuàng)建Channel(通道)

// Connection利用創(chuàng)建Channel,主要內(nèi)容是開始一些操作和發(fā)送RMQConnectionDelegate給Channel。
 id<RMQChannel> ch = [_conn createChannel];

/*!
 * @brief Create a new channel, using an internally allocated channel number.
 * @return An RMQAllocatedChannel or RMQUnallocatedChannel. The latter sends errors to the RMQConnectionDelegate.
 */
- (nonnull id<RMQChannel>)createChannel;

(3)創(chuàng)建Exchange(交換機(jī))

// 這里我們采用的fanout方式。
// 注意交換機(jī)的名稱(exchangeName)和操作類型(options)一定要和你們后臺的格式保持一致。我這里采用的是永久類型的RMQExchangeDeclareDurable。
RMQExchange *x = [ch fanout:@"exchangeName" options:RMQExchangeDeclareDurable];
// options的類型
typedef NS_OPTIONS(NSUInteger, RMQExchangeDeclareOptions) {
    RMQExchangeDeclareNoOptions  = 0, // 沒有操作類型
    /// @brief If set, the server will reply with Declare-Ok if the exchange already exists with the same name, and raise an error if not. The client can use this to check whether an exchange exists without modifying the server state. When set, all other method fields except name and no-wait are ignored. A declare with both passive and no-wait has no effect. Arguments are compared for semantic equivalence.
    RMQExchangeDeclarePassive    = 1 << 0,
    /// @brief If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges remain active when a server restarts. Non-durable exchanges (transient exchanges) are purged if/when a server restarts.
    RMQExchangeDeclareDurable    = 1 << 1,  // 持久類型
    /// @brief If set, the exchange is deleted when all queues have finished using it.
    RMQExchangeDeclareAutoDelete = 1 << 2, // 自動刪除類型
    /// @brief If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. Internal exchanges are used to construct wiring that is not visible to applications.
    RMQExchangeDeclareInternal   = 1 << 3,
    /// @brief
    RMQExchangeDeclareNoWait     = 1 << 4,
};

(4)創(chuàng)建Queue(隊列)

// 注意隊列的名稱(queueName)不用填寫。
// 因為服務(wù)端隊列的聲明是隨機(jī)的不需要指定名稱,由服務(wù)器默認(rèn)生成名稱。
// 操作類型(options)這個要按照你們后端要求去填寫,我們使用的是RMQQueueDeclareExclusive和RMQQueueDeclareExclusive。
 RMQQueue *q = [ch queue:@"" options: RMQQueueDeclareExclusive | RMQQueueDeclareAutoDelete];

(5)Bind(綁定)

// Bind是Queue和Exchange的綁定,實質(zhì)是將隊列名稱與交換機(jī)名稱進(jìn)行綁定。
[q bind:x];

(6)Subscribe(訂閱)

// Subscribe方法內(nèi)部創(chuàng)建了Consumer消費(fèi)者,利用消費(fèi)者去訂閱服務(wù)端發(fā)來消息。在handler的我們可以接收到從服務(wù)器端發(fā)來的消息。
 [q subscribe:RMQBasicConsumeNoOptions handler:^(RMQMessage * _Nonnull message) {
        NSLog(@"Received %@", [[NSString alloc] initWithData:message.body encoding:NSUTF8StringEncoding]);   
}];

四、集成錯誤

Error Domain=com.rabbitmq.rabbitmq-objc-client Code=406 "PRECONDITION_FAILED - cannot redeclare exchange 'xxxx' in vhost 'xxxx' with different type, durable, internal or autodelete value" UserInfo={NSLocalizedDescription=PRECONDITION_FAILED - cannot redeclare exchange 'xxxx' in vhost 'xxxx' with different type, durable, internal or autodelete value

錯誤原因:交換機(jī)的類型與后端交換機(jī)的類型(options)不一致。

Error Domain=com.rabbitmq.rabbitmq-objc-client Code=5 "Cannot use channel after it has been closed." UserInfo={NSLocalizedDescription=Cannot use channel after it has been closed.}

錯誤原因:由于我把交換機(jī)的類型與后端交換機(jī)的類型(options)不一致,所以服務(wù)器端拒絕連接,斷開了channel。

五、集成注意

  • 一定要在viewDidLoad方法中添加如下代碼:
[[NSNotificationCenter defaultCenter] addObserver:self selector:@selector(activeNotification:) name:UIApplicationDidBecomeActiveNotification object:nil];
[[NSNotificationCenter defaultCenter] addObserver:self selector:@selector(backgroundNotification:) name:UIApplicationDidEnterBackgroundNotification object:nil];

#pragma mark - 系統(tǒng)的通知監(jiān)聽
- (void)activeNotification:(NSNotification *)notification{
// 當(dāng)程序從后臺切換到前臺后,程序處于活躍狀態(tài),創(chuàng)建連接,開始接收消息。
    if (_conn == nil) {
        [self receiveLogs];
    }
}
- (void)backgroundNotification:(NSNotification *)notification{
// 當(dāng)程序從前臺切換到后臺,程序處于后臺模式,關(guān)閉連接,停止接收消息。
// 這樣做的目的是為了當(dāng)程序處理非活躍狀態(tài),斷開連接,有利于釋放程序段和服務(wù)端資源,減少資源浪費(fèi),減少服務(wù)器壓力,提高程序性能。
    if (_conn) {
        [self.conn close];
        self.conn = nil;
    }
}
#pragma mark - Memory Manage
- (void)dealloc {

  if (_conn) {
        [self.conn close];
        self.conn = nil;
    }
    [[NSNotificationCenter  defaultCenter] removeObserver:self];
    
}

六、總結(jié)。

  • 集成前,要去了解RabbitMQ的概念、原理、應(yīng)用等。
  • 集成中,要參考官方文檔和你與后臺約定的集成的規(guī)則。
  • 集成后,有時間和精力的話,可以嘗試去閱讀RMQClient框架的源代碼。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 來源 RabbitMQ是用Erlang實現(xiàn)的一個高并發(fā)高可靠AMQP消息隊列服務(wù)器。支持消息的持久化、事務(wù)、擁塞控...
    jiangmo閱讀 10,515評論 2 34
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,695評論 19 139
  • rabbitMQ是一款基于AMQP協(xié)議的消息中間件,它能夠在應(yīng)用之間提供可靠的消息傳輸。在易用性,擴(kuò)展性,高可用性...
    點融黑幫閱讀 3,134評論 3 41
  • 什么叫消息隊列 消息(Message)是指在應(yīng)用間傳送的數(shù)據(jù)。消息可以非常簡單,比如只包含文本字符串,也可以更復(fù)雜...
    lijun_m閱讀 1,419評論 0 1
  • 1. 歷史 RabbitMQ是一個由erlang開發(fā)的AMQP(Advanced Message Queue )的...
    高廣超閱讀 6,240評論 3 51

友情鏈接更多精彩內(nèi)容