dotnet core 開(kāi)發(fā)中使用RabbitMQ做消息中間件

前言碎語(yǔ)

上次發(fā)文到現(xiàn)在已經(jīng)三個(gè)月了, 本來(lái)計(jì)劃每個(gè)月2篇的美夢(mèng)就這樣成了泡影, 個(gè)中緣由不想詳述, 或者說(shuō)不想找借口. 如此看來(lái), 時(shí)光匆匆, 需珍惜, 切記切記.


時(shí)間你等等我

RabbitMQ是啥?

最早接觸RabbitMQ還是在3年前, 當(dāng)時(shí)接手一個(gè)系統(tǒng)遷移到中國(guó)的落地任務(wù). 系統(tǒng)很大, 架構(gòu)很復(fù)雜, 功能也很強(qiáng)大. 過(guò)程中填了無(wú)數(shù)的坑, 但是也學(xué)到很多.
現(xiàn)在回想起來(lái), 令我印象最深的就是RabbitMQ. RabbitMQ是什么? 簡(jiǎn)單的一句話總結(jié) "RabbitMQ是基于AMQP協(xié)議的隊(duì)列服務(wù)". 由于它是Erlang這個(gè)天生分布式的語(yǔ)言所開(kāi)發(fā), 所以分布式, 集群, 高可用這些武功它統(tǒng)統(tǒng)都會(huì).

我讀書(shū)少, 不騙人

有童鞋忍不住要問(wèn)了, 這么高大上的東西, 我用它作甚? 截取一段CSDN中大神anzhsoft的描述:

對(duì)于一個(gè)大型的軟件系統(tǒng)來(lái)說(shuō),它會(huì)有很多的組件或者說(shuō)模塊或者說(shuō)子系統(tǒng)或者(subsystem or Component or submodule)。那么這些模塊的如何通信?這和傳統(tǒng)的IPC有很大的區(qū)別。傳統(tǒng)的IPC很多都是在單一系統(tǒng)上的,模塊耦合性很大,不適合擴(kuò)展(Scalability);如果使用socket那么不同的模塊的確可以部署到不同的機(jī)器上,但是還是有很多問(wèn)題需要解決。比如:
1)信息的發(fā)送者和接收者如何維持這個(gè)連接,如果一方的連接中斷,這期間的數(shù)據(jù)如何方式丟失?
2)如何降低發(fā)送者和接收者的耦合度?
3)如何讓Priority高的接收者先接到數(shù)據(jù)?
4)如何做到load balance?有效均衡接收者的負(fù)載?
5)如何有效的將數(shù)據(jù)發(fā)送到相關(guān)的接收者?也就是說(shuō)將接收者subscribe 不同的數(shù)據(jù),如何做有效的filter。
6)如何做到可擴(kuò)展,甚至將這個(gè)通信模塊發(fā)到cluster上?
7)如何保證接收者接收到了完整,正確的數(shù)據(jù)?

是的, 對(duì)于這些問(wèn)題RabbitMQ都能給你一個(gè)答案.

動(dòng)手實(shí)踐

閑話不多說(shuō), 讓我們操練起來(lái).
首先安裝RabbitMQ的服務(wù), 下載地址在這里
提示一下, 安裝后把服務(wù)啟動(dòng)起來(lái)通過(guò)執(zhí)行下面的命令把管理插件打開(kāi)你會(huì)感覺(jué)格外的舒爽.

rabbitmq-plugins enable rabbitmq_management

打開(kāi)管理插件后在瀏覽器輸入http://localhost:15672 即可打開(kāi)管理頁(yè)面, 視圖如下

然后需要用Nuget導(dǎo)入RabbitMQ客戶端. 我使用的開(kāi)發(fā)工具是vs code, 安裝包相對(duì)來(lái)說(shuō)會(huì)比較復(fù)雜, 如果你用Visual Studio會(huì)簡(jiǎn)單很多.
好了, 我說(shuō)下vs code 中具體的操作步驟. 首先在擴(kuò)展中搜索nuget, 可以看到一個(gè)叫.net core project manager的擴(kuò)展, 安裝后快捷鍵Ctrl+Shift+P 打開(kāi)命令工具輸入nuget即可看到一個(gè)nuget: add new package的提示, 選中后搜索rabbitmq, 會(huì)看到列出的幾個(gè)版本號(hào), 我選的4.1.1

簡(jiǎn)單碼一段代碼看看效果.

  1. 配置, 創(chuàng)建Client
    ConnectionFactory factory = new ConnectionFactory();
    factory.UserName = "test";
    factory.Password = "test";
    factory.VirtualHost = "test";
    factory.HostName = "127.0.0.1";

     IConnection conn = factory.CreateConnection();
     IModel channel = conn.CreateModel();
     channel.ExchangeDeclare("test", "topic");
     channel.QueueDeclare("test",true,true,false,null);
     channel.QueueBind("test", "test", "test", null);
    
  2. 編寫(xiě)生產(chǎn)者的代碼
    Timer t =new Timer((a)=>{
    var i=0;
    while (i++<100)
    {
    try
    {
    channel.BasicPublish("test","test",true,null, messageBodyBytes);
    }
    catch (System.Exception ex)
    {
    System.Console.WriteLine(ex.Message);
    }
    }
    },null,0,1000);
    用了一個(gè)Timer, 可以定時(shí)發(fā)送數(shù)據(jù).

  3. 編寫(xiě)消費(fèi)者代碼
    EventingBasicConsumer c = new EventingBasicConsumer(channel);
    c.Received += (ch, ea) =>
    {
    System.Console.WriteLine(System.Text.Encoding.UTF8.GetString(ea.Body));
    Thread.Sleep(1000);
    channel.BasicAck(ea.DeliveryTag, false);
    };
    string consumerTag = channel.BasicConsume("test", false, c);
    RabbitMQ的消費(fèi)者有幾種, 我這里用的是EventingBasicConsumer 它是被事件觸發(fā)的, 不需要主動(dòng)輪詢(xún)?nèi)ハM(fèi), 而且也是目前RabbitMQ官方推薦的.
    需要注意這一句
    string consumerTag = channel.BasicConsume("test", false, c);
    第二個(gè)參數(shù)我設(shè)置為false, 意思是我要主動(dòng)向server確認(rèn)信息的接收. 這樣做的好處有兩點(diǎn). 第一, 消費(fèi)者的入口容易控制, 不會(huì)把消費(fèi)者壓死. 第二, 可以通過(guò)RabbitMQ的控制臺(tái)直觀的了解到消費(fèi)者的處理能力. 看了后面的內(nèi)容我相信你會(huì)理解我的Point.

代碼碼好了, 來(lái)run一下瞧瞧.

由于我是單線程消費(fèi), 并且每次消費(fèi)過(guò)程中Sleep一秒, 這樣我每秒的處理速度高達(dá) 1

進(jìn)階一點(diǎn)點(diǎn)

看到這有人會(huì)說(shuō)了, 你寫(xiě)個(gè)這么爛的代碼也好意思出來(lái)曬? 好吧, 我是要循序漸進(jìn)的!


廢話不多說(shuō), 上代碼!

    Task.Factory.StartNew(()=>{   
        System.Console.WriteLine(System.Text.Encoding.UTF8.GetString(ea.Body));
        Thread.Sleep(1000);
        channel.BasicAck(ea.DeliveryTag, false);
    });

貼了兩張圖, 看官們可以看到, 處理速度有一個(gè)爬坡的過(guò)程, 而且運(yùn)行到一段時(shí)間以后基本平穩(wěn)在了100個(gè)/秒上下. 川酷不滿意的是, 此時(shí)的cpu才僅僅占用了10%不到. 對(duì)于我這種抓到蛤蟆攥出尿來(lái)的人, 不壓榨壓榨實(shí)在不甘心. 對(duì)消費(fèi)者進(jìn)行了一下改造, 且看.

        public static ConcurrentQueue<BasicDeliverEventArgs> Queue1 = new ConcurrentQueue<BasicDeliverEventArgs>();
        EventingBasicConsumer c = new EventingBasicConsumer(channel);
        c.Received += (ch, ea) =>
        {
              Queue1.Enqueue(ea);
        };
        string consumerTag = channel.BasicConsume("test", false, c);

        var j=0;
        while (j++<150)
        {
            Task.Factory.StartNew(()=>{
                BasicDeliverEventArgs bdea=null;
                while (true)
                {
                    if(Queue1.TryDequeue(out bdea)){
                        System.Console.WriteLine(System.Text.Encoding.UTF8.GetString(bdea.Body));
                        Thread.Sleep(1000);
                        channel.BasicAck(bdea.DeliveryTag, false);
                    }
                }
            });
        }

哈哈, 沒(méi)錯(cuò), 起了150個(gè)線程去同步處理, 速度就達(dá)到了150, 看證據(jù)


其實(shí)這時(shí)候的cpu占用率也不高. 只想做一個(gè)拋磚引玉, 當(dāng)我們直接調(diào)用.net為我們提供的傻瓜式接口時(shí), 有時(shí)候會(huì)遇到結(jié)果并不那么如意的情況, 逼著我們開(kāi)動(dòng)腦筋. 看官當(dāng)中肯定會(huì)有人問(wèn), 如果起10000個(gè)線程, 豈不是速度能達(dá)到10000個(gè)/秒? 自己動(dòng)手試試唄.

如果您覺(jué)得這篇文章對(duì)您有那么一丁點(diǎn)益處, 或者從某個(gè)角度觸動(dòng)到了您, 請(qǐng)給川酷一些鼓勵(lì), 打賞, 點(diǎn)贊, 關(guān)注, 哪怕評(píng)論區(qū)罵我兩句, 鄙人都感激涕零.

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,695評(píng)論 19 139
  • 關(guān)于消息隊(duì)列,從前年開(kāi)始斷斷續(xù)續(xù)看了些資料,想寫(xiě)很久了,但一直沒(méi)騰出空,近來(lái)分別碰到幾個(gè)朋友聊這塊的技術(shù)選型,是時(shí)...
    預(yù)流閱讀 586,690評(píng)論 51 787
  • 本文章翻譯自http://www.rabbitmq.com/api-guide.html,并沒(méi)有及時(shí)更新。 術(shù)語(yǔ)對(duì)...
    joyenlee閱讀 7,810評(píng)論 0 3
  • 摘要:RabbitMQ發(fā)送消息時(shí),都是先把消息發(fā)送給ExChange(交換機(jī)),然后再分發(fā)給有相應(yīng)RoutingK...
    請(qǐng)叫wo小爺閱讀 1,412評(píng)論 0 2
  • 來(lái)源 RabbitMQ是用Erlang實(shí)現(xiàn)的一個(gè)高并發(fā)高可靠AMQP消息隊(duì)列服務(wù)器。支持消息的持久化、事務(wù)、擁塞控...
    jiangmo閱讀 10,515評(píng)論 2 34

友情鏈接更多精彩內(nèi)容