注:這是RabbitMQ-java版Client的指導(dǎo)教程翻譯系列文章,歡迎大家批評指正
第一篇Hello Word了解RabbitMQ的基本用法
第二篇Work Queues介紹隊(duì)列的使用
第三篇Publish/Subscribe介紹轉(zhuǎn)換器以及其中fanout類型
第四篇Routing介紹direct類型轉(zhuǎn)換器
第五篇Topics介紹topic類型轉(zhuǎn)換器
第六篇RPC介紹遠(yuǎn)程調(diào)用
在上一篇指導(dǎo)教程中,我們創(chuàng)建了一個(gè)日志系統(tǒng),可以把日志消息廣播給很多接受者。
在這篇指導(dǎo)教程中,我們需要添加一個(gè)功能:可以訂閱消息的一部分。例如:我們會(huì)直接將嚴(yán)重的錯(cuò)誤信息生成日志文件(保存在空余的磁盤上),但是依然會(huì)把所有的日志信息顯示在控制臺(tái)。
綁定(Bindings)
在上篇指導(dǎo)教程的例子中,我們已經(jīng)創(chuàng)建過綁定的實(shí)例,你可能會(huì)覺得跟下面的代碼類似:
channel.queueBind(queueName, EXCHANGE_NAME, "");
綁定的含義是轉(zhuǎn)換器和隊(duì)列之間的一種關(guān)聯(lián),通俗來說就是一個(gè)隊(duì)列對這個(gè)轉(zhuǎn)換器中的消息感興趣。
綁定可以帶有一個(gè)參數(shù):routingKey。為了避免和basic_publish中的參數(shù)產(chǎn)生困惑,我們將這個(gè)參數(shù)叫著binding key(綁定鑰匙),下面是我們創(chuàng)建一個(gè)帶有鑰匙的綁定。
channel.queueBind(queueName, EXCHANGE_NAME, "black");
這個(gè)綁定鑰匙的意思取決于轉(zhuǎn)換器的類型,如果是我們之前使用的fanout類型轉(zhuǎn)換器,那么會(huì)忽略綁定鑰匙的意義。
直接轉(zhuǎn)換器(Direct exchange)
在上篇指導(dǎo)教程中,我們的日志系統(tǒng)會(huì)廣播消息給所有綁定轉(zhuǎn)換器的消費(fèi)者?,F(xiàn)在我們擴(kuò)展一下:根據(jù)消息的級別來過濾消息。舉例來說,我們想一個(gè)應(yīng)用只接受嚴(yán)重級別的消息并且寫入到磁盤里,就不用浪費(fèi)磁盤空間去保存警告或者信息日志的消息。
如果使用fanout轉(zhuǎn)換器,那樣就沒有什么靈活性,不停的愚蠢的廣播。
可以使用direct轉(zhuǎn)換器,它的路由選擇的算法是容易理解,一個(gè)消息之所以到這個(gè)隊(duì)列中去,是因?yàn)殛?duì)列的binding Key和發(fā)出消息的routingkey相匹配。
為了說明這個(gè)問題,看下下面的結(jié)構(gòu):

這張結(jié)構(gòu)圖中,可以看到有兩個(gè)隊(duì)列綁定著類型為direct的轉(zhuǎn)換器,第一個(gè)隊(duì)列綁定鑰匙為orange,第二個(gè)綁定鑰匙有兩個(gè):一個(gè)是black另一個(gè)是green。
在上面的結(jié)構(gòu)圖中,一個(gè)帶有routingkey為orange的消息發(fā)送給轉(zhuǎn)換器將會(huì)被發(fā)送到隊(duì)列Q1中,帶有routing Key為black和green將會(huì)被發(fā)送給到隊(duì)列Q2中,其他所有的消息將會(huì)被清除。
多重綁定(Multiple bindings)

多個(gè)隊(duì)列擁有相同的binding key是完全合規(guī)的,上圖中我們可以在轉(zhuǎn)換器x和帶有bindingkey為black的隊(duì)列Q1建立綁定關(guān)系。在這種情況下,direct類型的轉(zhuǎn)換器具有fanout類型的一樣特性,可以廣播給所有匹配的隊(duì)列消息。一個(gè)routingkey為black的消息將會(huì)被發(fā)送到Q1和Q2兩個(gè)隊(duì)列中。
發(fā)送消息(Emitting logs)
我們使用這種模型應(yīng)用到日志系統(tǒng)上,發(fā)送給消息給direct而不是fanout類型的轉(zhuǎn)換器。將以日志嚴(yán)重等級作為routing key。按照那種方式,消費(fèi)者應(yīng)用將會(huì)選擇接受日志的嚴(yán)重等級的消息。首先我們先發(fā)送消息。
總是一樣的,先聲明一個(gè)轉(zhuǎn)換器:
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
準(zhǔn)備好發(fā)送消息:
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
為了方面,我們假設(shè)等級分為三種:info,warning,error。
訂閱(Subscribing)
只要像上篇指導(dǎo)教程中接受消息就可以,有一個(gè)不同的地方就是:我們可以去創(chuàng)建任何一個(gè)等級的綁定。
String queueName = channel.queueDeclare().getQueue();
for(String severity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
綜合

下面是EmitLogDirect.java類,這里下載:
import com.rabbitmq.client.*;
import java.io.IOException;
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws java.io.IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String severity = getSeverity(argv); //獲取日志等級
String message = getMessage(argv); //獲取消息
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
channel.close();
connection.close();
}
//..
}
下面是ReceiveLogsDirect.java類,這里下載:
import com.rabbitmq.client.*;
import java.io.IOException;
public class ReceiveLogsDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1){
System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
System.exit(1);
}
for(String severity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
跟以前一樣編譯,運(yùn)行的時(shí)候?yàn)榱朔矫?,我們使用環(huán)境便來個(gè)$CP作為路徑配置:
javac -cp $CP ReceiveLogsDirect.java EmitLogDirect.java
如果你想把warning和error(而不是info)類型的日消息保存到文件中,只需要打開一個(gè)控制臺(tái)和記錄:
java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log
如果你想在你的屏幕上看到所有的日志消息,新打開一個(gè)終端和查看就可以:
java -cp $CP ReceiveLogsDirect info warning error
# => [*] Waiting for logs. To exit press CTRL+C
舉個(gè)例子,發(fā)送一個(gè)error類型的日志消息:
java -cp $CP EmitLogDirect error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'
第四節(jié)的內(nèi)容大致翻譯完了,這里是原文鏈接。接著進(jìn)入下一節(jié):Topics。
終篇是我對RabbitMQ使用理解的總結(jié)文章,歡迎討教。
--謝謝--