Java 中的 CompletableFuture 和 ThreadPool
1. 概述
Java 8 的並發 API 引入了CompletableFuture,
這是一個用於簡化異步和非阻塞編程的寶貴工具。
在本文中,我們將討論 Java 的CompletableFuture
及其利用的線程池。我們將探討其異步和非異步方法之間的差異,並學習如何最大限度地發揮CompletableFuture
API 的潛力。
2. 非異步方法
CompletableFuture
提供了由 50 多種方法組成的廣泛 API。其中許多方法都有兩種變體: non-async
和async.
讓我們從非異步對應部分開始,並使用thenApply()
方法深入研究實際示例:
當使用thenApply()
時,我們傳遞一個函數作為參數,該函數將CompletableFuture
的先前值作為輸入,執行操作並返回一個新值。因此,會創建一個新的CompletableFuture
來封裝結果值。為了說明這個Integer
,讓我們考慮一個簡單的示例,其中我們將String
值轉換為表示其大小的整數。此外,我們還將打印負責執行此操作的線程的名稱:
@Test
void whenUsingNonAsync_thenMainThreadIsUsed() throws Exception {
CompletableFuture<String> name = CompletableFuture.supplyAsync(() -> "Baeldung");
CompletableFuture<Integer> nameLength = name.thenApply(value -> {
printCurrentThread(); // will print "main"
return value.length();
});
assertThat(nameLength.get()).isEqualTo(8);
}
private static void printCurrentThread() {
System.out.println(Thread.currentThread().getName());
}
作為參數傳遞給thenApply()
的函數將由直接與CompletableFuture
的 API 交互的線程執行,
在我們的例子中是main
線程。但是,如果我們提取與CompletableFuture
的交互並從不同的線程調用它,我們應該注意到變化:
@Test
void whenUsingNonAsync_thenUsesCallersThread() throws Exception {
Runnable test = () -> {
CompletableFuture<String> name = CompletableFuture.supplyAsync(() -> "Baeldung");
CompletableFuture<Integer> nameLength = name.thenApply(value -> {
printCurrentThread(); // will print "test-thread"
return value.length();
});
try {
assertThat(nameLength.get()).isEqualTo(8);
} catch (Exception e) {
fail(e.getMessage());
}
};
new Thread(test, "test-thread").start();
Thread.sleep(100l);
}
3. 異步方法
API 中的大多數方法都具有異步對應方法。我們可以使用這些異步變體來確保中間操作在單獨的線程池上執行。讓我們更改前面的代碼示例,從thenApply()
切換到thenApplyAsync()
:
@Test
void whenUsingAsync_thenUsesCommonPool() throws Exception {
CompletableFuture<String> name = CompletableFuture.supplyAsync(() -> "Baeldung");
CompletableFuture<Integer> nameLength = name.thenApplyAsync(value -> {
printCurrentThread(); // will print "ForkJoinPool.commonPool-worker-1"
return value.length();
});
assertThat(nameLength.get()).isEqualTo(8);
}
**根據官方文檔,如果我們使用異步方法而不顯式提供Executor
,函數將使用ForkJoinPool.commonPool().
**因此,如果我們運行代碼片段,我們應該會看到常見的ForkJoinPool
工作線程之一:在我的例子中,“ ForkJoinPool.commonPool-worker-1″.
4. 使用自定義執行器的異步方法
我們可以注意到所有異步方法都被重載,提供了一種接受要執行的代碼以及Executor.
我們可以使用它來為異步操作使用顯式線程池。讓我們進一步更新我們的測試並提供一個用於thenApplyAsync()
方法的自定義線程池:
@Test
void whenUsingAsync_thenUsesCustomExecutor() throws Exception {
Executor testExecutor = Executors.newFixedThreadPool(5);
CompletableFuture<String> name = CompletableFuture.supplyAsync(() -> "Baeldung");
CompletableFuture<Integer> nameLength = name.thenApplyAsync(value -> {
printCurrentThread(); // will print "pool-2-thread-1"
return value.length();
}, testExecutor);
assertThat(nameLength.get()).isEqualTo(8);
}
正如預期的那樣,當使用重載方法時,CompletableFuture 將不再使用常見的ForkJoinPool
。
5. 擴展CompletableFuture
最後,我們可以擴展CompletableFuture and
重寫defaultExecutor()
,封裝自定義線程池。因此,我們將能夠在不指定Executor
情況下使用async
方法,並且這些函數將由我們的線程池而不是常見的ForkJoinPool
調用。
讓我們創建一個擴展 CompletableFuture 的CustomCompletableFuture
CompletableFuture.
讓我們使用newSingleThreadExecutor
並創建一個線程,其名稱在測試時可以在控制台中輕鬆識別。此外,我們將重寫defaultExecutor()
方法,使CompletableFuture
能夠無縫地利用我們定制的線程池:
public class CustomCompletableFuture<T> extends CompletableFuture<T> {
private static final Executor executor = Executors.newSingleThreadExecutor(
runnable -> new Thread(runnable, "Custom-Single-Thread")
);
@Override
public Executor defaultExecutor() {
return executor;
}
}
此外,我們添加一個遵循CompletableFuture
模式的靜態工廠方法。這將使我們能夠輕鬆創建並完成CustomCompletableFuture
對象:
public static <TYPE> CustomCompletableFuture<TYPE> supplyAsync(Supplier<TYPE> supplier) {
CustomCompletableFuture<TYPE> future = new CustomCompletableFuture<>();
executor.execute(() -> {
try {
future.complete(supplier.get());
} catch (Exception ex) {
future.completeExceptionally(ex);
}
});
return future;
}
現在,讓我們創建CustomCompletableFuture
的實例,並對thenSupplyAsync()
內的String
值執行相同的轉換。不過,這一次,我們將不再指定Executor
,但仍然期望該函數由我們的專用線程“Custom-Single-Thread”
調用:
@Test
void whenOverridingDefaultThreadPool_thenUsesCustomExecutor() throws Exception {
CompletableFuture<String> name = CustomCompletableFuture.supplyAsync(() -> "Baeldung");
CompletableFuture<Integer> nameLength = name.thenApplyAsync(value -> {
printCurrentThread(); // will print "Custom-Single-Thread"
return value.length();
});
assertThat(nameLength.get()).isEqualTo(8);
}
六,結論
在本文中,我們了解到CompletableFuture
的 API 中的大多數方法都允許異步和非異步執行。通過調用非異步變體,調用CompletableFuture
的 API 的線程還將執行所有中間操作和轉換。另一方面,異步對應項將使用不同的線程池,默認線程池是通用的ForkJoinPool.
之後,我們討論了對執行的進一步自定義,為每個異步步驟使用自定義Executors
。最後,我們學習瞭如何創建自定義CompletableFuture
對象並重寫defaultExecutor()
方法。這使我們能夠利用異步方法,而無需每次都指定自定義Executor
。
與往常一樣,我們可以在 GitHub 上找到工作代碼示例。