如何在循環中使用 CompletableFuture 收集所有結果並處理異常
1. 概述
Java 8 的CompletableFuture
非常適合處理非同步計算。例如,Web 用戶端在進行伺服器呼叫時可以使用CompletableFuture
。開始並處理單獨的CompletableFuture
回應很容易。然而,目前還不清楚如何收集多個CompletableFuture
執行的結果,同時處理異常。
在本教程中,我們將開發一個傳回CompletableFuture,
並了解如何多次呼叫它以產生成功和失敗的摘要。
2. 微服務客戶端範例
對於我們的範例,讓我們編寫一個簡單的微服務客戶端,負責建立資源並傳回該資源的標識符。
我們將聲明一個簡單的介面MicroserviceClient ,我們可以在單元測試中模擬(使用 Mockito):
interface MicroserviceClient {
CompletableFuture<Long> createResource(String resourceName);
}
單元測試CompletableFuture
有其自身的挑戰,但測試MicroserviceClient
單一呼叫將很簡單。我們不在這裡詳細說明這一點,而是繼續處理可能引發異常的多個客戶端呼叫。
3. 組合對微服務的多個調用
讓我們先建立一個單元測試並宣告一個MicroserviceClient
的模擬,該模擬對「 Good Resource
」的輸入回傳成功回應,並針對「 Bad Resource
」的輸入引發異常:
@ParameterizedTest
@MethodSource("clientData")
public void givenMicroserviceClient_whenMultipleCreateResource_thenCombineResults(List<String> inputs,
int expectedSuccess, int expectedFailure) throws ExecutionException, InterruptedException {
MicroserviceClient mockMicroservice = mock(MicroserviceClient.class);
when(mockMicroservice.createResource("Good Resource"))
.thenReturn(CompletableFuture.completedFuture(123L));
when(mockMicroservice.createResource("Bad Resource"))
.thenReturn(CompletableFuture.failedFuture(new IllegalArgumentException("Bad Resource")));
}
我們將使其成為參數化測試,並使用MethodSource
傳遞不同的資料集。我們需要建立一個靜態方法來為我們的測試提供 JUnit Arguments
Stream
:
private static Stream<Arguments> clientData() {
return Stream.of(
Arguments.of(List.of("Good Resource"), 1, 0),
Arguments.of(List.of("Bad Resource"), 0, 1),
Arguments.of(List.of("Good Resource", "Bad Resource"), 1, 1),
Arguments.of(List.of("Good Resource", "Bad Resource", "Good Resource", "Bad Resource",
"Good Resource"), 3, 2)
);
}
這將建立四個測試執行,傳遞輸入清單以及預期的成功和失敗計數。
接下來,讓我們傳回單元測試並使用測試資料呼叫MicroserviceClient
並將每個產生的CompletableFuture
收集到List
中:
List<CompletableFuture<Long>> clientCalls = new ArrayList<>();
for (String resource : inputs) {
clientCalls.add(mockMicroservice.createResource(resource));
}
現在,我們有了問題的核心部分:我們需要完成並收集結果的CompletableFuture
物件List
,同時處理我們遇到的任何異常。
3.1.處理例外
在討論如何完成每個CompletableFuture
之前,讓我們先定義一個用於處理異常的輔助方法。我們還將定義並模擬一個Logger
來模擬現實世界的錯誤處理:
private final Logger logger = mock(Logger.class);
private Long handleError(Throwable throwable) {
logger.error("Encountered error: " + throwable);
return -1L;
}
interface Logger {
void error(String message);
}
輔助方法只是「記錄」錯誤訊息並傳回-1
,我們用它來指定無效資源。
3.2.使用異常處理完成CompletableFuture
現在,我們需要完成所有的CompletableFuture
並適當地處理任何異常。我們可以利用**CompleteableFuture
為我們提供的**一些工具來做到這一點:
-
exceptionally()
:如果CompletableFuture
因例外狀況而完成,則接受一個要執行的函數 -
join()
:CompletableFuture
完成後回傳結果
然後,我們可以定義一個輔助方法來完成單一CompletableFuture
:
private Long handleFuture(CompletableFuture<Long> future) {
return future
.exceptionally(this::handleError)
.join();
}
值得注意的是,我們使用exceptionally()
來處理MicroserviceClient
呼叫可能透過我們的handleError()
輔助方法拋出的任何例外。最後,我們在CompletableFuture
上呼叫join()
來等待客戶端呼叫完成並傳回其資源標識符。
3.3.處理CompletableFuture
List
回到我們的單元測試,我們現在可以利用我們的幫助器方法以及 Java 的 Stream API 來建立一個簡單的語句來解析所有客戶端呼叫:
Map<Boolean, List<Long>> resultsByValidity = clientCalls.stream()
.map(this::handleFuture)
.collect(Collectors.partitioningBy(resourceId -> resourceId != -1L));
我們來分解一下這個聲明:
- 我們使用
handleFuture()
輔助方法將每個CompletableFuture
對應到結果資源識別碼中 - 我們使用 Java 的
Collectors.partitioningBy()
實用程式根據有效性將產生的資源標識符拆分為單獨的列表
我們可以透過使用分區List
大小的斷言以及檢查對模擬Logger:
List<Long> validResults = resultsByValidity.getOrDefault(true, List.of());
assertThat(validResults.size()).isEqualTo(successCount);
List<Long> invalidResults = resultsByValidity.getOrDefault(false, List.of());
assertThat(invalidResults.size()).isEqualTo(errorCount);
verify(logger, times(errorCount))
.error(eq("Encountered error: java.lang.IllegalArgumentException: Bad Resource"));
運行測試,我們可以看到分區列表符合我們的預期。
4。結論
在本文中,我們學習如何處理完成CompletableFuture
的集合。如有必要,我們可以輕鬆擴展我們的方法以使用更強大的錯誤處理或複雜的業務邏輯。
與往常一樣,本文的所有程式碼都可以在 GitHub 上找到。