使用 CompletableFuture 重試邏輯
1. 概述
在本文中,我們將學習如何將重試邏輯應用於CompletableFuture
物件。最初,我們將重試包含在CompletableFuture
中的任務。接下來,我們將利用CompletableFuture
API 建立多個實例鏈,使我們能夠在將來遇到異常完成時重新執行任務。
2. 重試任務
重試任務的一個簡單方法是利用裝飾器模式並使用帶有類別和介面的經典 OOP 風格來實現它。另一方面,我們可以選擇更簡潔、更實用的方法,利用高階函數。
最初,我們將宣告一個函數,該函數將Supplier<T>
和最大呼叫次數作為參數。之後,如果需要,我們將使用 while 迴圈和 try-catch 區塊多次呼叫函數。最後,我們將透過傳回另一個Supplier<T>
來保留原始資料類型:
static <T> Supplier<T> retryFunction(Supplier<T> supplier, int maxRetries) {
return () -> {
int retries = 0;
while (retries < maxRetries) {
try {
return supplier.get();
} catch (Exception e) {
retries++;
}
}
throw new IllegalStateException(String.format("Task failed after %s attempts", maxRetries));
};
}
我們可以透過允許重試特定異常的定義或在呼叫之間引入延遲來進一步改進此裝飾器。但是,為了簡單起見,讓我們繼續基於此函數裝飾器建立CompletableFuture
:
static <T> CompletableFuture<T> retryTask(Supplier<T> supplier, int maxRetries) {
Supplier<T> retryableSupplier = retryFunction(supplier, maxRetries);
return CompletableFuture.supplyAsync(retryableSupplier);
}
現在,讓我們繼續為此功能編寫測試。首先,我們需要一個由CompletableFuture
重試的方法。為此,我們將設計一個方法,該方法透過拋出RuntimeExceptions
失敗四次,並在第五次嘗試時成功完成,傳回整數值:
AtomicInteger retriesCounter = new AtomicInteger(0);
@BeforeEach
void beforeEach() {
retriesCounter.set(0);
}
int failFourTimesThenReturn(int returnValue) {
int retryNr = retriesCounter.get();
if (retryNr < 4) {
retriesCounter.set(retryNr + 1);
throw new RuntimeException();
}
return returnValue;
}
現在,我們終於可以測試retryTask()
函數並斷言傳回了預期值。此外,我們可以透過詢問retriesCounter
來檢查呼叫次數:
@Test
void whenRetryingTask_thenReturnsCorrectlyAfterFourInvocations() {
Supplier<Integer> codeToRun = () -> failFourTimesThenReturn(100);
CompletableFuture<Integer> result = retryTask(codeToRun, 10);
assertThat(result.join()).isEqualTo(100);
assertThat(retriesCounter).hasValue(4);
}
此外,如果我們使用較小的maxRetires
參數值來呼叫相同的函數,我們將預期Future
能夠異常完成。原始的IllegalStateException
應包裝到CompletionException
中,但應保留原始的錯誤訊息:
@Test
void whenRetryingTask_thenThrowsExceptionAfterThreeInvocations() {
Supplier<Integer> codeToRun = () -> failFourTimesThenReturn(100);
CompletableFuture<Integer> result = retryTask(codeToRun, 3);
assertThatThrownBy(result::join)
.isInstanceOf(CompletionException.class)
.hasMessageContaining("IllegalStateException: Task failed after 3 attempts");
}
3. 重試CompletableFuture
CompletableFuture
API 提供了在異常發生時進行處理的選項。因此,我們可以使用exceptionally()
等方法,而不是建立函數裝飾器。
3.1.不安全重試
exceptionally()
方法使我們能夠指定一個替代函數,當初始呼叫完成但出現異常時將呼叫該函數。例如,如果我們打算重試呼叫兩次,我們可以利用 Fluent API 新增其中兩個後備:
static <T> CompletableFuture<T> retryTwice(Supplier<T> supplier) {
return CompletableFuture.supplyAsync(supplier)
.exceptionally(__ -> supplier.get())
.exceptionally(__ -> supplier.get());
}
由於我們需要可變次數的重試,因此我們重構程式碼並使用 for 迴圈:
static <T> CompletableFuture<T> retryUnsafe(Supplier<T> supplier, int maxRetries) {
CompletableFuture<T> cf = CompletableFuture.supplyAsync(supplier);
for (int i = 0; i < maxRetries; i++) {
cf = cf.exceptionally(__ -> supplier.get());
}
return cf;
}
我們可以使用相同的測試助手來測試retryUnsafe()
並預測類似的結果。儘管如此,如果初始供應商在最終的CompletableFuture
及其所有exceptionally()
後備創建之前完成,將會有一個微妙的區別。在這種情況下,函數確實會重試指定的次數。但是,這個重試過程會發生在主執行緒上,導致非同步性的喪失。
為了說明這一點,我們可以在 for 迴圈之前插入一個 100 毫秒的暫停,該迴圈會迭代呼叫exceptionally()
方法。
static <T> CompletableFuture<T> retryUnsafe(Supplier<T> supplier, int maxRetries) {
CompletableFuture<T> cf = CompletableFuture.supplyAsync(supplier);
sleep(100l);
for (int i = 0; i < maxRetries; i++) {
cf = cf.exceptionally(__ -> supplier.get());
}
return cf;
}
接下來,我們將修改failFourTimesThenReturn()
測試方法以記錄每次呼叫此方法時的嘗試次數和目前執行緒名稱。現在,讓我們重新運行測試並檢查控制台:
invocation: 0, thread: ForkJoinPool.commonPool-worker-1
invocation: 1, thread: main
invocation: 2, thread: main
invocation: 3, thread: main
invocation: 4, thread: main
正如預期的那樣,後續呼叫由主執行緒執行。如果初始呼叫很快,但後續呼叫預計會較慢,這可能會出現問題。
3.2.非同步重試
我們可以透過確保後續呼叫非同步執行來解決這個問題。為了實現這一點,從 Java 12 開始,API 中引入了一種專用方法。透過使用exceptionallyAsync()
,我們將確保所有重試都將非同步執行,無論初始CompletableFuture
完成的速度如何:
static <T> CompletableFuture<T> retryExceptionallyAsync(Supplier<T> supplier, int maxRetries) {
CompletableFuture<T> cf = CompletableFuture.supplyAsync(supplier);
for (int i = 0; i < maxRetries; i++) {
cf = cf.exceptionallyAsync(__ -> supplier.get());
}
return cf;
}
讓我們快速運行測試並檢查日誌:
invocation: 0, thread: ForkJoinPool.commonPool-worker-1
invocation: 1, thread: ForkJoinPool.commonPool-worker-1
invocation: 2, thread: ForkJoinPool.commonPool-worker-1
invocation: 3, thread: ForkJoinPool.commonPool-worker-2
invocation: 4, thread: ForkJoinPool.commonPool-worker-2
正如預期的那樣,主執行緒沒有執行任何呼叫。
3.3.嵌套CompletableFutures
如果我們需要一個與 Java 12 之前版本相容的解決方案,我們可以手動增強第一個範例以實現完全非同步。為了實現這一點,我們必須確保回退在新的CompletableFuture:
cf.exceptionally(__ -> CompletableFuture.supplyAsync(supplier))
然而,上面的程式碼不會編譯,因為資料類型不匹配,但我們可以透過三個步驟修復它。首先,我們需要雙重嵌套最初的Future.
我們可以透過compleatedFuture()
輕鬆做到這一點:
CompletableFuture<CompletableFuture<T>> temp = cf.thenApply(value -> CompletableFuture.completedFuture(value));
現在類型是匹配的,所以我們可以安全地應用exceptionally()
後備:
temp = temp.exceptionally(__ -> CompletableFuture.supplyAsync(supplier));
最後,我們將使用thenCompose()
來展平物件並返回原始類型:
cf = temp.thenCompose(t -> t);
最後,讓我們將所有內容結合起來,創建一個具有可變數量的非同步回退的CompletableFuture
。此外,讓我們利用流暢的 API、方法參考和實用函數來保持程式碼簡潔:
static <T> CompletableFuture<T> retryNesting(Supplier<T> supplier, int maxRetries) {
CompletableFuture<T> cf = CompletableFuture.supplyAsync(supplier);
for (int i = 0; i < maxRetries; i++) {
cf = cf.thenApply(CompletableFuture::completedFuture)
.exceptionally(__ -> CompletableFuture.supplyAsync(supplier))
.thenCompose(Function.identity());
}
return cf;
}
4。結論
在本文中,我們探討了在CompletableFuture
中重試函式呼叫的概念。我們首先深入研究函數式風格的裝飾模式的實現,使我們能夠重試函數本身。
隨後,我們利用CompletableFuture
API 來完成相同的任務,同時保持非同步流程。我們的發現包括 Java 12 中引入的exceptionallyAsync()
方法,它非常適合此目的。最後,我們提出了一種替代方法,僅依賴原始 Java 8 API 中的方法。
像往常一樣,我們可以在 GitHub 上找到工作程式碼範例。