Spring Boot 自定義線程池實(shí)現(xiàn)異步開發(fā)相信看過陳某的文章都了解,但是在實(shí)際開發(fā)中需要在父子線程之間傳遞一些數(shù)據(jù),比如用戶信息,鏈路信息等等
比如用戶登錄信息使用ThreadLocal存放保證線程隔離,代碼如下:
/** *@description用戶上下文信息 */ publicclassOauthContext{ privatestaticfinalThreadLocalloginValThreadLocal=newThreadLocal<>(); publicstaticLoginValget(){ returnloginValThreadLocal.get(); } publicstaticvoidset(LoginValloginVal){ loginValThreadLocal.set(loginVal); } publicstaticvoidclear(){ loginValThreadLocal.remove(); } }
那么子線程想要獲取這個(gè)LoginVal如何做呢?
今天就來介紹幾種優(yōu)雅的方式實(shí)現(xiàn)Spring Boot 內(nèi)部的父子線程的數(shù)據(jù)傳遞。
1. 手動(dòng)設(shè)置
每執(zhí)行一次異步線程都要分為兩步:
獲取父線程的LoginVal
將LoginVal設(shè)置到子線程,達(dá)到復(fù)用
代碼如下:
publicvoidhandlerAsync(){ //1.獲取父線程的loginVal LoginValloginVal=OauthContext.get(); log.info("父線程的值:{}",OauthContext.get()); CompletableFuture.runAsync(()->{ //2.設(shè)置子線程的值,復(fù)用 OauthContext.set(loginVal); log.info("子線程的值:{}",OauthContext.get()); }); }
雖然能夠?qū)崿F(xiàn)目的,但是每次開異步線程都需要手動(dòng)設(shè)置,重復(fù)代碼太多,看了頭疼,你認(rèn)為優(yōu)雅嗎?
基于 Spring Boot + MyBatis Plus + Vue & Element 實(shí)現(xiàn)的后臺管理系統(tǒng) + 用戶小程序,支持 RBAC 動(dòng)態(tài)權(quán)限、多租戶、數(shù)據(jù)權(quán)限、工作流、三方登錄、支付、短信、商城等功能
項(xiàng)目地址:https://github.com/YunaiV/ruoyi-vue-pro
2. 線程池設(shè)置TaskDecorator
TaskDecorator是什么?官方api的大致意思:這是一個(gè)執(zhí)行回調(diào)方法的裝飾器,主要應(yīng)用于傳遞上下文,或者提供任務(wù)的監(jiān)控/統(tǒng)計(jì)信息。
知道有這么一個(gè)東西,如何去使用?
TaskDecorator是一個(gè)接口,首先需要去實(shí)現(xiàn)它,代碼如下:
/** *@description上下文裝飾器 */ publicclassContextTaskDecoratorimplementsTaskDecorator{ @Override publicRunnabledecorate(Runnablerunnable){ //獲取父線程的loginVal LoginValloginVal=OauthContext.get(); return()->{ try{ //將主線程的請求信息,設(shè)置到子線程中 OauthContext.set(loginVal); //執(zhí)行子線程,這一步不要忘了 runnable.run(); }finally{ //線程結(jié)束,清空這些信息,否則可能造成內(nèi)存泄漏 OauthContext.clear(); } }; } }
這里我只是設(shè)置了LoginVal,實(shí)際開發(fā)中其他的共享數(shù)據(jù),比如SecurityContext,RequestAttributes....
TaskDecorator需要結(jié)合線程池使用,實(shí)際開發(fā)中異步線程建議使用線程池,只需要在對應(yīng)的線程池配置一下,代碼如下:
@Bean("taskExecutor") publicThreadPoolTaskExecutortaskExecutor(){ ThreadPoolTaskExecutorpoolTaskExecutor=newThreadPoolTaskExecutor(); poolTaskExecutor.setCorePoolSize(xx); poolTaskExecutor.setMaxPoolSize(xx); //設(shè)置線程活躍時(shí)間(秒) poolTaskExecutor.setKeepAliveSeconds(xx); //設(shè)置隊(duì)列容量 poolTaskExecutor.setQueueCapacity(xx); //設(shè)置TaskDecorator,用于解決父子線程間的數(shù)據(jù)復(fù)用 poolTaskExecutor.setTaskDecorator(newContextTaskDecorator()); poolTaskExecutor.setRejectedExecutionHandler(newThreadPoolExecutor.CallerRunsPolicy()); //等待所有任務(wù)結(jié)束后再關(guān)閉線程池 poolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true); returnpoolTaskExecutor; }
此時(shí)業(yè)務(wù)代碼就不需要去設(shè)置子線程的值,直接使用即可,代碼如下:
publicvoidhandlerAsync(){ log.info("父線程的用戶信息:{}",OauthContext.get()); //執(zhí)行異步任務(wù),需要指定的線程池 CompletableFuture.runAsync(()->log.info("子線程的用戶信息:{}",OauthContext.get()),taskExecutor); }
來看一下結(jié)果,如下圖:
這里使用的是CompletableFuture執(zhí)行異步任務(wù),使用@Async這個(gè)注解同樣是可行的。
注意 :無論使用何種方式,都需要指定線程池
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 實(shí)現(xiàn)的后臺管理系統(tǒng) + 用戶小程序,支持 RBAC 動(dòng)態(tài)權(quán)限、多租戶、數(shù)據(jù)權(quán)限、工作流、三方登錄、支付、短信、商城等功能
項(xiàng)目地址:https://github.com/YunaiV/yudao-cloud
3. InheritableThreadLocal
這種方案不建議使用,InheritableThreadLocal雖然能夠?qū)崿F(xiàn)父子線程間的復(fù)用,但是在線程池中使用會(huì)存在復(fù)用的問題。
這種方案使用也是非常簡單,直接用InheritableThreadLocal替換ThreadLocal即可,代碼如下:
/** *@description用戶上下文信息 */ publicclassOauthContext{ privatestaticfinalInheritableThreadLocalloginValThreadLocal=newInheritableThreadLocal<>(); publicstaticLoginValget(){ returnloginValThreadLocal.get(); } publicstaticvoidset(LoginValloginVal){ loginValThreadLocal.set(loginVal); } publicstaticvoidclear(){ loginValThreadLocal.remove(); } }
4. TransmittableThreadLocal
TransmittableThreadLocal是阿里開源的工具,彌補(bǔ)了InheritableThreadLocal的缺陷,在使用線程池等會(huì)池化復(fù)用線程的執(zhí)行組件情況下,提供ThreadLocal值的傳遞功能,解決異步執(zhí)行時(shí)上下文傳遞的問題。
使用起來也是非常簡單,添加依賴如下:
com.alibaba transmittable-thread-local 2.14.2
OauthContext改造代碼如下:
/** *@description用戶上下文信息 */ publicclassOauthContext{ privatestaticfinalTransmittableThreadLocalloginValThreadLocal=newTransmittableThreadLocal<>(); publicstaticLoginValget(){ returnloginValThreadLocal.get(); } publicstaticvoidset(LoginValloginVal){ loginValThreadLocal.set(loginVal); } publicstaticvoidclear(){ loginValThreadLocal.remove(); } }
TransmittableThreadLocal原理
從定義來看,TransimittableThreadLocal繼承于InheritableThreadLocal,并實(shí)現(xiàn)TtlCopier接口,它里面只有一個(gè)copy方法。所以主要是對InheritableThreadLocal的擴(kuò)展。
publicclassTransmittableThreadLocalextendsInheritableThreadLocal implementsTtlCopier
在TransimittableThreadLocal中添加holder屬性。這個(gè)屬性的作用就是被標(biāo)記為具備線程傳遞資格的對象都會(huì)被添加到這個(gè)對象中。
要標(biāo)記一個(gè)類,比較容易想到的方式,就是給這個(gè)類新增一個(gè)Type字段,還有一個(gè)方法就是將具備這種類型的的對象都添加到一個(gè)靜態(tài)全局集合中。之后使用時(shí),這個(gè)集合里的所有值都具備這個(gè)標(biāo)記。
//1.holder本身是一個(gè)InheritableThreadLocal對象 //2.這個(gè)holder對象的value是WeakHashMap,?> //2.1WeekHashMap的value總是null,且不可能被使用。 //2.2WeekHasshMap支持value=null privatestaticInheritableThreadLocal ,?>>holder=newInheritableThreadLocal ,?>>(){ @Override protectedWeakHashMap ,?>initialValue(){ returnnewWeakHashMap ,Object>(); } /** *重寫了childValue方法,實(shí)現(xiàn)上直接將父線程的屬性作為子線程的本地變量對象。 */ @Override protectedWeakHashMap ,?>childValue(WeakHashMap ,?>parentValue){ returnnewWeakHashMap ,Object>(parentValue); } };
應(yīng)用代碼是通過TtlExecutors工具類對線程池對象進(jìn)行包裝。工具類只是簡單的判斷,輸入的線程池是否已經(jīng)被包裝過、非空校驗(yàn)等,然后返回包裝類ExecutorServiceTtlWrapper。根據(jù)不同的線程池類型,有不同和的包裝類。
@Nullable publicstaticExecutorServicegetTtlExecutorService(@NullableExecutorServiceexecutorService){ if(TtlAgent.isTtlAgentLoaded()||executorService==null||executorServiceinstanceofTtlEnhanced){ returnexecutorService; } returnnewExecutorServiceTtlWrapper(executorService); }
進(jìn)入包裝類ExecutorServiceTtlWrapper。可以注意到不論是通過ExecutorServiceTtlWrapper#submit方法或者是ExecutorTtlWrapper#execute方法,都會(huì)將線程對象包裝成TtlCallable或者TtlRunnable,用于在真正執(zhí)行run方法前做一些業(yè)務(wù)邏輯。
/** *在ExecutorServiceTtlWrapper實(shí)現(xiàn)submit方法 */ @NonNull @Override publicFuture submit(@NonNullCallable task){ returnexecutorService.submit(TtlCallable.get(task)); } /** *在ExecutorTtlWrapper實(shí)現(xiàn)execute方法 */ @Override publicvoidexecute(@NonNullRunnablecommand){ executor.execute(TtlRunnable.get(command)); }
所以,重點(diǎn)的核心邏輯應(yīng)該是在TtlCallable#call()或者TtlRunnable#run()中。以下以TtlCallable為例,TtlRunnable同理類似。在分析call()方法之前,先看一個(gè)類Transmitter
publicstaticclassTransmitter{ /** *捕獲當(dāng)前線程中的是所有TransimittableThreadLocal和注冊ThreadLocal的值。 */ @NonNull publicstaticObjectcapture(){ returnnewSnapshot(captureTtlValues(),captureThreadLocalValues()); } /** *捕獲TransimittableThreadLocal的值,將holder中的所有值都添加到HashMap后返回。 */ privatestaticHashMap,Object>captureTtlValues(){ HashMap ,Object>ttl2Value= newHashMap ,Object>(); for(TransmittableThreadLocal
進(jìn)入TtlCallable#call()方法。
@Override publicVcall()throwsException{ Objectcaptured=capturedRef.get(); if(captured==null||releaseTtlValueReferenceAfterCall&& !capturedRef.compareAndSet(captured,null)){ thrownewIllegalStateException("TTLvaluereferenceisreleasedaftercall!"); } //調(diào)用replay方法將捕獲到的當(dāng)前線程的本地變量,傳遞給線程池線程的本地變量, //并且獲取到線程池線程覆蓋之前的本地變量副本。 Objectbackup=replay(captured); try{ //線程方法調(diào)用 returncallable.call(); }finally{ //使用副本進(jìn)行恢復(fù)。 restore(backup); } }
到這基本上線程池方式傳遞本地變量的核心代碼已經(jīng)大概看完了??偟膩碚f在創(chuàng)建TtlCallable對象是,調(diào)用capture()方法捕獲調(diào)用方的本地線程變量,在call()執(zhí)行時(shí),將捕獲到的線程變量,替換到線程池所對應(yīng)獲取到的線程的本地變量中,并且在執(zhí)行完成之后,將其本地變量恢復(fù)到調(diào)用之前。
總結(jié)
上述列舉了4種方案,陳某這里推薦方案2和方案4,其中兩種方案的缺點(diǎn)非常明顯,實(shí)際開發(fā)中也是采用的方案2或者方案4。
-
接口
+關(guān)注
關(guān)注
33文章
8575瀏覽量
151014 -
spring
+關(guān)注
關(guān)注
0文章
340瀏覽量
14338 -
Boot
+關(guān)注
關(guān)注
0文章
149瀏覽量
35823 -
線程
+關(guān)注
關(guān)注
0文章
504瀏覽量
19675 -
數(shù)據(jù)傳遞
+關(guān)注
關(guān)注
1文章
3瀏覽量
1758
原文標(biāo)題:用這4招 優(yōu)雅的實(shí)現(xiàn)Spring Boot 異步線程間數(shù)據(jù)傳遞
文章出處:【微信號:芋道源碼,微信公眾號:芋道源碼】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
相關(guān)推薦
評論