最近在開發(fā) 延保服務(wù) 頻道頁時,為了提高查詢效率,使用到了多線程技術(shù)。為了對多線程方案設(shè)計有更加充分的了解,在業(yè)余時間讀完了《圖解 Java 多線程設(shè)計模式》這本書,覺得收獲良多。本篇文章將介紹其中提到的 Future 模式,以及在實際業(yè)務(wù)開發(fā)中對該模式的應(yīng)用,而這些內(nèi)容對于本書來說只是冰山一角,還是推薦大家有時間去閱讀原書。
1. Future 模式:“先給您提貨單”
我們先來看一個場景:假如我們?nèi)サ案獾曩I蛋糕,下單后,店員會遞給我們提貨單并告知“請您傍晚來取蛋糕”。到了傍晚我們拿著提貨單去取蛋糕,店員會先和我們說“您的蛋糕已經(jīng)做好了”,然后將蛋糕拿給我們。
如果將下單蛋糕到取蛋糕的過程抽象成一個方法的話,那么意味著這個方法需要花很長的時間才能獲取執(zhí)行結(jié)果,與其一直等待結(jié)果,不如先拿著一張“提貨單”,到我們需要取貨的時候,再通過它去取,而獲取“提貨單”的過程是幾乎不耗時的,而這個提貨單對象就被稱為 Future
,后續(xù)便可以通過它來獲取方法的返回值。用 Java 來表示這個過程的話,需要使用到 FutureTask
和 Callable
兩個類,如下:
public class Example {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 預(yù)定蛋糕,并定義“提貨單”
System.out.println("我:預(yù)定蛋糕");
FutureTask future = new FutureTask?>(() -> {
System.out.println("店員:請您傍晚來取蛋糕");
Thread.sleep(2000);
System.out.println("店員:您的蛋糕已經(jīng)做好了");
return "Holiland";
});
// 開始做蛋糕
new Thread(future).start();
// 去做其他事情
Thread.sleep(1000);
System.out.println("我:忙碌中...");
// 取蛋糕
System.out.println("我:取蛋糕 " + future.get());
}
}
// 運行結(jié)果:
// 我:預(yù)定蛋糕
// 店員:請您傍晚來取蛋糕
// 我:忙碌中...
// 店員:您的蛋糕已經(jīng)做好了
// 我:取蛋糕 Holiland
方法的調(diào)用者可以將任務(wù)交給其他線程去處理,無需阻塞等待方法的執(zhí)行,這樣調(diào)用者便可以繼續(xù)執(zhí)行其他任務(wù),并能通過 Future
對象獲取執(zhí)行結(jié)果。
它的運行原理如下:創(chuàng)建 FutureTask
實例時,Callable
對象會被傳遞給構(gòu)造函數(shù),當(dāng)線程調(diào)用 FutureTask
的 run
方法時,Callable
對象的 call
方法也會被執(zhí)行。調(diào)用 call
方法的線程會同步地獲取結(jié)果,并通過 FutureTask
的 set
方法來記錄結(jié)果對象,如果 call
方法執(zhí)行期間發(fā)生了異常,則會調(diào)用 setException
方法記錄異常。最后,通過調(diào)用 get
方法獲取方法的結(jié)果,注意這里可能會拋出方法執(zhí)行時產(chǎn)生的異常。
public void run() {
// ...
try {
// “提貨任務(wù)”
Callable c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 調(diào)用 callable 的 call 方法
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
// 捕獲并設(shè)置異常
setException(ex);
}
if (ran)
// 為結(jié)果賦值
set(result);
}
} finally {
// ...
}
}
protected void set(V v) {
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
// 將結(jié)果賦值給 outcome 全局變量,供 get 時獲取
outcome = v;
// 修改狀態(tài)為 NORMAL
STATE.setRelease(this, NORMAL); // final state
finishCompletion();
}
}
protected void setException(Throwable t) {
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
// 將異常賦值給 outcome 變量,供 get 時拋出
outcome = t;
// 修改狀態(tài)為 EXCEPTIONAL
STATE.setRelease(this, EXCEPTIONAL); // final state
finishCompletion();
}
}
public V get() throws InterruptedException, ExecutionException {
int s = state;
// 未完成時阻塞等一等
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
// 正常結(jié)束的話能正常獲取到結(jié)果
if (s == NORMAL)
return (V)x;
// 否則會拋出異常,注意如果執(zhí)行中出現(xiàn)異常,調(diào)用 get 時會被拋出
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
現(xiàn)在對 Future 模式 已經(jīng)有了基本的了解:它通過 Future
接口來表示未來的結(jié)果,實現(xiàn) 調(diào)用者與執(zhí)行者之間的解耦,提高系統(tǒng)的吞吐量和響應(yīng)速度,那在實踐中對該模式是如何使用的呢?
2. 對 Future 模式的實踐
因為 延保服務(wù) 頻道頁訪問量大且對接口性能要求較高,單線程處理并不能滿足性能要求,所以應(yīng)用了 Future 模式 來提高查詢效率,但是并沒有借助上文所述的 FutureTask
來實現(xiàn),而是使用了 CompletableFuture
工具類,它們的實現(xiàn)原理基本一致,但是后者提供的方法和對 鏈?zhǔn)?a target="_blank">編程 的支持使代碼更加簡潔,實現(xiàn)更加容易(相關(guān) API 參考見文末)。
如下是使用 CompletableFuture
異步多線程查詢訂單列表的邏輯,根據(jù)配置的 pageNo
分多條線程查詢各頁的訂單數(shù)據(jù):
List result = new ArrayList?>();
// 并發(fā)查詢訂單列表
List>> futureList = new ArrayList?>();
try {
// 配置需要查詢的頁數(shù) pageNo,并發(fā)查詢不同頁碼的訂單
for (int i = 1; i <= pageNo; i++) {
int curPageNo = i;
CompletableFuture> future = CompletableFuture.supplyAsync(
() -> getOrderInfoList(userNo, curPageNo), threadPoolExecutor);
futureList.add(future);
}
// 等待所有線程處理完畢,并封裝結(jié)果值
for (CompletableFuture> future : futureList) {
result.addAll(future.get());
}
} catch (Exception e) {
log.error("并發(fā)查詢用戶訂單信息異常", e);
}
這段代碼中對異常的處理能進(jìn)行優(yōu)化:第 15 行代碼,如果某條線程查詢訂單列表時發(fā)生異常,那么在調(diào)用 get
方法時會拋出該異常,被 catch
后返回空結(jié)果,即使有其他線程查詢成功,這些訂單結(jié)果值也會被忽略掉,可以針對這一點進(jìn)行優(yōu)化,如下:
List result = new ArrayList?>();
// 并發(fā)查詢訂單列表
List>> futureList = new ArrayList?>();
try {
// 配置需要查詢的頁數(shù) pageNo,并發(fā)查詢不同頁碼的訂單
for (int i = 1; i <= pageNo; i++) {
int curPageNo = i;
CompletableFuture> future = CompletableFuture
.supplyAsync(() -> getOrderInfoList(userNo, curPageNo), threadPoolExecutor)
// 添加異常處理
.exceptionally(e -> {
log.error("查詢用戶訂單信息異常", e);
return Collections.emptyList();
});
futureList.add(future);
}
// 等待所有線程處理完畢,并封裝結(jié)果值
for (CompletableFuture> future : futureList) {
result.addAll(future.get());
}
} catch (Exception e) {
log.error("并發(fā)查詢用戶訂單信息異常", e);
}
優(yōu)化后針對查詢發(fā)生異常的任務(wù)打印異常日志,并返回空集合,這樣即使單線程查詢失敗,也不會影響到其他線程查詢成功的結(jié)果。
CompletableFuture
還提供了 allOf
方法,它返回的 CompletableFuture
對象在所有 CompletableFuture
執(zhí)行完成時完成,相比于對每個任務(wù)都調(diào)用 get
阻塞等待任務(wù)完成的實現(xiàn)可讀性更好,改造后代碼如下:
List result = new ArrayList?>();
// 并發(fā)查詢訂單列表
CompletableFuture>[] futures = new CompletableFuture[pageNo];
// 配置需要查詢的頁數(shù) pageNo,并發(fā)查詢不同頁碼的訂單
for (int i = 1; i <= pageNo; i++) {
int curPageNo = i;
CompletableFuture> future = CompletableFuture
.supplyAsync(() -> getOrderInfoList(userNo, curPageNo), threadPoolExecutor)
// 添加異常處理
.exceptionally(e -> {
log.error("查詢用戶訂單信息異常", e);
return Collections.emptyList();
});
futures[i - 1] = future;
}
try {
// 等待所有線程處理完畢
CompletableFuture.allOf(futures).get();
for (CompletableFuture> future : futures) {
List orderInfoList = future.get();
if (CollectionUtils.isEmpty(orderInfoList)) {
result.addAll(orderInfoList);
}
}
} catch (Exception e) {
log.error("處理用戶訂單結(jié)果信息異常", e);
}
Tips:
CompletableFuture
的設(shè)計初衷是支持異步編程,所以應(yīng)盡量避免在CompletableFuture
鏈中使用get()/join()
方法,因為這些方法會阻塞當(dāng)前線程直到CompletableFuture
完成,應(yīng)該在必須使用該結(jié)果值時才調(diào)用它們。
相關(guān)的模式:命令模式
命令模式能將操作的調(diào)用者和執(zhí)行者解耦,它能很容易的與 Future 模式 結(jié)合,以查詢訂單的任務(wù)為例,我們可以將該任務(wù)封裝為“命令”對象的形式,執(zhí)行時為每個線程提交一個命令,實現(xiàn)解耦并提高擴(kuò)展性。在命令模式中,命令對象需要 支持撤銷和重做,那么這便在查詢出現(xiàn)異常時,提供了補償處理的可能,命令模式類圖關(guān)系如下:
3.《圖解Java多線程設(shè)計模式》書籍推薦
我覺得本書算得上是一本老書:05 年出版的基于 JDK1.5 的Java多線程書籍,相比于目前我們常用的 JDK1.8 和時髦的 JDK21,在讀之前總會讓人覺得有一種過時的感覺。但是當(dāng)我讀完時,發(fā)現(xiàn)其中的模式能對應(yīng)上代碼中的處理邏輯:對 CompletableFuture
的使用正對應(yīng)了其中的 Future 模式(異步獲取其他線程的執(zhí)行結(jié)果)等等,所以我覺得模式的應(yīng)用不會局限于技術(shù)的新老,它是在某種情況下,研發(fā)人員共識或通用的解決方案,在知曉某種模式,采用已有的技術(shù)實現(xiàn)它是容易的,而反過來在只掌握技術(shù)去探索模式是困難且沒有方向的。
同時,我也在考慮一個問題:對于新人學(xué)習(xí)多線程技術(shù)來說,究竟適不適合直接從模式入門呢?因為我對設(shè)計模式有了比較多的實踐經(jīng)驗,所以對“模式”相關(guān)的內(nèi)容足夠敏感,如果新人沒有這些經(jīng)驗的話,這對他們來說會不會更像是一個個知識點的堆砌呢?好在的是,本書除了模式相關(guān)的內(nèi)容,對基礎(chǔ)知識也做足了鋪墊,而且提出的關(guān)于多線程編程的思考點也是非常值得參考和學(xué)習(xí)的,以線程互斥和協(xié)同為例,書中談到:在對線程進(jìn)行互斥處理時需要考慮 “要保護(hù)的東西是什么”,這樣便能夠 清晰的確定鎖的粒度;對于線程的協(xié)同,書中提到的是需要考慮 “放在中間的東西是什么”,直接的拋出這個觀點是不容易理解的,“中間的東西”是在多線程的 生產(chǎn)者和消費者模式 中提出的,部分線程負(fù)責(zé)生產(chǎn),生產(chǎn)完成后將對象放在“中間”,部分線程負(fù)責(zé)消費,消費時取的便是“中間”的對象,而合理規(guī)劃這些中間的東西便能 消除生產(chǎn)者和消費者之間的速度差異,提高系統(tǒng)的吞吐量和響應(yīng)速度。而再深入考慮這兩個角度時,線程的互斥和協(xié)同其實是內(nèi)外統(tǒng)一的:為了讓線程協(xié)調(diào)運行,必須執(zhí)行互斥處理,以防止共享的內(nèi)容被破壞,而線程的互斥是為了線程的協(xié)調(diào)運行才進(jìn)行的必要操作。
附:CompletableFuture 常用 API
使用 supplyAsync 方法異步執(zhí)行任務(wù),并返回 CompletableFuture 對象
如下代碼所示,調(diào)用 CompletableFuture.supplyAsync
靜態(tài)方法異步執(zhí)行查詢邏輯,并返回一個新的 CompletableFuture
對象
CompletableFuture> future = CompletableFuture.supplyAsync(() -> doQuery(), executor);
使用 join 方法阻塞獲取完成結(jié)果
如下代碼所示,在封裝結(jié)果前,調(diào)用 join
方法阻塞等待獲取結(jié)果
futureList.forEach(CompletableFuture::join);
它與 get
方法的主要區(qū)別在于,join
方法拋出的是未經(jīng)檢查的異常 CompletionException
,并將原始異常作為其原因,這意味著我們可以不需要在方法簽名中聲明它或在調(diào)用 join
方法的地方進(jìn)行異常處理,而 get
方法會拋出 InterruptedException
和 ExecutionException
異常,我們必須對它進(jìn)行處理,get
方法源碼如下:
public T get() throws InterruptedException, ExecutionException {
Object r;
if ((r = result) == null)
r = waitingGet(true);
return (T) reportGet(r);
}
用 thenApply(Function) 和 thenAccept(Consumer) 等回調(diào)函數(shù)處理結(jié)果
如下是使用 thenApply()
方法對 CompletableFuture
的結(jié)果進(jìn)行轉(zhuǎn)換的操作:
CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(greeting -> greeting + " World");
使用 exceptionally() 處理 CompletableFuture 中的異常
CompletableFuture
提供了exceptionally()
方法來處理異常,這是一個非常重要的步驟。如果在 CompletableFuture
的運行過程中拋出異常,那么這個異常會被傳遞到最終的結(jié)果中。如果沒有適當(dāng)?shù)漠惓L幚?,那么在調(diào)用 get()
或 join()
方法時可能會拋出異常。
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
if (true) {
throw new RuntimeException("Exception occurred");
}
return "Hello, World!";
}).exceptionally(e -> "An error occurred");
使用 allOf() 和 anyOf() 處理多個 CompletableFuture
如果有多個 CompletableFuture
需要處理,可以使用 CompletableFuture.allOf()
或者 CompletableFuture.anyOf()
。allOf()
在所有的 CompletableFuture
完成時完成,而 anyOf()
則會在任意一個 CompletableFuture
完成時完成。
complete()、completeExceptionally()、cancel() 方法
CompletableFuture
的運行是在調(diào)用了 complete()
、completeExceptionally()
、cancel()
等方法后才會被標(biāo)記為完成。如果沒有正確地完成 CompletableFuture
,那么在調(diào)用 get()
方法時可能會永久阻塞。這三個方法在 Java 并發(fā)編程中有著重要的應(yīng)用。以下是這三個方法的常見使用場景:
complete(T value)
: 此方法用于顯式地完成一個 CompletableFuture
,并設(shè)置它的結(jié)果值。這在你需要在某個計算完成時,手動設(shè)置 CompletableFuture
的結(jié)果值的場景中非常有用。例如,你可能在一個異步操作完成時,需要設(shè)置 CompletableFuture
的結(jié)果值。
CompletableFuture future = new CompletableFuture?>();
// Some asynchronous operation
future.complete("Operation Result");
completeExceptionally(Throwable ex)
: 此方法用于顯式地以異常完成一個 CompletableFuture
。這在你需要在某個計算失敗時,手動設(shè)置 CompletableFuture
的異常的場景中非常有用。例如,你可能在一個異步操作失敗時,需要設(shè)置 CompletableFuture
的異常。
CompletableFuture future = new CompletableFuture?>();
// Some asynchronous operation
future.completeExceptionally(new RuntimeException("Operation Failed"));
cancel(boolean mayInterruptIfRunning)
: 此方法用于取消與 CompletableFuture
關(guān)聯(lián)的計算。這在你需要取消一個長時間運行的或者不再需要的計算的場景中非常有用。例如,你可能在用戶取消操作或者超時的情況下,需要取消 CompletableFuture
的計算。
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
// Long running operation
});
// Some condition
future.cancel(true);
這些方法都是線程安全的,可以從任何線程中調(diào)用。
使用 thenCompose() 處理嵌套的 CompletableFuture
如果在處理 CompletableFuture
的結(jié)果時又創(chuàng)建了新的CompletableFuture
,那么就會產(chǎn)生嵌套的 CompletableFuture
。這時可以使用 thenCompose()
方法來避免 CompletableFuture
的嵌套,如下代碼所示:
CompletableFuture completableFuture
= CompletableFuture.supplyAsync(() -> "Hello")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
使用 thenCombine() 處理兩個 CompletableFuture 的結(jié)果
CompletableFuture completableFuture
= CompletableFuture.supplyAsync(() -> "Hello")
.thenCombine(CompletableFuture.supplyAsync(() -> " World"), (s1, s2) -> s1 + s2);
審核編輯 黃宇
-
JAVA
+關(guān)注
關(guān)注
19文章
2966瀏覽量
104701 -
API
+關(guān)注
關(guān)注
2文章
1499瀏覽量
61960 -
多線程
+關(guān)注
關(guān)注
0文章
278瀏覽量
19943
發(fā)布評論請先 登錄
相關(guān)推薦
評論