在 Reactor 中將 DataBuffer 轉換為 Mono
1. 引言
在現代 Java 應用中,Project Reactor 和 Spring WebFlux 框架使得串流處理和非阻塞 I/O 成為建置可擴充微服務的標準。在處理大型二進位有效負載(例如檔案上傳或影像傳輸)時, WebFlux 預設將資料以Flux<DataBuffer>形式進行串流傳輸。
雖然Flux<DataBuffer>格式在反壓和低記憶體消耗方面效率很高,但它通常不是我們最終處理所需的格式,尤其是在與 API 或函式庫互動時,這些 API 或函式庫需要將整個二進位內容作為標準的 Java byte[]陣列。
在本文中,我們將探討DataBuffer和Flux的基本概念,並介紹一種將DataBuffer塊流高效率地轉換為單一Mono<byte[]>的策略。
2. 項目設定
首先,我們將加入spring-boot-starter-webflux依賴項。它包含 Project Reactor ( reactor-core )、Spring WebFlux 以及必要的資料緩衝區實用程式:
<dependency>
<groupId>org.springframework.boot</groupId
<artifactId>spring-boot-starter-webflux</artifactId
<version>4.0.0</version>
</dependency>
3. 理解 Reactor 和資料緩衝區
在實現轉換之前,我們必須了解涉及的三個元件: DataBuffer 、 Flex,和我們的目標資料類型Mono<byte[]> 。
3.1. 什麼是資料緩衝區?
在響應式程式設計中, DataBuffer是對一段連續記憶體(通常是位元組數組)的抽象化。與簡單的byte[]不同, DataBuffer通常使用記憶體池來減少垃圾回收的開銷,使其在高吞吐量應用中更有效率。
它充當輕量級包裝器,保存原始位元組並追蹤讀取/寫入索引,允許元件讀取部分資料而無需了解底層記憶體管理細節。
3.2. Flux<DataBuffer>在 WebFlux 中的作用
在 WebFlux 中,大型有效負載(例如檔案上傳或大型 JSON 回應)不會一次全部讀入記憶體。相反,它們會以資料塊序列的形式進行串流傳輸,每個資料塊都封裝在一個DataBuffer中。這種串流傳輸行為由Flux<DataBuffer>表示。
使用Flux可以實現反壓機制,消費者可以向生產者(例如底層 TCP 連線)發出訊號,表明其處理資料的速度,從而防止記憶體耗盡。這種方法對於建立可擴展、非阻塞的應用程式至關重要。
3.3. 為什麼我們需要Mono<byte[]> ?
我們期望的輸出Mono<byte[]>表示一個最終結果(完整的位元組數組),它被封裝在一個響應式發佈器中。 byte byte[]包含所有單獨的DataBuffer區塊聚合而成的完整二進位有效負載。
雖然Flux<DataBuffer>非常適合串流傳輸,但當需要一次處理entire有效負載Mono<byte[]> ,例如計算文件雜湊值、將內容儲存到阻塞式 API 或一次性反序列化整個對象,則必須使用 Mono<byte[]>。
4. 轉化策略及實施
轉換過程在概念上很簡單,但必須使用響應式運算元正確執行,以確保非阻塞執行和適當的資源管理。該策略包含兩個主要步驟:
4.1. 聚合通量流
由於Flux會發出多個DataBuffer對象,因此第一步是將它們合併成一個連續的緩衝區。靜態方法DataBufferUtils.join()可以幫助我們實現這一點。此方法接受一個Flux<DataBuffer> ,並傳回一個Mono<DataBuffer> 。它會迭代地從流中的每個緩衝區讀取位元組,並將它們寫入一個新的、大的DataBuffer中,直到來源Flux完成為止。
記憶體管理在響應式程式設計中至關重要。資料緩衝區通常從記憶體池中分配,必須明確釋放以防止記憶體洩漏和記憶體池耗盡。
好消息是, DataBufferUtils.join()方法正是為了管理這個流程而設計的。它會在聚合過程中自動從來源Flux中釋放各個緩衝區。最終產生的單一Mono<DataBuffer>即可供我們使用,但我們仍然需要負責釋放最終聚合後的緩衝區。
4.2. 提取最終的byte[]數組
最後一步是從產生的Mono中取出單一聚合的DataBuffer ,並將其內容轉換為標準的 Java byte[] 。我們透過使用DataBufferUtils.toByteArray()方法來映射Mono<DataBuffer>來實現這一點。
該工具可以有效率地從最終緩衝區中提取位元組。我們將使用doOnTerminate或doFinally運算元來確保轉換完成後,無論成功與否,最終聚合緩衝區的記憶體都會被釋放。
接下來,我們將依照上述三個步驟創建完整、健壯、無阻塞的實用方法:
public class DataBufferConverter {
public Mono<byte[]> toByteArray(Flux<DataBuffer> data) {
return DataBufferUtils
.join(data)
.flatMap(dataBuffer -> {
try {
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
return Mono.just(bytes);
} finally {
DataBufferUtils.release(dataBuffer);
}
});
}
}
此實用程式使用DataBufferUtils.join()進行聚合,並將最終提取包裝在try-finally區塊中,以確保在轉換為byte[]後釋放最終緩衝區。
5. 測試轉換流程
現在我們將編寫一個簡單的 JUnit 測試來驗證是否能從分塊的Flux中正確重建位元組數組。我們將使用DefaultDataBufferFactory來模擬響應式流環境:
public class DataBufferConverterTest {
private final DataBufferConverter converter = new DataBufferConverter();
private final DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
private final String TEST_CONTENT = "This is a long test string.";
@Test
void givenFluxOfDataBuffers_whenConvertedToByteArray_thenContentMatches() {
// Setup: First, we'll manually create two DataBuffer chunks for the input Flux
byte[] part1 = "This is a ".getBytes();
byte[] part2 = "long test string.".getBytes();
DataBuffer buffer1 = factory.allocateBuffer(part1.length);
buffer1.write(part1);
DataBuffer buffer2 = factory.allocateBuffer(part2.length);
buffer2.write(part2);
Flux<DataBuffer> sourceFlux = Flux.just(buffer1, buffer2);
// Act & Assert: Here we perform conversion and block for direct assertion
byte[] resultBytes = converter.toByteArray(sourceFlux).block();
byte[] expectedBytes = TEST_CONTENT.getBytes();
assertArrayEquals(expectedBytes, resultBytes, "The reconstructed byte array should match original");
}
}
在這裡,我們手動建立了兩個分塊的DataBuffer段,並將它們傳送到轉換器。然後,我們執行同步斷言來驗證重構的byte[]內容是否與原始內容相符。
6. 結論
在本文中,我們首先探討了Flux<DataBuffer>的流式特性與我們的目標( Mono<byte[]>所表示的完整有效載荷)之間的根本差異。
我們詳細闡述了包含聚合、緩衝區處理和最終提取的三步驟轉換策略。最後,我們使用DataBufferUtils.join().接下來,我們透過在flatMap操作符中手動釋放最終聚合的DataBuffer來確保適當的記憶體管理。
這種簡潔、非阻塞模式使我們能夠將高效能響應式資料流與需要單一byte[]有效負載的傳統 API 無縫整合。
與往常一樣,該實作的原始程式碼可在 GitHub 上找到。