使用 Spring Reactive WebClient 將 Flux 讀入單個 InputStream
一、概述
在本教程中,我們將深入研究 Java 反應式編程,以解決如何將Flux<DataBuffer>
讀入單個InputStream
的有趣問題。
2. 請求設置
作為解決將Flux<DataBuffer>
讀入單個InputStream
問題的第一步,我們將使用 Spring 響應式 WebClient 發出GET
請求。此外,我們可以將gorest.co.in託管的公共 API 端點之一用於此類測試場景:
String REQUEST_ENDPOINT = "https://gorest.co.in/public/v2/users";
接下來,讓我們定義用於獲取WebClient
類的新實例的getWebClient()
方法:
static WebClient getWebClient() {
WebClient.Builder webClientBuilder = WebClient.builder();
return webClientBuilder.build();
}
此時,我們已準備好向/public/v2/users
端點發出GET
請求。但是,我們必須將響應主體作為Flux<DataBuffer>
對象。所以,讓我們繼續下一節關於BodyExtractors
來準確地做到這一點。
3. BodyExtractors
和DataBufferUtils
我們可以**使用spring-webflux
中BodyExtractors
類的toDataBuffers()
方法將響應體提取到Flux<DataBuffer>
**中。
讓我們繼續創建body
作為Flux<DataBuffer>
類型的實例:
Flux<DataBuffer> body = client
.get(
.uri(REQUEST_ENDPOINT)
.exchangeToFlux( clientResponse -> {
return clientResponse.body(BodyExtractors.toDataBuffers());
});
接下來,由於我們需要將這些DataBuffer
流收集到單個InputStream
中,實現此目的的一個好策略是使用PipedInputStream
和[PipedOutputStream](https://docs.oracle.com/en/java/javase/14/docs/api/java.base/java/io/class-use/PipedOutputStream.html)
。
此外,我們打算寫入PipedOutputStream
並最終從PipedInputStream
讀取。那麼,讓我們看看如何創建這兩個連接的流:
PipedOutputStream outputStream = new PipedOutputStream();
PipedInputStream inputStream = new PipedInputStream(1024*10);
inputStream.connect(outputStream);
我們必須注意,默認大小是1024
字節。但是,我們預計從Flux<DataBuffer>
收集的結果可能會超過默認值。因此,我們需要明確指定更大的尺寸值,在本例中為1024*10
。
最後,我們使用DataBufferUtils
類中可用的write()
實用方法將body
作為發布者寫入outputStream
:
DataBufferUtils.write(body, outputStream).subscribe();
我們必須注意,我們在聲明時將inputStream
連接到outputStream
。所以,我們很高興從inputStream
中讀取。讓我們繼續下一部分,看看它的實際效果。
4. 從PipedInputStream
中讀取
首先,讓我們定義一個輔助方法readContent()
來將InputStream
作為String
對象讀取:
String readContent(InputStream stream) throws IOException {
StringBuffer contentStringBuffer = new StringBuffer();
byte[] tmp = new byte[stream.available()];
int byteCount = stream.read(tmp, 0, tmp.length);
contentStringBuffer.append(new String(tmp));
return String.valueOf(contentStringBuffer);
}
接下來,因為在不同線程中讀取 PipedInputStream 是一種典型**PipedInputStream
** ,所以讓我們創建readContentFromPipedInputStream()
方法,該方法在內部生成一個新線程,通過調用readContent()
方法將PipedInputStream
中的內容讀取到String
對像中:
String readContentFromPipedInputStream(PipedInputStream stream) throws IOException {
StringBuffer contentStringBuffer = new StringBuffer();
try {
Thread pipeReader = new Thread(() -> {
try {
contentStringBuffer.append(readContent(stream));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
pipeReader.start();
pipeReader.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
stream.close();
}
return String.valueOf(contentStringBuffer);
}
在這個階段,我們的代碼已準備好用於模擬。讓我們看看它的實際效果:
WebClient webClient = getWebClient();
InputStream inputStream = getResponseAsInputStream(webClient, REQUEST_ENDPOINT);
Thread.sleep(3000);
String content = readContentFromPipedInputStream((PipedInputStream) inputStream);
logger.info("response content: \n{}", content.replace("}","}\n"));
當我們處理異步系統時,我們將讀取延遲任意 3 秒,然後再從流中讀取,以便我們能夠看到完整的響應。此外,在記錄時,我們插入換行符以將長輸出分成多行。
最後,讓我們驗證一下代碼執行產生的輸出:
20:45:04.120 [main] INFO com.baeldung.databuffer.DataBufferToInputStream - response content:
[{"id":2642,"name":"Bhupen Trivedi","email":"[email protected]","gender":"male","status":"active"}
,{"id":2637,"name":"Preity Patel","email":"[email protected]","gender":"female","status":"inactive"}
,{"id":2633,"name":"Brijesh Shah","email":"[email protected]","gender":"male","status":"inactive"}
...
,{"id":2623,"name":"Mohini Mishra","email":"[email protected]","gender":"female","status":"inactive"}
]
而已!看起來我們已經搞定了。
5. 結論
在本文中,我們使用管道流的概念以及BodyExtractors
和DataBufferUtils
類中可用的實用方法將Flux<DataBuffer>
讀入單個InputStream
。
與往常一樣,本教程的完整源代碼可在 GitHub 上獲得。