從 CompletableFuture 列表轉換為 CompletableFuture 列表
1. 概述
在本教學中,我們將學習如何將List<CompletableFuture<T>>
物件轉換為CompletableFuture<List<T>>
。
這種轉換在許多情況下非常有用。一個主要的例子是,當我們必須多次呼叫遠端服務(通常是非同步操作)並將結果聚合到單一List
中時。此外,我們最終會等待一個CompletableFuture
對象,該對像在所有操作完成時為我們提供結果列表,或在一個或多個操作失敗時拋出異常。
我們將首先看到一種簡單的轉換方法,然後再看看一種更簡單、更安全的方法。
2. 連結CompletableFuture
實現此目的的一種方法是使用其thenCompose()
方法連結CompletableFuture
。這樣,我們就可以創建一個單一對象,一旦所有先前的 future 都一一解決,該對象就會解決,類似於多米諾骨牌構造。
2.1.執行
首先,讓我們建立一個模擬非同步操作:
public class Application {
ScheduledExecutorService asyncOperationEmulation;
Application initialize() {
asyncOperationEmulation = Executors.newScheduledThreadPool(10);
return this;
}
CompletableFuture<String> asyncOperation(String operationId) {
CompletableFuture<String> cf = new CompletableFuture<>();
asyncOperationEmulation.submit(() -> {
Thread.sleep(100);
cf.complete(operationId);
});
return cf;
}
我們創建了一個Application
類別來託管我們的測試程式碼和asyncOperation()
方法,僅休眠100
毫秒。我們使用具有10
線程的Executor
來非同步分派所有內容。
為了收集所有操作結果(在本例中為簡單的operationId
字串),我們將連結從asyncOperation()
方法產生的CompletableFuture
:
void startNaive() {
List<CompletableFuture<String>> futures = new ArrayList<>();
for (int i = 1; i <= 1000; i++) {
String operationId = "Naive-Operation-" + i;
futures.add(asyncOperation(operationId));
}
CompletableFuture<List<String>> aggregate = CompletableFuture.completedFuture(new ArrayList<>());
for (CompletableFuture<String> future : futures) {
aggregate = aggregate.thenCompose(list -> {
list.add(future.get());
return CompletableFuture.completedFuture(list);
});
}
final List<String> results = aggregate.join();
for (int i = 0; i < 10; i++) {
System.out.println("Finished " + results.get(i));
}
close();
}
我們首先使用靜態的completedFuture()
方法建立一個已完成的CompleteableFuture
,並提供一個空List
作為完成結果。使用thenCompose()
我們建立一個Runnable
,它在前一個 future 完成後立即執行。 thenCompose()
方法傳回一個新的CompletableFuture
,一旦第一個和第二個 future 完成,它就會解析。我們用這個新的 future 物件取代aggregate
參考。這使我們能夠在futures
列表的迭代循環內繼續連結這些呼叫。
在我們創建的Runnable
內部,我們等待future
完成並將結果加入list
。然後我們傳回一個完整的 future 以及該list
和結果。這會將list
進一步傳遞到thenCompose()
鏈,讓我們將future
結果一一add
。
一旦所有 future 都被連結起來,我們就對aggregate
CompletableFuture
呼叫join()
。這是專門針對該範例完成的,以便我們可以檢索結果並阻止主 Java 執行緒在aggregate
完成之前退出。在真正的非同步場景中,我們可能會在thenAccept()
或whenComplete()
呼叫中加入回呼邏輯。
需要注意的一件事是我們在最後添加了一個close()
調用,實現如下:
void close() {
asyncOperationEmulation.shutdownNow();
}
應用程式退出時必須關閉所有Executors
,否則 Java 進程將掛起。
2.2.實施問題
這種簡單的實作存在一些問題。 future 鏈不僅引入了不必要的複雜性,而且還創建了大量不需要的對象,例如thenCompose()
產生的所有新CompletableFuture
。
當我們執行大量操作時,會出現另一個潛在問題。如果操作失敗,並且根據 Java 實作如何解析CompletableFuture
鏈,如果解析是遞歸完成的,我們可能會得到StackOverflowError
。
為了測試異常場景,我們可以透過更改asyncOperation()
方法在其中一項操作上引入錯誤:
CompletableFuture<String> asyncOperation(String operationId) {
CompletableFuture<String> cf = new CompletableFuture<>();
asyncOperationEmulation.submit(() -> {
if (operationId.endsWith("567")) {
cf.completeExceptionally(new Exception("Error on operation " + operationId));
return;
}
Thread.sleep(100);
cf.complete(operationId);
});
return cf;
}
在這種情況下,未來567th
操作將會異常完成,使aggregate.join()
呼叫也會拋出執行時期例外。
3.使用CompletableFuture.allOf()
一種不同且更好的方法是使用CompletableFuture
API 的allOf()
方法。此方法採用一組CompletableFuture
物件並建立一個新對象,該物件在所有提供的 future 本身解析時解析。
此外,如果其中一個期貨失敗,那麼整個期貨也會失敗。新的未來不包含結果清單。要取得它們,我們必須檢查對應的CompletableFuture
物件.
3.1.執行
讓我們使用 allOf() 來建立一個新的start()
方法allOf():
void start() {
List<CompletableFuture<String>> futures = new ArrayList<>();
for (int i = 1; i <= 1000; i++) {
String operationId = "Operation-" + i;
futures.add(asyncOperation(operationId));
}
CompletableFuture<?>[] futuresArray = futures.toArray(new CompletableFuture<?>[0]);
CompletableFuture<List<String>> listFuture = CompletableFuture.allOf(futuresArray)
.thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
final List<String> results = listFuture.join();
System.out.println("Printing first 10 results");
for (int i = 0; i < 10; i++) {
System.out.println("Finished " + results.get(i));
}
close();
}
設定和結果列印是相同的,但是我們現在有一個futuresArray
並將其提供給allOf()
。我們使用thenApply()
在allOf()
解析後加入邏輯。在此回調中,我們使用CompletableFuture.join()
方法收集所有futures
結果並將它們收集到List
中。此清單是thenApply()
產生的CompletableFuture
中所包含的結果,即listFuture
。
為了展示聚合結果,我們使用join()
方法,該方法會阻塞main
線程,直到listFuture
完成。我們不應該忘記最後的close()
呼叫。
3.2. allOf()
的優點
基於allOf()
的實作比未來的連結更簡單、更清晰。它不會產生不需要的對象,從而減少記憶體佔用。此外,它為我們提供了一種處理CompletableFuture
的任意組合的簡單方法.
4。結論
在本文中,我們學習如何將List<CompletableFuture<T>>
轉換為CompletableFuture<List<T>>
。我們了解為什麼這種轉換很有用,並看到了兩種實作方式,一種是簡單的實現,另一種是使用正確的 Java API。我們與前者討論了潛在的問題以及後者如何避免這些問題。
與往常一樣,本文的源代碼可在 GitHub 上取得。