?
Spring 提供了一個 JmsTransactionManager 用于對 JMS ConnectionFactory 做事務管理。這將允許 JMS 應用利用 Spring 的事務管理特性。 JmsTransactionManager 在執行本地資源事務管理時將從指定的 ConnectionFactory 綁定一個 ConnectionFactory/Session 這樣的配對到線程中。 JmsTemplate 會自動檢測這樣的事務資源,并對它們進行相應操作。
在 Java EE 環境中, ConnectionFactory 會池化 Connection 和 Session ,這樣這些資源將會在整個事務中被有效地重復利用。在一個獨立的環境中,使用 Spring 的 SingleConnectionFactory 時所有的事務將公用一個 Connection ,但是每個事務將保留自己獨立的 Session 。
JmsTemplate 可以利用 JtaTransactionManager 和能夠進行分布式的 JMS ConnectionFactory 處理分布式事務。
?????? 在 Spring 整合 JMS 的應用中,如果我們要進行本地的事務管理的話非常簡單,只需要在定義對應的消息監聽容器時指定其 sessionTransacted 屬性為 true ,如:
?
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueDestination" /> <property name="messageListener" ref="consumerMessageListener" /> <property name="sessionTransacted" value="true"/> </bean>
?
?????? 該屬性值默認為 false ,這樣 JMS 在進行消息監聽的時候就會進行事務控制,當在接收消息時監聽器執行失敗時 JMS 就會對接收到的消息進行回滾,對于 SessionAwareMessageListener 在接收到消息后發送一個返回消息時也處于同一事務下,但是對于其他操作如數據庫訪問等將不屬于該事務控制。
這里我們可以來做一個這樣的測試:我們如上配置監聽在 queueDestination 的消息監聽容器的 sessionTransacted 屬性為 true ,然后把我們前面提到的消息監聽器 ConsumerMessageListener 改成這樣:
?
public class ConsumerMessageListener implements MessageListener { public void onMessage(Message message) { //這里我們知道生產者發送的就是一個純文本消息,所以這里可以直接進行強制轉換,或者直接把onMessage方法的參數改成Message的子類TextMessage TextMessage textMsg = (TextMessage) message; System.out.println("接收到一個純文本消息。"); try { System.out.println("消息內容是:" + textMsg.getText()); if (1 == 1) { throw new RuntimeException("Error"); } } catch (JMSException e) { e.printStackTrace(); } } }
????????
我們可以看到在上述代碼中我們的 ConsumerMessageListener 在進行消息接收的時候拋出了一個 RuntimeException ,根據我們上面說的,因為我們已經在對應的監聽容器上定義了其 sessionTransacted 屬性為 true ,所以當這里拋出異常的時候 JMS 將對接收到的消息進行回滾,即下次進行消息接收的時候該消息仍然能夠被接收到。為了驗證這一點,我們先執行一遍測試代碼,往 queueDestination 發送一個文本消息,這個時候 ConsumerMessageListener 在進行接收的時候將會拋出一個 RuntimeException ,已經接收到的純文本消息將進行回滾;接著我們去掉上面代碼中拋出異常的語句,即 ConsumerMessageListener 能夠正常的進行消息接收,這個時候我們再運行一次測試代碼,往 ConsumerMessageListener 監聽的 queueDestination 發送一條消息。如果之前在接手時拋出了異常的那條消息已經回滾了的話,那么這個時候將能夠接收到兩條消息,控制臺將輸出接收到的兩條消息的內容。具體結果有興趣的朋友可以自己驗證一下。
?????? 如果想接收消息和數據庫訪問處于同一事務中,那么我們就可以配置一個外部的事務管理同時配置一個支持外部事務管理的消息監聽容器(如 DefaultMessageListenerContainer )。要配置這樣一個參與分布式事務管理的消息監聽容器,我們可以配置一個 JtaTransactionManager ,當然底層的 JMS ConnectionFactory 需要能夠支持分布式事務管理,并正確地注冊我們的 JtaTransactionManager 。這樣消息監聽器進行消息接收和對應的數據庫訪問就會處于同一數據庫控制下,當消息接收失敗或數據庫訪問失敗都會進行事務回滾操作。
?
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueDestination" /> <property name="messageListener" ref="consumerMessageListener" /> <property name="transactionManager" ref="jtaTransactionManager"/> </bean> <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>
????????
當給消息監聽容器指定了 transactionManager 時,消息監聽容器將忽略 sessionTransacted 的值。 ?
?????? 關于使用 JtaTransactionManager 來管理上述分布式事務,我們這里也可以來做一個試驗。
?????? 首先:往 Spring 配置文件 applicationContext.xml 中添加如下配置:
???
<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"> <property name="dataSource" ref="dataSource"/> </bean> <jee:jndi-lookup jndi-name="jdbc/mysql" id="dataSource"/> <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/> <tx:annotation-driven transaction-manager="jtaTransactionManager"/>
??
?????? 我們可以看到,在這里我們引入了一個 jndi 數據源,定義了一個 JtaTransactionManager ,定義了 Spring 基于注解的聲明式事務管理,定義了一個 Spring 提供的進行 Jdbc 操作的工具類 jdbcTemplate 。
?
?????? 接下來把我們的 ConsumerMessageListener 改為如下形式:
public class ConsumerMessageListener implements MessageListener { @Autowired private TestDao testDao; private int count = 0; public void onMessage(Message message) { //這里我們知道生產者發送的就是一個純文本消息,所以這里可以直接進行強制轉換,或者直接把onMessage方法的參數改成Message的子類TextMessage TextMessage textMsg = (TextMessage) message; System.out.println(new Date().toLocaleString() + "接收到一個純文本消息。"); try { String text = textMsg.getText(); System.out.println("消息內容是:" + text); System.out.println("當前count的值是:" + count); testDao.insert(text + count); if (count == 0) { count ++; throw new RuntimeException("Error! 出錯啦!"); } } catch (JMSException e) { e.printStackTrace(); } } }
?
?????? 我們可以看到,在 ConsumerMessageListener 中我們定義了一個實例變量 count ,其初始值為 0 ;在 onMessage 里面,我們可以看到我們把接收到的消息內容作為參數調用了 testDao 的 insert 方法;當 count 值為 0 ,也就是進行第一次消息接收的時候會將 count 的值加 1 ,同時拋出一個運行時異常。那么我們這里要測試的就是進行第一次接收的時候 testDao 已經把相關內容插入數據庫了,接著在 onMessage 里面拋出了一個異常同時 count 加 1 ,我們預期的結果應該是此時數據庫進行回滾,同時 JMS 也回滾,這樣 JMS 將繼續嘗試接收該消息,此時同樣會調用 testDao 的 insert 方法將內容插入數據庫,再接著 count 已經不為 0 了,所以此時將不再拋出異常, JMS 成功進行消息的接收, testDao 也成功的將消息內容插入到了數據庫。要證明這個預期我們除了看數據庫中插入的數據外,還可以看控制臺的輸出,正常情況控制臺將輸出兩次消息接收的內容,且第一次時 count 為 0 ,第二次 count 為 1 。
?????? TestDao 是一個接口,其 TestDaoImpl 對 insert 的方法實現如下: ?
?
@Transactional(readOnly=false) public void insert(final String name) { jdbcTemplate.update("insert into test(name) values(?)", name); }
???????
這里我們使用支持 JtaTransactionManager 的 Weblogic 來進行測試,因為是 Web 容器,所以我們這里定義了一個 Controller 來進行消息的發送,具體代碼如下:
?
@Controller @RequestMapping("test") public class TestController { @Autowired @Qualifier("queueDestination") private Destination destination; @Autowired private ProducerService producerService; @RequestMapping("first") public String first() { producerService.sendMessage(destination, "你好,現在是:" + new Date().toLocaleString()); return "/test/first"; } }
??????
接下來就是啟用 Weblogic 服務器,進入其控制臺,定義一個名叫“ jdbc/mysql ”的 JNDI 數據源,然后把該項目部署到 Weblogic 服務器上并進行啟動。接下來我們就可以訪問 /test/first.do 訪問到上述 first 方法。之后控制臺會輸出如下信息:
???????
我們可以看到當
count
為
0
時接收了一次,并隨后拋出了異常,之后
count
為
1
又接收了一次,這說明在
count
為
0
時拋出異常后我們的
JMS
進行回滾了,那么我們的數據庫是否有進行回滾呢?接著我們來看數據庫中的內容:
???????
我們可以看到數據庫表中只有一條記錄,而且最后一位表示
count
的值的為
1
,這說明在
JMS
進行消息接收拋出異常時我們的數據庫也回滾了。關于使用
JtaTransactionManager
進行分布式事務管理的問題就說到這里了,有興趣的朋友可以自己試驗一下。
?
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
