今天我們以ActiveMQ為例進行一系列的學習
Apache ActiveMQ是Apache軟件基金會所研發(fā)的開放源碼消息中間件;由于ActiveMQ是一個純Java框架,那么我們先要了解下java中的JMS。
JMS的全稱是Java Message Service,即Java消息服務(wù)。它主要用于在生產(chǎn)者和消費者之間進行消息傳遞,生產(chǎn)者負責產(chǎn)生消息,而消費者負責接收消息。把它應用到實際的業(yè)務(wù)需求中的話我們可以在特定的時候利用生產(chǎn)者生成一消息,并進行發(fā)送,對應的消費者在接收到對應的消息后去完成對應的業(yè)務(wù)邏輯。對于消息的傳遞有兩種類型,一種是點對點的,即一個生產(chǎn)者和一個消費者一一對應;另一種是發(fā)布/訂閱模式,即一個生產(chǎn)者產(chǎn)生消息并進行發(fā)送后,可以由多個消費者進行接收。
JMS基本構(gòu)成
1、連接工廠
連接工廠是客戶用來創(chuàng)建連接的對象,例如 ActiveMQ 提供的
ActiveMQConnectionFactory。
2、 連接
JMS Connection 封裝了客戶與 JMS 提供者之間的一個虛擬的連接。
3、會話
JMS Session 是生產(chǎn)和消費消息的一個單線程上下文。會話用于創(chuàng)建消息生
產(chǎn)者(producer)、消息消費者(consumer)和消息(message)等。會話提供
了一個事務(wù)性的上下文,在這個上下文中,一組發(fā)送和接收被組合到了一個原子
操作中。
4、 目的地
目的地是客戶用來指定它生產(chǎn)的消息的目標和它消費的消息的來源的對象。
JMS1.0.2 規(guī)范中定義了兩種消息傳遞域:點對點(PTP)消息傳遞域和發(fā)布/訂閱
消息傳遞域。
5、消息生產(chǎn)者
消息生產(chǎn)者是由會話創(chuàng)建的一個對象,用于把消息發(fā)送到一個目的地。
6、消息消費者
消息消費者是由會話創(chuàng)建的一個對象,它用于接收發(fā)送到目的地的消息。
7、消息
JMS 消息由以下三部分組成:
? 消息頭。每個消息頭字段都有相應的 getter 和 setter 方法。
? 消息屬性。如果需要除消息頭字段以外的值,那么可以使用消息屬性。
? 消息體。JMS 定義的消息類型有 TextMessage、MapMessage、BytesMessage、
StreamMessage 和 ObjectMessage。
來看下具體代碼:
生產(chǎn)者
// 1.初始化connection工廠
ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, BROKE_URL);
// 2.創(chuàng)建Connection
connection = factory.createConnection();
// 3.打開連接
connection.start();
// 4.創(chuàng)建session
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 5.創(chuàng)建消息目的地
Destination destination = session.createQueue(DESTINATION);
//6.創(chuàng)建生產(chǎn)者
MessageProducer producer = session.createProducer(destination);
//7.配置消息持久化
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//8.發(fā)送消息
sendMessage(session, producer);
//9.事務(wù)提交
session.commit();
消費者
// 1.初始化connection工廠
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
// 2.創(chuàng)建Connection
connection = connectionFactory.createConnection();
// 3.打開連接
connection.start();
// 4.創(chuàng)建session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.創(chuàng)建消息目標
Destination destination = session.createTopic("YD");
//6.創(chuàng)建消費者
MessageConsumer consumer = session.createConsumer(destination);
//7.配置監(jiān)聽
consumer.setMessageListener(this);