1. 功能說明
2. 多線程任務(wù)示例
2.1 線程池
2.2 單個(gè)任務(wù)
2.3 任務(wù)入口
2.4 結(jié)果分析
2.5 源碼地址
3. 寫在最后
大家好,今天教大家擼一個(gè) Java 的多線程永動(dòng)任務(wù),這個(gè)示例的原型是公司自研的多線程異步任務(wù)項(xiàng)目 ,我把里面涉及到多線程的代碼抽離出來,然后進(jìn)行一定的改造。
里面涉及的知識(shí)點(diǎn)非常多,特別適合有一定工作經(jīng)驗(yàn) 的同學(xué)學(xué)習(xí),或者可以直接拿到項(xiàng)目中使用。
文章結(jié)構(gòu)非常簡(jiǎn)單:
1. 功能說明
做這個(gè)多線程異步任務(wù),主要是因?yàn)槲覀冇泻芏嘤绖?dòng)的異步任務(wù),什么是永動(dòng)呢?就是任務(wù)跑起來后,需要一直跑下去。
比如消息 Push 任務(wù),因?yàn)橐恢庇邢⑦^來,所以需要一直去消費(fèi) DB 中的未推送消息,就需要整一個(gè) Push 的永動(dòng)異步任務(wù)。
我們的需求其實(shí)不難,簡(jiǎn)單總結(jié)一下:
能同時(shí)執(zhí)行多個(gè)永動(dòng)的異步任務(wù) ;
每個(gè)異步任務(wù),支持開多個(gè)線程 去消費(fèi)這個(gè)任務(wù)的數(shù)據(jù);
支持永動(dòng)異步任務(wù)的優(yōu)雅關(guān)閉 ,即關(guān)閉后,需要把所有的數(shù)據(jù)消費(fèi)完畢后,再關(guān)閉。
完成上面的需求,需要注意幾個(gè)點(diǎn):
每個(gè)永動(dòng)任務(wù) ,可以開一個(gè)線程去執(zhí)行;
每個(gè)子任務(wù) ,因?yàn)樾枰С植l(fā),需要用線程池控制;
永動(dòng)任務(wù)的關(guān)閉,需要通知子任務(wù)的并發(fā)線程,并支持永動(dòng)任務(wù)和并發(fā)子任務(wù)的優(yōu)雅關(guān)閉 。
2. 多線程任務(wù)示例
2.1 線程池
對(duì)于子任務(wù),需要支持并發(fā),如果每個(gè)并發(fā)都開一個(gè)線程,用完就關(guān)閉,對(duì)資源消耗太大,所以引入線程池:
publicclassTaskProcessUtil{ //每個(gè)任務(wù),都有自己?jiǎn)为?dú)的線程池 privatestaticMapexecutors=newConcurrentHashMap<>(); //初始化一個(gè)線程池 privatestaticExecutorServiceinit(StringpoolName,intpoolSize){ returnnewThreadPoolExecutor(poolSize,poolSize, 0L,TimeUnit.MILLISECONDS, newLinkedBlockingQueue (), newThreadFactoryBuilder().setNameFormat("Pool-"+poolName).setDaemon(false).build(), newThreadPoolExecutor.CallerRunsPolicy()); } //獲取線程池 publicstaticExecutorServicegetOrInitExecutors(StringpoolName,intpoolSize){ ExecutorServiceexecutorService=executors.get(poolName); if(null==executorService){ synchronized(TaskProcessUtil.class){ executorService=executors.get(poolName); if(null==executorService){ executorService=init(poolName,poolSize); executors.put(poolName,executorService); } } } returnexecutorService; } //回收線程資源 publicstaticvoidreleaseExecutors(StringpoolName){ ExecutorServiceexecutorService=executors.remove(poolName); if(executorService!=null){ executorService.shutdown(); } } }
這是一個(gè)線程池的工具類,這里初始化線程池和回收線程資源很簡(jiǎn)單,我們主要討論獲取線程池。
獲取線程池可能會(huì)存在并發(fā)情況,所以需要加一個(gè) synchronized 鎖,然后鎖住后,需要對(duì) executorService 進(jìn)行二次判空校驗(yàn)。
2.2 單個(gè)任務(wù)
為了更好講解單個(gè)任務(wù)的實(shí)現(xiàn)方式,我們的任務(wù)主要就是把 Cat 的數(shù)據(jù)打印出來,Cat 定義如下:
@Data @Service publicclassCat{ privateStringcatName; publicCatsetCatName(Stringname){ this.catName=name; returnthis; } }
單個(gè)任務(wù)主要包括以下功能:
獲取永動(dòng)任務(wù)數(shù)據(jù) :這里一般都是掃描 DB,我直接就簡(jiǎn)單用 queryData() 代替。
多線程執(zhí)行任務(wù) :需要把數(shù)據(jù)拆分成 4 份,然后分別由多線程并發(fā)執(zhí)行,這里可以通過線程池支持;
永動(dòng)任務(wù)優(yōu)雅停機(jī) :當(dāng)外面通知任務(wù)需要停機(jī),需要執(zhí)行完剩余任務(wù)數(shù)據(jù),并回收線程資源,退出任務(wù);
永動(dòng)執(zhí)行 :如果未收到停機(jī)命令,任務(wù)需要一直執(zhí)行下去。
直接看代碼:
publicclassChildTask{ privatefinalintPOOL_SIZE=3;//線程池大小 privatefinalintSPLIT_SIZE=4;//數(shù)據(jù)拆分大小 privateStringtaskName; //接收jvm關(guān)閉信號(hào),實(shí)現(xiàn)優(yōu)雅停機(jī) protectedvolatilebooleanterminal=false; publicChildTask(StringtaskName){ this.taskName=taskName; } //程序執(zhí)行入口 publicvoiddoExecute(){ inti=0; while(true){ System.out.println(taskName+":Cycle-"+i+"-Begin"); //獲取數(shù)據(jù) Listdatas=queryData(); //處理數(shù)據(jù) taskExecute(datas); System.out.println(taskName+":Cycle-"+i+"-End"); if(terminal){ //只有應(yīng)用關(guān)閉,才會(huì)走到這里,用于實(shí)現(xiàn)優(yōu)雅的下線 break; } i++; } //回收線程池資源 TaskProcessUtil.releaseExecutors(taskName); } //優(yōu)雅停機(jī) publicvoidterminal(){ //關(guān)機(jī) terminal=true; System.out.println(taskName+"shutdown"); } //處理數(shù)據(jù) privatevoiddoProcessData(List datas,CountDownLatchlatch){ try{ for(Catcat:datas){ System.out.println(taskName+":"+cat.toString()+",ThreadName:"+Thread.currentThread().getName()); Thread.sleep(1000L); } }catch(Exceptione){ System.out.println(e.getStackTrace()); }finally{ if(latch!=null){ latch.countDown(); } } } //處理單個(gè)任務(wù)數(shù)據(jù) privatevoidtaskExecute(List sourceDatas){ if(CollectionUtils.isEmpty(sourceDatas)){ return; } //將數(shù)據(jù)拆成4份 List >splitDatas=Lists.partition(sourceDatas,SPLIT_SIZE); finalCountDownLatchlatch=newCountDownLatch(splitDatas.size()); //并發(fā)處理拆分的數(shù)據(jù),共用一個(gè)線程池 for(finalList
datas:splitDatas){ ExecutorServiceexecutorService=TaskProcessUtil.getOrInitExecutors(taskName,POOL_SIZE); executorService.submit(newRunnable(){ @Override publicvoidrun(){ doProcessData(datas,latch); } }); } try{ latch.await(); }catch(Exceptione){ System.out.println(e.getStackTrace()); } } //獲取永動(dòng)任務(wù)數(shù)據(jù) privateList queryData(){ List datas=newArrayList<>(); for(inti=0;i5;?i?++)?{ ????????????datas.add(new?Cat().setCatName("羅小黑"?+?i)); ????????} ????????return?datas; ????} }
簡(jiǎn)單解釋一下:
queryData :用于獲取數(shù)據(jù),實(shí)際應(yīng)用中其實(shí)是需要把 queryData 定為抽象方法,然后由各個(gè)任務(wù)實(shí)現(xiàn)自己的方法。
doProcessData :數(shù)據(jù)處理邏輯,實(shí)際應(yīng)用中其實(shí)是需要把 doProcessData 定為抽象方法,然后由各個(gè)任務(wù)實(shí)現(xiàn)自己的方法。
taskExecute :將數(shù)據(jù)拆分成 4 份,獲取該任務(wù)的線程池,并交給線程池并發(fā)執(zhí)行,然后通過 latch.await() 阻塞。當(dāng)這 4 份數(shù)據(jù)都執(zhí)行成功后,阻塞結(jié)束,該方法才返回。
terminal :僅用于接受停機(jī)命令,這里該變量定義為 volatile,所以多線程內(nèi)存可見;
doExecute :程序執(zhí)行入口,封裝了每個(gè)任務(wù)執(zhí)行的流程,當(dāng) terminal=true 時(shí),先執(zhí)行完任務(wù)數(shù)據(jù),然后回收線程池,最后退出。
2.3 任務(wù)入口
直接上代碼:
publicclassLoopTask{ privateListchildTasks; publicvoidinitLoopTask(){ childTasks=newArrayList(); childTasks.add(newChildTask("childTask1")); childTasks.add(newChildTask("childTask2")); for(finalChildTaskchildTask:childTasks){ newThread(newRunnable(){ @Override publicvoidrun(){ childTask.doExecute(); } }).start(); } } publicvoidshutdownLoopTask(){ if(!CollectionUtils.isEmpty(childTasks)){ for(ChildTaskchildTask:childTasks){ childTask.terminal(); } } } publicstaticvoidmain(Stringargs[])throwsException{ LoopTaskloopTask=newLoopTask(); loopTask.initLoopTask(); Thread.sleep(5000L); loopTask.shutdownLoopTask(); } }
每個(gè)任務(wù)都開一個(gè)單獨(dú)的 Thread,這里我初始化了 2 個(gè)永動(dòng)任務(wù),分別為 childTask1 和 childTask2,然后分別執(zhí)行,后面 Sleep 了 5 秒后,再關(guān)閉任務(wù),我們可以看看是否可以按照我們的預(yù)期優(yōu)雅退出。
2.4 結(jié)果分析
執(zhí)行結(jié)果如下:
childTask1:Cycle-0-Begin childTask2:Cycle-0-Begin childTask1:Cat(catName=羅小黑0),ThreadName:Pool-childTask1 childTask1:Cat(catName=羅小黑4),ThreadName:Pool-childTask1 childTask2:Cat(catName=羅小黑4),ThreadName:Pool-childTask2 childTask2:Cat(catName=羅小黑0),ThreadName:Pool-childTask2 childTask1:Cat(catName=羅小黑1),ThreadName:Pool-childTask1 childTask2:Cat(catName=羅小黑1),ThreadName:Pool-childTask2 childTask2:Cat(catName=羅小黑2),ThreadName:Pool-childTask2 childTask1:Cat(catName=羅小黑2),ThreadName:Pool-childTask1 childTask2:Cat(catName=羅小黑3),ThreadName:Pool-childTask2 childTask1:Cat(catName=羅小黑3),ThreadName:Pool-childTask1 childTask2:Cycle-0-End childTask2:Cycle-1-Begin childTask1:Cycle-0-End childTask1:Cycle-1-Begin childTask2:Cat(catName=羅小黑0),ThreadName:Pool-childTask2 childTask2:Cat(catName=羅小黑4),ThreadName:Pool-childTask2 childTask1:Cat(catName=羅小黑4),ThreadName:Pool-childTask1 childTask1:Cat(catName=羅小黑0),ThreadName:Pool-childTask1 childTask1shutdown childTask2shutdown childTask2:Cat(catName=羅小黑1),ThreadName:Pool-childTask2 childTask1:Cat(catName=羅小黑1),ThreadName:Pool-childTask1 childTask1:Cat(catName=羅小黑2),ThreadName:Pool-childTask1 childTask2:Cat(catName=羅小黑2),ThreadName:Pool-childTask2 childTask1:Cat(catName=羅小黑3),ThreadName:Pool-childTask1 childTask2:Cat(catName=羅小黑3),ThreadName:Pool-childTask2 childTask1:Cycle-1-End childTask2:Cycle-1-End
輸出數(shù)據(jù):
“Pool-childTask” 是線程池名稱;
“childTask” 是任務(wù)名稱;
“Cat(catName=羅小黑)” 是執(zhí)行的結(jié)果;
“childTask shut down” 是關(guān)閉標(biāo)記;
“childTask:Cycle-X-Begin” 和“childTask:Cycle-X-End” 是每一輪循環(huán)的開始和結(jié)束標(biāo)記。
我們分析一下執(zhí)行結(jié)果:
childTask1 和 childTask2 分別執(zhí)行,在第一輪循環(huán)中都正常輸出了 5 條羅小黑數(shù)據(jù);
第二輪執(zhí)行過程中,我啟動(dòng)了關(guān)閉指令,這次第二輪執(zhí)行沒有直接停止,而是先執(zhí)行完任務(wù)中的數(shù)據(jù),再執(zhí)行退出,所以完全符合我們的優(yōu)雅退出結(jié)論。
2.5 源碼地址
GitHub 地址:
https://github.com/lml200701158/java-study/tree/master/src/main/java/com/java/parallel/pool/ofc
3. 寫在最后
對(duì)于這個(gè)經(jīng)典的線程池使用示例,原項(xiàng)目是我好友一灰 寫的,技術(shù)水平阿里 P7級(jí)別,實(shí)現(xiàn)得也非常優(yōu)雅,涉及的知識(shí)點(diǎn)非常多 ,非常值得大家學(xué)習(xí)。
-
JAVA
+關(guān)注
關(guān)注
19文章
2966瀏覽量
104701 -
編程
+關(guān)注
關(guān)注
88文章
3614瀏覽量
93685 -
多線程
+關(guān)注
關(guān)注
0文章
278瀏覽量
19943 -
代碼
+關(guān)注
關(guān)注
30文章
4779瀏覽量
68521 -
Thread
+關(guān)注
關(guān)注
2文章
83瀏覽量
25923
原文標(biāo)題:新來個(gè)阿里 P7,僅花 2 小時(shí),擼出一個(gè)多線程永動(dòng)任務(wù),看完直接跪了,真牛逼!
文章出處:【微信號(hào):芋道源碼,微信公眾號(hào):芋道源碼】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論