使用Spring 5的反應式WebSocket

1.概述

在本文中,我們將使用新的Spring 5 WebSockets API和Spring WebFlux提供的響應功能來創建一個快速示例。

WebSocket是一種眾所周知的協議,支持客戶端和服務器之間的全雙工通信,通常用於Web應用程序中,其中客戶端和服務器需要以高頻率和低延遲交換事件。

Spring Framework 5在框架中具有現代化的WebSockets支持,從而為該通信通道添加了響應功能。

我們可以在Spring WebFlux上找到更多信息

2. Maven依賴

我們將使用spring-boot-starters依賴項進行spring-boot-integrationspring-boot-starter-webflux的使用,當前版本可在Spring Milestone Repository中獲得

在此示例中,我們使用的是最新可用版本2.0.0.M7,但應該始終在Maven存儲庫中獲得可用的最新版本:

<dependency>

 <groupId>org.springframework.boot</groupId>

 <artifactId>spring-boot-starter-integration</artifactId>

 </dependency>

 <dependency>

 <groupId>org.springframework.boot</groupId>

 <artifactId>spring-boot-starter-webflux</artifactId>

 </dependency>

3. Spring中的WebSocket配置

我們的配置非常簡單:我們將注入WebSocketHandler來處理Spring WebSocket應用程序中的套接字會話。

@Autowired

 private WebSocketHandler webSocketHandler;

此外,讓我們創建一個HandlerMapping bean註釋的方法,該方法將負責請求和處理程序對象之間的映射:

@Bean

 public HandlerMapping webSocketHandlerMapping() {

 Map<String, WebSocketHandler> map = new HashMap<>();

 map.put("/event-emitter", webSocketHandler);



 SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping();

 handlerMapping.setOrder(1);

 handlerMapping.setUrlMap(map);

 return handlerMapping;

 }

我們可以連接的URL為: ws://localhost:<port>/event-emitter.

4. Spring中的WebSocket消息處理

我們的ReactiveWebSocketHandler類將負責管理服務器端的WebSocket會話。

它實現了WebSocketHandler接口,因此我們可以覆蓋handle方法,該方法將用於將消息發送到WebSocket客戶端:

@Component

 public class ReactiveWebSocketHandler implements WebSocketHandler {



 // private fields ...



 @Override

 public Mono<Void> handle(WebSocketSession webSocketSession) {

 return webSocketSession.send(intervalFlux

 .map(webSocketSession::textMessage))

 .and(webSocketSession.receive()

 .map(WebSocketMessage::getPayloadAsText)

 .log());

 }

 }

5.創建一個簡單的反應式WebSocket客戶端

現在,讓我們創建一個Spring Reactive WebSocket客戶端,它將能夠與我們的WebSocket服務器連接並交換信息。

5.1。 Maven依賴

首先,Maven依賴項。

<dependency>

 <groupId>org.springframework.boot</groupId>

 <artifactId>spring-boot-starter-webflux</artifactId>

 </dependency>

在這裡,我們使用與之前用於設置反應式WebSocket服務器應用程序相同的spring-boot-starter-webflux。

5.2。 WebSocket客戶端

現在,讓我們創建ReactiveClientWebSocket類,該類負責開始與服務器的通信:

public class ReactiveJavaClientWebSocket {



 public static void main(String[] args) throws InterruptedException {



 WebSocketClient client = new ReactorNettyWebSocketClient();

 client.execute(

 URI.create("ws://localhost:8080/event-emitter"),

 session -> session.send(

 Mono.just(session.textMessage("event-spring-reactive-client-websocket")))

 .thenMany(session.receive()

 .map(WebSocketMessage::getPayloadAsText)

 .log())

 .then())

 .block(Duration.ofSeconds(10L));

 }

 }

在上面的代碼中,我們可以看到我們正在使用ReactorNettyWebSocketClient ,這是用於Reactor Netty的WebSocketClient實現。

此外,客戶端通過URL ws://localhost:8080/event-emitter,連接到WebSocket服務器,並在連接到服務器後立即建立會話。

我們還可以看到,我們正在將消息以及連接請求發送到服務器(“ event-spring-reactive-client-websocket ”)。

此外,調用send方法,將類型為Publisher<T>,的變量作為參數Publisher<T>,在我們的示例中,我們的Publisher<T>Mono<T>T是簡單的String“ event-me-from-reactive-java-client-websocket ”。

而且,調用了期望類型為StringFluxthenMany(…)方法。 receive()方法獲取傳入消息的流量,這些消息隨後將轉換為字符串。

最後, block()方法會在給定時間(在我們的示例中為10秒)後強制客戶端與服務器斷開連接。

5.3。啟動客戶端

要運行它,請確保Reactive WebSocket服務器已啟動並正在運行。然後,啟動ReactiveJavaClientWebSocket類,我們可以在sysout日誌上看到正在發出的事件:

[reactor-http-nio-4] INFO reactor.Flux.Map.1 -

 onNext({"eventId":"6042b94f-fd02-47a1-911d-dacf97f12ba6",

 "eventDt":"2018-01-11T23:29:26.900"})

我們還可以從Reactive WebSocket服務器的日誌中看到客戶端在嘗試連接期間發送的消息:

[reactor-http-nio-2] reactor.Flux.Map.1:

 onNext(event-me-from-reactive-java-client)

此外,我們可以在客戶端完成請求後(在我們的情況下,是10秒鐘後)看到終止連接的消息:

[reactor-http-nio-2] reactor.Flux.Map.1: onComplete()

6.創建瀏覽器WebSocket客戶端

讓我們創建一個簡單的HTML / Javascript客戶端WebSocket來使用我們的響應式WebSocket服務器應用程序。

<div class="events"></div>

 <script>

 var clientWebSocket = new WebSocket("ws://localhost:8080/event-emitter");

 clientWebSocket.onopen = function() {

 console.log("clientWebSocket.onopen", clientWebSocket);

 console.log("clientWebSocket.readyState", "websocketstatus");

 clientWebSocket.send("event-me-from-browser");

 }

 clientWebSocket.onclose = function(error) {

 console.log("clientWebSocket.onclose", clientWebSocket, error);

 events("Closing connection");

 }

 clientWebSocket.onerror = function(error) {

 console.log("clientWebSocket.onerror", clientWebSocket, error);

 events("An error occured");

 }

 clientWebSocket.onmessage = function(error) {

 console.log("clientWebSocket.onmessage", clientWebSocket, error);

 events(error.data);

 }

 function events(responseEvent) {

 document.querySelector(".events").innerHTML += responseEvent + "<br>";

 }

 </script>

在運行WebSocket服務器的情況下,在瀏覽器(例如Chrome,Internet Explorer,Mozilla Firefox等)中打開此HTML文件,我們應該看到事件被打印在屏幕上,每個事件的延遲為1秒,如我們的WebSocket服務器。

{"eventId":"c25975de-6775-4b0b-b974-b396847878e6","eventDt":"2018-01-11T23:56:09.780"}

 {"eventId":"ac74170b-1f71-49d3-8737-b3f9a8a352f9","eventDt":"2018-01-11T23:56:09.781"}

 {"eventId":"40d8f305-f252-4c14-86d7-ed134d3e10c6","eventDt":"2018-01-11T23:56:09.782"}

7.結論

在這裡,我們提供了一個示例,說明如何通過使用Spring 5 Framework實現服務器和客戶端之間的WebSocket通信,並實現Spring Webflux提供的新的響應功能。

與往常一樣,可以在我們的GitHub存儲庫中找到完整的示例。