新聞中心
前言
線程是稀缺資源,如果被無限制的創(chuàng)建,不僅會消耗系統(tǒng)資源,還會降低系統(tǒng)的穩(wěn)定性,合理的使用線程池對線程進(jìn)行統(tǒng)一分配、調(diào)優(yōu)和監(jiān)控,有以下好處:
1、降低資源消耗;
2、提高響應(yīng)速度;
3、提高線程的可管理性。
Java1.5中引入的Executor框架把任務(wù)的提交和執(zhí)行進(jìn)行解耦,只需要定義好任務(wù),然后提交給線程池,而不用關(guān)心該任務(wù)是如何執(zhí)行、被哪個(gè)線程執(zhí)行,以及什么時(shí)候執(zhí)行。
demo
1、Executors.newFixedThreadPool(10)初始化一個(gè)包含10個(gè)線程的線程池executor;
2、通過executor.execute方法提交20個(gè)任務(wù),每個(gè)任務(wù)打印當(dāng)前的線程名;
3、負(fù)責(zé)執(zhí)行任務(wù)的線程的生命周期都由Executor框架進(jìn)行管理;
ThreadPoolExecutor
Executors是java線程池的工廠類,通過它可以快速初始化一個(gè)符合業(yè)務(wù)需求的線程池,如Executors.newFixedThreadPool方法可以生成一個(gè)擁有固定線程數(shù)的線程池。
其本質(zhì)是通過不同的參數(shù)初始化一個(gè)ThreadPoolExecutor對象,具體參數(shù)描述如下:
corePoolSize
線程池中的核心線程數(shù),當(dāng)提交一個(gè)任務(wù)時(shí),線程池創(chuàng)建一個(gè)新線程執(zhí)行任務(wù),直到當(dāng)前線程數(shù)等于corePoolSize;如果當(dāng)前線程數(shù)為corePoolSize,繼續(xù)提交的任務(wù)被保存到阻塞隊(duì)列中,等待被執(zhí)行;如果執(zhí)行了線程池的prestartAllCoreThreads()方法,線程池會提前創(chuàng)建并啟動所有核心線程。
maximumPoolSize
線程池中允許的大線程數(shù)。如果當(dāng)前阻塞隊(duì)列滿了,且繼續(xù)提交任務(wù),則創(chuàng)建新的線程執(zhí)行任務(wù),前提是當(dāng)前線程數(shù)小于maximumPoolSize;
keepAliveTime
線程空閑時(shí)的存活時(shí)間,即當(dāng)線程沒有任務(wù)執(zhí)行時(shí),繼續(xù)存活的時(shí)間;默認(rèn)情況下,該參數(shù)只在線程數(shù)大于corePoolSize時(shí)才有用;
unit
keepAliveTime的單位;
workQueue
用來保存等待被執(zhí)行的任務(wù)的阻塞隊(duì)列,且任務(wù)必須實(shí)現(xiàn)Runable接口,在JDK中提供了如下阻塞隊(duì)列:
1、ArrayBlockingQueue:基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列,按FIFO排序任務(wù);
2、LinkedBlockingQuene:基于鏈表結(jié)構(gòu)的阻塞隊(duì)列,按FIFO排序任務(wù),吞吐量通常要高于ArrayBlockingQuene;
3、SynchronousQuene:一個(gè)不存儲元素的阻塞隊(duì)列,每個(gè)插入操作必須等到另一個(gè)線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于LinkedBlockingQuene;
4、priorityBlockingQuene:具有優(yōu)先級的×××阻塞隊(duì)列;
threadFactory
創(chuàng)建線程的工廠,通過自定義的線程工廠可以給每個(gè)新建的線程設(shè)置一個(gè)具有識別度的線程名。
handler
線程池的飽和策略,當(dāng)阻塞隊(duì)列滿了,且沒有空閑的工作線程,如果繼續(xù)提交任務(wù),必須采取一種策略處理該任務(wù),線程池提供了4種策略:
1、AbortPolicy:直接拋出異常,默認(rèn)策略;
2、CallerRunsPolicy:用調(diào)用者所在的線程來執(zhí)行任務(wù);
3、DiscardOldestPolicy:丟棄阻塞隊(duì)列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù);
4、DiscardPolicy:直接丟棄任務(wù);
當(dāng)然也可以根據(jù)應(yīng)用場景實(shí)現(xiàn)RejectedExecutionHandler接口,自定義飽和策略,如記錄日志或持久化存儲不能處理的任務(wù)。
Exectors
Exectors工廠類提供了線程池的初始化接口,主要有如下幾種:
newFixedThreadPool
初始化一個(gè)指定線程數(shù)的線程池,其中corePoolSize == maximumPoolSize,使用LinkedBlockingQuene作為阻塞隊(duì)列,不過當(dāng)線程池沒有可執(zhí)行任務(wù)時(shí),也不會釋放線程。
newCachedThreadPool
1、初始化一個(gè)可以緩存線程的線程池,默認(rèn)緩存60s,線程池的線程數(shù)可達(dá)到Integer.MAX_VALUE,即2147483647,內(nèi)部使用SynchronousQueue作為阻塞隊(duì)列;
2、和newFixedThreadPool創(chuàng)建的線程池不同,newCachedThreadPool在沒有任務(wù)執(zhí)行時(shí),當(dāng)線程的空閑時(shí)間超過keepAliveTime,會自動釋放線程資源,當(dāng)提交新任務(wù)時(shí),如果沒有空閑線程,則創(chuàng)建新線程執(zhí)行任務(wù),會導(dǎo)致一定的系統(tǒng)開銷;
所以,使用該線程池時(shí),一定要注意控制并發(fā)的任務(wù)數(shù),否則創(chuàng)建大量的線程可能導(dǎo)致嚴(yán)重的性能問題。
newSingleThreadExecutor
初始化的線程池中只有一個(gè)線程,如果該線程異常結(jié)束,會重新創(chuàng)建一個(gè)新的線程繼續(xù)執(zhí)行任務(wù),唯一的線程可以保證所提交任務(wù)的順序執(zhí)行,內(nèi)部使用LinkedBlockingQueue作為阻塞隊(duì)列。
newScheduledThreadPool
初始化的線程池可以在指定的時(shí)間內(nèi)周期性的執(zhí)行所提交的任務(wù),在實(shí)際的業(yè)務(wù)場景中可以使用該線程池定期的同步數(shù)據(jù)。
實(shí)現(xiàn)原理
除了newScheduledThreadPool的內(nèi)部實(shí)現(xiàn)特殊一點(diǎn)之外,其它幾個(gè)線程池都是基于ThreadPoolExecutor類實(shí)現(xiàn)的。
線程池內(nèi)部狀態(tài)
其中AtomicInteger變量ctl的功能非常強(qiáng)大:利用低29位表示線程池中線程數(shù),通過高3位表示線程池的運(yùn)行狀態(tài):
1、RUNNING:-1 << COUNT_BITS,即高3位為111,該狀態(tài)的線程池會接收新任務(wù),并處理阻塞隊(duì)列中的任務(wù);
2、SHUTDOWN: 0 << COUNT_BITS,即高3位為000,該狀態(tài)的線程池不會接收新任務(wù),但會處理阻塞隊(duì)列中的任務(wù);
3、STOP : 1 << COUNT_BITS,即高3位為001,該狀態(tài)的線程不會接收新任務(wù),也不會處理阻塞隊(duì)列中的任務(wù),而且會中斷正在運(yùn)行的任務(wù);
4、TIDYING : 2 << COUNT_BITS,即高3位為010;
5、TERMINATED: 3 << COUNT_BITS,即高3位為011;
任務(wù)提交
線程池框架提供了兩種方式提交任務(wù),根據(jù)不同的業(yè)務(wù)需求選擇不同的方式。
Executor.execute()
通過Executor.execute()方法提交的任務(wù),必須實(shí)現(xiàn)Runnable接口,該方式提交的任務(wù)不能獲取返回值,因此無法判斷任務(wù)是否執(zhí)行成功。
ExecutorService.submit()
通過ExecutorService.submit()方法提交的任務(wù),可以獲取任務(wù)執(zhí)行完的返回值。
任務(wù)執(zhí)行
當(dāng)向線程池中提交一個(gè)任務(wù),線程池會如何處理該任務(wù)?
execute實(shí)現(xiàn)
具體的執(zhí)行流程如下:
1、workerCountOf方法根據(jù)ctl的低29位,得到線程池的當(dāng)前線程數(shù),如果線程數(shù)小于corePoolSize,則執(zhí)行addWorker方法創(chuàng)建新的線程執(zhí)行任務(wù);否則執(zhí)行步驟(2);
2、如果線程池處于RUNNING狀態(tài),且把提交的任務(wù)成功放入阻塞隊(duì)列中,則執(zhí)行步驟(3),否則執(zhí)行步驟(4);
3、再次檢查線程池的狀態(tài),如果線程池沒有RUNNING,且成功從阻塞隊(duì)列中刪除任務(wù),則執(zhí)行reject方法處理任務(wù);
4、執(zhí)行addWorker方法創(chuàng)建新的線程執(zhí)行任務(wù),如果addWoker執(zhí)行失敗,則執(zhí)行reject方法處理任務(wù);
addWorker實(shí)現(xiàn)
從方法execute的實(shí)現(xiàn)可以看出:addWorker主要負(fù)責(zé)創(chuàng)建新的線程并執(zhí)行任務(wù),代碼實(shí)現(xiàn)如下:
這只是addWoker方法實(shí)現(xiàn)的前半部分:
1、判斷線程池的狀態(tài),如果線程池的狀態(tài)值大于或等SHUTDOWN,則不處理提交的任務(wù),直接返回;
2、通過參數(shù)core判斷當(dāng)前需要創(chuàng)建的線程是否為核心線程,如果core為true,且當(dāng)前線程數(shù)小于corePoolSize,則跳出循環(huán),開始創(chuàng)建新的線程,具體實(shí)現(xiàn)如下:
線程池的工作線程通過Woker類實(shí)現(xiàn),在ReentrantLock鎖的保證下,把Woker實(shí)例插入到HashSet后,并啟動Woker中的線程,其中Worker類設(shè)計(jì)如下:
1、繼承了AQS類,可以方便的實(shí)現(xiàn)工作線程的中止操作;
2、實(shí)現(xiàn)了Runnable接口,可以將自身作為一個(gè)任務(wù)在工作線程中執(zhí)行;
3、當(dāng)前提交的任務(wù)firstTask作為參數(shù)傳入Worker的構(gòu)造方法;
從Woker類的構(gòu)造方法實(shí)現(xiàn)可以發(fā)現(xiàn):線程工廠在創(chuàng)建線程thread時(shí),將Woker實(shí)例本身this作為參數(shù)傳入,當(dāng)執(zhí)行start方法啟動線程thread時(shí),本質(zhì)是執(zhí)行了Worker的runWorker方法。
runWorker實(shí)現(xiàn)
runWorker方法是線程池的核心:
1、線程啟動之后,通過unlock方法釋放鎖,設(shè)置AQS的state為0,表示運(yùn)行中斷;
2、獲取第一個(gè)任務(wù)firstTask,執(zhí)行任務(wù)的run方法,不過在執(zhí)行任務(wù)之前,會進(jìn)行加鎖操作,任務(wù)執(zhí)行完會釋放鎖;
3、在執(zhí)行任務(wù)的前后,可以根據(jù)業(yè)務(wù)場景自定義beforeExecute和afterExecute方法;
4、firstTask執(zhí)行完成之后,通過getTask方法從阻塞隊(duì)列中獲取等待的任務(wù),如果隊(duì)列中沒有任務(wù),getTask方法會被阻塞并掛起,不會占用cpu資源;
getTask實(shí)現(xiàn)
整個(gè)getTask操作在自旋下完成:
1、workQueue.take:如果阻塞隊(duì)列為空,當(dāng)前線程會被掛起等待;當(dāng)隊(duì)列中有任務(wù)加入時(shí),線程被喚醒,take方法返回任務(wù),并執(zhí)行;
2、workQueue.poll:如果在keepAliveTime時(shí)間內(nèi),阻塞隊(duì)列還是沒有任務(wù),則返回null;
所以,線程池中實(shí)現(xiàn)的線程可以一直執(zhí)行由用戶提交的任務(wù)。
Future和Callable實(shí)現(xiàn)
通過ExecutorService.submit()方法提交的任務(wù),可以獲取任務(wù)執(zhí)行完的返回值。
在實(shí)際業(yè)務(wù)場景中,F(xiàn)uture和Callable基本是成對出現(xiàn)的,Callable負(fù)責(zé)產(chǎn)生結(jié)果,F(xiàn)uture負(fù)責(zé)獲取結(jié)果。
1、Callable接口類似于Runnable,只是Runnable沒有返回值。
2、Callable任務(wù)除了返回正常結(jié)果之外,如果發(fā)生異常,該異常也會被返回,即Future可以拿到異步執(zhí)行任務(wù)各種結(jié)果;
3、Future.get方法會導(dǎo)致主線程阻塞,直到Callable任務(wù)執(zhí)行完成;
submit實(shí)現(xiàn)
通過submit方法提交的Callable任務(wù)會被封裝成了一個(gè)FutureTask對象。
FutureTask
1、FutureTask在不同階段擁有不同的狀態(tài)state,初始化為NEW;
2、FutureTask類實(shí)現(xiàn)了Runnable接口,這樣就可以通過Executor.execute方法提交FutureTask到線程池中等待被執(zhí)行,最終執(zhí)行的是FutureTask的run方法;
FutureTask.get實(shí)現(xiàn)
內(nèi)部通過awaitDone方法對主線程進(jìn)行阻塞,具體實(shí)現(xiàn)如下:
1、如果主線程被中斷,則拋出中斷異常;
2、判斷FutureTask當(dāng)前的state,如果大于COMPLETING,說明任務(wù)已經(jīng)執(zhí)行完成,則直接返回;
3、如果當(dāng)前state等于COMPLETING,說明任務(wù)已經(jīng)執(zhí)行完,這時(shí)主線程只需通過yield方法讓出cpu資源,等待state變成NORMAL;
4、通過WaitNode類封裝當(dāng)前線程,并通過UNSAFE添加到waiters鏈表;
5、最終通過LockSupport的park或parkNanos掛起線程;
FutureTask.run實(shí)現(xiàn)
FutureTask.run方法是在線程池中被執(zhí)行的,而非主線程
1、通過執(zhí)行Callable任務(wù)的call方法;
2、如果call執(zhí)行成功,則通過set方法保存結(jié)果;
3、如果call執(zhí)行有異常,則通過setException保存異常;
set
setException
set和setException方法中,都會通過UnSAFE修改FutureTask的狀態(tài),并執(zhí)行finishCompletion方法通知主線程任務(wù)已經(jīng)執(zhí)行完成;
finishCompletion
1、執(zhí)行FutureTask類的get方法時(shí),會把主線程封裝成WaitNode節(jié)點(diǎn)并保存在waiters鏈表中;
2、FutureTask任務(wù)執(zhí)行完成后,通過UNSAFE設(shè)置waiters的值,并通過LockSupport類unpark方法喚醒主線程;
覺得不錯(cuò)請點(diǎn)贊支持,歡迎留言或進(jìn)我的個(gè)人群855801563領(lǐng)取【架構(gòu)資料專題目合集90期】、【BATJTMD大廠JAVA面試真題1000+】,本群專用于學(xué)習(xí)交流技術(shù)、分享面試機(jī)會,拒絕廣告,我也會在群內(nèi)不定期答題、探討。
創(chuàng)新互聯(lián)www.cdcxhl.cn,專業(yè)提供香港、美國云服務(wù)器,動態(tài)BGP最優(yōu)骨干路由自動選擇,持續(xù)穩(wěn)定高效的網(wǎng)絡(luò)助力業(yè)務(wù)部署。公司持有工信部辦法的idc、isp許可證, 機(jī)房獨(dú)有T級流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確進(jìn)行流量調(diào)度,確保服務(wù)器高可用性。佳節(jié)活動現(xiàn)已開啟,新人活動云服務(wù)器買多久送多久。
本文名稱:深入分析java線程池的實(shí)現(xiàn)原理-創(chuàng)新互聯(lián)
本文地址:http://www.ef60e0e.cn/article/deicse.html