Future、CompletableFuture 和 Rxjava 的 Observable 之間的區別
1. 概述
在 Java 中,我們可以通過多種方式異步運行任務。 Java 內置了Future
和CompletableFuture
。我們還可以使用 RxJava 庫,它為我們提供了Observable
類。在本文中,我們將研究三者之間的差異以及各自的好處和潛在用例。
2. Future
Future
接口首先出現在 Java 5 中,提供的功能非常有限。 Future
的實例是一個結果的佔位符,該結果將由異步過程生成,並且可能尚不可用。提供了一小部分方法來幫助完成此過程。我們可以取消任務或獲取已完成任務的結果,還可以檢查任務是否已被取消或完成。
要查看其實際效果,讓我們創建一個示例異步任務。我們將有一個對象和一個Callable,
它的作用就像從數據庫中檢索該對像一樣。我們的對象可以非常簡單:
class TestObject {
int dataPointOne;
int dataPointTwo;
TestObject() {
dataPointOne = 10;
}
// Standard getters and setters
}
因此,在調用構造函數時,我們返回一個包含數據點集之一的TestObject
實例。我們現在可以創建第二個實現Callable
接口的類來為我們創建該對象:
class ObjectCallable implements Callable<TestObject> {
@Override
TestObject call() {
return new TestObject();
}
}
設置這兩個對像後,我們可以編寫一個測試來使用Future
獲取TestObject
:
@Test
void whenRetrievingObjectWithBasicFuture_thenExpectOnlySingleDataPointSet() throws ExecutionException, InterruptedException {
ExecutorService exec = Executors.newSingleThreadExecutor();
Future<TestObject> future = exec.submit(new ObjectCallable());
TestObject retrievedObject = future.get();
assertEquals(10, retrievedObject.getDataPointOne());
assertEquals(0, retrievedObject.getDataPointTwo());
}
在這裡,我們創建了一個ExecutorService
,我們可以向其提交任務。接下來,我們提交了ObjectCallable
類並收到了Future
作為回報。最後,我們可以在Future
上調用get()
來獲取結果。我們從斷言中看到我們的對象填充了單個數據點。
3. CompletableFuture
CompletableFuture
是隨 Java 8 一起發布的Future
接口的實現。它擴展了Future
的基本功能,使我們能夠更好地控制異步操作的結果。最大的附加功能之一是將函數調用鏈接到初始任務的結果上的選項。讓我們通過重複我們在上一節中完成的任務來看看它的實際效果。但這一次,我們想在取回物體後對其進行水合。讓我們創建一個具有水合方法的對象來填充TestObject
中的第二個數據點:
class ObjectHydrator {
TestObject hydrateTestObject(TestObject testObject){
testObject.setDataPointTwo(20);
return testObject;
}
}
這次我們還需要從Supplier
的實現中檢索我們的初始TestObject:
class ObjectSupplier implements Supplier<TestObject> {
@Override
TestObject get() {
return new TestObject();
}
}
準備好這兩個類後,讓我們使用它們:
@Test
void givenACompletableFuture_whenHydratingObjectAfterRetrieval_thenExpectBothDataPointsSet() throws ExecutionException, InterruptedException {
ExecutorService exec = Executors.newSingleThreadExecutor();
ObjectHydrator objectHydrator = new ObjectHydrator();
CompletableFuture<TestObject> future = CompletableFuture.supplyAsync(new ObjectSupplier(), exec)
.thenApply(objectHydrator::hydrateTestObject);
TestObject retrievedObject = future.get();
assertEquals(10, retrievedObject.getDataPointOne());
assertEquals(20, retrievedObject.getDataPointTwo());
}
這次我們可以從斷言中看到,由於能夠鏈接水合方法,我們已經在對像上設置了兩個數據點。
4.RxJava的Observable
RxJava 是一個庫,可讓我們按照反應式編程範例構建事件驅動和異步程序。
要在我們的項目中使用 RxJava,我們需要將其導入到 pom.xml 中:
<dependency>
<groupId>io.reactivex.rxjava3</groupId>
<artifactId>rxjava</artifactId>
<version>3.1.6</version>
</dependency>
最新版本可在Maven 存儲庫中找到。
這個庫可以做很多事情,但今天我們將重點關注Observable
類。 Observable
根據需要或在數據可用時向Observer
提供數據。要異步運行任務,就像我們對Future
和CompletableFuture,
我們可以創建一個Observable
,它將在請求時從異步源生成數據:
@Test
void givenAnObservable_whenRequestingData_thenItIsRetrieved() {
ObjectHydrator objectHydrator = new ObjectHydrator();
Observable<TestObject> observable = Observable.fromCallable(new ObjectCallable()).map(objectHydrator::hydrateTestObject);
observable.subscribe(System.out::println);
}
在這裡,我們從ObjectCallable
類創建了一個Observable
,並使用map()
來應用我們的水化器。然後我們訂閱Observable
並提供一個方法來處理結果。在我們的例子中,我們只是將結果記錄下來。這給出了與我們的CompletableFuture
實現完全相同的最終結果。 subscribe()
方法與CompletableFutures
get().
雖然我們可以清楚地使用 RxJava 來實現與CompletableFuture,
它的主要用例是它提供的大量其他功能。一個例子是以完全不同的方式再次執行相同的任務。我們可以創建一個Observable
來等待數據到達,然後可以從其他地方將數據推送到它:
@Test
void givenAnObservable_whenPushedData_thenItIsReceived() {
PublishSubject<Integer> source = PublishSubject.create();
Observable<Integer> observable = source.observeOn(Schedulers.computation());
observable.subscribe(System.out::println, (throwable) -> System.out.println("Error"), () -> System.out.println("Done"));
source.onNext(1);
source.onNext(2);
source.onNext(3);
source.onComplete();
}
運行時,此測試會產生以下輸出:
1
2
3
Done
因此,我們可以訂閱尚未生成任何內容的數據源,然後只需等待即可。數據準備好後,我們使用onNext()
將其推送到源,並通過訂閱收到警報。這是 RxJava 允許的反應式編程風格的一個示例。我們對外部來源推送給我們的事件和新數據做出反應,而不是我們自己請求。
5. 結論
在本文中,我們了解了早期 Java 的Future
接口如何提供有用但有限的異步執行任務並稍後獲取結果的能力。接下來,我們探討了較新的實現CompletableFuture
帶來的好處。這使我們能夠將方法調用串在一起,並對整個過程提供更好的控制。
最後,我們看到我們可以使用 RxJava 執行相同的工作,但我們也注意到它是一個廣泛的庫,允許我們做更多的事情。我們簡要了解瞭如何使用 RxJava 將任務異步推送到Observer
,同時無限期地訂閱數據流。
與往常一樣,示例的完整代碼可在 GitHub 上獲取。