Reactor WebFlux 與虛擬線程
1. 概述
在本教學中,我們將比較 Java 19 的虛擬執行緒與 Project Reactor 的 Webflux。我們將首先回顧每種方法的基本工作原理,隨後,我們將分析它們的優點和缺點。
我們將從探索響應式框架的優勢開始,然後我們將了解為什麼 WebFlux 仍然有價值。之後,我們將討論每個請求一個執行緒的方法,並重點介紹虛擬執行緒可能是更好選擇的場景。
2. 程式碼範例
對於本文中的程式碼範例,我們假設我們正在開發電子商務應用程式的後端。我們將重點放在負責計算和發布添加到購物車的商品價格的函數:
class ProductService {
private final String PRODUCT_ADDED_TO_CART_TOPIC = "product-added-to-cart";
private final ProductRepository repository;
private final DiscountService discountService;
private final KafkaTemplate<String, ProductAddedToCartEvent> kafkaTemplate;
// constructor
public void addProductToCart(String productId, String cartId) {
Product product = repository.findById(productId)
.orElseThrow(() -> new IllegalArgumentException("not found!"));
Price price = product.basePrice();
if (product.category().isEligibleForDiscount()) {
BigDecimal discount = discountService.discountForProduct(productId);
price.setValue(price.getValue().subtract(discount));
}
var event = new ProductAddedToCartEvent(productId, price.getValue(), price.getCurrency(), cartId);
kafkaTemplate.send(PRODUCT_ADDED_TO_CART_TOPIC, cartId, event);
}
}
正如我們所看到的,我們首先使用MongoRepository
從 MongoDB 資料庫檢索Product
。檢索後,我們將確定該Product
是否符合折扣資格。如果是這種情況,我們使用DiscountService
執行 HTTP 請求以確定該產品的任何可用折扣。
最後,我們計算產品的最終價格。完成後,我們發送一條 Kafka 訊息,其中包含productId, cartId
和計算出的價格。
3.WebFlux
WebFlux 是一個用於建立非同步、非阻塞和事件驅動應用程式的框架。它按照反應式程式設計原理運行,利用Flux
和Mono
類型來處理複雜的非同步通訊。這些類型實現了發布者-訂閱者設計模式,將資料的消費者和生產者解耦。
3.1.反應式庫
Spring 生態系統中的許多模組與 WebFlux 整合以進行響應式編程。讓我們使用其中一些模組,同時將程式碼重構為響應式範例。
例如,我們可以將MongoRepository
切換為ReactiveMongoRepository
。此變更意味著我們必須使用Mono<Product>
而不是Optional<Product>
:
Mono<Product> product = repository.findById(productId)
.switchIfEmpty(Mono.error(() -> new IllegalArgumentException("not found!")));
同樣,我們可以將ProductService
更改為非同步和非阻塞。例如,我們可以讓它使用 WebClient 來執行 HTTP 請求,從而以Mono<BigDecimal>:
Mono<BigDecimal> discount = discountService.discountForProduct(productId);
3.2.不變性
在函數式和響應式程式設計範例中,不變性始終優於可變資料。我們最初的方法涉及使用設定器更改Price
的值。然而,當我們轉向反應式方法時,讓我們重構Price
物件並使其不可變。
例如,我們可以引入一種專用方法來應用折扣並產生一個新的Price
實例,而不是修改現有的實例:
record Price(BigDecimal value, String currency) {
public Price applyDiscount(BigDecimal discount) {
return new Price(value.subtract(discount), currency);
}
}
現在,我們可以使用 WebFlux 的map()
方法根據折扣計算新價格:
Mono<Price> price = discountService.discountForProduct(productId)
.map(discount -> price.applyDiscount(discount));
此外,我們甚至可以在這裡使用方法引用,以保持程式碼緊湊:
Mono<Price> price = discountService.discountForProduct(productId).map(price::applyDiscount);
3.3.功能管線
Mono
和Flux
透過map()
和flatMap()
等方法遵循函子和 monad 模式。這使我們能夠將用例描述為不可變資料的轉換管道。
讓我們嘗試確定我們的用例所需的轉換:
- 我們從原始的
productId
開始 - 我們將其轉化為
Product
- 我們使用
Product
來計算Price
- 我們使用
Price
來創建event
- 最後,我們將
event
發佈到訊息佇列上
現在,讓我們重構程式碼以反映該函數鏈:
void addProductToCart(String productId, String cartId) {
Mono<Product> productMono = repository.findById(productId)
.switchIfEmpty(Mono.error(() -> new IllegalArgumentException("not found!")));
Mono<Price> priceMono = productMono.flatMap(product -> {
if (product.category().isEligibleForDiscount()) {
return discountService.discountForProduct(productId)
.map(product.basePrice()::applyDiscount);
}
return Mono.just(product.basePrice());
});
Mono<ProductAddedToCartEvent> eventMono = priceMono.map(
price -> new ProductAddedToCartEvent(productId, price.value(), price.currency(), cartId));
eventMono.subscribe(event -> kafkaTemplate.send(PRODUCT_ADDED_TO_CART_TOPIC, cartId, event));
}
現在,讓我們內聯局部變數以保持程式碼緊湊。此外,讓我們提取一個用於計算價格的函數,並在flatMap()
內部使用它:
void addProductToCart(String productId, String cartId) {
repository.findById(productId)
.switchIfEmpty(Mono.error(() -> new IllegalArgumentException("not found!")))
.flatMap(this::computePrice)
.map(price -> new ProductAddedToCartEvent(productId, price.value(), price.currency(), cartId))
.subscribe(event -> kafkaTemplate.send(PRODUCT_ADDED_TO_CART_TOPIC, cartId, event));
}
Mono<Price> computePrice(Product product) {
if (product.category().isEligibleForDiscount()) {
return discountService.discountForProduct(product.id())
.map(product.basePrice()::applyDiscount);
}
return Mono.just(product.basePrice());
}
4. 虛擬線程
虛擬執行緒是透過 Project Loom 在 Java 中引入的,作為平行處理的替代解決方案。它們是由 Java 虛擬機器 (JVM) 管理的輕量級使用者模式執行緒。因此,它們特別適合 I/O 操作,而傳統執行緒可能會花費大量時間等待外部資源。
與非同步或反應式解決方案相比,虛擬線程使我們能夠繼續使用每個請求一個線程的處理模型。換句話說,我們可以繼續按順序編寫程式碼,而無需混合業務邏輯和響應式 API。
4.1.虛擬線程
有幾種方法可以利用虛擬線程來執行我們的程式碼。**對於單一方法,例如上一個範例中演示的方法,我們可以使用startVirtualThread().
**這個靜態方法最近被加入到Thread
API 中,並在新的虛擬執行緒上執行Runnable
:
public void addProductToCart(String productId, String cartId) {
Thread.startVirtualThread(() -> computePriceAndPublishMessage(productId, cartId));
}
private void computePriceAndPublishMessage(String productId, String cartId) {
// ...
}
或者,我們可以使用新的靜態工廠方法 Executors.newVirtualThreadPerTaskExecutor() 來建立依賴虛擬執行緒的ExecutorService
**Executors.newVirtualThreadPerTaskExecutor() .**
此外,對於使用 Spring Framework 6 和 Spring Boot 3 的應用程序,我們可以利用新的Executor
並配置 Spring 以支援虛擬線程而不是平台線程。
4.2.相容性
虛擬線程透過使用更傳統的同步程式設計模型來簡化程式碼。因此,我們可以以類似於阻塞 I/O 操作的順序方式編寫程式碼,而不必擔心顯式的反應式建構。
此外,我們可以從常規單線程程式碼無縫切換到虛擬線程,而無需進行任何更改。例如,在前面的範例中,我們只需要使用靜態工廠方法startVirtualThread()
建立一個虛擬執行緒並執行其中的邏輯:
void addProductToCart(String productId, String cartId) {
Thread.startVirtualThread(() -> computePriceAndPublishMessage(productId, cartId));
}
void computePriceAndPublishMessage(String productId, String cartId) {
Product product = repository.findById(productId)
.orElseThrow(() -> new IllegalArgumentException("not found!"));
Price price = computePrice(productId, product);
var event = new ProductAddedToCartEvent(productId, price.value(), price.currency(), cartId);
kafkaTemplate.send(PRODUCT_ADDED_TO_CART_TOPIC, cartId, event);
}
Price computePrice(String productId, Product product) {
if (product.category().isEligibleForDiscount()) {
BigDecimal discount = discountService.discountForProduct(productId);
return product.basePrice().applyDiscount(discount);
}
return product.basePrice();
}
4.3.可讀性
透過每個請求一個執行緒的處理模型,可以更容易理解和推理業務邏輯。這可以減少與反應式程式範例相關的認知負擔。
換句話說,虛擬線程使我們能夠將技術問題與業務邏輯清晰地分開。因此,它消除了在實現我們的業務用例時對外部 API 的需求。
5. 結論
在本文中,我們比較了並發和非同步處理的兩種不同方法。我們首先分析了 Reactor 專案的 WebFlux 和反應式程式設計範例。我們發現這種方法有利於不可變物件和功能管道。
之後,我們討論了虛擬線程及其與遺留程式碼庫的出色相容性,從而可以平滑過渡到非阻塞程式碼。此外,它們還具有將業務邏輯與基礎設施代碼和其他技術問題清晰分離的額外好處。
與往常一樣,本文中使用的所有程式碼範例都可以在 GitHub 上取得。