Spring WebFlux中的背壓機制

1.簡介

Spring WebFlux為Web應用程序提供了反應式編程。響應式設計的異步和非阻塞性質提高了性能和內存使用率。 Project Reactor提供了那些功能來有效地管理數據流。

但是,背壓是這類應用中的常見問題。在本教程中,我們將解釋它是什麼以及如何在Spring WebFlux中應用反壓機制來緩解這種情況。

2.反應流中的背壓

由於響應式編程的非阻塞性質,服務器不會立即發送完整的流。它可以在數據可用時立即並發推送數據。因此,客戶端等待較少的時間來接收和處理事件。但是,仍有一些問題需要克服。

軟件系統中的背壓是使通信量過載的能力。換句話說,信息發布者將無法處理的數據淹沒了消費者。

最終,人們也將此術語用作控制和處理它的機制。它是系統控制下游力所採取的保護措施。

2.1 什麼是背壓?

**在“反應性流”中,**背壓還定義瞭如何調節流元素的傳輸。換句話說,控制接收者可以消耗多少元素。

讓我們用一個例子清楚地描述它是什麼:

  • 該系統包含三個服務:發布者,使用者和圖形用戶界面(GUI)
  • 發布者每秒向消費者發送10000個事件
  • 使用者處理它們並將結果發送到GUI
  • GUI將結果顯示給用戶
  • 消費者每秒只能處理7500個事件

Spring

**以該速度,消費者無法管理事件(**背壓) 。因此,系統將崩潰並且用戶將看不到結果。

2.2 使用背壓防止系統性故障

這裡的建議是應用某種背壓策略來防止系統性故障。目的是有效管理收到的額外事件:

  • 控制發送的數據流將是第一個選擇。基本上,發布者需要放慢事件的速度。因此,消費者不會過載。不幸的是,這並不總是可能的,我們需要找到其他可用的選項
  • 第二個選擇是緩衝額外的數據量。使用這種方法,消費者可以臨時存儲剩餘事件,直到可以處理它們為止。這裡的主要缺點是取消綁定緩衝區,導致內存崩潰
  • 丟棄多餘的事件,使它們失去踪跡。即使此解決方案也不是理想的選擇,使用此技術,系統也不會崩潰

Spring

2.3 控制背壓

我們將重點控制發布者發出的事件。基本上,有三種策略可以遵循:

  • 僅在訂閱者請求時發送新事件。這是在發射器請求時收集元素的拉動策略
  • 限制要在客戶端接收的事件數。限制數量的推送策略,發布者只能一次將最大數量的項目發送給客戶
  • 當使用者無法處理更多事件時,取消數據流。在這種情況下,接收方可以在任何給定時間中止傳輸並稍後再次訂閱該流

Spring

3.在Spring WebFlux中處理背壓

Spring WebFlux提供了反應流的異步非阻塞流。 Spring WebFlux中負責背壓的是Project Reactor 。它在內部使用Flux功能來應用機制來控制由發射器產生的事件。

WebFlux使用TCP流量控制來調節背壓(以字節為單位)。但是它不能處理消費者可以接收的邏輯元素。讓我們看看幕後發生的交互流:

  • WebFlux框架負責將事件轉換為字節,以便通過TCP傳輸/接收事件
  • 在請求下一個邏輯元素之前,使用者可能會啟動並運行長時間的工作
  • 接收方處理事件時,WebFlux會在不進行確認的情況下使字節排隊,因為不需要新事件
  • 由於TCP協議的性質,如果有新事件,發布者將繼續將其發送到網絡

Spring

總之,上圖顯示,對於消費者和發布者,邏輯元素中的需求可能會有所不同。 Spring WebFlux不能理想地管理作為一個整體系統進行交互的服務之間的背壓。它與消費者獨立處理,然後與發布者以相同方式處理。但這沒有考慮到這兩種服務之間的邏輯需求。

因此, Spring WebFlux不會像我們期望的那樣處理背壓。在下一節中,讓我們看看如何在Spring WebFlux中實現反壓機制!

4.使用Spring WebFlux實現背壓機制

我們將使用Flux實現來處理對接收到的事件的控制。因此,我們將在讀取和寫入側為請求和響應主體提供反壓支持。然後,生產者將放慢速度或停下來,直到消費者的生產能力釋放出來。讓我們來看看如何做!

4.1 依賴關係

為了實現示例,我們只需將Spring WebFlux啟動器Reactor測試依賴項添加到我們的pom.xml

<dependency>

 <groupId>org.springframework.boot</groupId>

 <artifactId>spring-boot-starter-webflux</artifactId>

 </dependency>



 <dependency>

 <groupId>io.projectreactor</groupId>

 <artifactId>reactor-test</artifactId>

 <scope>test</scope>

 </dependency>

4.2 請求新事件

第一種選擇是****讓消費者控制它可以處理的事件。因此,發布者一直等到接收者請求新事件。總而言之,客戶端訂閱Flux ,然後根據其需求處理事件:

@Test

 public void whenRequestingChunks10_thenMessagesAreReceived() {

 Flux request = Flux.range(1, 50);



 request.subscribe(

 System.out::println,

 err -> err.printStackTrace(),

 () -> System.out.println("All 50 items have been successfully processed!!!"),

 subscription -> {

 for (int i = 0; i < 5; i++) {

 System.out.println("Requesting the next 10 elements!!!");

 subscription.request(10);

 }

 }

 );



 StepVerifier.create(request)

 .expectSubscription()

 .thenRequest(10)

 .expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

 .thenRequest(10)

 .expectNext(11, 12, 13, 14, 15, 16, 17, 18, 19, 20)

 .thenRequest(10)

 .expectNext(21, 22, 23, 24, 25, 26, 27 , 28, 29 ,30)

 .thenRequest(10)

 .expectNext(31, 32, 33, 34, 35, 36, 37 , 38, 39 ,40)

 .thenRequest(10)

 .expectNext(41, 42, 43, 44, 45, 46, 47 , 48, 49 ,50)

 .verifyComplete();

通過這種方法,發射器永遠不會淹沒接收器。換句話說,客戶端處於控制之下以處理其所需的事件。

StepVerifier測試與反壓有關的生產者行為。 thenRequest(n)時才能期待接下來的n個項目。

4.3 限制

第二個選項是使用Project Reactor中limitRange()它允許設置要預取的項目數。一個有趣的功能是,即使訂戶請求處理更多事件,該限制也適用。發射器將事件分成多個塊,避免消耗超過每個請求的限制:

@Test

 public void whenLimitRateSet_thenSplitIntoChunks() throws InterruptedException {

 Flux<Integer> limit = Flux.range(1, 25);



 limit.limitRate(10);

 limit.subscribe(

 value -> System.out.println(value),

 err -> err.printStackTrace(),

 () -> System.out.println("Finished!!"),

 subscription -> subscription.request(15)

 );



 StepVerifier.create(limit)

 .expectSubscription()

 .thenRequest(15)

 .expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

 .expectNext(11, 12, 13, 14, 15)

 .thenRequest(10)

 .expectNext(16, 17, 18, 19, 20, 21, 22, 23, 24, 25)

 .verifyComplete();

 }

4.4 取消

最終,消費者可以隨時取消要接收的事件。對於此示例,我們將使用另一種方法。 Project Reactor允許實現我們自己的Subscriber或擴展BaseSubscriber 。因此,讓我們看一下接收者如何在任何時候都可以重寫接收到的新類來中止新事件的接收:

@Test

 public void whenCancel_thenSubscriptionFinished() {

 Flux<Integer> cancel = Flux.range(1, 10).log();



 cancel.subscribe(new BaseSubscriber<Integer>() {

 @Override

 protected void hookOnNext(Integer value) {

 request(3);

 System.out.println(value);

 cancel();

 }

 });



 StepVerifier.create(cancel)

 .expectNext(1, 2, 3)

 .thenCancel()

 .verify();

 }

5.結論

在本教程中,我們展示了反應式編程中的反壓以及如何避免這種反壓。 Spring WebFlux通過Project Reactor支持背壓。因此,當發布者通過太多事件壓倒消費者時,它可以提供可用性,魯棒性和穩定性。總而言之,它可以防止由於需求量大而導致的系統性故障。