使用Spring Boot的定時WebSocket推送

1.概述

在本教程中,我們將看到如何使用WebSockets將計劃的消息從服務器發送到瀏覽器。一種替代方法是使用服務器發送的事件(SSE),但我們將不在本文中介紹。

Spring提供了各種調度選項。首先,我們將介紹@Scheduled註釋。然後,我們將看到由Project Reactor提供的‘Flux::interval’方法的示例。該庫對於Webflux應用程序是開箱即用的,並且可以在任何Java項目中用作獨立庫。

此外,還存在更高級的機制,例如Quartz調度程序,但我們不會對其進行介紹。

2.一個簡單的聊天應用程序

在上一篇文章中,我們使用WebSockets構建了一個聊天應用程序。讓我們用一個新功能擴展它:聊天機器人。這些漫遊器是服務器端組件,可將計劃的消息推送到瀏覽器。

2.1。 Maven依賴

讓我們從在Maven中設置必要的依賴關係開始。要構建此項目,我們的pom.xml應該具有:

<dependency>

 <groupId>org.springframework.boot</groupId>

 <artifactId>spring-boot-starter-websocket</artifactId>

 </dependency>

 <dependency>

 <groupId>io.projectreactor</groupId>

 <artifactId>reactor-core</artifactId>

 </dependency>

 <dependency>

 <groupId>com.github.javafaker</groupId>

 <artifactId>javafaker</artifactId>

 <version>1.0.2</version>

 </dependency>

 <dependency>

 <groupId>com.google.code.gson</groupId>

 <artifactId>gson</artifactId>

 </dependency>

2.2。 JavaFaker依賴關係

我們將使用JavaFaker庫來生成機器人的消息。該庫通常用於生成測試數據。在這裡,我們將一個名為“ Chuck Norris ”的訪客添加到我們的聊天室。

讓我們看一下代碼:

Faker faker = new Faker();

 ChuckNorris chuckNorris = faker.chuckNorris();

 String messageFromChuck = chuckNorris.fact();

Faker將為各種數據生成器提供工廠方法。我們將使用ChuckNorris生成器。調用chuckNorris.fact()將顯示預定義消息列表中的隨機句子。

2.3。消息模型

聊天應用程序使用簡單的POJO作為消息包裝器:

public class OutputMessage {



 private String from;

 private String text;

 private String time;



 // standard constructors, getters/setters, equals and hashcode

 }

放在一起,這是我們如何創建聊天消息的示例:

OutputMessage message = new OutputMessage(

 "Chatbot 1", "Hello there!", new SimpleDateFormat("HH:mm").format(new Date())));

2.4。客戶端

我們的聊天客戶端是一個簡單的HTML頁面。它使用SockJS客戶端STOMP消息協議。

讓我們看看客戶端如何訂閱主題:

<html>

 <head>

 <script src="./js/sockjs-0.3.4.js"></script>

 <script src="./js/stomp.js"></script>

 <script type="text/javascript">

 // ...

 stompClient = Stomp.over(socket);




 stompClient.connect({}, function(frame) {

 // ...

 stompClient.subscribe('/topic/pushmessages', function(messageOutput) {

 showMessageOutput(JSON.parse(messageOutput.body));

 });

 });

 // ...

 </script>

 </head>

 <!-- ... -->

 </html>

首先,我們通過SockJS協議創建了一個Stomp客戶端。然後,主題訂閱充當服務器與連接的客戶端之間的通信通道。

在我們的存儲庫中,此代碼位於webapp/bots.html 。我們在http:// localhost:8080 / bots.html本地運行時訪問它。當然,我們需要根據部署應用程序的方式來調整主機和端口。

2.5。服務器端

在上一篇文章中,我們已經了解瞭如何在Spring中配置WebSocket。讓我們對配置進行一些修改:

@Configuration

 @EnableWebSocketMessageBroker

 public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {



 @Override

 public void configureMessageBroker(MessageBrokerRegistry config) {

 config.enableSimpleBroker("/topic");

 config.setApplicationDestinationPrefixes("/app");

 }



 @Override

 public void registerStompEndpoints(StompEndpointRegistry registry) {

 // ...

 registry.addEndpoint("/chatwithbots");

 registry.addEndpoint("/chatwithbots").withSockJS();

 }

 }

為了推送消息,我們使用實用程序類SimpMessagingTemplate 。默認情況下,它在Spring Context中以@Bean提供。我們可以看到當AbstractMessageBrokerConfiguration在類路徑中時如何通過自動配置聲明它。因此,我們可以將其註入到任何Spring組件中。

之後,我們使用它來將消息發佈到主題/topic/pushmessages 。我們假設我們的類將那個bean注入了一個名為simpMessagingTemplate的變量中:

simpMessagingTemplate.convertAndSend("/topic/pushmessages",

 new OutputMessage("Chuck Norris", faker.chuckNorris().fact(), time));

如前面的客戶端示例所示,客戶端訂閱該主題以在消息到達時對其進行處理。

3.安排推送消息

在Spring生態系統中,我們可以從多種調度方法中進行選擇。如果我們使用Spring MVC,則@Scheduled註釋由於其簡單性而成為自然選擇。如果我們使用Spring Webflux,我們也可以使用Project Reactor的Flux::interval方法。我們將看到每個示例。

3.1。配置

我們的聊天機器人將使用JavaFaker的Chuck Norris生成器。我們將其配置為Bean,以便可以將其註入所需的位置。

@Configuration

 class AppConfig {



 @Bean

 public ChuckNorris chuckNorris() {

 return (new Faker()).chuckNorris();

 }

 }

3.2。使用@Scheduled

我們的示例機器人是預定方法。當他們運行時,他們使用SimpMessagingTemplate通過WebSocket發送我們的OutputMessage POJO。

顧名思義,@ @Scheduled批註允許重複執行method 。有了它,我們可以使用基於速率的簡單調度或更複雜的“ cron”表達式。

讓我們編寫第一個聊天機器人的代碼:

@Service

 public class ScheduledPushMessages {



 @Scheduled(fixedRate = 5000)

 public void sendMessage(SimpMessagingTemplate simpMessagingTemplate, ChuckNorris chuckNorris) {

 String time = new SimpleDateFormat("HH:mm").format(new Date());

 simpMessagingTemplate.convertAndSend("/topic/pushmessages",

 new OutputMessage("Chuck Norris (@Scheduled)", chuckNorris().fact(), time));

 }



 }

我們用@Scheduled(fixedRate = 5000).註釋sendMessage方法@Scheduled(fixedRate = 5000).這使sendMessage每五秒鐘運行一次。然後,我們使用simpMessagingTemplate實例將OutputMessage發送到主題。 simpMessagingTemplatechuckNorris實例從Spring上下文作為方法參數注入。

3.3。使用Flux::interval()

如果使用WebFlux,則可以使用[Flux::interval](https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#interval-java.time.Duration-)運算符。它將發布的無限流Long通過選擇分隔的項D uration

現在,讓我們在前面的示例中使用Flux。目標是每五秒鐘發送一次Chuck Norris的報價。首先,我們需要實現InitializingBean接口以在應用程序啟動時訂閱Flux

@Service

 public class ReactiveScheduledPushMessages implements InitializingBean {



 private SimpMessagingTemplate simpMessagingTemplate;



 private ChuckNorris chuckNorris;



 @Autowired

 public ReactiveScheduledPushMessages(SimpMessagingTemplate simpMessagingTemplate, ChuckNorris chuckNorris) {

 this.simpMessagingTemplate = simpMessagingTemplate;

 this.chuckNorris = chuckNorris;

 }



 @Override

 public void afterPropertiesSet() throws Exception {

 Flux.interval(Duration.ofSeconds(5L))

 // discard the incoming Long, replace it by an OutputMessage

 .map((n) -> new OutputMessage("Chuck Norris (Flux::interval)",

 chuckNorris.fact(),

 new SimpleDateFormat("HH:mm").format(new Date())))

 .subscribe(message -> simpMessagingTemplate.convertAndSend("/topic/pushmessages", message));

 }

 }

在這裡,我們使用構造函數注入來設置simpMessagingTemplatechuckNorris實例。這次,調度邏輯在afterPropertiesSet(),我們在實現InitializingBean時將其重寫。服務啟動後,該方法將立即運行。

interval運算符每五秒鐘發出一個Long 。然後, map運算符會丟棄該值,並將其替換為我們的消息。最後,我們subscribe Flux來觸發每條消息的邏輯。

4。結論

在本教程中,我們已經看到實用程序類SimpMessagingTemplate使通過WebSocket推送服務器消息變得容易。此外,我們已經看到了兩種調度一段代碼執行的方式。