?需要將生產環境上Infoxmix里的數據原封不動的Copy到另一臺 Oracle數據庫服務器上,然后對Copy后的數據作些漂白處理。為了將人為干預的因素降到最低,在系統設計時采用Java代碼對數據作Copy,思路 如圖:?
??? 首 先在代碼與生產庫間建立一個Connection,將讀取到的數據放在ResultSet對象,然后再與開發庫建立一個Connection。從 ResultSet取出數據后通過TestConnection插入到開發庫,以此來實現Copy。代碼寫完后運行程序,速度太慢了,一秒鐘只能Copy 一千條數據,生產庫上有上億條數據,按照這個速度同步完要到猴年馬月呀,用PreparedStatement批處理速度也沒有提交多少。我想能不能用多 線程處理,多個人干活總比一個人干活速度要快。
??? 假設生產庫有1萬條數據,我開5個線程,每個線程分2000條數據,同時向開發庫里插數據,Oracle支持高并發這樣的話速度至少會提高好多倍,按照這 個思路重新進行了編碼,批處理設置為1萬條一提交,統計插入數量的變量使用 java.util.concurrent.atomic.AtomicLong,程序一運行,傳輸速度飛快CPU利用率在70%~90%,現在一秒鐘可 以拷貝50萬條記錄,沒過幾分鐘上億條數據一條不落地全部Copy到目標庫。
在查詢的時候我用了如下語句
?
String queryStr = "SELECT * FROM xx";
ResultSet coreRs = PreparedStatement.executeQuery(queryStr);
?
?
實習生問如果xx表里有上千萬條記錄,你全部查詢出來放到ResultSet, 那內存不溢出了么?Java在設計的時候已經考慮到這個問題了,并沒有查詢出所有的數據,而是只查詢了一部分數據放到ResultSet,數據“用完”它 會自動查詢下一批數據,你可以用setFetchSize(int rows)方法設置一個建議值給ResultSet,告訴它每次從數據庫Fetch多少條數據。但我不贊成,因為JDBC驅動會根據實際情況自動調整 Fetch的數量。另外性能也與網線的帶寬有直接的關系。
相關代碼
?
?
package com.dlbank.domain;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
/**
*<p>title: 數據同步類 </p>
*<p>Description: 該類用于將生產核心庫數據同步到開發庫</p>
*@author Tank Zhang
*/
public class CoreDataSyncImpl implements CoreDataSync {
private List<String> coreTBNames; //要同步的核心庫表名
private ConnectionFactory connectionFactory;
private Logger log = Logger.getLogger(getClass());
private AtomicLong currentSynCount = new AtomicLong(0L); //當前已同步的條數
private int syncThreadNum; //同步的線程數
@Override
public void syncData(int businessType) throws Exception {
for (String tmpTBName : coreTBNames) {
log.info("開始同步核心庫" + tmpTBName + "表數據");
// 獲得核心庫連接
Connection coreConnection = connectionFactory.getDMSConnection(4);
Statement coreStmt = coreConnection.createStatement();
//為每個線程分配結果集
ResultSet coreRs = coreStmt.executeQuery("SELECT count(*) FROM "+tmpTBName);
coreRs.next();
//總共處理的數量
long totalNum = coreRs.getLong(1);
//每個線程處理的數量
long ownerRecordNum =(long) Math.ceil((totalNum / syncThreadNum));
log.info("共需要同步的數據量:"+totalNum);
log.info("同步線程數量:"+syncThreadNum);
log.info("每個線程可處理的數量:"+ownerRecordNum);
// 開啟五個線程向目標庫同步數據
for(int i=0; i < syncThreadNum; i ++){
StringBuilder sqlBuilder = new StringBuilder();
//拼裝后SQL示例
//Select * From dms_core_ds Where id between 1 And 657398
//Select * From dms_core_ds Where id between 657399 And 1314796
//Select * From dms_core_ds Where id between 1314797 And 1972194
//Select * From dms_core_ds Where id between 1972195 And 2629592
//Select * From dms_core_ds Where id between 2629593 And 3286990
//..
sqlBuilder.append("Select * From ").append(tmpTBName)
.append(" Where id between " ).append(i * ownerRecordNum +1)
.append( " And ")
.append((i * ownerRecordNum + ownerRecordNum));
Thread workThread = new Thread(
new WorkerHandler(sqlBuilder.toString(),businessType,tmpTBName));
workThread.setName("SyncThread-"+i);
workThread.start();
}
while (currentSynCount.get() < totalNum);
//休眠一會兒讓數據庫有機會commit剩余的批處理(只針對JUnit單元測試,
//因為單元測試完成后會關閉虛擬器,使線程里的代碼沒有機會作提交操作);
//Thread.sleep(1000 * 3);
log.info( "核心庫"+tmpTBName+"表數據同步完成,共同步了" + currentSynCount.get() + "條數據");
}
}// end for loop
public void setCoreTBNames(List<String> coreTBNames) {
this.coreTBNames = coreTBNames;
}
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
public void setSyncThreadNum(int syncThreadNum) {
this.syncThreadNum = syncThreadNum;
}
//數據同步線程
final class WorkerHandler implements Runnable {
ResultSet coreRs;
String queryStr;
int businessType;
String targetTBName;
public WorkerHandler(String queryStr,int businessType,String targetTBName) {
this.queryStr = queryStr;
this.businessType = businessType;
this.targetTBName = targetTBName;
}
@Override
public void run() {
try {
//開始同步
launchSyncData();
} catch(Exception e){
log.error(e);
e.printStackTrace();
}
}
//同步數據方法
void launchSyncData() throws Exception{
// 獲得核心庫連接
Connection coreConnection = connectionFactory.getDMSConnection(4);
Statement coreStmt = coreConnection.createStatement();
// 獲得目標庫連接
Connection targetConn = connectionFactory.getDMSConnection(businessType);
targetConn.setAutoCommit(false);// 設置手動提交
PreparedStatement targetPstmt =
targetConn.prepareStatement("INSERT INTO " + targetTBName+" VALUES (?,?,?,?,?)");
ResultSet coreRs = coreStmt.executeQuery(queryStr);
log.info(Thread.currentThread().getName()+"'s Query SQL::"+queryStr);
int batchCounter = 0; //累加的批處理數量
while (coreRs.next()) {
targetPstmt.setString(1, coreRs.getString(2));
targetPstmt.setString(2, coreRs.getString(3));
targetPstmt.setString(3, coreRs.getString(4));
targetPstmt.setString(4, coreRs.getString(5));
targetPstmt.setString(5, coreRs.getString(6));
targetPstmt.addBatch();
batchCounter++;
currentSynCount.incrementAndGet();//遞增
if (batchCounter % 10000 == 0) { //1萬條數據一提交
targetPstmt.executeBatch();
targetPstmt.clearBatch();
targetConn.commit();
}
}
//提交剩余的批處理
targetPstmt.executeBatch();
targetPstmt.clearBatch();
targetConn.commit();
//釋放連接
connectionFactory.release(targetConn, targetPstmt,coreRs);
}
}
}
?
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061
微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元

