欧美三区_成人在线免费观看视频_欧美极品少妇xxxxⅹ免费视频_a级毛片免费播放_鲁一鲁中文字幕久久_亚洲一级特黄

java(多線程)實現高性能數據同步

系統 2101 0

?需要將生產環境上Infoxmix里的數據原封不動的Copy到另一臺 Oracle數據庫服務器上,然后對Copy后的數據作些漂白處理。為了將人為干預的因素降到最低,在系統設計時采用Java代碼對數據作Copy,思路 如圖:?



??? 首 先在代碼與生產庫間建立一個Connection,將讀取到的數據放在ResultSet對象,然后再與開發庫建立一個Connection。從 ResultSet取出數據后通過TestConnection插入到開發庫,以此來實現Copy。代碼寫完后運行程序,速度太慢了,一秒鐘只能Copy 一千條數據,生產庫上有上億條數據,按照這個速度同步完要到猴年馬月呀,用PreparedStatement批處理速度也沒有提交多少。我想能不能用多 線程處理,多個人干活總比一個人干活速度要快。
??? 假設生產庫有1萬條數據,我開5個線程,每個線程分2000條數據,同時向開發庫里插數據,Oracle支持高并發這樣的話速度至少會提高好多倍,按照這 個思路重新進行了編碼,批處理設置為1萬條一提交,統計插入數量的變量使用 java.util.concurrent.atomic.AtomicLong,程序一運行,傳輸速度飛快CPU利用率在70%~90%,現在一秒鐘可 以拷貝50萬條記錄,沒過幾分鐘上億條數據一條不落地全部Copy到目標庫。

在查詢的時候我用了如下語句

?

    String queryStr = "SELECT * FROM xx";
ResultSet coreRs = PreparedStatement.executeQuery(queryStr);

  

?

?

實習生問如果xx表里有上千萬條記錄,你全部查詢出來放到ResultSet, 那內存不溢出了么?Java在設計的時候已經考慮到這個問題了,并沒有查詢出所有的數據,而是只查詢了一部分數據放到ResultSet,數據“用完”它 會自動查詢下一批數據,你可以用setFetchSize(int rows)方法設置一個建議值給ResultSet,告訴它每次從數據庫Fetch多少條數據。但我不贊成,因為JDBC驅動會根據實際情況自動調整 Fetch的數量。另外性能也與網線的帶寬有直接的關系。
相關代碼

?

?

    package com.dlbank.domain;  
  
import java.sql.Connection;  
import java.sql.PreparedStatement;  
import java.sql.ResultSet;  
import java.sql.Statement;  
import java.util.List;  
import java.util.concurrent.atomic.AtomicLong;  
  
import org.apache.log4j.Logger;  
  
/** 
 *<p>title: 數據同步類 </p>   
 *<p>Description: 該類用于將生產核心庫數據同步到開發庫</p>   
 *@author Tank Zhang  
 */  
public class CoreDataSyncImpl implements CoreDataSync {  
      
    private List<String> coreTBNames; //要同步的核心庫表名  
    private ConnectionFactory connectionFactory;  
    private Logger log = Logger.getLogger(getClass());  
      
    private AtomicLong currentSynCount = new AtomicLong(0L); //當前已同步的條數  
      
    private int syncThreadNum;  //同步的線程數  
  
    @Override  
    public void syncData(int businessType) throws Exception {  
          
        for (String tmpTBName : coreTBNames) {  
            log.info("開始同步核心庫" + tmpTBName + "表數據");  
            // 獲得核心庫連接  
            Connection coreConnection = connectionFactory.getDMSConnection(4);  
            Statement coreStmt = coreConnection.createStatement();  
            //為每個線程分配結果集  
            ResultSet coreRs = coreStmt.executeQuery("SELECT count(*) FROM "+tmpTBName);  
            coreRs.next();  
            //總共處理的數量  
            long totalNum = coreRs.getLong(1);  
            //每個線程處理的數量  
            long ownerRecordNum =(long) Math.ceil((totalNum / syncThreadNum));   
            log.info("共需要同步的數據量:"+totalNum);  
            log.info("同步線程數量:"+syncThreadNum);  
            log.info("每個線程可處理的數量:"+ownerRecordNum);  
            // 開啟五個線程向目標庫同步數據  
            for(int i=0; i < syncThreadNum; i ++){  
                StringBuilder sqlBuilder = new StringBuilder();  
                //拼裝后SQL示例  
                //Select * From dms_core_ds Where id between 1 And 657398  
                //Select * From dms_core_ds Where id between 657399 And 1314796  
                //Select * From dms_core_ds Where id between 1314797 And 1972194  
                //Select * From dms_core_ds Where id between 1972195 And 2629592  
                //Select * From dms_core_ds Where id between 2629593 And 3286990  
                //..  
                sqlBuilder.append("Select * From ").append(tmpTBName)  
                        .append(" Where id between " ).append(i * ownerRecordNum +1)  
                        .append( " And ")  
                        .append((i * ownerRecordNum + ownerRecordNum));  
                Thread workThread = new Thread(  
                        new WorkerHandler(sqlBuilder.toString(),businessType,tmpTBName));  
                workThread.setName("SyncThread-"+i);  
                workThread.start();  
            }  
            while (currentSynCount.get() < totalNum);  
            //休眠一會兒讓數據庫有機會commit剩余的批處理(只針對JUnit單元測試,
			//因為單元測試完成后會關閉虛擬器,使線程里的代碼沒有機會作提交操作);  
            //Thread.sleep(1000 * 3);  
            log.info( "核心庫"+tmpTBName+"表數據同步完成,共同步了" + currentSynCount.get() + "條數據");  
        }  
    }// end for loop  
      
    public void setCoreTBNames(List<String> coreTBNames) {  
        this.coreTBNames = coreTBNames;  
    }  
  
    public void setConnectionFactory(ConnectionFactory connectionFactory) {  
        this.connectionFactory = connectionFactory;  
    }  
      
    public void setSyncThreadNum(int syncThreadNum) {  
        this.syncThreadNum = syncThreadNum;  
    }  
      
    //數據同步線程  
    final class WorkerHandler implements Runnable {  
        ResultSet coreRs;  
        String queryStr;  
        int businessType;  
        String targetTBName;  
        public WorkerHandler(String queryStr,int businessType,String targetTBName) {  
            this.queryStr = queryStr;  
            this.businessType = businessType;  
            this.targetTBName = targetTBName;  
        }  
        @Override  
        public void run() {  
            try {  
                //開始同步  
                launchSyncData();  
            } catch(Exception e){  
                log.error(e);  
                e.printStackTrace();  
            }  
        }  
        //同步數據方法  
        void launchSyncData() throws Exception{  
            // 獲得核心庫連接  
            Connection coreConnection = connectionFactory.getDMSConnection(4);  
            Statement coreStmt = coreConnection.createStatement();  
            // 獲得目標庫連接  
            Connection targetConn = connectionFactory.getDMSConnection(businessType);  
            targetConn.setAutoCommit(false);// 設置手動提交  
            PreparedStatement targetPstmt =
			 targetConn.prepareStatement("INSERT INTO " + targetTBName+" VALUES (?,?,?,?,?)");  
            ResultSet coreRs = coreStmt.executeQuery(queryStr);  
            log.info(Thread.currentThread().getName()+"'s Query SQL::"+queryStr);  
            int batchCounter = 0; //累加的批處理數量  
            while (coreRs.next()) {  
                targetPstmt.setString(1, coreRs.getString(2));  
                targetPstmt.setString(2, coreRs.getString(3));  
                targetPstmt.setString(3, coreRs.getString(4));  
                targetPstmt.setString(4, coreRs.getString(5));  
                targetPstmt.setString(5, coreRs.getString(6));  
                targetPstmt.addBatch();  
                batchCounter++;  
                currentSynCount.incrementAndGet();//遞增  
                if (batchCounter % 10000 == 0) { //1萬條數據一提交  
                    targetPstmt.executeBatch();  
                    targetPstmt.clearBatch();  
                    targetConn.commit();  
                }  
            }  
            //提交剩余的批處理  
            targetPstmt.executeBatch();  
            targetPstmt.clearBatch();  
            targetConn.commit();  
            //釋放連接   
            connectionFactory.release(targetConn, targetPstmt,coreRs);  
        }  
    }  
}  
  

?

java(多線程)實現高性能數據同步


更多文章、技術交流、商務合作、聯系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 免费网站看v片在线a | 成人av网站免费观看 | 欧美精品一区二区三区在线播放 | 性夜影院爽黄a爽免费看网站 | 久久久国产这里有的是精品 | sm高h视频 | 久久精品国产精品亚洲综合 | 两性免费视频 | 国产精品久久久久久亚洲色 | 凛子小姐想试试 | a三级毛片 | 精品久久久久久久人人人人传媒 | 亚瑟天堂久久一区二区影院 | 成年网站视频在线观看 | 欧美一级二级在线观看 | 免费成人在线网站 | 国产第一页浮力 | 亚洲综合久久久久久888 | 四虎综合 | 欧美激情视频网站 | 欧美精品一区二区免费 | 欧美日韩一区二区三 | 狠狠激情| 91视频会员| 日本午夜免费无码片三汲大片 | 男进女内免费视频无遮挡 | 在线天堂中文在线资源网 | 亚洲精品乱码久久久久久花季 | 亚洲图片欧洲电影 | 91免费公开视频 | 欧美18一12sex性处hd | 中文字幕 国产精品 | 欧美极品在线 | 91茄子国产线观看免费 | 欧美午夜一艳片欧美精品 | 無码一区中文字幕少妇熟女H | 91精品视频在线播放 | 亚洲国产精品一区二区久久 | 青娱乐在线免费观看视频 | 亚洲免费网 | 色诱成人免费观看视频 |