介紹
RabbitMQ是一種消息中間件。它的核心思想很簡單,接收并傳遞消息。你可以把RabbitMQ想象成郵局:當你把信扔進信箱后,你十分確信郵遞員會準確的把信交給收件人。在這個比喻里,RabbitMQ就是郵箱、郵局和郵遞員的集合。
RabbitMQ和郵局主要的區(qū)別在于,RabbitMQ處理的不是紙質郵件,而是二進制的數據(Messages)
接下來用較為專業(yè)的術語解釋RabbitMQ以及消息傳遞。
Producing指的是只做發(fā)送操作,其余什么也不干。自然而然,Producer就是只發(fā)消息的程序。我們用P指代Producer。

Queue等同于郵箱的意思,它存在于RabbitMQ當中。當消息穿過RabbitMQ到達你的應用程序期間,它們全部都保存在Queue當中。Queue的大小沒有限制,你想存多少就存多少-它基本等同于一個容量無限的緩存。大量Producer往里發(fā)送消息,大量Consumer從同一個隊列里取消息。我們用個圖來展示下。

Consuming與接收的含義非常接近。Consumer這類程序主要功能就是接收消息。我們畫個C。

注意Producer,Consumer以及Broker可能不在同一臺主機中;實際大多數情況下,它們都分布在不同的主機中。
"Hello World"
Using the Java Client
在這部分內容中,我們會寫兩個JAVA程序;
第一個如下:
Producer發(fā)送一條消息,Consumer接收消息并打印。讓我們暫時忽視JAVA API的實現細節(jié),集中注意力從簡單的調用開始,發(fā)一條“Hello World”的消息。
在下方的圖中,“P”代表Producer,“C”代表Consumer,中間的紅色方塊代表Queue(Rabbit為Consumer持有的消息緩存)

Java版RabbitMQ客戶端
RabbitMQ使用多種協議。在本部分示例中,采用的是AMQP 0-9-1協議,它是一種開放的、多功能的消息傳遞協議。同時,RabbitMQ客戶端的實現語言也種類繁多,在這里我們選用JAVA版本。
下載安裝包,檢查簽名,解壓到你指定的路徑 巴拉巴拉~~~
現在我們開始寫代碼。
Sending

我們稱消息發(fā)送者為Send,接收者為Recv。Send會連接RabbitMQ,發(fā)送一條簡短的消息,然后退出。
我們需要導入以下class文件
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
建立類文件并且命名隊列。
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv)
throws java.io.IOException {
...
}
}
然后我們創(chuàng)建一個到服務器的連接:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
Connection掩蓋了套接字實現的細節(jié),讓我們能專注于選擇協議版本和認證以及其他重要的事情上。我們連接本機的broker,下文中我們簡稱為localhost。我們只需要修改IP地址就能簡單的連上其他主機上的broker。
接下來我們創(chuàng)建一個channel,它包含大量我們常用的API。
為了發(fā)送消息,我們需要聲明一個隊列;然后我們向隊列中發(fā)送消息。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
聲明隊列是冪等的,只有當隊列不存在時,它才會被創(chuàng)建(吐槽,不就是單例嘛。。。)。消息內容是字節(jié)數組,你可以隨心所以編碼它。
最后,別忘記關掉連接。
channel.close();
connection.close();
以上就是整個Send.java的內容。
發(fā)送無效怎么辦?
如果你第一次使用RabbitMQ并且沒有看見發(fā)送的消息,你肯定會對這種不知所措的感覺印象深刻。也許是broker沒有足夠的磁盤空間(默認需要1G)導致拒絕接收消息。通過檢查broker的日志文件來判斷是否需要降低磁盤需求。 configuration file documentation會教你怎樣設置disk_free_limit。
Receving
接下來是reciver,它被RabbitMQ塞入消息。同時,reciver實現起來也比sender復雜,我們需要它監(jiān)聽消息,直到接收并打印出來。

像寫sender一樣,這里我們也需要引入很多依賴的代碼。
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
引入的代碼中,DefaultConsumer是一個實現了Consumer接口的類,我們會用它來保存RabbitMQ推送的消息。
類似Sender一樣構建代碼;我們打開Connection和Channel,聲明將要消費的隊列。注意隊列名稱需要與發(fā)送的隊列相同。
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv)
throws java.io.IOException,
java.lang.InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
...
}
}
注意,我們在這里也許要聲明隊列,是因為我們也許在啟動sender前就啟動了reciver。我們需要確保在消費消息前,隊列已經存在。
接下來我們要告訴RabbitMQ,讓它把隊列中的消息發(fā)給我們。由于它通過異步的方式推送消息,我們在形式上先用一個變量保存消息直到我們實際使用它。這也是DefaultConsumer子類所做的工作。
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
以上就是整個Recv.java的代碼。
Putting it all together
你可以將這兩段代碼同rabbitMQ客戶端代碼一起編譯。
$ javac -cp rabbitmq-client.jar Send.java Recv.java
為了運行他們,你需要rabbitmq-client.jar并且它依賴于classpath。在終端上,運行sender:
$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Send
然后,運行receiver;
$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Recv
在Windows環(huán)境下,在classpath中用分號代替冒號分離它們
當receiver從RabbitMQ中獲取消息后會將其打印出來。recevier會一直運行并等待新的消息,因此我們在另一個終端中啟動sender。
如果你想檢查隊列,嘗試使用 rabbitmqctl list_queues
hello
是時候看看Part2,構建一個簡單的工作隊列。
小秘密:
為了減少打字,你可以為classpath設置環(huán)境變量,例如
$ export CP=.:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
$ java -cp $CP Send
windows環(huán)境下
> set CP=.;commons-io-1.2.jar;commons-cli-1.1.jar;rabbitmq-client.jar
> java -cp %CP% Send