Spring Reactor簡介

1.概述

在這篇快速的文章中,我們將通過為反應性,事件驅動的應用程序設置真實場景來介紹反應堆總線

2. Project Reactor的基礎

2.1 為什麼要使用Reactor?

現代應用程序需要處理大量並發請求並處理大量數據。標準的阻塞代碼已不足以滿足這些要求。

反應性設計模式是一種基於事件的體系結構方法,用於異步處理來自單個或多個服務處理程序的大量並發服務請求

Project Reactor基於此模式,並且有一個明確而雄心勃勃的目標,那就是在JVM上構建非阻塞,反應式應用程序.

2.2 示例方案

在開始之前,這裡有一些有趣的場景,在這些場景中,可以利用反應式架構風格,只是為了了解我們可以在哪裡應用它:

  • 適用於大型在線購物平台(如亞馬遜)的通知服務
  • 為銀行業提供大量交易處理服務
  • 股票價格同時變化的股票交易業務

3. Maven依賴

讓我們通過在pom.xml:添加以下依賴關係來開始使用Project Reactor Bus pom.xml:

<dependency>
 <groupId>io.projectreactor</groupId>
 <artifactId>reactor-bus</artifactId>
 <version>2.0.8.RELEASE</version>
 </dependency>

我們可以在Maven Central中查看最新版本的reactor-bus

4.構建演示應用程序

為了更好地理解基於反應器的方法的好處,讓我們看一個實際的例子。

我們將構建一個簡單的應用程序,負責向在線購物平台的用戶發送通知。例如,如果用戶下了新訂單,則該應用會通過電子郵件或SMS發送訂單確認。

典型的同步實現自然會受到電子郵件或SMS服務的吞吐量的限制。因此,交通高峰,例如假期通常是有問題的。

通過反應性方法,我們可以將系統設計為更靈活,並更好地適應外部系統(例如網關服務器)中可能發生的故障或超時。

讓我們看一下該應用程序-從更傳統的方面開始,再轉到更具reactive的結構。

4.1。簡單的POJO

首先,我們創建一個POJO類來表示通知數據:

eval(ez_write_tag([[468,60],'baeldung_com-medrectangle-3','ezslot_0',110,'0','0']));

public class NotificationData {

 private long id;
 private String name;
 private String email;
 private String mobile;

 // getter and setter methods
 }

4.2。服務層

現在讓我們定義一個簡單的服務層:

public interface NotificationService {

 void initiateNotification(NotificationData notificationData)
 throws InterruptedException;

 }

和實現,模擬長時間運行的操作:

@Service
 public class NotificationServiceimpl implements NotificationService {

 @Override
 public void initiateNotification(NotificationData notificationData)
 throws InterruptedException {

 System.out.println("Notification service started for "
 + "Notification ID: " + notificationData.getId());

 Thread.sleep(5000);

 System.out.println("Notification service ended for "
 + "Notification ID: " + notificationData.getId());
 }
 }

請注意,說明通過短信或電子郵件網關發送消息的真實生活場景,我們特意推出了五秒鐘的延遲initiateNotification與方法Thread.sleep(5000).

因此,當線程訪問服務時,它將被阻塞五秒鐘。

4.3。消費者

現在,讓我們進入應用程序的更多響應方面並實現一個使用者-然後將其映射到反應堆事件總線:

@Service
 public class NotificationConsumer implements
 Consumer<Event<NotificationData>> {

 @Autowired
 private NotificationService notificationService;

 @Override
 public void accept(Event<NotificationData> notificationDataEvent) {
 NotificationData notificationData = notificationDataEvent.getData();

 try {
 notificationService.initiateNotification(notificationData);
 } catch (InterruptedException e) {
 // ignore
 }

 }
 }

如我們所見,我們創建的Consumer<T>實現了Consumer<T>接口。主要邏輯駐留在accept方法中。

這是我們在典型的Spring偵聽器實現中可以達到的類似方法。

4.4。控制器

最後,既然我們可以使用事件了,那麼我們也可以生成它們。

我們將在一個簡單的控制器中執行此操作:

@Controller
 public class NotificationController {

 @Autowired
 private EventBus eventBus;

 @GetMapping("/startNotification/{param}")
 public void startNotification(@PathVariable Integer param) {
 for (int i = 0; i < param; i++) {
 NotificationData data = new NotificationData();
 data.setId(i);

 eventBus.notify("notificationConsumer", Event.wrap(data));

 System.out.println(
 "Notification " + i + ": notification task submitted successfully");
 }
 }
 }

這是不言自明的-我們在這里通過EventBus發出事件。

例如,如果客戶端點擊參數值為10的URL,則將通過事件總線發送十個事件。

4.5。 Java配置

現在讓我們將所有內容放在一起,並創建一個簡單的Spring Boot應用程序。

首先,我們需要配置EventBusEnvironment Bean:

@Configuration
 public class Config {

 @Bean
 public Environment env() {
 return Environment.initializeIfEmpty().assignErrorJournal();
 }

 @Bean
 public EventBus createEventBus(Environment env) {
 return EventBus.create(env, Environment.THREAD_POOL);
 }
 }

在我們的例子中,我們使用環境中可用的默認線程池實例化EventBus

另外,我們可以使用自定義的Dispatcher實例:

EventBus evBus = EventBus.create(
 env,
 Environment.newDispatcher(
 REACTOR_CAPACITY,REACTOR_CONSUMERS_COUNT,
 DispatcherType.THREAD_POOL_EXECUTOR));

現在,我們準備創建一個主要的應用程序代碼:

import static reactor.bus.selector.Selectors.$;

 @SpringBootApplication
 public class NotificationApplication implements CommandLineRunner {

 @Autowired
 private EventBus eventBus;

 @Autowired
 private NotificationConsumer notificationConsumer;

 @Override
 public void run(String... args) throws Exception {
 eventBus.on($("notificationConsumer"), notificationConsumer);
 }

 public static void main(String[] args) {
 SpringApplication.run(NotificationApplication.class, args);
 }
 }

在我們的run方法,我們註冊notificationConsumer在通知指定選擇匹配被觸發

注意我們如何使用$屬性的靜態導入來創建Selector對象。

5.測試應用程序

現在讓我們創建一個測試來查看我們的NotificationApplication情況:

@RunWith(SpringRunner.class)
 @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
 public class NotificationApplicationIntegrationTest {

 @LocalServerPort
 private int port;

 @Test
 public void givenAppStarted_whenNotificationTasksSubmitted_thenProcessed() {
 RestTemplate restTemplate = new RestTemplate();
 restTemplate.getForObject("http://localhost:" + port + "/startNotification/10", String.class);
 }
 }

如我們所見,一旦執行了請求,所有十個任務都會立即提交,而不會產生任何阻塞。提交後,通知事件將並行處理。

Notification 0: notification task submitted successfully
 Notification 1: notification task submitted successfully
 Notification 2: notification task submitted successfully
 Notification 3: notification task submitted successfully
 Notification 4: notification task submitted successfully
 Notification 5: notification task submitted successfully
 Notification 6: notification task submitted successfully
 Notification 7: notification task submitted successfully
 Notification 8: notification task submitted successfully
 Notification 9: notification task submitted successfully
 Notification service started for Notification ID: 1
 Notification service started for Notification ID: 2
 Notification service started for Notification ID: 3
 Notification service started for Notification ID: 0
 Notification service ended for Notification ID: 1
 Notification service ended for Notification ID: 0
 Notification service started for Notification ID: 4
 Notification service ended for Notification ID: 3
 Notification service ended for Notification ID: 2
 Notification service started for Notification ID: 6
 Notification service started for Notification ID: 5
 Notification service started for Notification ID: 7
 Notification service ended for Notification ID: 4
 Notification service started for Notification ID: 8
 Notification service ended for Notification ID: 6
 Notification service ended for Notification ID: 5
 Notification service started for Notification ID: 9
 Notification service ended for Notification ID: 7
 Notification service ended for Notification ID: 8
 Notification service ended for Notification ID: 9

重要的是要記住,在我們的場景中,無需按任何特定順序處理這些事件。

六,結論

在本快速教程中,我們創建了一個簡單的事件驅動應用程序。我們還看到瞭如何開始編寫更具反應性和非阻塞性的代碼。

但是,這種情況只是刮擦對象的表面,只是開始嘗試反應式範式的良好基礎

與往常一樣,源代碼可以在GitHub上找到