1.1 ????? JMS 簡介
???????JMS 的全稱是 Java Message Service ,即 Java 消息服務。它主要用于在生產者和消費者之間進行消息傳遞,生產者負責產生消息,而消費者負責接收消息。把它應用到實際的業務需求中的話我們可以在特定的時候利用生產者生成一消息,并進行發送,對應的消費者在接收到對應的消息后去完成對應的業務邏輯。對于消息的傳遞有兩種類型,一種是點對點的,即一個生產者和一個消費者一一對應;另一種是發布 / 訂閱模式,即一個生產者產生消息并進行發送后,可以由多個消費者進行接收。
1.2 ????? Spring 整合 JMS
??????? 對 JMS 做了一個簡要介紹之后,接下來就講一下 Spring 整合 JMS 的具體過程。 JMS 只是一個標準,真正在使用它的時候我們需要有它的具體實現,這里我們就使用 Apache 的 activeMQ 來作為它的實現。所使用的依賴利用 Maven 來進行管理,具體依賴如下:
?
- < dependencies > ??
- ???????? < dependency > ??
- ???????????? < groupId > junit </ groupId > ??
- ???????????? < artifactId > junit </ artifactId > ??
- ???????????? < version > 4.10 </ version > ??
- ???????????? < scope > test </ scope > ??
- ???????? </ dependency > ??
- ???????? < dependency > ??
- ???????????? < groupId > org.springframework </ groupId > ??
- ???????????? < artifactId > spring-context </ artifactId > ??
- ???????????? < version > ${spring-version} </ version > ??
- ???????? </ dependency > ??
- ???????? < dependency > ??
- ???????????? < groupId > org.springframework </ groupId > ??
- ???????????? < artifactId > spring-jms </ artifactId > ??
- ???????????? < version > ${spring-version} </ version > ??
- ???????? </ dependency > ??
- ???????? < dependency > ??
- ???????????? < groupId > org.springframework </ groupId > ??
- ???????????? < artifactId > spring-test </ artifactId > ??
- ???????????? < version > ${spring-version} </ version > ??
- ???????? </ dependency > ??
- ???????? < dependency > ??
- ???????????? < groupId > javax.annotation </ groupId > ??
- ???????????? < artifactId > jsr250-api </ artifactId > ??
- ???????????? < version > 1.0 </ version > ??
- ???????? </ dependency > ??
- ???????? < dependency > ??
- ???????????? < groupId > org.apache.activemq </ groupId > ??
- ???????????? < artifactId > activemq-core </ artifactId > ??
- ???????????? < version > 5.7.0 </ version > ??
- ???????? </ dependency > ??
- </ dependencies > ??
?
1.2.1??activeMQ準備
??????? 既然是使用的 apache 的 activeMQ 作為 JMS 的實現,那么首先我們應該到 apache 官網上下載 activeMQ ( http://activemq.apache.org/download.html ),進行解壓后運行其 bin 目錄下面的 activemq.bat 文件啟動 activeMQ 。
1.2.2配置ConnectionFactory
???????ConnectionFactory 是用于產生到 JMS 服務器的鏈接的, Spring 為我們提供了多個 ConnectionFactory ,有 SingleConnectionFactory 和 CachingConnectionFactory 。 SingleConnectionFactory 對于建立 JMS 服務器鏈接的請求會一直返回同一個鏈接,并且會忽略 Connection 的 close 方法調用。 CachingConnectionFactory 繼承了 SingleConnectionFactory ,所以它擁有 SingleConnectionFactory 的所有功能,同時它還新增了緩存功能,它可以緩存 Session 、 MessageProducer 和 MessageConsumer 。這里我們使用 SingleConnectionFactory 來作為示例。
- < bean ? id = "connectionFactory" ? class = "org.springframework.jms.connection.SingleConnectionFactory" /> ??
?
??????? 這樣就定義好產生 JMS 服務器鏈接的 ConnectionFactory 了嗎?答案是非也。 Spring 提供的 ConnectionFactory 只是 Spring 用于管理 ConnectionFactory 的,真正產生到 JMS 服務器鏈接的 ConnectionFactory 還得是由 JMS 服務廠商提供,并且需要把它注入到 Spring 提供的 ConnectionFactory 中。我們這里使用的是 ActiveMQ 實現的 JMS ,所以在我們這里真正的可以產生 Connection 的就應該是由 ActiveMQ 提供的 ConnectionFactory 。所以定義一個 ConnectionFactory 的完整代碼應該如下所示:
- <!--?真正可以產生Connection的ConnectionFactory,由對應的?JMS服務廠商提供--> ??
- < bean ? id = "targetConnectionFactory" ? class = "org.apache.activemq.ActiveMQConnectionFactory" > ??
- ???? < property ? name = "brokerURL" ? value = "tcp://localhost:61616" /> ??
- </ bean > ??
- ??
- <!--?Spring用于管理真正的ConnectionFactory的ConnectionFactory?--> ??
- < bean ? id = "connectionFactory" ? class = "org.springframework.jms.connection.SingleConnectionFactory" > ??
- ???? <!--?目標ConnectionFactory對應真實的可以產生JMS?Connection的ConnectionFactory?--> ??
- ???? < property ? name = "targetConnectionFactory" ? ref = "targetConnectionFactory" /> ??
- </ bean > ??
??
1.2.3配置生產者
配置好 ConnectionFactory 之后我們就需要配置生產者。生產者負責產生消息并發送到 JMS 服務器,這通常對應的是我們的一個業務邏輯服務實現類。但是我們的服務實現類是怎么進行消息的發送的呢?這通常是利用 Spring 為我們提供的 JmsTemplate 類來實現的,所以配置生產者其實最核心的就是配置進行消息發送的 JmsTemplate 。對于消息發送者而言,它在發送消息的時候要知道自己該往哪里發,為此,我們在定義 JmsTemplate 的時候需要往里面注入一個 Spring 提供的 ConnectionFactory 對象。
- <!--?Spring提供的JMS工具類,它可以進行消息發送、接收等?--> ??
- < bean ? id = "jmsTemplate" ? class = "org.springframework.jms.core.JmsTemplate" > ??
- ???? <!--?這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象?--> ??
- ???? < property ? name = "connectionFactory" ? ref = "connectionFactory" /> ??
- </ bean > ??
?
??????? 在真正利用 JmsTemplate 進行消息發送的時候,我們需要知道消息發送的目的地,即 destination 。在 Jms 中有一個用來表示目的地的 Destination 接口,它里面沒有任何方法定義,只是用來做一個標識而已。當我們在使用 JmsTemplate 進行消息發送時沒有指定 destination 的時候將使用默認的 Destination 。默認 Destination 可以通過在定義 jmsTemplate bean 對象時通過屬性 defaultDestination 或 defaultDestinationName 來進行注入, defaultDestinationName 對應的就是一個普通字符串。在 ActiveMQ 中實現了兩種類型的 Destination ,一個是點對點的 ActiveMQQueue ,另一個就是支持訂閱 / 發布模式的 ActiveMQTopic 。在定義這兩種類型的 Destination 時我們都可以通過一個 name 屬性來進行構造,如:
?
?
?
?
- <!--這個是隊列目的地,點對點的--> ??
- < bean ? id = "queueDestination" ? class = "org.apache.activemq.command.ActiveMQQueue" > ??
- ???? < constructor-arg > ??
- ???????? < value > queue </ value > ??
- ???? </ constructor-arg > ??
- </ bean > ??
- <!--這個是主題目的地,一對多的--> ??
- < bean ? id = "topicDestination" ? class = "org.apache.activemq.command.ActiveMQTopic" > ??
- ???? < constructor-arg ? value = "topic" /> ??
- </ bean > ??
?
?
?
??????? 假設我們定義了一個 ProducerService ,里面有一個向 Destination 發送純文本消息的方法 sendMessage ,那么我們的代碼就大概是這個樣子:
?
?
?
?
- package ?com.tiantian.springintejms.service.impl;??
- ???
- import ?javax.annotation.Resource;??
- import ?javax.jms.Destination;??
- import ?javax.jms.JMSException;??
- import ?javax.jms.Message;??
- import ?javax.jms.Session;??
- ???
- import ?org.springframework.jms.core.JmsTemplate;??
- import ?org.springframework.jms.core.MessageCreator;??
- import ?org.springframework.stereotype.Component;??
- ???
- import ?com.tiantian.springintejms.service.ProducerService;??
- ???
- @Component ??
- public ? class ?ProducerServiceImpl? implements ?ProducerService?{??
- ???
- ???? private ?JmsTemplate?jmsTemplate;??
- ??????
- ???? public ? void ?sendMessage(Destination?destination,? final ?String?message)?{??
- ????????System.out.println( "---------------生產者發送消息-----------------" );??
- ????????System.out.println( "---------------生產者發了一個消息:" ?+?message);??
- ????????jmsTemplate.send(destination,? new ?MessageCreator()?{??
- ???????????? public ?Message?createMessage(Session?session)? throws ?JMSException?{??
- ???????????????? return ?session.createTextMessage(message);??
- ????????????}??
- ????????});??
- ????}???
- ??
- ???? public ?JmsTemplate?getJmsTemplate()?{??
- ????????returnjmsTemplate;??
- ????}???
- ??
- ???? @Resource ??
- ???? public ? void ?setJmsTemplate(JmsTemplate?jmsTemplate)?{??
- ???????? this .jmsTemplate?=?jmsTemplate;??
- ????}??
- ???
- }??
?
?
?
??????? 我們可以看到在 sendMessage 方法體里面我們是通過 jmsTemplate 來發送消息到對應的 Destination 的。到此,我們生成一個簡單的文本消息并把它發送到指定目的地 Destination 的生產者就配置好了。
1.2.4配置消費者
生產者往指定目的地 Destination 發送消息后,接下來就是消費者對指定目的地的消息進行消費了。那么消費者是如何知道有生產者發送消息到指定目的地 Destination 了呢?這是通過 Spring 為我們封裝的消息監聽容器 MessageListenerContainer 實現的,它負責接收信息,并把接收到的信息分發給真正的 MessageListener 進行處理。每個消費者對應每個目的地都需要有對應的 MessageListenerContainer 。對于消息監聽容器而言,除了要知道監聽哪個目的地之外,還需要知道到哪里去監聽,也就是說它還需要知道去監聽哪個 JMS 服務器,這是通過在配置 MessageConnectionFactory 的時候往里面注入一個 ConnectionFactory 來實現的。所以我們 在配置一個 MessageListenerContainer 的時候有三個屬性必須指定,一個是表示從哪里監聽的 ConnectionFactory ;一個是表示監聽什么的 Destination ;一個是接收到消息以后進行消息處理的 MessageListener 。 Spring 一共為我們提供了兩種類型的 MessageListenerContainer , SimpleMessageListenerContainer 和 DefaultMessageListenerContainer 。
SimpleMessageListenerContainer 會在一開始的時候就創建一個會話 session 和消費者 Consumer ,并且會使用標準的 JMS MessageConsumer.setMessageListener() 方法注冊監聽器讓 JMS 提供者調用監聽器的回調函數。它不會動態的適應運行時需要和參與外部的事務管理。兼容性方面,它非常接近于獨立的 JMS 規范,但一般不兼容 Java EE 的 JMS 限制。
大多數情況下我們還是使用的 DefaultMessageListenerContainer ,跟 SimpleMessageListenerContainer 相比, DefaultMessageListenerContainer 會動態的適應運行時需要,并且能夠參與外部的事務管理。它很好的平衡了對 JMS 提供者要求低、先進功能如事務參與和兼容 Java EE 環境。
定義處理消息的 MessageListener
??????? 要定義處理消息的 MessageListener 我們只需要實現 JMS 規范中的 MessageListener 接口就可以了。 MessageListener 接口中只有一個方法 onMessage 方法,當接收到消息的時候會自動調用該方法。
?
?
?
?
- package ?com.tiantian.springintejms.listener;??
- ???
- import ?javax.jms.JMSException;??
- import ?javax.jms.Message;??
- import ?javax.jms.MessageListener;??
- import ?javax.jms.TextMessage;??
- ???
- public ? class ?ConsumerMessageListener? implements ?MessageListener?{??
- ???
- ???? public ? void ?onMessage(Message?message)?{??
- ???????? //這里我們知道生產者發送的就是一個純文本消息,所以這里可以直接進行強制轉換,或者直接把onMessage方法的參數改成Message的子類TextMessage ??
- ????????TextMessage?textMsg?=?(TextMessage)?message;??
- ????????System.out.println( "接收到一個純文本消息。" );??
- ???????? try ?{??
- ????????????System.out.println( "消息內容是:" ?+?textMsg.getText());??
- ????????}? catch ?(JMSException?e)?{??
- ????????????e.printStackTrace();??
- ????????}??
- ????}??
- ???
- }??
?
??
?
??????? 有了 MessageListener 之后我們就可以在 Spring 的配置文件中配置一個消息監聽容器了。
- <!--這個是隊列目的地--> ??
- < bean ? id = "queueDestination" ? class = "org.apache.activemq.command.ActiveMQQueue" > ??
- ???? < constructor-arg > ??
- ???????? < value > queue </ value > ??
- ???? </ constructor-arg > ??
- </ bean > ??
- <!--?消息監聽器?--> ??
- < bean ? id = "consumerMessageListener" ? class = "com.tiantian.springintejms.listener.ConsumerMessageListener" /> ??????
- ??
- <!--?消息監聽容器?--> ??
- < bean ? id = "jmsContainer" ???????? class = "org.springframework.jms.listener.DefaultMessageListenerContainer" > ??
- ???? < property ? name = "connectionFactory" ? ref = "connectionFactory" ? /> ??
- ???? < property ? name = "destination" ? ref = "queueDestination" ? /> ??
- ???? < property ? name = "messageListener" ? ref = "consumerMessageListener" ? /> ??
- </ bean > ??
?
?
??????? 我們可以看到我們定義了一個名叫 queue 的 ActiveMQQueue 目的地,我們的監聽器就是監聽了發送到這個目的地的消息。
??????? 至此我們的生成者和消費者都配置完成了,這也就意味著我們的整合已經完成了。這個時候完整的 Spring 的配置文件應該是這樣的:
- <? xml ? version = "1.0" ? encoding = "UTF-8" ?> ??
- < beans ? xmlns = "http://www.springframework.org/schema/beans" ??
- ???? xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance" ? xmlns:context = "http://www.springframework.org/schema/context" ??
- ???? xmlns:jms = "http://www.springframework.org/schema/jms" ??
- ???? xsi:schemaLocation ="http://www.springframework.org/schema/beans??
- ?????http://www.springframework.org/schema/beans/spring-beans-3.0.xsd??
- ?????http://www.springframework.org/schema/context??
- ?????http://www.springframework.org/schema/context/spring-context-3.0.xsd??
- ????http://www.springframework.org/schema/beans?http://www.springframework.org/schema/beans/spring-beans-3.0.xsd??
- ????http://www.springframework.org/schema/jms?http://www.springframework.org/schema/jms/spring-jms-3.0.xsd" > ??
- ???
- ???? < context:component-scan ? base-package = "com.tiantian" ? /> ??
- ???
- ???? <!--?Spring提供的JMS工具類,它可以進行消息發送、接收等?--> ??
- ???? < bean ? id = "jmsTemplate" ? class = "org.springframework.jms.core.JmsTemplate" > ??
- ???????? <!--?這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象?--> ??
- ???????? < property ? name = "connectionFactory" ? ref = "connectionFactory" /> ??
- ???? </ bean > ??
- ??????
- ???? <!--?真正可以產生Connection的ConnectionFactory,由對應的?JMS服務廠商提供--> ??
- ???? < bean ? id = "targetConnectionFactory" ? class = "org.apache.activemq.ActiveMQConnectionFactory" > ??
- ???????? < property ? name = "brokerURL" ? value = "tcp://localhost:61616" /> ??
- ???? </ bean > ??
- ??????
- ???? <!--?Spring用于管理真正的ConnectionFactory的ConnectionFactory?--> ??
- ???? < bean ? id = "connectionFactory" ? class = "org.springframework.jms.connection.SingleConnectionFactory" > ??
- ???????? <!--?目標ConnectionFactory對應真實的可以產生JMS?Connection的ConnectionFactory?--> ??
- ???????? < property ? name = "targetConnectionFactory" ? ref = "targetConnectionFactory" /> ??
- ???? </ bean > ??
- ??????
- ???? <!--這個是隊列目的地--> ??
- ???? < bean ? id = "queueDestination" ? class = "org.apache.activemq.command.ActiveMQQueue" > ??
- ???????? < constructor-arg > ??
- ???????????? < value > queue </ value > ??
- ???????? </ constructor-arg > ??
- ???? </ bean > ??
- ???? <!--?消息監聽器?--> ??
- ???? < bean ? id = "consumerMessageListener" ? class = "com.tiantian.springintejms.listener.ConsumerMessageListener" /> ??
- ???? <!--?消息監聽容器?--> ??
- ???? < bean ? id = "jmsContainer" ??
- ???????? class = "org.springframework.jms.listener.DefaultMessageListenerContainer" > ??
- ???????? < property ? name = "connectionFactory" ? ref = "connectionFactory" ? /> ??
- ???????? < property ? name = "destination" ? ref = "queueDestination" ? /> ??
- ???????? < property ? name = "messageListener" ? ref = "consumerMessageListener" ? /> ??
- ???? </ bean > ??
- </ beans > ??
?
?
??????? 接著我們來測試一下,看看我們的整合是否真的成功了,測試代碼如下:
?
?
?
?
- package ?com.tiantian.springintejms.test;??
- ???
- import ?javax.jms.Destination;??
- ???
- import ?org.junit.Test;??
- import ?org.junit.runner.RunWith;??
- import ?org.springframework.beans.factory.annotation.Autowired;??
- import ?org.springframework.beans.factory.annotation.Qualifier;??
- import ?org.springframework.test.context.ContextConfiguration;??
- import ?org.springframework.test.context.junit4.SpringJUnit4ClassRunner;??
- import ?com.tiantian.springintejms.service.ProducerService;??
- ???
- @RunWith (SpringJUnit4ClassRunner. class )??
- @ContextConfiguration ( "/applicationContext.xml" )??
- public ? class ?ProducerConsumerTest?{??
- ???
- ???? @Autowired ??
- ???? private ?ProducerService?producerService;??
- ???? @Autowired ??
- ???? @Qualifier ( "queueDestination" )??
- ???? private ?Destination?destination;??
- ??????
- ???? @Test ??
- ???? public ? void ?testSend()?{??
- ???????? for ?( int ?i= 0 ;?i< 2 ;?i++)?{??
- ????????????producerService.sendMessage(destination,? "你好,生產者!這是消息:" ?+?(i+ 1 ));??
- ????????}??
- ????}??
- ??????
- }??
?
?
?
??????? 在上面的測試代碼中我們利用生產者發送了兩個消息,正常來說,消費者應該可以接收到這兩個消息。運行測試代碼后控制臺輸出如下:
??????? 看,控制臺已經進行了正確的輸出,這說明我們的整合確實是已經成功了。
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
