RxJava Observables 中的 concat() 與 merge() 運算符
1. 概述
concat()
和merge()
是兩個強大的運算符,用於在 RxJava 中組合多個Observable
實例。
concat()
依序從每個Observable
發出項目,等待每個 Observable 完成後再移至下一個,而merge()
在產生所有 Observable 實例時同時從所有Observable
實例中發出項目。
在本教程中,我們將探討concat()
和merge()
顯示相似和不同行為的場景。
2. 同步來源
當兩個來源都是同步時, concat()
和merge()
運算子的行為完全相同。讓我們模擬一下這個場景,以便更好地理解。
2.1.場景設定
讓我們先使用Observable.just()
工廠方法來建立三個同步源:
Observable<Integer> observable1 = Observable.just(1, 2, 3);
Observable<Integer> observable2 = Observable.just(4, 5, 6);
Observable<Integer> observable3 = Observable.just(7, 8, 9);
此外,我們建立兩個訂閱者,即testSubscriberForConcat
和testSubscriberForMerge
:
TestSubscriber<Integer> testSubscriberForConcat = new TestSubscriber<>();
TestSubscriber<Integer> testSubscriberForMerge = new TestSubscriber<>();
偉大的!我們已經擁有測試場景所需的一切。
2.2. concat()
和merge()
首先,我們應用concat()
運算子並使用testSubscriberForConcat
訂閱結果Observable
:
Observable.concat(observable1, observable2, observable3)
.subscribe(testSubscriberForConcat);
此外,讓我們驗證排放量是否依序排列,其中observable1
中的項目出現在observable2
之前,而observable2
出現在observable3
之前:
testSubscriberForConcat.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9);
類似地,我們可以應用merge()
運算子並使用testSubscriberForMerge
訂閱結果:
Observable.merge(observable1, observable2, observable3).subscribe(testSubscriberForMerge);
接下來,讓我們驗證透過合併的排放是否遵循與串聯相同的順序:
testSubscriberForMerge.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9);
最後,我們必須注意同步Observable
實例會立即發出所有項目,然後發出完成訊號。此外,每個Observable
在下一個 Observable 開始之前完成其發射。因此,兩個運算子依序處理每個Observable
,產生相同的輸出。
因此,無論來源是同步或非同步,一般規則是,如果我們需要維護來源的發射順序,我們應該使用concat()
。另一方面,如果我們想要組合從多個來源發出的項目,我們應該使用merge()
。
3. 可預測的非同步來源
在本節中,讓我們模擬一個具有非同步來源的場景,其中發射順序是可預測的。
3.1.場景設定
讓我們建立兩個非同步來源,即observable1
和observable2
:
Observable<Integer> observable1 = Observable.interval(100, TimeUnit.MILLISECONDS)
.map(i -> i.intValue() + 1)
.take(3);
Observable<Integer> observable2 = Observable.interval(30, TimeUnit.MILLISECONDS)
.map(i -> i.intValue() + 4)
.take(7);
我們必須注意到observable1
的發射分別在100ms
、 200ms
和300ms
之後到達。另一方面,來自observable2
的發射以30ms
的間隔到達。
現在,我們也建立testSubscriberForConcat
和testSubscriberforMerge
:
TestSubscriber<Integer> testSubscriberForConcat = new TestSubscriber<>();
TestSubscriber<Integer> testSubscriberForMerge = new TestSubscriber<>();
極好的!我們已準備好測試此場景。
3.2. concat()
與merge()
首先,我們應用concat()
運算子並使用testSubscribeForConcat
呼叫subscribe()
:
Observable.concat(observable1, observable2)
.subscribe(testSubscriberForConcat);
接下來,我們必須呼叫awaitTerminalEvent()
方法以確保收到所有發射:
testSubscriberForConcat.awaitTerminalEvent();
現在,我們可以驗證結果是否包含observable1
中的所有項目,後面接著observable2
中的所有項目:
testSubscriber.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
此外,讓我們應用merge()
運算子並使用testSubscriberForMerge
呼叫subscribe()
:
Observable.merge(observable1, observable2)
.subscribe(testSubscriberForMerge);
最後,讓我們等待排放並檢查排放值:
testSubscriberForMerge.awaitTerminalEvent();
testSubscriberForMerge.assertValues(4, 5, 6, 1, 7, 8, 9, 2, 10, 3);
結果包含所有按照observer1
和observer2
實際發射的順序交錯在一起的項目。
4. 具有競爭條件的非同步源
在本節中,我們將模擬一個具有兩個非同步來源的場景,其中組合發射的順序是相當不可預測的。
4.1.場景設定
首先,讓我們建立兩個具有完全相同延遲的非同步來源:
Observable<Integer> observable1 = Observable.interval(100, TimeUnit.MILLISECONDS)
.map(i -> i.intValue() + 1)
.take(3);
Observable<Integer> observable2 = Observable.interval(100, TimeUnit.MILLISECONDS)
.map(i -> i.intValue() + 4)
.take(3);
我們知道每個來源的一次發射會在100ms
、 200ms
和300ms
後到達。然而,由於競爭條件,我們無法預測確切的順序。
接下來,讓我們建立兩個測試訂閱者:
TestSubscriber<Integer> testSubscriberForConcat = new TestSubscriber<>();
TestSubscriber<Integer> testSubscriberForMerge = new TestSubscriber<>();
完美的!我們現在就可以走了。
4.2. concat()
與merge()
首先,我們應用concat()
運算符,然後訂閱testSubscribeForConcat
:
Observable.concat(observable1, observable2)
.subscribe(testSubscriberForConcat);
testSubscriberForConcat.awaitTerminalEvent();
現在,讓我們驗證**concat()
運算子的結果是否保持不變**:
testSubscriberForConcat.assertValues(1, 2, 3, 4, 5, 6);
此外,讓我們merge()
並使用testSubscriberForMerge
進行訂閱:
Observable.merge(observable1, observable2)
.subscribe(testSubscriberForMerge);
testSubscriberForMerge.awaitTerminalEvent();
接下來,讓我們累積清單中的所有排放量並驗證它是否包含所有值:
List<Integer> actual = testSubscriberForMerge.getOnNextEvents();
List<Integer> expected = Arrays.asList(1, 2, 3, 4, 5, 6);
assertTrue(actual.containsAll(expected) && expected.containsAll(actual));
最後,我們還記錄排放量以查看其實際情況:
21:05:43.252 [main] INFO actual emissions: [4, 1, 2, 5, 3, 6]
我們可以針對不同的運行收到不同的訂單。
5. 結論
在本文中,我們了解了 RxJava 中的concat()
和merge()
運算子如何處理同步和非同步資料來源。此外,我們還比較了涉及可預測和不可預測排放模式的情景,強調了兩個運營商之間的差異。
與往常一樣,本文中的程式碼可以在 GitHub 上取得。