如何在 Project Reactor 中將 List 轉換為 Flux
1. 概述
在反應式編程中,通常需要將Collections
轉換為稱為Flux
的反應流。將現有資料結構整合到反應式管道中時,這成為關鍵的一步。
在本教程中,我們將探討如何將元素Collection
轉換為元素Flux
。
2. 問題定義
Project Reactor 中[Publisher](https://projectreactor.io/docs/core/release/api/reactor/core/publisher/package-summary.html)
的兩種主要類型是[Flux](https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html)
和[Mono](https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html)
。 Mono
最多可以發出一個值,而Flux
可以發出任意數量的值。
當我們取得List<T>
時,我們可以將其包裝在Mono<List<T>>
中或將其轉換為Flux<T>
。傳回List<T>
的阻塞呼叫可以包裝在Mono
中,在一次大的發射中發出整個清單。
但是,如果我們將如此大的清單放入Flux
中,它允許Subscriber
以可管理的區塊的形式請求資料。這使得subscriber
能夠一項一項或小批量地處理專案:
來源:projectreactor.io
我們將探索轉換已包含T
類型元素的List
不同方法。對於我們的用例,我們將考慮運算子**[fromIterable](https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#fromIterable-java.lang.Iterable-)
**和create
Publisher
類型Flux
將List<T>
轉換為Flux<T>
。
3. fromIterable
我們先建立一個Integer
類型的List
並在其中加入一些值:
List<Integer> list = List.of(1, 2, 3);
fromIterable
是Flux
Publisher
上的一個運算符,用於發出所提供集合中包含的項目。
我們使用log()
運算子來記錄發布的每個元素:
private <T> Flux<T> listToFluxUsingFromIterableOperator(List<T> list) {
return Flux
.fromIterable(list)
.log();
}
然後我們可以將fromIterable
運算子應用於我們的Integer
List
並觀察行為:
@Test
public void givenList_whenCallingFromIterableOperator_thenListItemsTransformedAsFluxAndEmitted(){
List<Integer> list = List.of(1, 2, 3);
Flux<Integer> flux = listToFluxUsingFromIterableOperator(list);
StepVerifier.create(flux)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectComplete()
.verify();
}
最後,我們使用**StepVerifier
** API 來驗證Flux
中發出的元素與List
中的元素。在我們包裝完正在測試的Flux
來源之後,我們使用expectNext
方法來交叉引用從Flux
發出的項目和List
內的項目是否相同並遵循相同的順序。
4. create
Flux
類型Publisher
上的create
操作子使我們能夠以程式設計方式使用FluxSink
API 建立Flux
。
雖然fromIterable
在大多數情況下通常是一個不錯的選擇,但當清單是由回調產生時,它的使用並不簡單。在這種情況下,使用create
運算子更合適。
讓我們建立一個回調介面:
public interface Callback<T> {
void onTrigger(T element);
}
接下來,讓我們想像一個從非同步 API 呼叫傳回的List<T>
:
private void asynchronousApiCall(Callback<List<Integer>> callback) {
Thread thread = new Thread(()-> {
List<Integer> list = List.of(1, 2,3);
callback.onTrigger(list);
});
thread.start();
}
現在,我們不使用fromIterable
,而是在回調內部使用FluxSink
將每個項目新增到清單中:
@Test
public void givenList_whenCallingCreateOperator_thenListItemsTransformedAsFluxAndEmitted() {
Flux<Integer> flux = Flux.create(sink -> {
Callback<List<Integer>> callback = list -> {
list.forEach(sink::next);
sink.complete();
};
asynchronousApiCall(callback);
});
StepVerifier.create(flux)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectComplete()
.verify();
}
5. 結論
在本文中,我們探索了使用運算子fromIterable
將List<T>
轉換為Flux<T>
不同方法,並在Publisher
類型Flux
中create
。 fromIterable
運算子可以與List<T>
類型以及包裝在Mono
中的List<T>
一起使用。 create
運算子最適合從回呼建立的List<T>
。
與往常一樣,完整的原始程式碼可以在 GitHub 上取得。