在 Java CompletableFuture 中處理異常
一、簡介
Java 8引入了一個基於Future
新抽象來執行非同步任務CompletableFuture
類別。它基本上是為了克服舊的Future
API 的問題。
在本教學中,我們將研究使用CompletableFuture
時處理異常的方法。
2. CompletableFuture
回顧
首先,我們可能需要回顧一下CompletableFuture
是什麼。 CompletableFuture
是一個Future
實現,它允許我們運行,最重要的是,鍊式非同步操作。一般來說,非同步操作完成可能有三種結果—正常、異常或可以從外部取消。 CompletableFuture
有各種 API 方法來解決所有這些可能的結果。
與CompletableFuture
中的許多其他方法一樣,這些方法具有非非同步、非同步和使用特定Executor
變體的非同步。那麼,事不宜遲,我們來一一看看CompletableFuture
中處理異常的方法。
3. handle()
首先,我們有一個handle()
方法。透過使用此方法,無論結果如何,我們都可以存取和轉換CompletionStage
的整個結果。也就是說, handle()
方法接受BiFunction
函數介面。所以,這個介面有兩個輸入。在handle()
方法的情況下,參數將是先前CompletionStage
的結果和發生的Exception
。
重要的是這兩個參數都是可選的,這意味著它們可以為null
。這在某種意義上是顯而易見的,因為先前的CompletionStage
已經正常完成。那麼Exception
應該為null
因為沒有任何異常,與CompletionStage
結果值可為 null 性類似。
現在讓我們來看看handle()
方法的使用範例:
@ParameterizedTest
@MethodSource("parametersSource_handle")
void whenCompletableFutureIsScheduled_thenHandleStageIsAlwaysInvoked(int radius, long expected)
throws ExecutionException, InterruptedException {
long actual = CompletableFuture
.supplyAsync(() -> {
if (radius <= 0) {
throw new IllegalArgumentException("Supplied with non-positive radius '%d'");
}
return Math.round(Math.pow(radius, 2) * Math.PI);
})
.handle((result, ex) -> {
if (ex == null) {
return result;
} else {
return -1L;
}
})
.get();
Assertions.assertThat(actual).isEqualTo(expected);
}
static Stream<Arguments> parameterSource_handle() {
return Stream.of(Arguments.of(1, 3), Arguments.of(1, -1));
}
這裡要注意的是, handle()
方法傳回一個新的CompletionStage
,它將永遠執行,無論先前的CompletionStage
結果如何。因此, handle()
將前一階段的來源值轉換為某個輸出值。因此,我們將透過get()
方法取得的值是從the handle()
方法傳回的值。
4. exceptionally()
The handle()
方法並不總是很方便,特別是當我們只想在存在異常時處理異常。幸運的是,我們有一個替代方案exceptionally()
。
此方法允許我們提供僅在前一個CompletionStage
以Exception
結束時才執行的回呼。如果沒有拋出異常,則省略回調,並且執行鏈將使用前一個回調的值繼續到下一個回調(如果有)。
為了理解,讓我們來看一個具體的例子:
@ParameterizedTest
@MethodSource("parametersSource_exceptionally")
void whenCompletableFutureIsScheduled_thenExceptionallyExecutedOnlyOnFailure(int a, int b, int c, long expected)
throws ExecutionException, InterruptedException {
long actual = CompletableFuture
.supplyAsync(() -> {
if (a <= 0 || b <= 0 || c <= 0) {
throw new IllegalArgumentException(String.format("Supplied with incorrect edge length [%s]", List.of(a, b, c)));
}
return a * b * c;
})
.exceptionally((ex) -> -1)
.get();
Assertions.assertThat(actual).isEqualTo(expected);
}
static Stream<Arguments> parametersSource_exceptionally() {
return Stream.of(
Arguments.of(1, 5, 5, 25),
Arguments.of(-1, 10, 15, -1)
);
}
所以在這裡,它的工作方式與handle()
相同,但是我們有一個Exception
實例作為回呼的參數。這個參數永遠不會為null
,所以我們的程式碼現在更簡單了。
這裡要注意的重要一點是,只有在當前一階段以Exception
完成時, exceptionally()
方法的回呼才會執行。它基本上意味著,如果Exception
發生在執行鏈中的某個地方,並且已經有一個handle()
方法捕獲了它——之後的excpetionally()
回調將不會被執行:
@ParameterizedTest
@MethodSource("parametersSource_exceptionally")
void givenCompletableFutureIsScheduled_whenHandleIsAlreadyPresent_thenExceptionallyIsNotExecuted(int a, int b, int c, long expected)
throws ExecutionException, InterruptedException {
long actual = CompletableFuture
.supplyAsync(() -> {
if (a <= 0 || b <= 0 || c <= 0) {
throw new IllegalArgumentException(String.format("Supplied with incorrect edge length [%s]", List.of(a, b, c)));
}
return a * b * c;
})
.handle((result, throwable) -> {
if (throwable != null) {
return -1;
}
return result;
})
.exceptionally((ex) -> {
System.exit(1);
return 0;
})
.get();
Assertions.assertThat(actual).isEqualTo(expected);
}
這裡,沒有呼叫exceptionally()
,因為handle()
方法已經捕獲了Exception
(如果有)。因此,除非Exception
發生在handle()
方法內部,否則這裡的exceptionally()
方法將不會被執行。
5. 完成式()
API 中還有一個whenComplete()
方法。它接受帶有兩個參數的BiConsumer
:結果和前階段的異常(如果有)。然而,這種方法與上面的方法有很大不同。
不同之處在於, whenComplete()
不會轉換前一階段的任何異常結果。因此,即使考慮到whenComplete()
的回呼將始終運行,前一階段的異常(如果有)也將進一步傳播:
@ParameterizedTest
@MethodSource("parametersSource_whenComplete")
void whenCompletableFutureIsScheduled_thenWhenCompletedExecutedAlways(Double a, long expected) {
try {
CountDownLatch countDownLatch = new CountDownLatch(1);
long actual = CompletableFuture
.supplyAsync(() -> {
if (a.isNaN()) {
throw new IllegalArgumentException("Supplied value is NaN");
}
return Math.round(Math.pow(a, 2));
})
.whenComplete((result, exception) -> countDownLatch.countDown())
.get();
Assertions.assertThat(countDownLatch.await(20L, java.util.concurrent.TimeUnit.SECONDS));
Assertions.assertThat(actual).isEqualTo(expected);
} catch (Exception e) {
Assertions.assertThat(e.getClass()).isSameAs(ExecutionException.class);
Assertions.assertThat(e.getCause().getClass()).isSameAs(IllegalArgumentException.class);
}
}
static Stream<Arguments> parametersSource_whenComplete() {
return Stream.of(
Arguments.of(2d, 4),
Arguments.of(Double.NaN, 1)
);
}
正如我們在這裡看到的, whenCompleted()
內部的回呼在兩個測試呼叫中運行。然而,在第二次呼叫中,我們完成了ExecutionException
,這導致了IllegalArgumentException
。因此,正如我們所看到的,回調中的異常會傳播到被呼叫者。我們將在下一節中介紹發生這種情況的原因。
6. 未處理的異常
最後,我們需要稍微討論一下未處理的異常。一般來說,如果異常仍未捕獲,則CompletableFuture
將以不會傳播到被呼叫者的Exception
完成。在上面的例子中,我們從get()
方法呼叫中取得ExecutionException
。所以,這是因為我們試圖在CompletableFuture
以Exception
結束時存取結果。
因此,我們需要在get()
呼叫之前檢查CompletableFuture
的結果。有幾種方法可以做到這一點。第一個也可能是所有方法中最熟悉的方法是透過isCompletedExceptionally()/isCancelled()/isDone()
方法.
如果CompletableFutre
完成但出現異常、從外部取消或成功完成,這些方法將傳回boolean
。
不過,值得一提的是,還有一個state()
方法傳回State
枚舉實例。此實例表示CompletableFuture
的狀態,例如RUNNING
、 SUCCESS,
因此,這是存取CompletableFuture
結果的另一種方式。
七、結論
在本文中,我們探討了處理CompletableFuture
階段中發生的異常的方法。
與往常一樣,本文的源代碼可在 GitHub 上取得。