如何在 Project Reactor 中將 List 轉換為 Flux
1. 概述
在反應式編程中,通常需要將Collections轉換為稱為Flux的反應流。將現有資料結構整合到反應式管道中時,這成為關鍵的一步。
在本教程中,我們將探討如何將元素Collection轉換為元素Flux 。
2. 問題定義
Project Reactor 中[Publisher](https://projectreactor.io/docs/core/release/api/reactor/core/publisher/package-summary.html)的兩種主要類型是[Flux](https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html)和[Mono](https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html) 。 Mono最多可以發出一個值,而Flux可以發出任意數量的值。
當我們取得List<T>時,我們可以將其包裝在Mono<List<T>>中或將其轉換為Flux<T> 。傳回List<T>的阻塞呼叫可以包裝在Mono中,在一次大的發射中發出整個清單。
但是,如果我們將如此大的清單放入Flux中,它允許Subscriber以可管理的區塊的形式請求資料。這使得subscriber能夠一項一項或小批量地處理專案:
來源:projectreactor.io
我們將探索轉換已包含T類型元素的List不同方法。對於我們的用例,我們將考慮運算子**[fromIterable](https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#fromIterable-java.lang.Iterable-)**和create Publisher類型Flux將List<T>轉換為Flux<T> 。
3. fromIterable
我們先建立一個Integer類型的List並在其中加入一些值:
List<Integer> list = List.of(1, 2, 3);
fromIterable是Flux Publisher上的一個運算符,用於發出所提供集合中包含的項目。
我們使用log()運算子來記錄發布的每個元素:
private <T> Flux<T> listToFluxUsingFromIterableOperator(List<T> list) {
return Flux
.fromIterable(list)
.log();
}
然後我們可以將fromIterable運算子應用於我們的Integer List並觀察行為:
@Test
public void givenList_whenCallingFromIterableOperator_thenListItemsTransformedAsFluxAndEmitted(){
List<Integer> list = List.of(1, 2, 3);
Flux<Integer> flux = listToFluxUsingFromIterableOperator(list);
StepVerifier.create(flux)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectComplete()
.verify();
}
最後,我們使用**StepVerifier** API 來驗證Flux中發出的元素與List中的元素。在我們包裝完正在測試的Flux來源之後,我們使用expectNext方法來交叉引用從Flux發出的項目和List內的項目是否相同並遵循相同的順序。
4. create
Flux類型Publisher上的create操作子使我們能夠以程式設計方式使用FluxSink API 建立Flux 。
雖然fromIterable在大多數情況下通常是一個不錯的選擇,但當清單是由回調產生時,它的使用並不簡單。在這種情況下,使用create運算子更合適。
讓我們建立一個回調介面:
public interface Callback<T> {
void onTrigger(T element);
}
接下來,讓我們想像一個從非同步 API 呼叫傳回的List<T> :
private void asynchronousApiCall(Callback<List<Integer>> callback) {
Thread thread = new Thread(()-> {
List<Integer> list = List.of(1, 2,3);
callback.onTrigger(list);
});
thread.start();
}
現在,我們不使用fromIterable ,而是在回調內部使用FluxSink將每個項目新增到清單中:
@Test
public void givenList_whenCallingCreateOperator_thenListItemsTransformedAsFluxAndEmitted() {
Flux<Integer> flux = Flux.create(sink -> {
Callback<List<Integer>> callback = list -> {
list.forEach(sink::next);
sink.complete();
};
asynchronousApiCall(callback);
});
StepVerifier.create(flux)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectComplete()
.verify();
}
5. 結論
在本文中,我們探索了使用運算子fromIterable將List<T>轉換為Flux<T>不同方法,並在Publisher類型Flux中create 。 fromIterable運算子可以與List<T>類型以及包裝在Mono中的List<T>一起使用。 create運算子最適合從回呼建立的List<T> 。
與往常一樣,完整的原始程式碼可以在 GitHub 上取得。