在 Spring WebFlux 中取消一個正在進行的 Flux
一、簡介
在本文中,我們將討論 Spring WebFlux 提供的各種選項來取消正在進行的Flux
。首先,我們將在響應式編程的上下文中快速概述Flux
。接下來,我們將研究取消正在進行的Flux
必要性。
我們將研究 Spring WebFlux 提供的各種方法,以顯式和自動取消訂閱。我們將使用 JUnit 測試來驅動我們的簡單示例,以驗證系統是否按預期運行。最後,我們將了解如何執行取消後清理,使我們能夠在取消後將系統重置為所需狀態。
讓我們首先快速概述一下Flux
。
2.什麼是Flux
?
Spring WebFlux 是一個響應式 Web 框架,它為構建異步、非阻塞應用程序提供了強大的功能。 Spring WebFlux 的關鍵特性之一是它處理通量的能力。 Flux
是一種反應性數據流,可以發出零個或多個項目。它可以從各種來源創建,例如數據庫查詢、網絡調用或內存中的集合。
在這種情況下,我們應該注意的一個相關術語是訂閱,它表示數據源(即發布者)和數據消費者(即訂閱者)之間的連接。訂閱維護反映訂閱是否處於活動狀態的狀態。它可用於取消訂閱,這將停止Flux
發送數據並釋放發布者持有的任何資源。我們可能想要取消正在進行的訂閱的一些潛在場景可能是用戶取消請求或發生超時等。
3. 取消持續Flux
的好處
在 Reactive Spring WebFlux 中,取消正在進行的Flux
以確保系統資源的高效使用並防止潛在的內存洩漏非常重要。以下是一些原因:
- 背壓:反應式編程使用背壓來調節發布者和訂閱者之間的數據流。如果訂閱者跟不上發布者的步伐,背壓用於減慢或停止數據流。如果正在進行的訂閱沒有被取消,即使訂閱者沒有消費它,它也會繼續生成數據,從而導致背壓增加並可能導致內存洩漏。
- Resource Management : 它可以持有系統資源,如內存、CPU 和網絡連接,如果不加以檢查,可能會導致資源耗盡。系統資源可以通過取消訂閱來釋放,以後可以用於其他任務。
- 性能:**通過提前終止訂閱,系統可以避免不必要的處理並減少響應時間,從而提高整體系統性能**。
4.Maven依賴
讓我們舉一個非常簡單的例子,一些傳感器數據以Flux
形式出現,我們想使用 WebFlux 提供的各種選項取消基於訂閱的數據發射。
首先,我們需要添加以下關鍵依賴項:
- spring-boot-starter-webflux:它捆綁了開始使用 Spring WebFlux 構建反應式 Web 應用程序所需的所有依賴項,包括用於反應式編程的 Reactor 庫和作為默認嵌入式服務器的 Netty。
- reactor-spring :它是 Reactor 項目中的一個模塊,提供與 Spring Framework 的集成。
- reactor-test :它為 Reactive Streams 提供測試支持。
現在,讓我們在項目 POM 中聲明這些依賴項:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.projectreactor</groupId>
<artifactId>reactor-spring</artifactId>
<version>${reactor-spring.version}</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
5. 取消 WebFlux 中的 Ongoing Flux
在 Spring WebFlux 中,我們可以使用dispose()
執行顯式取消,或者它可以在使用某些在Subscription
對像上調用cancel()
的運算符時隱式發生。這些運營商包括:
-
takeUntil()
-
takeWhile()
-
take(long n)
-
take(Duration n)
仔細研究一下,我們會發現這些運算符在內部調用Subscription
對象的cancel
() 方法,作為參數傳遞給 Subscriber 的OnSubscribe()
方法。
接下來我們來討論這些運算符。
5.1.使用takeUntil()
運算符取消
讓我們以我們的傳感器數據示例為基礎。我們希望繼續從數據流中接收數據,直到遇到值 8,此時我們希望取消任何更多數據的發射:
@Test
void givenOngoingFlux_whentakeUntil_thenFluxCancels() {
Flux<Integer> sensorData = Flux.range(1, 10);
List<Integer> result = new ArrayList<>();
sensorData.takeUntil(reading -> reading == 8)
.subscribe(result::add);
assertThat(result).containsExactly(1, 2, 3, 4, 5, 6, 7, 8);
}
此代碼片段使用Flux
API 創建整數流並使用各種運算符對其進行操作。首先,使用Flux.range()
創建從 1 到 10 的整數序列。然後,應用takeUntil()
運算符,它需要一個謂詞來指定Flux
應該繼續發出整數,直到值達到 8。
最後,調用subscribe()
方法,這會導致 Flux 發出值,直到takeUntil()
謂詞的計算結果為 true 。在subscribe()
方法中,每個發出的新整數都會添加到 List
重要的是要注意subscribe()
方法對於觸發從Flux
發出值是必不可少的,沒有它,不會發出任何值,因為Flux
沒有 subscription 。一旦takeUntil()
運算符指定的條件為真,訂閱將自動取消,並且Flux
停止發出值。測試結果確認結果列表僅包含最大 8 的整數值,表明取消了任何進一步的數據發射。
5.2.使用takeWhile()
運算符取消
接下來,讓我們考慮一個場景,只要傳感器讀數保持小於 8,我們就希望訂閱繼續發送數據。在這裡我們可以利用takeWhile
() 運算符,它需要延續謂詞:
@Test
void givenOngoingFlux_whentakeWhile_thenFluxCancels() {
List<Integer> result = new ArrayList<>();
Flux<Integer> sensorData = Flux.range(1, 10)
.takeWhile(reading -> reading < 8)
.doOnNext(result::add);
sensorData.subscribe();
assertThat(result).containsExactly(1, 2, 3, 4, 5, 6, 7);
}
本質上,此處的takeWhile()
運算符還需要一個謂詞。只要謂詞的計算結果為真,數據流就會發出數據。一旦謂詞的計算結果為 false,訂閱就會被取消,並且不再發出數據。請注意,在這裡,我們在設置通量時使用了doOnNext()
方法以將每個發射值添加到列表中。
之後,我們調用sensorData.subscribe().
5.3.使用take(long n)
運算符取消
接下來,讓我們看看take()
運算符,它可以限制我們想要從可能無限的反應數據流序列中獲取的元素數量。讓我們取一個Flux
of Integers,範圍從 1 到 Integers 的最大值,然後取前 10 個元素:
@Test
void givenOngoingFlux_whentake_thenFluxCancels() {
Flux<Integer> sensorData = Flux.range(1, Integer.MAX_VALUE);
List<Integer> result = new ArrayList<>();
sensorData.take(10)
.subscribe(result::add);
Assertions.assertThat(result)
.containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}
在這裡,訂閱在前 10 個元素之後再次被取消,我們的result
列表證實了這一點。
5.4.使用take(Duration d)
運算符取消
我們可能希望取消任何進一步的數據發射的另一種潛在情況是,經過一段時間後,我們對任何進一步的發射都不感興趣。在這種情況下,我們查看Flux
的持續時間,然後停止接收持續時間之外的任何內容:
@Test
void givenAnOnGoingFlux_whenTimeout_thenCancelsFlux() {
Flux<Integer> sensorData = Flux.interval(Duration.ZERO, Duration.ofSeconds(2))
.map(i -> i.intValue() + 10)
.take(5);
Flux<Integer> canceledByTimeout = sensorData.take(Duration.ofSeconds(3));
StepVerifier.create(canceledByTimeout)
.expectNext(10, 11)
.expectComplete()
.verify();
}
首先,我們使用interval()
運算符創建一個整數Flux
,它每隔 2 秒發出從 0 開始的值。然後,我們通過加 10 將每個發射值映射到一個整數。接下來,我們使用take()
運算符將發射值的數量限制為 5。這意味著 Flux 將僅發射前 5 個值,然後完成。
然後,我們通過應用持續時間值為 3 秒的take(Duration)
運算符創建一個名為 canceledBytimeOut 的新Flux
。這意味著 canceledBytimeout Flux 將從傳感器數據中發出前 2 個值,然後完成。
這裡我們使用 StepVerifier。 StepVerifier 是 Reactor-Test 庫提供的一個實用程序,它通過對預期事件設置預期來幫助驗證 Flux 或 Mono 流的行為,然後驗證事件是否按預期順序和預期值發出。
在我們的例子中,預期的順序和值是 10 和 11,我們還在驗證 Flux 是否使用expectComplete()
完成而不發出任何額外的值。
重要的是要注意subscribe()
方法沒有被顯式調用,因為它是在我們調用verify()
時在內部調用的。這意味著事件僅在我們運行 StepVerifier 時發出,而不是在我們創建Flux
流時發出。
5.5.使用dispose()
方法取消
接下來,讓我們看看如何通過調用屬於 Disposable Interface dispose(),
來進行顯式取消.
簡單地說, Disposable
是一個接口,用作取消的單向機制。它可以處理資源或取消訂閱。
讓我們設置一個示例,其中我們有一個整數Flux
,它以 1 秒的延遲發出從 1 到 10 的值。我們將訂閱 flux 以在控制台上打印值。然後我們會讓線程休眠 5ms,然後調用dispose() :
@Test
void giveAnOnGoingFlux_whenDispose_thenCancelsFluxExplicitly() throws InterruptedException {
Flux<Integer> flux = Flux.range(1, 10)
.delayElements(Duration.ofSeconds(1));
AtomicInteger count = new AtomicInteger(0);
Disposable disposable = flux.subscribe(i -> {
System.out.println("Received: " + i);
count.incrementAndGet();
}, e -> System.err.println("Error: " + e.getMessage())
);
Thread.sleep(5000);
System.out.println("Will Dispose the Flux Next");
disposable.dispose();
if(disposable.isDisposed()) {
System.out.println("Flux Disposed");
}
assertEquals(4, count.get());
}
在這裡,我們讓線程休眠 5 秒,然後調用dispose
()。這會導致取消訂閱。
6.取消後清理
重要的是要了解取消正在進行的訂閱不會隱式釋放任何關聯的資源。然而,一旦通量被取消或完成,進行任何清理和狀態重置是很重要的。我們可以使用提供的doOnCancel()
和doFinally
() 方法來實現:
為了簡化我們的測試,我們將在取消通量後打印適當的消息。但是,在實際場景中,此步驟可以執行任何資源清理,例如關閉連接。
讓我們快速測試一下,當Flux
被取消時,我們想要的字符串會作為取消後清理的一部分打印出來:
@Test
void givenAFluxIsCanceled_whenDoOnCancelAndDoFinally_thenMessagePrinted() throws InterruptedException {
List<Integer> result = new ArrayList<>();
PrintStream mockPrintStream = mock(PrintStream.class);
System.setOut(mockPrintStream);
Flux<Integer> sensorData = Flux.interval(Duration.ofMillis(100))
.doOnCancel(() -> System.out.println("Flux Canceled"))
.doFinally(signalType -> {
if (signalType == SignalType.CANCEL) {
System.out.println("Flux Completed due to Cancelation");
} else {
System.out.println("Flux Completed due to Completion or Error");
}
})
.map(i -> ThreadLocalRandom.current().nextInt(1, 1001))
.doOnNext(result::add);
Disposable subscription = sensorData.subscribe();
Thread.sleep(1000);
subscription.dispose();
ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
Mockito.verify(mockPrintStream, times(2)).println(captor.capture());
assertThat(captor.getAllValues()).contains("Flux Canceled", "Flux Completed due to Cancelation");
}
該代碼調用doOnCancel()
和doFinally()
運算符。重要的是要注意doOnCancel()
運算符僅在顯式取消Flux
序列時執行。另一方面, doFinally()
操作符執行它是被取消、成功完成還是錯誤完成。
此外, doFinally()
運算符使用一種SignalType
接口。它表示可能的信號類型,例如OnComplete
、 OnError
和CANCEL
。在這種情況下, SignalType
是CANCEL
,因此也會捕獲“Flux Completed due to Cancellation”消息。
七、結論
在本教程中,我們介紹了 Webflux 提供的各種取消正在進行的Flux
方法。我們在響應式編程的上下文中快速回顧了Flux
。我們檢查了可能需要取消訂閱的原因。然後,我們討論了各種便於取消的方法。此外,我們還研究了取消後的清理工作。
與往常一樣,代碼可以在 GitHub 上找到。