Flux.create 和 Flux.generate 之間的區別
一、簡介
Project Reactor 為 JVM 提供了一個完全非阻塞的編程基礎。它提供了 Reactive Streams 規範的實現,並提供了可組合的異步 API,例如 Flux。 Flux 是具有多個反應式運算符的反應式流發布者。它發出 0 到 N 個元素,然後成功或錯誤地完成。它可以根據我們的需要以幾種不同的方式創建。
2. 理解通量
Flux 是一個 Reactive Stream 發布者,可以發出 0 到 N 個元素。它有幾個運算符用於生成、編排和轉換 Flux 序列。 Flux 可以成功完成,也可以完成但有錯誤。
Flux API 在 Flux 上提供了幾個靜態工廠方法來創建源或從多個回調類型生成。它還提供實例方法和運算符來構建異步處理管道。該管道產生一個異步序列。
在接下來的部分中,讓我們看看 Flux generate()
和create()
方法的一些用法。
3. Maven依賴
我們需要[reactor-core](https://search.maven.org/search?q=g:io.projectreactor%20AND%20a:reactor-core&core=gav)
和reactor-test
Maven 依賴項:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.17</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.4.17</version>
<scope>test</scope>
</dependency>
4.通量生成
Flux API 的generate()
方法提供了一種簡單直接的編程方法來創建 Flux。 generate()
方法採用生成器函數來生成一系列項目。
生成方法有三種變體:
-
generate(Consumer<SynchronousSink<T>> generator)
-
generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)
-
generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator, Consumer<? super S> stateConsumer)
generate 方法根據需要計算並發出值。最好在計算下游可能不使用的元素的成本太高的情況下使用。如果發出的事件受應用程序狀態的影響,也可以使用它。
4.1。例子
在這個例子中,讓我們使用generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)
來生成一個Flux
:
public class CharacterGenerator {
public Flux<Character> generateCharacters() {
return Flux.generate(() -> 97, (state, sink) -> {
char value = (char) state.intValue();
sink.next(value);
if (value == 'z') {
sink.complete();
}
return state + 1;
});
}
}
在generate()
方法中,我們提供了兩個函數作為參數:
- 第一個是
Callable
函數。此函數定義生成器的初始狀態,值為 97 - 第二個是
BiFunction.
這是一個使用SynchronousSink.
每當調用接收器的next
方法時,此 SynchronousSink 都會返回一個項目
根據其名稱, SynchronousSink
實例同步工作。但是,我們不能在每個生成器調用中多次調用SynchronousSink
對象的next
方法。
讓我們使用StepVerifier
驗證生成的序列:
@Test
public void whenGeneratingCharacters_thenCharactersAreProduced() {
CharacterGenerator characterGenerator = new CharacterGenerator();
Flux<Character> characterFlux = characterGenerator.generateCharacters().take(3);
StepVerifier.create(characterFlux)
.expectNext('a', 'b', 'c')
.expectComplete()
.verify();
}
在此示例中,訂閱者僅請求三個項目。因此,生成的序列以發出三個字符——a、b 和 c 結束。 expectNext()
期望從 Flux 中得到我們期望的元素。 expectComplete
() 表示從 Flux 發射元素的完成。
5.通量創建
當我們想要create()
**不受應用程序狀態影響的多個(0 到無窮大)值**時,使用 Flux 中的 create() 方法。這是因為 Flux create()
方法的底層方法不斷計算元素。
此外,下游系統決定了它需要多少元素。因此,如果下游系統無法跟上,已經發出的元素要么被緩衝要么被移除。
默認情況下,發出的元素會被緩衝,直到下游系統請求更多元素。
5.1。例子
現在讓我們演示create()
方法的示例:
public class CharacterCreator {
public Consumer<List<Character>> consumer;
public Flux<Character> createCharacterSequence() {
return Flux.create(sink -> CharacterCreator.this.consumer = items -> items.forEach(sink::next));
}
}
我們可以注意到create
運算符要求我們使用FluxSink
而不是generate
() 中使用的SynchronousSink
。在這種情況下,我們將為items
列表中的每個項目調用next()
,逐個發出。
現在讓我們使用帶有兩個字符序列的CharacterCreator
:
@Test
public void whenCreatingCharactersWithMultipleThreads_thenSequenceIsProducedAsynchronously() throws InterruptedException {
CharacterGenerator characterGenerator = new CharacterGenerator();
List<Character> sequence1 = characterGenerator.generateCharacters().take(3).collectList().block();
List<Character> sequence2 = characterGenerator.generateCharacters().take(2).collectList().block();
}
我們在上面的代碼片段中創建了兩個序列—— sequence1
和sequence2
。這些序列用作字符項的來源。請注意,我們使用CharacterGenerator
實例來獲取字符序列。
現在讓我們定義一個characterCreator
實例和兩個線程實例:
CharacterCreator characterCreator = new CharacterCreator();
Thread producerThread1 = new Thread(() -> characterCreator.consumer.accept(sequence1));
Thread producerThread2 = new Thread(() -> characterCreator.consumer.accept(sequence2));
我們現在正在創建兩個線程實例,它們將為發布者提供元素。當調用接受運算符時,字符元素開始流入序列源。接下來,我們subscribe
新的合併序列:
List<Character> consolidated = new ArrayList<>();
characterCreator.createCharacterSequence().subscribe(consolidated::add);
請注意, createCharacterSequence
返回一個我們訂閱的 Flux 並使用consolidated
列表中的元素。接下來,讓我們觸發查看項目在兩個不同線程上移動的整個過程:
producerThread1.start();
producerThread2.start();
producerThread1.join();
producerThread2.join();
最後,讓我們驗證一下操作的結果:
assertThat(consolidated).containsExactlyInAnyOrder('a', 'b', 'c', 'a', 'b');
接收到的序列中的前三個字符來自sequence1.
最後兩個字符來自sequence2
。由於這是一個異步操作,因此無法保證這些序列中元素的順序。
6. Flux Create vs. Flux Generate
以下是 create 和 generate 操作之間的一些區別:
通量創建 | 通量生成 |
---|---|
此方法接受Consumer<FluxSink> 的實例 |
此方法接受Consumer<SynchronousSink> 的實例 |
Create 方法只調用消費者一次 | 生成方法根據下游應用的需要多次調用消費者方法 |
消費者可以立即發出 0..N 個元素 | 只能發射一種元素 |
發布者不知道下游狀態。因此 create 接受溢出策略作為流量控制的附加參數 | 發布者根據下游應用需求生成元素 |
FluxSink 允許我們在需要時使用多個線程發出元素 |
對多線程沒有用,因為它一次只發出一個元素 |
7. 結論
在本文中,我們討論了 Flux API 的 create 和 generate 方法之間的區別。
首先,我們介紹了反應式編程的概念並討論了 Flux API。然後我們討論了 Flux API 的 create 和 generate 方法。最後,我們提供了 Flux API 的 create 和 generate 方法之間的差異列表。
本教程的源代碼可在 GitHub 上獲得。