Spring AI ChatClient 中的串流響應
1.概述
在標準 REST 回應中,伺服器會等到收到完整的負載後再將其傳送回客戶端。然而,大型語言模型 (LLM) 會以逐個標記的方式產生輸出,通常需要花費大量時間才能產生完整的回應。
這會導致等待完整回應的延遲,尤其是在輸出涉及大量令牌的情況下。串流回應透過以小塊增量發送資料來解決此問題。
在本教程中,我們將探討如何使用 Spring AI ChatClient
返回串流聊天回應,而不是一次發送整個回應。
2. Maven依賴項
讓我們先將 Spring AI OpenAI依賴項新增到我們的pom.xml
中:
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-starter-model-openai</artifactId>
<version>1.0.2</version>
</dependency>
我們需要一個 Web 容器來示範聊天回應流。我們可以選擇spring-boot-starter-web
或spring-boot-starter-webflux
依賴項:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
3. 通用組件
在探索不同的串流方法之前,我們先建立一個通用元件,用於後續部分。 ChatRequest ChatRequest
包含 API 呼叫的有效負載:
public class ChatRequest {
@NotNull
private String prompt;
// constructor, getter and setter
}
在以下部分中,我們將向端點發送以下聊天請求。這是為了讓聊天模型產生較長的回應,以便我們可以示範串流:
{
"prompt": "Tell me a story about a girl loves a boy, around 250 words"
}
現在,我們已經一切就緒,可以開始採用不同的串流方法了。
4. 串流
為了提供更真實的體驗,我們不想等待整個回應才將其傳回給客戶端。我們可以將回應以串流的方式傳送到客戶端。 Spring AI 預設會逐字串流聊天回應。
讓我們建立一個ChatService
來啟用來自ChatClient
的串流聊天回應。這裡的主要部分是我們呼叫stream()
並將回應作為Flux<String>
傳回:
@Component
public class ChatService {
private final ChatClient chatClient;
public ChatService(ChatModel chatModel) {
this.chatClient = ChatClient.builder(chatModel)
.build();
}
public Flux<String> chat(String prompt) {
return chatClient.prompt()
.user(userMessage -> userMessage.text(prompt))
.stream()
.content();
}
}
啟用聊天回應流有兩個條件。首先,REST 控制器必須傳回Flux<String>
。其次,回應內容類型必須設定為text/event-stream
:
@RestController
@Validated
public class ChatController {
private final ChatService chatService;
public ChatController(ChatService chatService) {
this.chatService = chatService;
}
@PostMapping(value = "/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> chat(@RequestBody @Valid ChatRequest request) {
return chatService.chat(request.getPrompt());
}
}
現在,一切就緒。我們可以啟動 Spring Boot 應用程序,並使用 Postman 將聊天請求發送到 REST 端點:
執行後,我們可以看到Postman中顯示的回應體是一行一行的,每一行都是一個伺服器發送的事件。
從回應中,我們可以看到 Spring AI 逐字串流回應。這使得客戶端可以立即開始使用結果,而無需等待回應。這樣,它提供了非常低的延遲,讓用戶感覺就像在即時輸入一樣。
5. 分塊傳輸
儘管以文字形式串流的回應速度非常快,但它可能會顯著增加開銷。
我們可以透過將單字收集在一起形成更大的區塊並返回,而不是單字,從而減少開銷。這使得串流更有效率,並保留了漸進式串流的體驗。
我們可以修改我們的chat()
方法並呼叫 Flux
@Component
public class ChatService {
private final ChatClient chatClient;
public ChatService(ChatModel chatModel) {
this.chatClient = ChatClient.builder(chatModel)
.build();
}
public Flux<String> chat(String prompt) {
return chatClient.prompt()
.user(userMessage -> userMessage.text(prompt))
.stream()
.content()
.transform(flux -> toChunk(flux, 100));
}
private Flux<String> toChunk(Flux<String> tokenFlux, int chunkSize) {
return Flux.create(sink -> {
StringBuilder buffer = new StringBuilder();
tokenFlux.subscribe(
token -> {
buffer.append(token);
if (buffer.length() >= chunkSize) {
sink.next(buffer.toString());
buffer.setLength(0);
}
},
sink::error,
() -> {
if (buffer.length() > 0) {
sink.next(buffer.toString());
}
sink.complete();
}
);
});
}
}
基本上,我們收集Flux<String>
傳回的每個單字,並將其附加到StringBuilder
中。一旦緩衝區大小達到最小值 100 個字符,我們就將緩衝區作為一個區塊刷新到客戶端。在流的末尾,我們將剩餘的緩衝區作為最後一個區塊刷新。
現在,如果我們向修改後的ChatService
發出聊天請求,我們可以看到伺服器發送的事件中的內容除了最後一段之外至少有 100 個字元長:
6. JSON 流
如果我們想以結構化格式串流聊天回應,可以使用換行符號分隔的 JSON(NDJSON)。 NDJSON是一種串流格式,其中每一行包含一個 JSON 對象,並且在物件之間以換行符號分隔。
為了實現這一點,我們可以透過新增系統提示來指示聊天模型返回 NDJSON,同時新增範例 JSON,以確保聊天模型完全理解所需的格式並避免混淆:
@Component
public class ChatService {
private final ChatClient chatClient;
public ChatService(ChatModel chatModel) {
this.chatClient = ChatClient.builder(chatModel)
.build();
}
public Flux<String> chat(String prompt) {
return chatClient.prompt()
.system(systemMessage -> systemMessage.text(
"""
Respond in NDJSON format.
Each JSON object should contains around 100 characters.
Sample json object format: {"part":0,"text":"Once in a small town..."}
"""))
.user(userMessage -> userMessage.text(prompt))
.stream()
.content()
.transform(this::toJsonChunk);
}
private Flux<String> toJsonChunk(Flux<String> tokenFlux) {
return Flux.create(sink -> {
StringBuilder buffer = new StringBuilder();
tokenFlux.subscribe(
token -> {
buffer.append(token);
int idx;
if ((idx = buffer.indexOf("\n")) >= 0) {
String line = buffer.substring(0, idx);
sink.next(line);
buffer.delete(0, idx + 1);
}
},
sink::error,
() -> {
if (buffer.length() > 0) {
sink.next(buffer.toString());
}
sink.complete();
}
);
});
}
}
toJsonChunk()
方法與上一節的toChunk()
類似。關鍵區別在於刷新策略。它不是在緩衝區達到最小大小時刷新數據,而是在令牌中找到換行符後將緩衝區內容刷新到客戶端。
我們再次發起聊天請求來查看結果:
我們可以看到,每一行都是 JSON 對象,其格式遵循系統提示。 JSON 被各種程式語言廣泛支持,這使得客戶端在事件到達時可以輕鬆解析和使用。
7. 非串流媒體
我們已經探索了串流響應的不同方法。現在,讓我們來看看傳統的非流式方法。
當我們使用spring-boot-starter-web
Maven 依賴項傳回同步聊天回應時,我們只需呼叫ChatClient
call()
方法:
ChatClient chatClient = ...;
chatClient.prompt()
.user(userMessage -> userMessage.text(prompt))
.call()
.content()
但是,如果我們對spring-boot-starter-webflux
依賴項執行相同操作,我們將收到以下例外:
org.springframework.web.client.ResourceAccessException: I/O error on POST request for "https://api.openai.com/v1/chat/completions": block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3
發生這種情況是因為 WebFlux 是非阻塞的,並且不允許諸如call()
之類的阻塞操作。
為了在 WebFlux 中實現相同的非串流響應,我們需要在ChatClient
中呼叫stream()
並將收集到的流量組合成單一回應:
@Component
public class ChatService {
private final ChatClient chatClient;
public ChatService(ChatModel chatMode) {
this.chatClient = ChatClient.builder(chatModel)
.build();
}
public Flux<String> chat(String prompt) {
return chatClient.prompt()
.user(userMessage -> userMessage.text(prompt))
.stream()
.content();
}
}
在控制器中,我們必須透過收集單字並將它們連接起來,將Flux<String>
轉換為Mono<String>
:
@RestController
@Validated
public class ChatController {
private final ChatService chatService;
public ChatController(ChatService chatService) {
this.chatService = chatService;
}
@PostMapping(value = "/chat")
public Mono<String> chat(@RequestBody @Valid ChatRequest request) {
return chatService.chat(request.getPrompt())
.collectList()
.map(list -> String.join("", list));
}
}
透過這種方法,我們可以使用 WebFlux 非阻塞模型傳回非串流回應。
8. 結論
在本文中,我們探討了使用 Spring AI ChatClient
進行串流聊天回應的不同方法。
這包括以單字形式串流、以區塊形式串流傳輸以及以 JSON 形式串流傳輸。借助這些技術,我們可以大幅減少向客戶端回傳聊天回應的延遲,並提升使用者體驗。
與往常一樣,我們的完整程式碼範例可 在 GitHub 上找到。