Guava 的 Futures 和 ListenableFuture
- Guava
- java
一、簡介
Guava 為我們提供了ListenableFuture
其中包含一個基於默認 Java Future.
讓我們看看我們如何利用它來發揮我們的優勢。
2. Future
, ListenableFuture
和Futures
讓我們簡要地看看這些不同的類是什麼以及它們是如何相互關聯的。
2.1. Future
從Java 5,
我們可以使用java.util.concurrent.Future
來表示異步任務。
Future
允許我們訪問已經完成或將來可能完成的任務的結果,並支持取消它們。
2.2. ListenableFuture
java.util.concurrent.Future
時缺少的一個功能是添加偵聽器以在完成時運行,這是大多數流行的異步框架提供的常見功能。
Guava 通過允許我們將偵聽器附加到它的[com.google.common.util.concurrent.ListenableFuture](https://guava.dev/releases/29.0-jre/api/docs/com/google/common/util/concurrent/ListenableFuture.html "可聽未來") .
2.3. Futures
Guava 為我們提供了便利類com.google.common.util.concurrent.Futures
以使其更容易使用其ListenableFuture.
該類提供了多種與ListenableFuture,
交互的方式,其中包括支持添加成功/失敗回調,並允許我們通過聚合或轉換來協調多個期貨。
3. 簡單使用
現在讓我們看看如何以最簡單的方式ListenableFuture
創建和添加回調。
3.1.創造ListenableFuture
**ListenableFuture
的最簡單方法是向ListeningExecutorService
**提交任務(很像我們如何使用普通的ExecutorService
來獲取普通的Future
):
ExecutorService execService = Executors.newSingleThreadExecutor();
ListeningExecutorService lExecService = MoreExecutors.listeningDecorator(execService);
ListenableFuture<Integer> asyncTask = lExecService.submit(() -> {
TimeUnit.MILLISECONDS.sleep(500); // long running task
return 5;
});
請注意我們如何使用MoreExecutors
類將ExecutorService
裝飾為ListeningExecutorService.
我們可以參考線程池在 Guava 中的實現來了解更多關於MoreExecutors
。
如果我們已經有一個返回Future
的 API 並且我們需要將它轉換為ListenableFuture
,這很容易完成 通過初始化它的具體實現ListenableFutureTask:
// old api
public FutureTask<String> fetchConfigTask(String configKey) {
return new FutureTask<>(() -> {
TimeUnit.MILLISECONDS.sleep(500);
return String.format("%s.%d", configKey, new Random().nextInt(Integer.MAX_VALUE));
});
}
// new api
public ListenableFutureTask<String> fetchConfigListenableTask(String configKey) {
return ListenableFutureTask.create(() -> {
TimeUnit.MILLISECONDS.sleep(500);
return String.format("%s.%d", configKey, new Random().nextInt(Integer.MAX_VALUE));
});
}
我們需要注意,除非我們將它們提交給Executor.
直接與ListenableFutureTask
交互不是常見用法,僅在極少數情況下完成(例如:實現我們自己的ExecutorService
)。實際使用請參考 Guava 的[AbstractListeningExecutorService](https://github.com/google/guava/blob/v18.0/guava/src/com/google/common/util/concurrent/AbstractListeningExecutorService.java "AbstractListeningExecutorService")
如果我們的異步任務無法使用ListeningExecutorService
或提供的Futures
實用程序方法,我們也可以使用com.google.common.util.concurrent.SettableFuture
,並且我們需要手動設置未來值。對於更複雜的用法,我們還可以考慮com.google.common.util.concurrent.AbstractFuture.
3.2.添加偵聽器/回調
我們可以**ListenableFuture
添加偵聽器的一種Futures.addCallback(),
註冊回調,使我們可以在成功或失敗發生時訪問結果或異常:**
Executor listeningExecutor = Executors.newSingleThreadExecutor();
ListenableFuture<Integer> asyncTask = new ListenableFutureService().succeedingTask()
Futures.addCallback(asyncTask, new FutureCallback<Integer>() {
@Override
public void onSuccess(Integer result) {
// do on success
}
@Override
public void onFailure(Throwable t) {
// do on failure
}
}, listeningExecutor);
我們還**可以通過直接將偵聽器添加到ListenableFuture.
**請注意,此偵聽器將在未來成功或異常完成時運行。另請注意,我們無權訪問異步任務的結果:
Executor listeningExecutor = Executors.newSingleThreadExecutor();
int nextTask = 1;
Set<Integer> runningTasks = ConcurrentHashMap.newKeySet();
runningTasks.add(nextTask);
ListenableFuture<Integer> asyncTask = new ListenableFutureService().succeedingTask()
asyncTask.addListener(() -> runningTasks.remove(nextTask), listeningExecutor);
4. 複雜的用法
現在讓我們看看如何在更複雜的場景中使用這些期貨。
4.1.扇入
我們有時可能需要調用多個異步任務並收集它們的結果,通常稱為扇入操作。
Guava 為我們提供了兩種方法。但是,我們應該謹慎地根據我們的要求選擇正確的方法。假設我們需要協調以下異步任務:
ListenableFuture<String> task1 = service.fetchConfig("config.0");
ListenableFuture<String> task2 = service.fetchConfig("config.1");
ListenableFuture<String> task3 = service.fetchConfig("config.2");
扇入多個期貨的一種方法是使用Futures.allAsList()
方法。如果所有期貨都成功,這允許我們按照提供的期貨的順序收集所有期貨的結果。如果這些 future 中的任何一個失敗,那麼整個結果就是一個失敗的 future:
ListenableFuture<List<String>> configsTask = Futures.allAsList(task1, task2, task3);
Futures.addCallback(configsTask, new FutureCallback<List<String>>() {
@Override
public void onSuccess(@Nullable List<String> configResults) {
// do on all futures success
}
@Override
public void onFailure(Throwable t) {
// handle on at least one failure
}
}, someExecutor);
如果我們需要收集所有異步任務的結果,無論它們是否失敗,我們都可以使用Futures.successfulAsList()
。這將返回一個列表,其結果將與傳遞給參數的任務具有相同的順序,失敗的任務將在列表中的各自位置分配null
ListenableFuture<List<String>> configsTask = Futures.successfulAsList(task1, task2, task3);
Futures.addCallback(configsTask, new FutureCallback<List<String>>() {
@Override
public void onSuccess(@Nullable List<String> configResults) {
// handle results. If task2 failed, then configResults.get(1) == null
}
@Override
public void onFailure(Throwable t) {
// handle failure
}
}, listeningExecutor);
在上面的用法中我們應該小心,如果未來的任務通常null
,它將與失敗的任務(也將結果設置為null
)無法區分。
4.2.用組合器扇入
如果我們需要協調多個返回不同結果的期貨,上面的解決方案可能還不夠。在這種情況下,我們可以使用扇入操作的組合器變體來協調這種期貨組合。
與簡單的扇入操作類似, Guava 為我們提供了兩種變體;一種在所有任務成功完成時成功,一種在使用Futures.whenAllSucceed()
和Futures.whenAllComplete()
方法時即使某些任務失敗也成功。
讓我們看看如何使用Futures.whenAllSucceed()
組合來自多個期貨的不同結果類型:
ListenableFuture<Integer> cartIdTask = service.getCartId();
ListenableFuture<String> customerNameTask = service.getCustomerName();
ListenableFuture<List<String>> cartItemsTask = service.getCartItems();
ListenableFuture<CartInfo> cartInfoTask = Futures.whenAllSucceed(cartIdTask, customerNameTask, cartItemsTask)
.call(() -> {
int cartId = Futures.getDone(cartIdTask);
String customerName = Futures.getDone(customerNameTask);
List<String> cartItems = Futures.getDone(cartItemsTask);
return new CartInfo(cartId, customerName, cartItems);
}, someExecutor);
Futures.addCallback(cartInfoTask, new FutureCallback<CartInfo>() {
@Override
public void onSuccess(@Nullable CartInfo result) {
//handle on all success and combination success
}
@Override
public void onFailure(Throwable t) {
//handle on either task fail or combination failed
}
}, listeningExecService);
如果我們需要允許某些任務失敗,我們可以使用Futures.whenAllComplete()
。雖然語義與上面的基本相似,但我們應該注意,當對它們調用Futures.getDone()
ExecutionException
4.3.轉型
有時我們需要轉換成功後的結果。 Futures.transform()
和Futures.lazyTransform()
為我們提供了兩種方法。
讓我們看看如何使用Futures.transform()
來轉換 Future 的結果。只要轉換計算量不大,就可以使用它:
ListenableFuture<List<String>> cartItemsTask = service.getCartItems();
Function<List<String>, Integer> itemCountFunc = cartItems -> {
assertNotNull(cartItems);
return cartItems.size();
};
ListenableFuture<Integer> itemCountTask = Futures.transform(cartItemsTask, itemCountFunc, listenExecService);
**我們還可以使用Futures.lazyTransform()
**將轉換函數應用於java.util.concurrent.Future.
我們需要記住,這個選項不返回一個ListenableFuture
而是一個普通的java.util.concurrent.Future
get()
在結果未來上調用時都會應用轉換函數。
4.4.鍊式期貨
我們可能會遇到我們的期貨需要調用其他期貨的情況。在這種情況下,Guava 為我們提供了async()
變體來安全地鏈接這些期貨以一個接一個地執行。
讓我們看看如何使用Futures.submitAsync()
從提交Callable
內部調用未來:
AsyncCallable<String> asyncConfigTask = () -> {
ListenableFuture<String> configTask = service.fetchConfig("config.a");
TimeUnit.MILLISECONDS.sleep(500); //some long running task
return configTask;
};
ListenableFuture<String> configTask = Futures.submitAsync(asyncConfigTask, executor);
如果我們想要真正的鏈接,其中一個 future 的結果被饋送到另一個 future 的計算中,我們可以使用Futures.transformAsync()
:
ListenableFuture<String> usernameTask = service.generateUsername("john");
AsyncFunction<String, String> passwordFunc = username -> {
ListenableFuture<String> generatePasswordTask = service.generatePassword(username);
TimeUnit.MILLISECONDS.sleep(500); // some long running task
return generatePasswordTask;
};
ListenableFuture<String> passwordTask = Futures.transformAsync(usernameTask, passwordFunc, executor);
Guava 還為我們提供了Futures.scheduleAsync()
和Futures.catchingAsync()
來分別提交計劃任務和提供錯誤恢復的回退任務。雖然它們迎合不同的場景,但我們不會討論它們,因為它們類似於其他async()
調用。
5. 使用注意事項
現在讓我們研究一下在使用期貨時可能遇到的一些常見陷阱以及如何避免它們。
5.1.工作與傾聽執行者
在使用 Guava 期貨時,了解工作執行者和監聽執行者之間的區別很重要。例如,假設我們有一個異步任務來獲取配置:
public ListenableFuture<String> fetchConfig(String configKey) {
return lExecService.submit(() -> {
TimeUnit.MILLISECONDS.sleep(500);
return String.format("%s.%d", configKey, new Random().nextInt(Integer.MAX_VALUE));
});
}
假設我們想為上述未來附加一個偵聽器:
ListenableFuture<String> configsTask = service.fetchConfig("config.0");
Futures.addCallback(configsTask, someListener, listeningExecutor);
請注意, lExecService
是運行異步任務的執行器,而listeningExecutor
器的執行器。
如上所示,我們應該始終考慮將這兩個執行程序分開,以避免出現我們的偵聽器和工作線程爭用相同線程池資源的情況。共享同一個 executor 可能會導致我們的繁重任務使 listener 執行餓死。或者一個寫得很糟糕的重量級聽眾最終阻止了我們重要的重型任務。
5.2.小心使用directExecutor()
雖然我們可以MoreExecutors.directExecutor()
和MoreExecutors.newDirectExecutorService()
來更容易地處理異步執行,但在生產代碼中我們應該小心使用它們。
當我們從上述方法中獲取到 executor 時,我們提交給它的任何任務,無論是重量級的還是監聽器,都會在當前線程上執行。如果當前執行上下文是需要高吞吐量的上下文,則這可能很危險。
比如使用一個directExecutor
,在UI線程中向它提交一個重量級的任務,就會自動阻塞我們的UI線程。
我們也可能面臨這樣一種情況:我們的聽眾 最終會減慢我們所有其他偵聽器(即使是那些與directExecutor
)。這是因為 Guava 在各自的Executors,
while
循環中的所有偵聽器,但是directExecutor
會導致偵聽器運行在與while
循環相同的線程中。
5.3.嵌套期貨不好
在使用鍊式期貨時,我們應該注意不要以創建嵌套期貨的方式從另一個期貨內部調用一個期貨:
public ListenableFuture<String> generatePassword(String username) {
return lExecService.submit(() -> {
TimeUnit.MILLISECONDS.sleep(500);
return username + "123";
});
}
String firstName = "john";
ListenableFuture<ListenableFuture<String>> badTask = lExecService.submit(() -> {
final String username = firstName.replaceAll("[^a-zA-Z]+", "")
.concat("@service.com");
return generatePassword(username);
});
如果我們曾經看到過包含ListenableFuture<ListenableFuture<V>>,
那麼我們應該知道這是一個寫得很糟糕的未來,因為外部未來的取消和完成有可能競爭,並且取消可能不會傳播到內心的未來。
如果我們看到上述情況,我們應該始終使用Futures.async()
變體以連接的方式安全地解開這些鍊式期貨。
5.4.小心JdkFutureAdapters.listenInPoolThread()
Guava 建議我們利用其ListenableFuture
的最佳方式是將所有使用Future
的代碼轉換為ListenableFuture.
如果在某些情況下這種轉換不可行, Guava 為我們提供了適配器來使用JdkFutureAdapters.listenInPoolThread()
覆蓋來做到這一點。雖然這看起來很有幫助,但Guava 警告我們這些是重量級的適配器,應該盡可能避免。
六,結論
在本文中,我們看到瞭如何使用 Guava 的ListenableFuture
來豐富我們對 futures 的使用,以及如何使用Futures
API 來更輕鬆地使用這些 futures。
我們還看到了在使用這些期貨和提供的執行程序時可能會犯的一些常見錯誤。