欧美三区_成人在线免费观看视频_欧美极品少妇xxxxⅹ免费视频_a级毛片免费播放_鲁一鲁中文字幕久久_亚洲一级特黄

ActiveMQ消息持久化到數(shù)據(jù)庫(kù)

系統(tǒng) 2026 0
        本人實(shí)現(xiàn)的功能為activemq將消息持久化到數(shù)據(jù)庫(kù)的方法:
    

1:前言

???? 這一段給公司開發(fā)消息總線有機(jī)會(huì)研究ActiveMQ,今天撰文給大家介紹一下他的持久化消息。本文只介紹三種方式,分別是持久化為文件,MYSql,Oracle。下面逐一介紹。

A:持久化為文件

???? 這個(gè)你裝ActiveMQ時(shí)默認(rèn)就是這種,只要你設(shè)置消息為持久化就可以了。涉及到的配置和代碼有

< persistenceAdapter >
< kahaDB directory = " ${activemq.base}/data/kahadb " />
</ persistenceAdapter >

producer.Send(request, MsgDeliveryMode.Persistent, level, TimeSpan.MinValue);

B:持久化為MySql

???? 你首先需要把MySql的驅(qū)動(dòng)放到ActiveMQ的Lib目錄下,我用的文件名字是:mysql-connector-java-5.0.4-bin.jar

???? 接下來(lái)你修改配置文件

< persistenceAdapter >
< jdbcPersistenceAdapter dataDirectory = " ${activemq.base}/data " dataSource = " #derby-ds " />
</ persistenceAdapter >

在配置文件中的broker節(jié)點(diǎn)外增加

< bean id = " derby-ds " class = " org.apache.commons.dbcp.BasicDataSource " destroy - method = " close " >
< property name = " driverClassName " value = " com.mysql.jdbc.Driver " />
< property name = " url " value = " jdbc:mysql://localhost/activemq?relaxAutoCommit=true " />
< property name = " username " value = " activemq " />
< property name = " password " value = " activemq " />
< property name = " maxActive " value = " 200 " />
< property name = " poolPreparedStatements " value = " true " />
</ bean >

從配置中可以看出數(shù)據(jù)庫(kù)的名稱是activemq,你需要手動(dòng)在MySql中增加這個(gè)庫(kù)。

然后重新啟動(dòng)消息隊(duì)列,你會(huì)發(fā)現(xiàn)多了3張表

1:activemq_acks

2:activemq_lock

3:activemq_msgs

C:持久化為Oracle

??? 和持久化為MySql一樣。這里我說(shuō)兩點(diǎn)

1;在ActiveMQ安裝文件夾里的Lib文件夾中增加Oracle的JDBC驅(qū)動(dòng)。驅(qū)動(dòng)文件位于Oracle客戶端安裝文件中的product\11.1.0\client_1\jdbc\lib文件夾下。

2:

< bean id = " derby-ds " class = " org.apache.commons.dbcp.BasicDataSource " destroy - method = " close " >
< property name = " driverClassName " value = " oracle.jdbc.driver.OracleDriver " />
< property name = " url " value = " jdbc:oracle:thin:@10.53.132.47:1521:test " />
< property name = " username " value = " qdcommu " />
< property name = " password " value = " qdcommu " />
< property name = " maxActive " value = " 200 " />
< property name = " poolPreparedStatements " value = " true " />
</ bean >

這里的jdbc:oracle:thin:@10.53.132.47:1521:test按照自己實(shí)際情況設(shè)置一下就可以了,特別注意的是test是SID即服務(wù)名稱而不是TNS中配置的節(jié)點(diǎn)名。各位同學(xué)只需要替換IP,端口和這個(gè)SID就可以了。

      
 
 
消息消費(fèi)者的事先代碼:
    
      package easyway.activemq.app;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
/***
 * 消息持久化到數(shù)據(jù)庫(kù)
 *  @author longgangbai
 */
public class MessageCustomer {
	private static Logger logger=LogManager.getLogger(MessageProductor.class);
	  private String username=ActiveMQConnectionFactory.DEFAULT_USER;
	  private String password=ActiveMQConnectionFactory.DEFAULT_PASSWORD;
	  private  String url=ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
	  private static String QUEUENAME="ActiveMQ.QUEUE";
	  protected static final int messagesExpected = 10;
	  protected ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
			  url+"?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries="+messagesExpected);
	  
	  
	/***
	 * 創(chuàng)建Broker服務(wù)對(duì)象
	 * @return
	 * @throws Exception
	 */
	public BrokerService createBroker()throws Exception{
		BrokerService  broker=new BrokerService();
	    broker.addConnector(url);
		return broker;
	}

	/**
	 * 啟動(dòng)BrokerService進(jìn)程
	 * @throws Exception
	 */
	public void init() throws Exception{
		BrokerService brokerService=createBroker();
		brokerService.start();
	}
	/**
	 * 接收的信息
	 * @return
	 * @throws Exception
	 */
	public int receiveMessage() throws Exception{
		Connection connection=connectionFactory.createConnection();
		connection.start();
		Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
		return receiveMessages(messagesExpected,session);
	}
	

	/**
	 * 接受信息的方法
	 * @param messagesExpected
	 * @param session
	 * @return
	 * @throws Exception
	 */
	protected int receiveMessages(int messagesExpected, Session session) throws Exception {
        int messagesReceived = 0;
        for (int i=0; i<messagesExpected; i++) {
            Destination destination = session.createQueue(QUEUENAME);
            MessageConsumer consumer = session.createConsumer(destination);
            Message message = null;
            try {
            	logger.debug("Receiving message " + (messagesReceived+1) + " of " + messagesExpected);
                message = consumer.receive(2000);
                logger.info("Received : " + message);
                if (message != null) {
                    session.commit();
                    messagesReceived++;
                }
            } catch (Exception e) {
            	logger.debug("Caught exception " + e);
                session.rollback();
            } finally {
                if (consumer != null) {
                    consumer.close();
                }
            }
        }
        return messagesReceived;
    }


	public String getPassword() {
		return password;
	}
	public void setPassword(String password) {
		this.password = password;
	}
	public String getUrl() {
		return url;
	}
	public void setUrl(String url) {
		this.url = url;
	}
	public String getUsername() {
		return username;
	}
	public void setUsername(String username) {
		this.username = username;
	}
}

    

?消息生產(chǎn)者的代碼:

      package easyway.activemq.app;

import java.io.File;
import java.util.Properties;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.sql.DataSource;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.jdbc.adapter.MySqlJDBCAdapter;
import org.apache.commons.dbcp.BasicDataSourceFactory;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import easyway.activemq.app.utils.BrokenPersistenceAdapter;
/**
 * 消息持久化到數(shù)據(jù)庫(kù)
 * @author longgangbai
 *
 */
public class MessageProductor {
	  private static Logger logger=LogManager.getLogger(MessageProductor.class);
	  private String username=ActiveMQConnectionFactory.DEFAULT_USER;
	  private String password=ActiveMQConnectionFactory.DEFAULT_PASSWORD;
	  private  String url=ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
	  private static String queueName="ActiveMQ.QUEUE";
	  private BrokerService brokerService;
	  protected static final int messagesExpected = 10;
	  protected ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
	            "tcp://localhost:61617?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries="+messagesExpected);
	/***
	 * 創(chuàng)建Broker服務(wù)對(duì)象
	 * @return
	 * @throws Exception
	 */
	public BrokerService createBroker()throws Exception{
			BrokerService  broker=new BrokerService();
			BrokenPersistenceAdapter jdbc=createBrokenPersistenceAdapter();
			broker.setPersistenceAdapter(jdbc);
			jdbc.setDataDirectory(System.getProperty("user.dir")+File.separator+"data"+File.separator);
			jdbc.setAdapter(new MySqlJDBCAdapter());
			broker.setPersistent(true);
			broker.addConnector("tcp://localhost:61617");
			//broker.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
		return broker;
	}
	/**
	 * 創(chuàng)建Broken的持久化適配器
	 * @return
	 * @throws Exception
	 */
	public BrokenPersistenceAdapter createBrokenPersistenceAdapter() throws Exception{
		BrokenPersistenceAdapter jdbc=new BrokenPersistenceAdapter();
		DataSource datasource=createDataSource();
		jdbc.setDataSource(datasource);
		jdbc.setUseDatabaseLock(false);
		//jdbc.deleteAllMessages();
		return jdbc;
	}
	/**
	 * 創(chuàng)建數(shù)據(jù)源
	 * @return
	 * @throws Exception
	 */
	public DataSource createDataSource() throws Exception{
		Properties props=new Properties();
		props.put("driverClassName", "com.mysql.jdbc.Driver");
		props.put("url", "jdbc:mysql://localhost:3306/activemq");
		props.put("username", "root");
		props.put("password", "root");
		DataSource datasource=BasicDataSourceFactory.createDataSource(props);
		return datasource;
	}
	/**
	 * 啟動(dòng)BrokerService進(jìn)程
	 * @throws Exception
	 */
	public void init() throws Exception{
		createBrokerService();
		brokerService.start();
	}
	public BrokerService createBrokerService() throws Exception{
		if(brokerService==null){
			brokerService=createBroker();
		}
		return brokerService;
	}
	
	public void sendMessage() throws JMSException{
		Connection connection=connectionFactory.createConnection();
		connection.start();
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	    Destination destination = session.createQueue(queueName);        
	    MessageProducer producer = session.createProducer(destination);
	    producer.setDeliveryMode(DeliveryMode.PERSISTENT);
		for(int i=0;i<messagesExpected;i++){
			 logger.debug("Sending message " + (i+1) + " of " + messagesExpected);
	         producer.send(session.createTextMessage("test message " + (i+1)));
		}
		connection.close();
	}
	
	

	public String getPassword() {
		return password;
	}
	public void setPassword(String password) {
		this.password = password;
	}
	public String getUrl() {
		return url;
	}
	public void setUrl(String url) {
		this.url = url;
	}
	public String getUsername() {
		return username;
	}
	public void setUsername(String username) {
		this.username = username;
	}
}

    

?

持久化適配器類

      package easyway.activemq.app.utils;


import java.io.IOException;

import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * 
 * @author longgangbai
 *
 */
public class BrokenPersistenceAdapter extends JDBCPersistenceAdapter {

    private final Logger LOG = LoggerFactory.getLogger(BrokenPersistenceAdapter.class);

    private boolean shouldBreak = false;

    @Override
    public void commitTransaction(ConnectionContext context) throws IOException {
        if ( shouldBreak ) {
            LOG.warn("Throwing exception on purpose");
            throw new IOException("Breaking on purpose");
        }
        LOG.debug("in commitTransaction");
        super.commitTransaction(context);
    }

    public void setShouldBreak(boolean shouldBreak) {
        this.shouldBreak = shouldBreak;
    }
}

    

?

測(cè)測(cè)試代碼如下:

?

?

      package easyway.activemq.app.test;

import easyway.activemq.app.MessageProductor;

public class MessageProductorTest {
	
	public static void main(String[] args) throws Exception {
		MessageProductor  productor =new MessageProductor();
		productor.init();
		productor.sendMessage();
		//productor.createBrokerService().stop();
	}

}

    

?

      package easyway.activemq.app.test;

import easyway.activemq.app.MessageCustomer;

public class MessageCustomerTest {
  public static void main(String[] args) throws Exception {
	  MessageCustomer  customer=new MessageCustomer();
	  //customer.init();  //當(dāng)兩臺(tái)機(jī)器在不同的服務(wù)器上啟動(dòng)客戶端的broker進(jìn)程
	  customer.receiveMessage();
	  
}
}

    

?

?

備注:運(yùn)行過(guò)程為:首先執(zhí)行MessageProductorTest,MessageCustomerTest。

??????? mysql數(shù)據(jù)庫(kù)activemq必須存在。關(guān)于消息持久化的表結(jié)構(gòu)如下:

? ActiveMQ消息持久化到數(shù)據(jù)庫(kù)

?

ActiveMQ消息持久化到數(shù)據(jù)庫(kù)

ActiveMQ消息持久化到數(shù)據(jù)庫(kù)


更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號(hào)聯(lián)系: 360901061

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對(duì)您有幫助就好】

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對(duì)您有幫助,請(qǐng)用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長(zhǎng)會(huì)非常 感謝您的哦?。?!

發(fā)表我的評(píng)論
最新評(píng)論 總共0條評(píng)論
主站蜘蛛池模板: 男女爽爽无遮挡午夜动态图 | 涩涩天堂| 亚洲午夜电影 | 亚洲激情视频在线观看 | 91tv最新永久在线地址 | 91看片儿| 欧美日韩中出 | 另类视频色综合 | 视频一区中文字幕 | 日本精品区 | 精品国产第一国产综合精品 | 欧美精品网站 | 欧美激情刺激爽免费视频观看 | 波多野结衣精品一区二区三区 | 84pao视频强力打造免费视频 | 国产精品久久久久久久免费 | 三级视频网站 | 亚洲日韩欧美一区二区在线 | 欧美一级α片毛片免费观看 | 天堂va在线高清一区 | 国产成人在线观看免费网站 | 亚洲成片在线观看12345ba | 亚洲伊人成色综合网 | 亚洲乱码在线卡一卡二卡新区 | 日本www.在线中文字幕 | 三级网址在线播放 | 亚州a| 91免费版在线观看 | 久久com| 啪啪免费网站 | 成人永久福利在线观看不卡 | 免费的黄色一级片 | 日韩毛片大全免费高清 | 国产精品吹潮在线观看中文 | 91精品观看91久久久久久 | 欧美日韩国产一区二区三区 | 91视频免费观看高清观看完整 | 九九色综合 | 欧美乱大交xxxx | 奇米影视在线 | 精品精品 |