在 Java 中完成線程的作業後返回一個值
一、概述
Java 的主要特性之一是並發性。它允許多個線程運行並執行並行任務。因此,我們可以執行異步和非阻塞指令。這將優化可用資源,尤其是當計算機具有多個 CPU 時。有兩種類型的線程:有返回值或沒有(在後一種情況下,我們說它將有一個 void 返回方法)。
在本文中,我們將重點關注如何從其作業已終止的線程返回一個值。
Thread
和Runnable
我們將 Java 線程稱為輕量級進程。讓我們看一下 Java 程序通常是如何工作的:
Java 程序是一個正在執行的過程。線程是 Java 進程的子集,可以訪問主內存。它可以與同一進程的其他線程通信。
一個線程有一個生命週期和不同的狀態。實現它的一種常見方法是通過Runnable
接口:
public class RunnableExample implements Runnable {
...
@Override
public void run() {
// do something
}
}
然後,我們可以開始我們的線程:
Thread thread = new Thread(new RunnableExample());
thread.start();
thread.join();
如我們所見,我們無法從Runnable
返回值。但是,我們可以使用wait()
和notify()
與其他線程同步。 join()
方法將使執行處於等待狀態,直到它完成。當我們從異步執行中獲得結果時,我們稍後會看到這有多重要。
3. Callable
Java 從 1.5 版本開始引入了Callable
接口。讓我們看一個異步任務返回階乘計算值的示例。我們使用BigInteger
作為結果可以是一個很大的數字:
public class CallableFactorialTask implements Callable<BigInteger> {
// fields and constructor
@Override
public BigInteger call() throws Exception {
return factorial(BigInteger.valueOf(value));
}
}
讓我們也創建一個簡單的階乘計算器:
public class FactorialCalculator {
public static BigInteger factorial(BigInteger end) {
BigInteger start = BigInteger.ONE;
BigInteger res = BigInteger.ONE;
for (int i = start.add(BigInteger.ONE).intValue(); i <= end.intValue(); i++) {
res = res.multiply(BigInteger.valueOf(i));
}
return res;
}
public static BigInteger factorial(BigInteger start, BigInteger end) {
BigInteger res = start;
for (int i = start.add(BigInteger.ONE).intValue(); i <= end.intValue(); i++) {
res = res.multiply(BigInteger.valueOf(i));
}
return res;
}
}
Callable
只有一個方法call()
我們需要覆蓋。該方法將返回我們的異步任務的對象。
Callable
和Runnable
都是@FunctionalInterface
。 Callable
可以返回一個值並拋出異常。但是,它需要一個Future
來完成任務。
4.執行Callable
我們可以使用Future
或 Fork/Join 執行Callable
。
4.1. Future
Callable
從 1.5 版本開始,Java 有了Future
接口來創建包含我們異步處理的響應的對象。我們可以在邏輯上將Future
與 Javascript 中的Promise進行比較。
例如,當我們想要從多個端點獲取數據時,我們通常會看到Future
。因此,我們需要等待所有任務完成才能收集響應數據。
Future
包裝響應並等待線程完成。但是,我們可能會遇到中斷,例如超時或執行異常。
讓我們看一下Future
接口:
public interface Future<V> {
boolean cancel(boolean var1);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long var1, TimeUnit var3) throws InterruptedException, ExecutionException, TimeoutException;
}
get
方法很有趣,我們可以等待並獲取執行的結果。
要啟動Future
作業,我們將其執行與ThreadPool
相關聯。這樣,我們將為那些異步任務分配一些資源。
讓我們用一個Executor
創建一個示例,該 Executor 對我們之前看到的Callable
中的階乘數求和。我們將使用Executor
接口和ExecutorService
實現來創建ThreadPoolExecutor
。我們可能想使用固定的或緩存的線程池。在這種情況下,我們將使用緩存線程池來演示:
public BigInteger execute(List<CallableFactorialTask> tasks) {
BigInteger result = BigInteger.ZERO;
ExecutorService cachedPool = Executors.newCachedThreadPool();
List<Future<BigInteger>> futures;
try {
futures = cachedPool.invokeAll(tasks);
} catch (InterruptedException e) {
// exception handling example
throw new RuntimeException(e);
}
for (Future<BigInteger> future : futures) {
try {
result = result.add(future.get());
} catch (InterruptedException | ExecutionException e) {
// exception handling example
throw new RuntimeException(e);
}
}
return result;
}
我們可以用一個圖表來表示這個執行,我們可以在其中觀察線程池和Callable
是如何交互的:
Executor
將在Future
對像中調用和收集所有內容。然後,我們可以從我們的異步處理中得到一個或多個結果。
讓我們通過總結兩個階乘數的結果來測試:
@Test
void givenCallableExecutor_whenExecuteFactorial_thenResultOk() {
BigInteger result = callableExecutor.execute(Arrays.asList(new CallableFactorialTask(5), new CallableFactorialTask(3)));
assertEquals(BigInteger.valueOf(126), result);
}
4.2.可通過 Fork/Join Callable
我們還可以選擇使用ForkJoinPool
。它仍然與ExecutorSerivce
類似,因為它擴展了AbstractExecutorService
類。但是,它有不同的方式來創建和組織線程。它將任務分解為更小的任務並優化資源,使它們永遠不會閒置。我們可以用圖表表示子任務:
我們可以看到主任務將分叉為 SubTask1、SubTask3 和 SubTask4 作為最小的可執行文件。最後,他們將加入最終結果。
讓我們使用ForkJoinPool
將前面的示例轉換為一個示例。我們可以將所有內容包裝在一個執行程序方法中:
public BigInteger execute(List<Callable<BigInteger>> forkFactorials) {
List<Future<BigInteger>> futures = forkJoinPool.invokeAll(forkFactorials);
BigInteger result = BigInteger.ZERO;
for (Future<BigInteger> future : futures) {
try {
result = result.add(future.get());
} catch (InterruptedException | ExecutionException e) {
// exception handling example
throw new RuntimeException(e);
}
}
return result;
}
在這種情況下,我們只需要創建一個不同的池來獲得我們的期貨。讓我們用階乘Callable
的列表來測試它:
@Test
void givenForkExecutor_whenExecuteCallable_thenResultOk() {
assertEquals(BigInteger.valueOf(126),
forkExecutor.execute(Arrays.asList(new CallableFactorialTask(5), new CallableFactorialTask(3))));
}
然而,我們也可以決定如何分叉我們的任務。我們可能希望根據某些標準來分叉我們的計算,例如,根據輸入參數或服務負載。
我們需要將任務重寫為ForkJoinTask
,因此我們將使用RecursiveTask
:
public class ForkFactorialTask extends RecursiveTask<BigInteger> {
// fields and constructor
@Override
protected BigInteger compute() {
BigInteger factorial = BigInteger.ONE;
if (end - start > threshold) {
int middle = (end + start) / 2;
return factorial.multiply(new ForkFactorialTask(start, middle, threshold).fork()
.join()
.multiply(new ForkFactorialTask(middle + 1, end, threshold).fork()
.join()));
}
return factorial.multiply(factorial(BigInteger.valueOf(start), BigInteger.valueOf(end)));
}
}
如果某個閾值適用,我們將細分我們的主要任務。然後我們可以使用invoke()
方法來獲取結果:
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
int result = forkJoinPool.invoke(forkFactorialTask);
此外, submit()
或execute()
也是一個選項。但是,我們總是需要join()
命令來完成執行。
我們還創建一個測試,我們在其中執行階乘任務:
@Test
void givenForkExecutor_whenExecuteRecursiveTask_thenResultOk() {
assertEquals(BigInteger.valueOf(3628800), forkExecutor.execute(new ForkFactorialTask(10, 5)));
}
在這種情況下,我們會將 10 的階乘分成兩個任務。第一個將從 1 計算到 5,而第二個將從 6 計算到 10。
5. 可完成的未來
自 1.8 版以來,Java 通過引入**CompletableFuture**
改進了多線程。它從Future
執行中刪除了樣板代碼,並添加了鏈接或組合異步結果等功能。然而,最重要的是,我們現在可以對任何方法進行異步計算,因此我們不受Callable
約束。此外,我們可以將語義不同的多個Futures
連接在一起。
5.1. supplyAsync()
使用CompletableFuture
可以很簡單:
CompletableFuture<BigInteger> future = CompletableFuture.supplyAsync(() -> factorial(BigInteger.valueOf(10)));
...
BigInteger result = future.get();
我們不再需要Callable
。我們可以將任何 lambda 表達式作為參數傳遞。讓我們用supplyAsync()
測試階乘方法:
@Test
void givenCompletableFuture_whenSupplyAsyncFactorial_thenResultOk() throws ExecutionException, InterruptedException {
CompletableFuture<BigInteger> completableFuture = CompletableFuture.supplyAsync(() -> factorial(BigInteger.valueOf(10)));
assertEquals(BigInteger.valueOf(3628800), completableFuture.get());
}
請注意,我們沒有指定任何線程池。在這種情況下,將使用默認的ForkJoinPool
。但是,我們可以指定一個Executor
,例如,具有固定線程池:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> factorial(BigInteger.valueOf(10)), Executors.newFixedThreadPool(1));
5.2. thenCompose()
我們還可以創建一個連續的Futures
鏈。假設我們有兩個階乘任務,第二個需要第一個的輸入:
CompletableFuture<BigInteger> completableFuture = CompletableFuture.supplyAsync(() -> factorial(BigInteger.valueOf(3)))
.thenCompose(inputFromFirstTask -> CompletableFuture.supplyAsync(() -> factorial(inputFromFirstTask)));
BigInteger result = completableFuture.get();
我們可以使用thenCompose()
方法來使用鏈中下一個CompletableFuture
的返回值。
讓我們結合兩個階乘的執行。例如,我們從 3 開始,給我們一個階乘 6 作為下一個階乘的輸入:
@Test
void givenCompletableFuture_whenComposeTasks_thenResultOk() throws ExecutionException, InterruptedException {
CompletableFuture<BigInteger> completableFuture = CompletableFuture.supplyAsync(() -> factorial(BigInteger.valueOf(3)))
.thenCompose(inputFromFirstTask -> CompletableFuture.supplyAsync(() -> factorial(inputFromFirstTask)));
assertEquals(BigInteger.valueOf(720), completableFuture.get());
}
5.3. allOf()
有趣的是,我們可以使用接受輸入 var-arg 的靜態方法allOf()
並行執行多個Futures
。
從多個執行中收集異步結果就像添加到allOf()
和join()
來完成任務一樣簡單:
BigInteger result = allOf(asyncTask1, asyncTask2)
.thenApplyAsync(fn -> factorial(factorialTask1.join()).add(factorial(new BigInteger(factorialTask2.join()))), Executors.newFixedThreadPool(1)).join();
請注意, allOf()
的返回類型為 void 。因此,我們需要手動從單個Futures
中獲取結果。此外,我們可以在同一執行中運行具有不同返回類型的Futures
。
為了進行測試,讓我們加入兩個不同的階乘任務。為了演示,一個有一個數字輸入,而第二個有一個字符串:
@Test
void givenCompletableFuture_whenAllOfTasks_thenResultOk() {
CompletableFuture<BigInteger> asyncTask1 = CompletableFuture.supplyAsync(() -> BigInteger.valueOf(5));
CompletableFuture<String> asyncTask2 = CompletableFuture.supplyAsync(() -> "3");
BigInteger result = allOf(asyncTask1, asyncTask2)
.thenApplyAsync(fn -> factorial(asyncTask1.join()).add(factorial(new BigInteger(asyncTask2.join()))), Executors.newFixedThreadPool(1))
.join();
assertEquals(BigInteger.valueOf(126), result);
}
六,結論
在本教程中,我們了解瞭如何從線程返回對象。我們看到瞭如何將Callable
與Future
和線程池結合使用。 Future
包裝結果並等待所有任務完成。我們還看到了ForkJoinPool
的一個例子,可以將我們的執行優化為多個子任務。
Java 8 中的CompletableFuture
工作方式類似,但還提供了新功能,例如可以執行任何 lambda 表達式。它還允許我們鏈接和組合異步任務的結果。
最後,我們用Future
、 Fork
和CompletableFuture
測試了一個簡單的階乘任務。
與往常一樣,我們可以在 GitHub 上找到工作代碼示例。