黄色网页视频 I 影音先锋日日狠狠久久 I 秋霞午夜毛片 I 秋霞一二三区 I 国产成人片无码视频 I 国产 精品 自在自线 I av免费观看网站 I 日本精品久久久久中文字幕5 I 91看视频 I 看全色黄大色黄女片18 I 精品不卡一区 I 亚洲最新精品 I 欧美 激情 在线 I 人妻少妇精品久久 I 国产99视频精品免费专区 I 欧美影院 I 欧美精品在欧美一区二区少妇 I av大片网站 I 国产精品黄色片 I 888久久 I 狠狠干最新 I 看看黄色一级片 I 黄色精品久久 I 三级av在线 I 69色综合 I 国产日韩欧美91 I 亚洲精品偷拍 I 激情小说亚洲图片 I 久久国产视频精品 I 国产综合精品一区二区三区 I 色婷婷国产 I 最新成人av在线 I 国产私拍精品 I 日韩成人影音 I 日日夜夜天天综合

JAVA線程池代碼淺析

系統(tǒng) 2236 0

1. ?????? ExecutorService
JAVA線程池代碼淺析
?
Java 1.5 開始正式提供了并發(fā)包 , 而這個并發(fā)包里面除了原子變量 ,synchronizer, 并發(fā)容器 , 另外一個非常重要的特性就是線程池 . 對于線程池的意義 , 我們這邊不再多說 .

上圖是線程池的主體類圖 ,ThreadPoolExecutor 是應(yīng)用最為廣泛的一個線程池實現(xiàn) ( 我也將在接下來的文字中詳細(xì)描述我對這個類的理解和執(zhí)行機制 ),ScheduledThreadPoolExecutor 則在 ThreadPoolExecutor 上提供了定時執(zhí)行的等附加功能 , 這個可以從 ScheduledExecutorService 接口的定義中看出來 .Executors 則類似工廠方法 , 提供了幾個非常常用的線程池初始化方法 .

ThreadPoolExecutor

這個類繼承了 AbstractExecutorService 抽象類 , AbstractExecutorService 主要的職責(zé)有 2 部分 , 一部分定義和實現(xiàn)提交任務(wù)的方法 (3 submit 方法的實現(xiàn) ) , 實例化 FutureTask 并且交給子類執(zhí)行 , 另外一部分實現(xiàn) invokeAny,invokeAll 方法 . 留給子類的方法為 execute 方法 , 也就是 Executor 接口定義的方法 .

// 實例化一個FutureTask,交給子類的execute方法執(zhí)行.這種設(shè)計能夠保證callable和runnable的執(zhí)行接口方法的一致性(FutureTask包裝了這個差別)
public ? < T > ?Future < T > ?submit(Runnable?task,?T?result)? {
????
if ?(task? == ? null )? throw ? new ?NullPointerException();
????RunnableFuture
< T > ?ftask? = ?newTaskFor(task,?result);
????execute(ftask);
????
return ?ftask;
}


protected ? < T > ?RunnableFuture < T > ?newTaskFor(Runnable?runnable,?T?value)? {
????
return ? new ?FutureTask < T > (runnable,?value);
}

關(guān)于 FutureTask 這個類的實現(xiàn) , 我在前面的 JAVA LOCK 代碼淺析有講過其實現(xiàn)原理 , 主要的思想就是關(guān)注任務(wù)完成與未完成的狀態(tài) , 任務(wù)提交線程 get() 結(jié)果時被 park , 等待任務(wù)執(zhí)行完成被喚醒 , 任務(wù)執(zhí)行線程在任務(wù)執(zhí)行完畢后設(shè)置結(jié)果 , 并且 unpark 對應(yīng)線程并且讓其得到執(zhí)行結(jié)果 .

回到 ThreadPoolExecutor .ThreadPoolExecutor 需要實現(xiàn)除了我們剛才說的 execute(Runnable command) 方法外 , 還得實現(xiàn) ExecutorService 接口定義的部分方法 . ThreadPoolExecutor 所提供的不光是這些 , 以下根據(jù)我的理解來列一下它所具有的特性
1. ?????? execute 流程
2. ??????
3. ?????? 工作隊列
4. ?????? 飽和拒絕策略
5. ?????? 線程工廠
6. ?????? beforeExecute afterExecute 擴(kuò)展

execute 方法的實現(xiàn)有個機制非常重要 , 當(dāng)當(dāng)前線程池線程數(shù)量小于 corePoolSize, 那么生成一個新的 worker 并把提交的任務(wù)置為這個工作線程的頭一個執(zhí)行任務(wù) , 如果大于 corePoolSize, 那么會試著將提交的任務(wù)塞到 workQueue 里面供線程池里面的worker稍后執(zhí)行 , 并不是直接再起一個 worker, 但是當(dāng) workQueue 也滿 , 并且當(dāng)前線程池小于 maxPoolSize, 那么起一個新的 worker 并將該任務(wù)設(shè)為該 worker 執(zhí)行的第一個任務(wù)執(zhí)行 , 大于 maxPoolSize,workQueue 也滿負(fù)荷 , 那么調(diào)用飽和策略里面的行為 .

worker 線程在執(zhí)行完一個任務(wù)之后并不會立刻關(guān)閉 , 而是嘗試著去 workQueue 里面取任務(wù) , 如果取不到 , 根據(jù)策略關(guān)閉或者保持空閑狀態(tài) . 所以 submit 任務(wù)的時候 , 提交的順序為 核心線程池 ------ 工作隊列 ------ 擴(kuò)展線程池 .

池包括核心池
, 擴(kuò)展池 (2 者的線程在同一個 hashset 中,這里只是為了方便才這么稱呼,并不是分離的 ), 核心池在池內(nèi) worker 沒有用完的情況下 , 只要有任務(wù)提交都會創(chuàng)建新的線程 , 其代表線程池正常處理任務(wù)的能力 . 擴(kuò)展池 , 是在核心線程池用完 , 并且工作隊列也已排滿任務(wù)的情況下才會開始初始化線程 , 其代表的是線程池超出正常負(fù)載時的解決方案 , 一旦任務(wù)完成 , 并且試圖從 workQueue 取不到任務(wù) , 那么會比較當(dāng)前線程池與核心線程池的大小 , 大于核心線程池數(shù)的 worker 將被銷毀 .

Runnable?getTask()? {
????
for ?(;;)? {
????????
try ? {
????????????
int ?state? = ?runState;
????????????
// >SHUTDOWN就是STOP或者TERMINATED
????????????
// 直接返回
???????????? if ?(state? > ?SHUTDOWN)
????????????????
return ? null ;
????????????Runnable?r;
????????????
// 如果是SHUTDOWN狀態(tài),那么取任務(wù),如果有
??????????????
// 將剩余任務(wù)執(zhí)行完畢,否則就結(jié)束了
???????????? if ?(state? == ?SHUTDOWN)?? // ?Help?drain?queue
????????????????r? = ?workQueue.poll();
????????????
// 如果不是以上狀態(tài)的(也就是RUNNING狀態(tài)的),那么如果當(dāng)前池大于核心池數(shù)量,
????????????
// 或者允許核心線程池取任務(wù)超時就可以關(guān)閉,那么從任務(wù)隊列取任務(wù),
????????????
// 如果超出keepAliveTime,那么就返回null了,也就意味著這個worker結(jié)束了
???????????? else ? if ?(poolSize? > ?corePoolSize? || ?allowCoreThreadTimeOut)
????????????????r?
= ?workQueue.poll(keepAliveTime,?TimeUnit.NANOSECONDS);
????????????
// 如果當(dāng)前池小于核心池,并且不允許核心線程池取任務(wù)超時就關(guān)閉,那么take(),直到拿到任務(wù)或者被interrupt
???????????? else
????????????????r?
= ?workQueue.take();
????????????
// 如果經(jīng)過以上判定,任務(wù)不為空,那么返回任務(wù)
???????????? if ?(r? != ? null )
????????????????
return ?r;
????????????
// 如果取到任務(wù)為空,那么判定是否可以退出
???????????? if ?(workerCanExit())? {
????????????????
// 如果整個線程池狀態(tài)變?yōu)镾HUTDOWN或者TERMINATED,那么將所有worker?interrupt?(如果正在執(zhí)行,那繼續(xù)讓其執(zhí)行)
???????????????? if ?(runState? >= ?SHUTDOWN)? // ?Wake?up?others
????????????????????interruptIdleWorkers();
????????????????
return ? null ;
????????????}

????????????
// ?Else?retry
????????}
? catch ?(InterruptedException?ie)? {
????????????
// ?On?interruption,?re-check?runState
????????}

}

????}


// worker從workQueue中取不到數(shù)據(jù)的時候調(diào)用此方法,以決定自己是否跳出取任務(wù)的無限循環(huán),從而結(jié)束此worker的運行
private ? boolean ?workerCanExit()? {
????
final ?ReentrantLock?mainLock? = ? this .mainLock;
????mainLock.lock();
????
boolean ?canExit;
????
try ? {
????????
/**/ /*
????????*線程池狀態(tài)為stop或者terminated,
????????*或者任務(wù)隊列里面任務(wù)已經(jīng)為空,
????????*或者允許線程池線程空閑超時(實現(xiàn)方式是從工作隊列拿最多keepAliveTime的任務(wù),超過這個時間就返回null了)并且
?????????*當(dāng)前線程池大于corePoolSize(>1)
????????*那么允許線程結(jié)束
????????*static?final?int?RUNNING????=?0;
????????*static?final?int?SHUTDOWN???=?1;
????????*static?final?int?STOP???????=?2;
????????*static?final?int?TERMINATED?=?3;
????????
*/

????????canExit?
= ?runState? >= ?STOP? ||
????????workQueue.isEmpty()?
||
???????(allowCoreThreadTimeOut?
&&
????????poolSize?
> ?Math.max( 1 ,corePoolSize));
????}
? finally ? {
????????mainLock.unlock();
????}

????
return ?canExit;
}


當(dāng)提交任務(wù)是 , 線程池都已滿 , 并且工作隊列也無空閑位置的情況下 ,ThreadPoolExecutor 會執(zhí)行 reject 操作 ,JDK 提供了四種 reject 策略 , 包括 AbortPolicy( 直接拋 RejectedException Exception),CallerRunsPolicy( 提交任務(wù)線程自己執(zhí)行 , 當(dāng)然這時剩余任務(wù)也將無法提交 ),DiscardOldestPolicy( 將線程池的 workQueue 任務(wù)隊列里面最老的任務(wù)剔除 , 將新任務(wù)丟入 ),DiscardPolicy( 無視 , 忽略此任務(wù) , 并且立即返回 ). 實例化 ThreadPoolExecutor , 如果不指定任何飽和策略 , 默認(rèn)將使用 AbortPolicy.

個人認(rèn)為這些飽和策略并不十分理想
, 特別是在應(yīng)用既要保證快速 , 又要高可用的情況下 , 我的想法是能夠加入超時等待策略 , 也就是提交線程時線程池滿 , 能夠 park 住提交任務(wù)的線程 , 一旦有空閑 , 能在第一時間通知到等待線程 . 這個實際上和主線程執(zhí)行相似 , 但是主線程執(zhí)行期間即使線程池有大量空閑也不會立即可以提交任務(wù) , 效率上后者可能會比較低 , 特別是執(zhí)行慢速任務(wù) .

實例化 Worker 的時候會調(diào)用 ThreadFactory addThread(Runnable r) 方法返回一個 Thread, 這個線程工廠是可以在 ThreadPoolExecutor 實例化的時候指定的 , 如果不指定 , 那么將會使用 DefaultThreadFactory, 這個也就是提供給使用者命名線程 , 線程歸組 , 是否是 demon 等線程相關(guān)屬性設(shè)置的機會 .

beforeExecute afterExecute 是提供給使用者擴(kuò)展的 , 這兩個方法會在 worker runTask 之前和 run 完畢之后分別調(diào)用 .JDK 注釋里 Doug Lea(concurrent 包作者 ) 展示了 beforeExecute 一個很有趣的示例 . 代碼如下 .

class ?PausableThreadPoolExecutor? extends ?ThreadPoolExecutor? {
????
private ? boolean ?isPaused;
????
private ?ReentrantLock?pauseLock? = ? new ?ReentrantLock();
????
private ?Condition?unpaused? = ?pauseLock.newCondition();
?
public ?PausableThreadPoolExecutor( )? {? super ( );?}

protected ? void ?beforeExecute(Thread?t,?Runnable?r)? {
????
super .beforeExecute(t,?r);
????pauseLock.lock();
????
try ? {
????????
while ?(isPaused)?unpaused.await();
????}
? catch ?(InterruptedException?ie)? {
????????t.interrupt();
????}
? finally ? {
????????pauseLock.unlock();
????}

}

?
public ? void ?pause()? {
????pauseLock.lock();
????
try ? {
????????isPaused?
= ? true ;
????}
? finally ? {
????????pauseLock.unlock();
????}

}


public ? void ?resume()? {
????pauseLock.lock();
????
try ? {
????????isPaused?
= ? false ;
????????unpaused.signalAll();
????}
? finally ? {
????????pauseLock.unlock();
????}

}

??}

使用這個線程池 , 用戶可以隨時調(diào)用 pause 中止剩余任務(wù)執(zhí)行 , 當(dāng)然也可以使用 resume 重新開始執(zhí)行剩余任務(wù) .

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor
是一個很實用的類 , 它的實現(xiàn)核心是基于 DelayedWorkQueue. ScheduledThreadPoolExecutor 的繼承結(jié)構(gòu)上來看 , 各位應(yīng)該能夠看出些端倪來 , 就是 ScheduledThreadPoolExecutor ThreadPoolExecutor 中的任務(wù)隊列設(shè)置成了 DelayedWorkQueue, 這也就是說 , 線程池 Worker 從任務(wù)隊列中取的一個任務(wù) , 需要等待這個隊列中最短超時任務(wù)的超時 , 也就是實現(xiàn)定時的效果 . 所以 ScheduledThreadPoolExecutor 所做的工作其實是比較少的 . 主要就是實現(xiàn)任務(wù)的實例化并加入工作隊列 , 以及支持 scheduleAtFixedRate scheduleAtFixedDelay 這種周期性任務(wù)執(zhí)行 .

public ?ScheduledThreadPoolExecutor( int ?corePoolSize,ThreadFactory?threadFactory)? {
???????????
super (corePoolSize,?Integer.MAX_VALUE,? 0 ,?TimeUnit.NANOSECONDS, new ?DelayedWorkQueue(),?threadFactory);
}

對于 scheduleAfFixedRate scheduleAtFiexedDelay 這種周期性任務(wù)支持 , 是由 ScheduledThreadPoolExecutor 內(nèi)部封裝任務(wù)的 ScheduledFutureTask 來實現(xiàn)的 . 這個類在執(zhí)行任務(wù)后 , 對于周期性任務(wù) , 它會處理周期時間 , 并將自己再次丟入線程池的工作隊列 , 從而達(dá)到周期執(zhí)行的目的 .
private ? void ?runPeriodic()? {
?????????
boolean ?ok? = ?ScheduledFutureTask. super .runAndReset();
???????? ?
boolean ?down? = ?isShutdown();
?????????
// ?Reschedule?if?not?cancelled?and?not?shutdown?or?policy?allows
????? if ?(ok? && ?( ! down? || (getContinueExistingPeriodicTasksAfterShutdownPolicy()? && ? ! isStopped())))? {
???????????????
long ?p? = ?period;
???????????????
if ?(p? > ? 0 )
??????????? ????????? time?
+= ?p;
???????????????
else
??????????? ????????? time?
= ?triggerTime( - p);
?????
??????????????? ScheduledThreadPoolExecutor.
super .getQueue().add( this );
?????????}

????????
// ?This?might?have?been?the?final?executed?delayed
???????
// ?task.??Wake?up?threads?to?check.
??????? else ? if ?(down)
???????????? ?interruptIdleWorkers();
}

?

2. ?????? CompletionService

JAVA線程池代碼淺析
ExecutorCompletionService

CompletionService 定義了線程池執(zhí)行任務(wù)集 , 可以依次拿到任務(wù)執(zhí)行完畢的 Future,ExecutorCompletionService 是其實現(xiàn)類 , 先舉個例子 , 如下代碼 , 這個例子中 , 需要注意 ThreadPoolExecutor 核心池一定保證能夠讓任務(wù)提交并且馬上執(zhí)行 , 而不是放到等待隊列中去 , 那樣次序?qū)o法控制 ,CompletionService 也將失去效果 ( 其實核心池中的任務(wù)完成順序還是準(zhǔn)確的 ).

public ? static ? void ?main(String[]?args)? throws ?InterruptedException,?ExecutionException {
????ThreadPoolExecutor?es
= new ?ThreadPoolExecutor( 10 ,? 15 ,? 2000 ,?TimeUnit.MILLISECONDS,? new ?ArrayBlockingQueue < Runnable > ( 10 ), new ?ThreadPoolExecutor.AbortPolicy());
????CompletionService
< String > ?cs = new ?ExecutorCompletionService < String > (es);????
????cs.submit(
new ?Callable < String > ()? {
?????@Override
?????
public ?String?call()? throws ?Exception? Codehi
分享到:
評論

JAVA線程池代碼淺析


更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯(lián)系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發(fā)表我的評論
最新評論 總共0條評論