從 CompletableFuture 列表轉換為 CompletableFuture 列表
1. 概述
在本教學中,我們將學習如何將List<CompletableFuture<T>>物件轉換為CompletableFuture<List<T>> 。
這種轉換在許多情況下非常有用。一個主要的例子是,當我們必須多次呼叫遠端服務(通常是非同步操作)並將結果聚合到單一List中時。此外,我們最終會等待一個CompletableFuture對象,該對像在所有操作完成時為我們提供結果列表,或在一個或多個操作失敗時拋出異常。
我們將首先看到一種簡單的轉換方法,然後再看看一種更簡單、更安全的方法。
2. 連結CompletableFuture
實現此目的的一種方法是使用其thenCompose()方法連結CompletableFuture 。這樣,我們就可以創建一個單一對象,一旦所有先前的 future 都一一解決,該對象就會解決,類似於多米諾骨牌構造。
2.1.執行
首先,讓我們建立一個模擬非同步操作:
public class Application {
ScheduledExecutorService asyncOperationEmulation;
Application initialize() {
asyncOperationEmulation = Executors.newScheduledThreadPool(10);
return this;
}
CompletableFuture<String> asyncOperation(String operationId) {
CompletableFuture<String> cf = new CompletableFuture<>();
asyncOperationEmulation.submit(() -> {
Thread.sleep(100);
cf.complete(operationId);
});
return cf;
}
我們創建了一個Application類別來託管我們的測試程式碼和asyncOperation()方法,僅休眠100毫秒。我們使用具有10線程的Executor來非同步分派所有內容。
為了收集所有操作結果(在本例中為簡單的operationId字串),我們將連結從asyncOperation()方法產生的CompletableFuture :
void startNaive() {
List<CompletableFuture<String>> futures = new ArrayList<>();
for (int i = 1; i <= 1000; i++) {
String operationId = "Naive-Operation-" + i;
futures.add(asyncOperation(operationId));
}
CompletableFuture<List<String>> aggregate = CompletableFuture.completedFuture(new ArrayList<>());
for (CompletableFuture<String> future : futures) {
aggregate = aggregate.thenCompose(list -> {
list.add(future.get());
return CompletableFuture.completedFuture(list);
});
}
final List<String> results = aggregate.join();
for (int i = 0; i < 10; i++) {
System.out.println("Finished " + results.get(i));
}
close();
}
我們首先使用靜態的completedFuture()方法建立一個已完成的CompleteableFuture ,並提供一個空List作為完成結果。使用thenCompose()我們建立一個Runnable ,它在前一個 future 完成後立即執行。 thenCompose()方法傳回一個新的CompletableFuture ,一旦第一個和第二個 future 完成,它就會解析。我們用這個新的 future 物件取代aggregate參考。這使我們能夠在futures列表的迭代循環內繼續連結這些呼叫。
在我們創建的Runnable內部,我們等待future完成並將結果加入list 。然後我們傳回一個完整的 future 以及該list和結果。這會將list進一步傳遞到thenCompose()鏈,讓我們將future結果一一add 。
一旦所有 future 都被連結起來,我們就對aggregate CompletableFuture呼叫join() 。這是專門針對該範例完成的,以便我們可以檢索結果並阻止主 Java 執行緒在aggregate完成之前退出。在真正的非同步場景中,我們可能會在thenAccept()或whenComplete()呼叫中加入回呼邏輯。
需要注意的一件事是我們在最後添加了一個close()調用,實現如下:
void close() {
asyncOperationEmulation.shutdownNow();
}
應用程式退出時必須關閉所有Executors ,否則 Java 進程將掛起。
2.2.實施問題
這種簡單的實作存在一些問題。 future 鏈不僅引入了不必要的複雜性,而且還創建了大量不需要的對象,例如thenCompose()產生的所有新CompletableFuture 。
當我們執行大量操作時,會出現另一個潛在問題。如果操作失敗,並且根據 Java 實作如何解析CompletableFuture鏈,如果解析是遞歸完成的,我們可能會得到StackOverflowError 。
為了測試異常場景,我們可以透過更改asyncOperation()方法在其中一項操作上引入錯誤:
CompletableFuture<String> asyncOperation(String operationId) {
CompletableFuture<String> cf = new CompletableFuture<>();
asyncOperationEmulation.submit(() -> {
if (operationId.endsWith("567")) {
cf.completeExceptionally(new Exception("Error on operation " + operationId));
return;
}
Thread.sleep(100);
cf.complete(operationId);
});
return cf;
}
在這種情況下,未來567th操作將會異常完成,使aggregate.join()呼叫也會拋出執行時期例外。
3.使用CompletableFuture.allOf()
一種不同且更好的方法是使用CompletableFuture API 的allOf()方法。此方法採用一組CompletableFuture物件並建立一個新對象,該物件在所有提供的 future 本身解析時解析。
此外,如果其中一個期貨失敗,那麼整個期貨也會失敗。新的未來不包含結果清單。要取得它們,我們必須檢查對應的CompletableFuture物件.
3.1.執行
讓我們使用 allOf() 來建立一個新的start()方法allOf():
void start() {
List<CompletableFuture<String>> futures = new ArrayList<>();
for (int i = 1; i <= 1000; i++) {
String operationId = "Operation-" + i;
futures.add(asyncOperation(operationId));
}
CompletableFuture<?>[] futuresArray = futures.toArray(new CompletableFuture<?>[0]);
CompletableFuture<List<String>> listFuture = CompletableFuture.allOf(futuresArray)
.thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
final List<String> results = listFuture.join();
System.out.println("Printing first 10 results");
for (int i = 0; i < 10; i++) {
System.out.println("Finished " + results.get(i));
}
close();
}
設定和結果列印是相同的,但是我們現在有一個futuresArray並將其提供給allOf() 。我們使用thenApply()在allOf()解析後加入邏輯。在此回調中,我們使用CompletableFuture.join()方法收集所有futures結果並將它們收集到List中。此清單是thenApply()產生的CompletableFuture中所包含的結果,即listFuture 。
為了展示聚合結果,我們使用join()方法,該方法會阻塞main線程,直到listFuture完成。我們不應該忘記最後的close()呼叫。
3.2. allOf()的優點
基於allOf()的實作比未來的連結更簡單、更清晰。它不會產生不需要的對象,從而減少記憶體佔用。此外,它為我們提供了一種處理CompletableFuture的任意組合的簡單方法.
4。結論
在本文中,我們學習如何將List<CompletableFuture<T>>轉換為CompletableFuture<List<T>> 。我們了解為什麼這種轉換很有用,並看到了兩種實作方式,一種是簡單的實現,另一種是使用正確的 Java API。我們與前者討論了潛在的問題以及後者如何避免這些問題。
與往常一樣,本文的源代碼可在 GitHub 上取得。