將 zipWhen() 與 Mono 一起使用
1. 概述
在本教程中,我們將探索如何使用zipWhen()
以協調的方式組合兩個或多個Mono
流的結果。我們將從快速概述開始。接下來,我們將設定一個涉及用戶資料儲存和電子郵件的簡單範例。我們將展示zipWhen()
如何使我們能夠在需要同時收集和處理來自不同來源的資料的場景中編排和協調多個非同步操作。
2. 什麼是zipWhen()?
在Mono
的響應式程式設計中, zipWhen()
是一個運算符,它允許我們以協調的方式組合兩個或多個Mono
流的結果。當我們要同時執行多個非同步操作並且需要將它們的結果組合成單一輸出時,通常會使用它。
我們從兩個或多個表示非同步操作的Mono
流開始。這些 Mono 可以發出不同類型的數據,它們可能相互依賴,也可能不依賴。
然後我們使用zipWhen()
進行協調。我們將zipWhen()
運算子應用於Monos
之一。該運算元等待第一個Mono
發出一個值,然後使用該值觸發其他 Mono 的執行。 zipWhen()
的結果是一個新的Mono
,它將所有Monos
的結果組合成一個資料結構,通常是一個Tuple
或我們定義的物件。
最後,我們可以指定如何組合 Monos 的結果。我們可以使用組合值來建立新物件、執行計算或建立有意義的回應。
3. 設定範例
讓我們設定一個由三個簡化服務組成的簡單範例: UserService
、 EmailService,
和DataBaseService
。它們中的每一個都以不同類型的Mono
形式產生資料。我們希望將所有資料合併在一個回應中並將其傳回給呼叫客戶端。讓我們先設定適當的POM
相依性。
3.1.依賴關係
讓我們先設定所需的依賴項。我們需要[spring-boot-starter-webflux](https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-webflux/3.1.3)
和[reactor-test](https://mvnrepository.com/artifact/io.projectreactor/reactor-test/3.5.10)
:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
3.2.設定UserService
我們先介紹一下User Service:
public class UserService {
public Mono<User> getUser(String userId) {
return Mono.just(new User(userId, "John Stewart"));
}
}
這裡, UserService
提供了一種根據給定的userId.
它傳回一個表示使用者資訊的Mono<User>
。
3.3.設定EmailService
接下來,讓我們新增EmailService
:
public class EmailService {
private final UserService userService;
public EmailService(UserService userService) {
this.userService = userService;
}
public Mono<Boolean> sendEmail(String userId) {
return userService.getUser(userId)
.flatMap(user -> {
System.out.println("Sending email to: " + user.getEmail());
return Mono.just(true);
})
.defaultIfEmpty(false);
}
}
顧名思義, EmailService
負責向用戶發送電子郵件。重要的是,它依賴UserService
來獲取用戶詳細信息,然後根據檢索到的信息發送電子郵件。 sendEmail()
方法傳回一個Mono<Boolean>
指示電子郵件是否發送成功。
3.4.設定DatabaseService
public class DatabaseService {
private Map<String, User> dataStore = new ConcurrentHashMap<>();
public Mono<Boolean> saveUserData(User user) {
return Mono.create(sink -> {
try {
dataStore.put(user.getId(), user);
sink.success(true);
} catch (Exception e) {
sink.success(false);
}
});
}
}
DatabaseService
處理使用者資料到資料庫的持久性。為了簡單起見,我們在這裡使用並發映射來表示資料儲存。
它提供了一個saveUserData()
方法,該方法會取得使用者資訊並傳回一個Mono<Boolean>
來表示資料庫操作的成功或失敗。
4. zipWhen()
的實際應用
現在我們已經定義了所有服務,讓我們定義一個控制器方法,它將來自所有三個服務的Mono
流組合成一個Mono<ResponseEntity<String>>.
我們將展示如何使用zipWhen()
運算子來協調各種非同步操作並將它們全部轉換為呼叫客戶端的單一回應。我們先定義一下GET
方法:
@GetMapping("/example/{userId}")
public Mono<ResponseEntity<String>> combineAllDataFor(@PathVariable String userId) {
Mono<User> userMono = userService.getUser(userId);
Mono<Boolean> emailSentMono = emailService.sendEmail(userId)
.subscribeOn(Schedulers.parallel());
Mono<String> databaseResultMono = userMono.flatMap(user -> databaseService.saveUserData(user)
.map(Object::toString));
return userMono.zipWhen(user -> emailSentMono, (t1, t2) -> Tuples.of(t1, t2))
.zipWhen(tuple -> databaseResultMono, (tuple, databaseResult) -> {
User user = tuple.getT1();
Boolean emailSent = tuple.getT2();
return ResponseEntity.ok()
.body("Response: " + user + ", Email Sent: " + emailSent + ", Database Result: " + databaseResult);
});
}
當客戶端呼叫 GET /example/{userId} 端點時, userService 會呼叫combineAllData()
方法,透過呼叫userService.getUser(userId)
來根據提供的 userId 擷取有關使用者的資訊。這個結果儲存在Mono<User>
中,這裡稱為 userMono。
接下來,它會向同一用戶發送一封電子郵件。但是,在發送電子郵件之前,它會檢查使用者是否存在。電子郵件發送操作的結果(成功或失敗)由Mono<Boolean>
類型的emailSentMono
表示。此操作並行執行以節省時間。它使用databaseService.saveUserData(user).
此操作的結果(成功或失敗)將轉換為字串並儲存在Mono<String>
中。
重要的是,它使用zipWhen()
運算子來組合前面步驟的結果。第一個zipWhen()
將使用者資料userMono
和emailSentMono
中的電子郵件傳送狀態組合到一個元組。第二個zipWhen()
結合了前面的元組和dataBaseResultMono
的資料庫結果來建構最終回應。在第二個zipWhen()
內,它使用組合資料建構回應訊息。
訊息中包含使用者資訊、郵件是否傳送成功以及資料庫操作的結果。本質上,此方法協調特定使用者的使用者資料檢索、電子郵件傳送和資料庫操作,並將結果組合成有意義的回應,確保一切有效率、並發地發生。
5. 測試
現在,讓我們測試我們的系統,並驗證是否返回了結合了三種不同類型的響應流的正確響應:
@Test
public void givenUserId_whenCombineAllData_thenReturnsMonoWithCombinedData() {
UserService userService = Mockito.mock(UserService.class);
EmailService emailService = Mockito.mock(EmailService.class);
DatabaseService databaseService = Mockito.mock(DatabaseService.class);
String userId = "123";
User user = new User(userId, "John Doe");
Mockito.when(userService.getUser(userId))
.thenReturn(Mono.just(user));
Mockito.when(emailService.sendEmail(userId))
.thenReturn(Mono.just(true));
Mockito.when(databaseService.saveUserData(user))
.thenReturn(Mono.just(true));
UserController userController = new UserController(userService, emailService, databaseService);
Mono<ResponseEntity<String>> responseMono = userController.combineAllDataFor(userId);
StepVerifier.create(responseMono)
.expectNextMatches(responseEntity -> responseEntity.getStatusCode() == HttpStatus.OK && responseEntity.getBody()
.equals("Response: " + user + ", Email Sent: true, Database Result: " + true))
.verifyComplete();
}
我們正在使用 StepVerifier
驗證回應實體是否具有預期的 200 OK 狀態代碼以及使用不同 Monos 組合結果的主體 zipWhen()
。
六,結論
在本教程中,我們快速了解了在響應式程式設計中將zipWhen()
與Mono
結合使用。我們使用了用戶資料收集、電子郵件和儲存組件的範例,所有這些組件都提供了不同類型的 Mono。此範例示範如何使用zipWhen()
在響應式 Spring WebFlux 應用程式中有效處理資料依賴性並編排非同步操作。
與往常一樣,完整的源代碼可以在 GitHub 上取得。