Part1 前言
隨著互聯(lián)網(wǎng)業(yè)務(wù)的發(fā)展,原本單機(jī)部署的系統(tǒng)演化成如今的分布式集群系統(tǒng)后,由于分布式系統(tǒng)多線程,多進(jìn)程并且分布在不同的機(jī)器上,這會(huì)使原本的單機(jī)鎖失效,而且單純的Java API并不能提供分布式鎖的能力,為了解決這個(gè)問題就需要一種跨JVM的互斥機(jī)制來控制共享資源的訪問,這就是分布式鎖要解決的問題。
本文將從實(shí)現(xiàn)本地鎖所產(chǎn)生的問題入手,從而介紹分布式鎖主流的實(shí)現(xiàn)方案,重點(diǎn)實(shí)現(xiàn)基于Redis的分布式鎖。
本地鎖會(huì)出現(xiàn)的問題(此篇幅代碼圖片過多,因此放在最后)
Part2 分布式鎖的實(shí)現(xiàn)
1 分布式鎖主要的實(shí)現(xiàn)有:
基于數(shù)據(jù)庫實(shí)現(xiàn)分布式鎖
基于緩存(Redis等,本文基于Redis實(shí)現(xiàn)手寫分布式鎖 ,因?yàn)檫@樣可以更好的理解分布式鎖的原理及實(shí)現(xiàn),當(dāng)然也可以使用Redisson)
基于Zookeeper
2 每種分布式鎖的解決方案都有各自的優(yōu)缺點(diǎn)
性能角度:redis > zk > mysql
安全角度:zk > redis == mysql
難易程度:zk > redis > mysql
3 分布式鎖要具備的特點(diǎn):
獨(dú)占排他互斥
可以通過 setnx (redis命令:執(zhí)行多次,只有一次能夠成功)
set key value ex 3 nx
防死鎖發(fā)生
請求獲取到鎖之后,服務(wù)器掛掉了,導(dǎo)致鎖無法釋放:給lock鎖添加過期時(shí)間
可以通過redis命令 expire
或者通過 set key value ex 3 nx
保證原子性
redis是單線程的,接受或者執(zhí)行指令遵循one-by-one原則。只要指令之間不被插入其他指令即可保證原子性,lua腳本批量發(fā)送多個(gè)指令給redis服務(wù)器,lua腳本也可以實(shí)現(xiàn)一些業(yè)務(wù)邏輯,redis集成了lua腳本,可以直接使用eval指令執(zhí)行l(wèi)ua腳本。
獲取鎖和設(shè)置過期時(shí)間之間
判斷和刪除之間:lua腳本
防誤刪:
uuid給每個(gè)線程的鎖添加唯一標(biāo)識(shí)
自動(dòng)續(xù)期
可重入:hash數(shù)據(jù)結(jié)構(gòu) + lua腳本
集群情況下,可能導(dǎo)致鎖失效:RedLock算法(redis特有的)
一個(gè)請求從主中獲取到鎖,從還沒來得及同步數(shù)據(jù),主就掛掉了,從就升級(jí)為新主,新的請求就可以從新主中獲取鎖
4 基于redis分布式鎖的基本實(shí)現(xiàn)
我們可以借助于redis中的命令setnx(key, value),key不存在就新增,存在就什么都不做。同時(shí)有多個(gè)客戶端發(fā)送setnx命令,只有一個(gè)客戶端可以成功,返回1(true);其他的客戶端返回0(false),流程圖如下圖所示:
多個(gè)客戶端同時(shí)嘗試獲取鎖(setnx)
獲取成功,執(zhí)行業(yè)務(wù)邏輯,執(zhí)行完成釋放鎖(del)
其他客戶端等待重試
代碼實(shí)現(xiàn):
publicvoidtestLock(){ //1.從redis中獲取鎖,setnx Booleanlock=this.redisTemplate.opsForValue().setIfAbsent("lock","111"); if(lock){ //查詢r(jià)edis中的num值 Stringvalue=this.redisTemplate.opsForValue().get("num"); //沒有該值return if(StringUtils.isBlank(value)){ return; } //有值就轉(zhuǎn)成成int intnum=Integer.parseInt(value); //把redis中的num值+1 this.redisTemplate.opsForValue().set("num",String.valueOf(++num)); //2.釋放鎖del this.redisTemplate.delete("lock"); }else{ //3.每隔1秒鐘回調(diào)一次,再次嘗試獲取鎖 try{ Thread.sleep(1000); testLock(); }catch(InterruptedExceptione){ e.printStackTrace(); } } }
那么以上代碼是否可以解決全部問題呢? 顯示是不能的,我們假設(shè)setnx剛好獲取到鎖,業(yè)務(wù)邏輯出現(xiàn)異常,導(dǎo)致鎖無法釋放,怎么辦呢?
5 優(yōu)化分布式鎖_設(shè)置過期時(shí)間
設(shè)置過期有倆種方式可以選擇:
通過expire設(shè)置過期時(shí)間(缺乏原子性:如果在setnx和expire之間出現(xiàn)異常,鎖也無法釋放)
在set時(shí)指定過期時(shí)間(推薦)
代碼實(shí)現(xiàn)優(yōu)化就是在設(shè)置鎖的時(shí)候設(shè)置過期時(shí)間:
publicvoidtestLock(){ //1.從redis中獲取鎖,setnx Booleanlock=this.redisTemplate.opsForValue().setIfAbsent("lock","111",3,TimeUnit.MINUTES); if(lock){ //與之前相同代碼略過 ... } }
那么還會(huì)不會(huì)存在問題呢?
場景:如果業(yè)務(wù)邏輯的執(zhí)行時(shí)間是7s。執(zhí)行流程如下:
index1業(yè)務(wù)邏輯沒執(zhí)行完,3秒后鎖被自動(dòng)釋放。
index2獲取到鎖,執(zhí)行業(yè)務(wù)邏輯,3秒后鎖被自動(dòng)釋放。
index3獲取到鎖,執(zhí)行業(yè)務(wù)邏輯
index1業(yè)務(wù)邏輯執(zhí)行完成,開始調(diào)用del釋放鎖,這時(shí)釋放的是index3的鎖,導(dǎo)致index3的業(yè)務(wù)只執(zhí)行1s就被別人釋放。
最終等于沒鎖的情況。
解決:setnx獲取鎖時(shí),設(shè)置一個(gè)指定的唯一值(例如:uuid);釋放前獲取這個(gè)值,判斷是否自己的鎖
6 優(yōu)化分布式鎖_防止誤刪除
publicvoidtestLock(){ //1.從redis中獲取鎖,setnx Stringuuid=UUID.randomUUID().toString(); Booleanlock=this.redisTemplate.opsForValue().setIfAbsent("lock",uuid,3,TimeUnit.MINUTES); if(lock){ //與之前相同代碼略過 ... //2.釋放鎖del if(StringUtils.equals(redisTemplate.opsForValue().get("lock"),uuid)){ this.redisTemplate.delete("lock"); } } }
場景:
index1執(zhí)行刪除時(shí),查詢到的lock值確實(shí)和uuid相等
index1執(zhí)行刪除前,lock剛好過期時(shí)間已到,被redis自動(dòng)釋放
index2獲取了lock
index1執(zhí)行刪除,此時(shí)會(huì)把index2的lock刪除
問題:缺乏原子性
7 優(yōu)化分布式鎖_LUA腳本保證刪除的原子性
首先我們先簡單介紹一下lua腳本的基本知識(shí)(lua腳本是c語言)
定義變量:
全局變量:a = 11
局部變量:local b = 22
redis不允許lua腳本創(chuàng)建全局變量,只能聲明局部變量
流程控制:
if(exp)then 業(yè)務(wù)邏輯 elseif(exp)then 業(yè)務(wù)邏輯 else 業(yè)務(wù)邏輯 end
redis中執(zhí)行l(wèi)ua腳本:
eval script numkeys keys[] args[] : eval指令的輸出不是lua腳本的打印而是lua腳本的返回值
script:lua腳本字符串,定義動(dòng)態(tài)變量:KEYS[1] ARGV[1]
numkeys:key數(shù)組的元素個(gè)數(shù)
keys:keys數(shù)組
args:argv數(shù)組
redis集群執(zhí)行l(wèi)ua腳本可能會(huì)報(bào)錯(cuò):如果所有keys不在同一個(gè)分片上,lua腳本就會(huì)報(bào)錯(cuò):解決方案是:
keys只傳一個(gè)
可以使用CLUSTER KEYSLOT bb{xx}
刪除LUA腳本:
ifredis.call('get',KEYS[1])==ARGV[1]thenreturnredis.call('del',KEYS[1])elsereturn0end publicvoidtestLock(){ //1.從redis中獲取鎖,setnx Stringuuid=UUID.randomUUID().toString(); Booleanlock=this.redisTemplate.opsForValue().setIfAbsent("lock",uuid,3,TimeUnit.SECONDS); if(lock){ //與之前相同代碼略過 ... //2.釋放鎖del Stringscript="ifredis.call('get',KEYS[1])==ARGV[1]thenreturnredis.call('del',KEYS[1])elsereturn0end"; this.redisTemplate.execute(newDefaultRedisScript<>(script),Arrays.asList("lock"),uuid); }else{ //3.每隔1秒鐘回調(diào)一次,再次嘗試獲取鎖 try{ Thread.sleep(1000); testLock(); }catch(InterruptedExceptione){ e.printStackTrace(); } } }
8 優(yōu)化分布式鎖_可以重入
上述加鎖命令使用了 SETNX ,一旦鍵存在就無法再設(shè)置成功,這就導(dǎo)致后續(xù)同一線程內(nèi)繼續(xù)加鎖,將會(huì)加鎖失敗。當(dāng)一個(gè)線程執(zhí)行一段代碼成功獲取鎖之后,繼續(xù)執(zhí)行時(shí),又遇到加鎖的子任務(wù)代碼,可重入性就保證線程能繼續(xù)執(zhí)行,而不可重入就是需要等待鎖釋放之后,再次獲取鎖成功,才能繼續(xù)往下執(zhí)行。
可重入鎖最大特性就是計(jì)數(shù),計(jì)算加鎖的次數(shù)。所以當(dāng)可重入鎖需要在分布式環(huán)境實(shí)現(xiàn)時(shí),我們也就需要統(tǒng)計(jì)加鎖次數(shù)。
我們基于Redis Hash 實(shí)現(xiàn)方案 :
Redis 提供了 Hash (哈希表)這種可以存儲(chǔ)鍵值對數(shù)據(jù)結(jié)構(gòu)。所以我們可以使用 Redis Hash 存儲(chǔ)的鎖的重入次數(shù),然后利用 lua 腳本判斷邏輯。
加鎖
if(redis.call('exists',KEYS[1])==0orredis.call('hexists',KEYS[1],ARGV[1])==1) then redis.call('hincrby',KEYS[1],ARGV[1],1); redis.call('expire',KEYS[1],ARGV[2]); return1; else return0; end
假設(shè)值為:KEYS:[lock], ARGV[uuid, expire]
如果鎖不存在或者這是自己的鎖,就通過hincrby(不存在新增,存在就加1)獲取鎖或者鎖次數(shù)加1。 代碼實(shí)例如下:
privateBooleantryLock(StringlockName,Stringuuid,Longexpire){ Stringscript="if(redis.call('exists',KEYS[1])==0orredis.call('hexists',KEYS[1],ARGV[1])==1)"+ "then"+ "redis.call('hincrby',KEYS[1],ARGV[1],1);"+ "redis.call('expire',KEYS[1],ARGV[2]);"+ "return1;"+ "else"+ "return0;"+ "end"; if(!this.redisTemplate.execute(newDefaultRedisScript<>(script,Boolean.class),Arrays.asList(lockName),uuid,expire.toString())){ try{ //沒有獲取到鎖,重試 Thread.sleep(200); tryLock(lockName,uuid,expire); }catch(InterruptedExceptione){ e.printStackTrace(); } } //獲取到鎖,返回true returntrue; }
解鎖
--判斷hashset可重入key的值是否等于0 --如果為nil代表自己的鎖已不存在,在嘗試解其他線程的鎖,解鎖失敗 --如果為0代表可重入次數(shù)被減1 --如果為1代表該可重入key解鎖成功 if(redis.call('hexists',KEYS[1],ARGV[1])==0)then returnnil; end; --小于等于0代表可以解鎖 if(redis.call('hincrby',KEYS[1],ARGV[1],-1)>0)then return0; else redis.call('del',KEYS[1]); return1; end;
這里之所以沒有跟加鎖一樣使用 Boolean ,這是因?yàn)榻怄i lua 腳本中,三個(gè)返回值含義如下:
1 代表解鎖成功,鎖被釋放
0 代表可重入次數(shù)被減 1
null 代表其他線程嘗試解鎖,解鎖失敗
如果返回值使用 Boolean,Spring-data-redis 進(jìn)行類型轉(zhuǎn)換時(shí)將會(huì)把 null 轉(zhuǎn)為 false,這就會(huì)影響我們邏輯判斷,所以返回類型只好使用 Long。
privatevoidunlock(StringlockName,Stringuuid){ Stringscript="if(redis.call('hexists',KEYS[1],ARGV[1])==0)then"+ "returnnil;"+ "end;"+ "if(redis.call('hincrby',KEYS[1],ARGV[1],-1)>0)then"+ "return0;"+ "else"+ "redis.call('del',KEYS[1]);"+ "return1;"+ "end;"; //這里之所以沒有跟加鎖一樣使用Boolean,這是因?yàn)榻怄ilua腳本中,三個(gè)返回值含義如下: //1代表解鎖成功,鎖被釋放 //0代表可重入次數(shù)被減1 //null代表其他線程嘗試解鎖,解鎖失敗 Longresult=this.redisTemplate.execute(newDefaultRedisScript<>(script,Long.class),Lists.newArrayList(lockName),uuid); //如果未返回值,代表嘗試解其他線程的鎖 if(result==null){ thrownewIllegalMonitorStateException("attempttounlocklock,notlockedbylockName:" +lockName+"withrequest:"+uuid); } }
使用
publicvoidtestLock(){ //加鎖 Stringuuid=UUID.randomUUID().toString(); Booleanlock=this.tryLock("lock",uuid,300l); if(lock){ //讀取redis中的num值 StringnumString=this.redisTemplate.opsForValue().get("num"); if(StringUtils.isBlank(numString)){ return; } //++操作 Integernum=Integer.parseInt(numString); num++; //放入redis this.redisTemplate.opsForValue().set("num",String.valueOf(num)); //測試可重入性 this.testSubLock(uuid); //釋放鎖 this.unlock("lock",uuid); } } //測試可重入性 privatevoidtestSubLock(Stringuuid){ //加鎖 Booleanlock=this.tryLock("lock",uuid,300l); if(lock){ System.out.println("分布式可重入鎖。。。"); this.unlock("lock",uuid); } }
9 優(yōu)化分布式鎖_自動(dòng)續(xù)期
A線程超時(shí)時(shí)間設(shè)為10s(為了解決死鎖問題),但代碼執(zhí)行時(shí)間可能需要30s,然后redis服務(wù)端10s后將鎖刪除。 此時(shí),B線程恰好申請鎖,redis服務(wù)端不存在該鎖,可以申請,也執(zhí)行了代碼。
那么問題來了, A、B線程都同時(shí)獲取到鎖并執(zhí)行業(yè)務(wù)邏輯,這與分布式鎖最基本的性質(zhì)相違背:在任意一個(gè)時(shí)刻,只有一個(gè)客戶端持有鎖(即獨(dú)享排他)。
鎖延期方法:開啟子線程執(zhí)行延期
/** *鎖延期 *線程等待超時(shí)時(shí)間的2/3時(shí)間后,執(zhí)行鎖延時(shí)代碼,直到業(yè)務(wù)邏輯執(zhí)行完畢,因此在此過程中,其他線程無法獲取到鎖,保證了線程安全性 *@paramlockName *@paramexpire單位:毫秒 */ privatevoidrenewTime(StringlockName,Stringuuid,Longexpire){ Stringscript="if(redis.call('hexists',KEYS[1],ARGV[1])==1)thenredis.call('expire',KEYS[1],ARGV[2]);return1;elsereturn0;end"; newThread(()->{ while(this.redisTemplate.execute(newDefaultRedisScript<>(script,Boolean.class),Lists.newArrayList(lockName),uuid,expire.toString())){ try{ //到達(dá)過期時(shí)間的2/3時(shí)間,自動(dòng)續(xù)期 Thread.sleep(expire/3); }catch(InterruptedExceptione){ e.printStackTrace(); } } }).start(); }
獲取鎖成功后,調(diào)用延期方法給鎖 定時(shí)延期:
privateBooleantryLock(StringlockName,Stringuuid,Longexpire){ Stringscript="if(redis.call('exists',KEYS[1])==0orredis.call('hexists',KEYS[1],ARGV[1])==1)"+ "then"+ "redis.call('hincrby',KEYS[1],ARGV[1],1);"+ "redis.call('expire',KEYS[1],ARGV[2]);"+ "return1;"+ "else"+ "return0;"+ "end"; if(!this.redisTemplate.execute(newDefaultRedisScript<>(script,Boolean.class),Arrays.asList(lockName),uuid,expire.toString())){ try{ //沒有獲取到鎖,重試 Thread.sleep(200); tryLock(lockName,uuid,expire); }catch(InterruptedExceptione){ e.printStackTrace(); } } //鎖續(xù)期 this.renewTime(lockName,uuid,expire*1000); //獲取到鎖,返回true returntrue; }
10 優(yōu)化分布式鎖_Redlock算法
redis集群狀態(tài)下的問題:
客戶端A從master獲取到鎖
在master將鎖同步到slave之前,master宕掉了。
slave節(jié)點(diǎn)被晉級(jí)為master節(jié)點(diǎn)
客戶端B取得了同一個(gè)資源被客戶端A已經(jīng)獲取到的另外一個(gè)鎖。安全失效 解決集群下鎖失效,參照redis官方網(wǎng)站針對redlock文檔:redis.io/topics/dist… [1]
11 本地鎖會(huì)出現(xiàn)的問題
我們知道java中有synchronized、lock鎖、讀寫鎖ReadWriteLock,眾所周知這些鎖都是本地鎖。
提到鎖就不得不提JUC:java.util.concurrent包,又稱concurrent包。jdk1.5提供,為多線程高并發(fā)編程而提供的包,但此文章的場景是分布式場景,后續(xù)會(huì)出JUC的文章。
簡單的介紹一下synchronized及l(fā)ock鎖
synchronized是一個(gè)關(guān)鍵字,lock是一個(gè)接口,ReentrantLock是實(shí)現(xiàn)了lock接口的一個(gè)類
ReentrantLock:悲觀的獨(dú)占的互斥的排他的可公平可不公平的可重入鎖
synchronized:悲觀的獨(dú)占的互斥的排他的非公平的可重入鎖
準(zhǔn)備
redis、ab工具(壓測)
不使用任何鎖的情況下
我們首先創(chuàng)建一個(gè)測試方法,testNoLock
@GetMapping("/test") publicvoidtestNoLock(){ Stringcount=(String)this.redisTemplate.opsForValue().get("count"); if(count==null){ //沒有值直接返回 return; } //有值就轉(zhuǎn)成成int intnumber=Integer.parseInt(count); //把redis中的num值+1 this.redisTemplate.opsForValue().set("count",String.valueOf(++number)); }
測試之前的查看值為1
@GetMapping("/getCount") publicStringgetCount(){ Stringcount=String.valueOf(this.redisTemplate.opsForValue().get("count")); returncount;//1 }
接下來使用ab壓力測試工具
//ab-n(一次發(fā)送的請求數(shù))-c(請求的并發(fā)數(shù))訪問路徑 ab-n100-c50http://127.0.0.1:8080/test/test
再次查詢結(jié)果為6,說明問題很大
使用本地鎖
publicsynchronizedvoidtestNoLock(){ Stringcount=String.valueOf(this.redisTemplate.opsForValue().get("count")); if("null".equals(count)){ //沒有值直接返回 return; } //有值就轉(zhuǎn)成成int intnumber=Integer.parseInt(count); //把redis中的num值+1 this.redisTemplate.opsForValue().set("count",String.valueOf(++number)); }
再次使用ab壓力測試工具
ab-n100-c50http://127.0.0.1:8080/test/test
此次結(jié)果為106,說明結(jié)果是正確的,看樣子結(jié)果是非常完美的,但是真的很完美嗎?
使用集群+本地鎖
我們只需要在idea中在啟動(dòng)倆個(gè)服務(wù),修改端口號(hào),三個(gè)運(yùn)行實(shí)例的名稱是相同的,并且網(wǎng)關(guān)的配置就是通過服務(wù)名在負(fù)載均衡,所以我們只需要訪問網(wǎng)關(guān),網(wǎng)關(guān)就會(huì)給我們做負(fù)載均衡了。
再次使用ab壓力測試工具(將count重置為1)
ab-n100-c50http://127.0.0.1:8080/test/test
此次的結(jié)果為58!?。?/p>
到此我們可以知道,本地鎖是有局限性的。
審核編輯:劉清
-
JAVA
+關(guān)注
關(guān)注
19文章
2966瀏覽量
104700 -
JVM
+關(guān)注
關(guān)注
0文章
158瀏覽量
12220 -
Hash算法
+關(guān)注
關(guān)注
0文章
43瀏覽量
7382
原文標(biāo)題:Java手寫分布式鎖的實(shí)現(xiàn)(非常牛逼)
文章出處:【微信號(hào):芋道源碼,微信公眾號(hào):芋道源碼】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
相關(guān)推薦
評論