RabbitMQ 入門

1:windows 下rabbitMQ 的安裝

  1:[下載Erlang] (http://www.erlang.org/downloads/19.0),可以選擇版本下載安裝
 
  2:[下載RabbitMQ](http://www.rabbitmq.com/releases/rabbitmq-server/)
  
  3:安裝完成后 在cmd 下進(jìn)入 rabbitMQ 安裝文件下的sbin 運(yùn)行 rabbitmq-service start  ,停止運(yùn)行 rabbitmq-service stop

2:介紹

  RabbitMQ 是信息傳輸?shù)闹虚g者,本質(zhì)上, 它從生產(chǎn)者(producers)接收消息 ,
然后 轉(zhuǎn)發(fā)這些消息給消費(fèi)者(consumers).換句話說(shuō),他能夠按根據(jù)你指定的規(guī)則進(jìn)行消息轉(zhuǎn)發(fā)、緩沖、和持久化。

3:RabbitMQ 常見(jiàn)術(shù)語(yǔ)

Producing意味著無(wú)非是發(fā)送。一個(gè)發(fā)送消息的程序是一個(gè)producer(生產(chǎn)者)。一般用下圖表示Producer:
20140709205554810.png
Queue(隊(duì)列)類似郵箱。依存于RabbitMQ內(nèi)部。雖然消息通過(guò)RabbitMQ在你的應(yīng)用中傳遞,但是它們只能存儲(chǔ)在queue中。隊(duì)列不受任何限制,
可以存儲(chǔ)任何數(shù)量的消息—本質(zhì)上是一個(gè)無(wú)限制的緩存。很多producers可以通過(guò)同一個(gè)隊(duì)列發(fā)送消息,相同的很多consumers可以從同一個(gè)隊(duì)列上接收消息。
一般用下圖表示隊(duì)列:
Consuming(消費(fèi))類似于接收。consumer是基本屬于等待接收消息的程序。一般使用下圖表示Consumer:
注意:producer(生產(chǎn)者),consumer(消費(fèi)者),broker(RabbitMQ服務(wù))并不需要部署在同一臺(tái)機(jī)器上,實(shí)際上在大多數(shù)實(shí)際的應(yīng)用中,也不會(huì)部署在同一臺(tái)機(jī)器上。

3:java 入門實(shí)例

一個(gè)producer發(fā)送消息,一個(gè)接收者接收消息,并在控制臺(tái)打印出來(lái)。如下圖:


注意:需要下載 amqp-client jar 包,demo 使用的maven 下載

<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>3.4.1</version>
</dependency>

發(fā)送端:Send.java 連接到RabbitMQ(此時(shí)服務(wù)需要啟動(dòng)),發(fā)送一條數(shù)據(jù),然后退出。

  package com.rabbitMQ.test.rabbitMQ;


import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send {

    //隊(duì)列名稱
    private final static String QUEUEZ_NAME ="hello test";
    
    public static void main(String[] args) throws IOException {
        //創(chuàng)建連接,連接到rabbitMQ;
         ConnectionFactory factory = new ConnectionFactory();
         
         //設(shè)置rabbitMQ 所在主機(jī)IP 或者主機(jī)名
          factory.setHost("localhost");
          //指定用戶名,密碼
           factory.setUsername("admin123");
           factory.setPassword("admin123");
           //指定端口
            factory.setPort(AMQP.PROTOCOL.PORT);
            
          //創(chuàng)建一個(gè)鏈接
         Connection connection=   factory.newConnection();
         
         //創(chuàng)建一個(gè)頻道
         Channel channel = connection.createChannel();
        //指定一個(gè)隊(duì)列
          channel.queueDeclare(QUEUEZ_NAME, false, false, false, null);
          
          //發(fā)送的消息
          String message ="hello rabbitMQ";
          //往隊(duì)列中發(fā)送一條消息
           channel.basicPublish("", QUEUEZ_NAME, null, message.getBytes());
           
          //關(guān)閉頻道和鏈接
           channel.close();
           connection.close();
    }
    
}

值得注意的是隊(duì)列只會(huì)在它不存在的時(shí)候創(chuàng)建,多次聲明并不會(huì)重復(fù)創(chuàng)建。信息的內(nèi)容是字節(jié)數(shù)組,也就意味著你可以傳遞任何數(shù)據(jù)。

接收端:Recv.java 不斷等待服務(wù)器推送消息,然后在控制臺(tái)輸出。

package com.rabbitMQ.test.rabbitMQ;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;

public class Recv {

    
    public final static String QUEUE_NAME="hello test";
    public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
        //創(chuàng)建連接
         ConnectionFactory factory = new ConnectionFactory();
         //設(shè)置rabbitMQ 所在的ip 或者主機(jī)名
         factory.setHost("localhost");
         //指定用戶名,密碼
          factory.setUsername("admin123");
          factory.setPassword("admin123");
          //指定端口號(hào)
           factory.setPort(AMQP.PROTOCOL.PORT);
         //創(chuàng)建一個(gè)鏈接
         Connection connection =  factory.newConnection();
         //創(chuàng)建一個(gè)頻道
         Channel channel =  connection.createChannel();
        
         //聲明隊(duì)列, 主要為了防止消息接收者先運(yùn)行此程序,隊(duì)列還不存在時(shí)候創(chuàng)建隊(duì)列
          channel.queueDeclare(QUEUE_NAME, false, false, false, null);
          System.out.println("waiting for message to exit press");
          //創(chuàng)建消費(fèi)者
           QueueingConsumer consumer = new QueueingConsumer(channel);
           //指定消費(fèi)隊(duì)列
            channel.basicConsume(QUEUE_NAME, true, consumer);
            while (true) {
                // nextDelivery   是一個(gè)阻塞方法( 內(nèi)部實(shí)現(xiàn)其實(shí)是阻塞隊(duì)列的take 方法)
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" received------" +message);
                
            }
    }
}

分別運(yùn)行Send.java和Recv.java 順序無(wú)所謂。前提RabbitMQ服務(wù)開(kāi)啟。
運(yùn)行結(jié)果:

waiting for message to exit press
 received------hello rabbitMQ

附 : demo 下載

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容