最近在測試HCatalog,由于Hcatalog本身就是一個獨立JAR包,雖然它也可以運行service,但是其實這個service就是metastore thrift server,我們在寫基于Hcatalog的mapreduce job時候只要把hcatalog JAR包和對應(yīng)的hive-site.xml文件加入libjars和HADOOP_CLASSPATH中就可以了。 不過在測試的時候還是遇到了一些問題,hive metastore server在運行了一段時間后會拋如下錯誤
?
2013-06-19 10:35:51,718 ERROR server.TThreadPoolServer (TThreadPoolServer.java:run(182)) - Error occurred during processing of message.
javax.jdo.JDOFatalUserException: Persistence Manager has been closed
at org.datanucleus.jdo.JDOPersistenceManager.assertIsOpen(JDOPersistenceManager.java:2124)
at org.datanucleus.jdo.JDOPersistenceManager.currentTransaction(JDOPersistenceManager.java:315)
at org.apache.hadoop.hive.metastore.ObjectStore.openTransaction(ObjectStore.java:294)
at org.apache.hadoop.hive.metastore.ObjectStore.getTable(ObjectStore.java:732)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.hive.metastore.RetryingRawStore.invoke(RetryingRawStore.java:111)
at com.sun.proxy.$Proxy5.getTable(Unknown Source)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:982)
at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Processor$get_table.getResult(ThriftHiveMetastore.java:5017)
at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Processor$get_table.getResult(ThriftHiveMetastore.java:5005)
at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:32)
at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:34)
?
?
其中PersistenceManager負(fù)責(zé)控制一組持久化對象包括創(chuàng)建持久化對象和查詢對象,它是ObjectStore的一個實例變量,每個ObjectStore擁有一個pm,RawStore是metastore邏輯層和物理底層元數(shù)據(jù)庫(比如derby)交互的接口類,ObjectStore是RawStore的默認(rèn)實現(xiàn)類。Hive Metastore Server啟動的時候會指定一個TProcessor,包裝了一個HMSHandler,內(nèi)部有一個ThreadLocal<RawStore> threadLocalMS實例變量,每個thread維護(hù)一個RawStore
?
private final ThreadLocal<RawStore> threadLocalMS =
new ThreadLocal<RawStore>() {
@Override
protected synchronized RawStore initialValue() {
return null;
}
};
每一個從hive metastore client過來的請求都會從線程池中分配一個
WorkerProcess來處理,在HMSHandler中每一個方法都會通過getMS()獲取rawstore instance來做具體操作
?
?
public RawStore getMS() throws MetaException {
RawStore ms = threadLocalMS.get();
if (ms == null) {
ms = newRawStore();
threadLocalMS.set(ms);
ms = threadLocalMS.get();
}
return ms;
}
看得出來RawStore是延遲加載,初始化后綁定到threadlocal變量中可以為以后復(fù)用
?
?
private RawStore newRawStore() throws MetaException {
LOG.info(addPrefix("Opening raw store with implemenation class:"
+ rawStoreClassName));
Configuration conf = getConf();
return RetryingRawStore.getProxy(hiveConf, conf, rawStoreClassName, threadLocalId.get());
}
RawStore使用了動態(tài)代理模式(繼承
InvocationHandler接口
),內(nèi)部實現(xiàn)了invoke函數(shù),通過method.invoke()執(zhí)行真正的邏輯,這樣的好處是可以在
method.invoke()上下文中添加自己其他的邏輯,RetryingRawStore就是在通過捕捉invoke函數(shù)拋出的異常,來達(dá)到重試的效果。由于使用reflection機(jī)制,異常是wrap在
InvocationTargetException中的,
不過在hive 0.9中竟然在捕捉到
此異常后直接throw出來了,而不是retry,明顯不對啊。我對它修改了下,拿出wrap的target exception,判斷是不是instance of jdoexception的,再做相應(yīng)的處理
?
?
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Object ret = null;
boolean gotNewConnectUrl = false;
boolean reloadConf = HiveConf.getBoolVar(hiveConf,
HiveConf.ConfVars.METASTOREFORCERELOADCONF);
boolean reloadConfOnJdoException = false;
if (reloadConf) {
updateConnectionURL(getConf(), null);
}
int retryCount = 0;
Exception caughtException = null;
while (true) {
try {
if (reloadConf || gotNewConnectUrl || reloadConfOnJdoException) {
initMS();
}
ret = method.invoke(base, args);
break;
} catch (javax.jdo.JDOException e) {
caughtException = (javax.jdo.JDOException) e.getCause();
} catch (UndeclaredThrowableException e) {
throw e.getCause();
} catch (InvocationTargetException e) {
Throwable t = e.getTargetException();
if (t instanceof JDOException){
caughtException = (JDOException) e.getTargetException();
reloadConfOnJdoException = true;
LOG.error("rawstore jdoexception:" + caughtException.toString());
}else {
throw e.getCause();
}
}
if (retryCount >= retryLimit) {
throw caughtException;
}
assert (retryInterval >= 0);
retryCount++;
LOG.error(
String.format(
"JDO datastore error. Retrying metastore command " +
"after %d ms (attempt %d of %d)", retryInterval, retryCount, retryLimit));
Thread.sleep(retryInterval);
// If we have a connection error, the JDO connection URL hook might
// provide us with a new URL to access the datastore.
String lastUrl = getConnectionURL(getConf());
gotNewConnectUrl = updateConnectionURL(getConf(), lastUrl);
}
return ret;
}
初始化RawStore有兩種方式,一種是在
RetryingRawStore的構(gòu)造函數(shù)中調(diào)用"
this.base = (RawStore) ReflectionUtils.newInstance(rawStoreClass, conf);
" ?因為ObjectStore實現(xiàn)了Configurable,在newInstance方法中主動調(diào)用里面的setConf(conf)方法初始化RawStore,還有一種情況是在捕捉到異常后retry,也會調(diào)用
base.setConf(getConf());
?
?
private void initMS() {
base.setConf(getConf());
}
?
ObjectStore的setConf方法中,先將PersistenceManagerFactory鎖住,pm close掉,設(shè)置成NULL,再初始化pm
?
public void setConf(Configuration conf) {
// Although an instance of ObjectStore is accessed by one thread, there may
// be many threads with ObjectStore instances. So the static variables
// pmf and prop need to be protected with locks.
pmfPropLock.lock();
try {
isInitialized = false;
hiveConf = conf;
Properties propsFromConf = getDataSourceProps(conf);
boolean propsChanged = !propsFromConf.equals(prop);
if (propsChanged) {
pmf = null;
prop = null;
}
assert(!isActiveTransaction());
shutdown();
// Always want to re-create pm as we don't know if it were created by the
// most recent instance of the pmf
pm = null;
openTrasactionCalls = 0;
currentTransaction = null;
transactionStatus = TXN_STATUS.NO_STATE;
initialize(propsFromConf);
if (!isInitialized) {
throw new RuntimeException(
"Unable to create persistence manager. Check dss.log for details");
} else {
LOG.info("Initialized ObjectStore");
}
} finally {
pmfPropLock.unlock();
}
}
?
private void initialize(Properties dsProps) {
LOG.info("ObjectStore, initialize called");
prop = dsProps;
pm = getPersistenceManager();
isInitialized = pm != null;
return;
}
回到一開始報錯的那段信息,怎么會Persistence Manager會被關(guān)閉呢,仔細(xì)排查后才發(fā)現(xiàn)是由于HCatalog使用HiveMetastoreClient用完后主動調(diào)用了close方法,而一般Hive里面內(nèi)部不會調(diào)這個方法.
?
HiveMetaStoreClient.java
?
public void close() {
isConnected = false;
try {
if (null != client) {
client.shutdown();
}
} catch (TException e) {
LOG.error("Unable to shutdown local metastore client", e);
}
// Transport would have got closed via client.shutdown(), so we dont need this, but
// just in case, we make this call.
if ((transport != null) && transport.isOpen()) {
transport.close();
}
}
對應(yīng)server端HMSHandler中的shutdown方法
@Override
public void shutdown() {
logInfo("Shutting down the object store...");
RawStore ms = threadLocalMS.get();
if (ms != null) {
ms.shutdown();
ms = null;
}
logInfo("Metastore shutdown complete.");
}
ObjectStore的shutdown方法
?
?
public void shutdown() {
if (pm != null) {
pm.close();
}
}
?
?
?
我們看到shutdown方法里面只是把當(dāng)前thread的ObjectStore拿出來后,做了一個ObjectStore shutdown方法,把pm關(guān)閉了。但是并沒有把ObjectStore銷毀掉,它還是存在于threadLocalMS中,下次還是會被拿出來,下一次這個thread服務(wù)于另外一個請求的時候又會被get出ObjectSture來,但是由于里面的pm已經(jīng)close掉了所以肯定拋異常。正確的做法是應(yīng)該加上threadLocalMS.remove()或者threadLocalMS.set(null),主動將其從ThreadLocalMap中刪除。
修改后的 shutdown方法
?
public void shutdown() {
logInfo("Shutting down the object store...");
RawStore ms = threadLocalMS.get();
if (ms != null) {
ms.shutdown();
ms = null;
threadLocalMS.remove();
}
logInfo("Metastore shutdown complete.");
}
?
Hive Metastore ObjectStore PersistenceManager自動關(guān)閉bug解析
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061
微信掃一掃加我為好友
QQ號聯(lián)系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元

