Spring Cloud Stream簡介

1.概述

Spring Cloud Stream是基於Spring Boot和Spring Integration構建的框架,可幫助創建事件驅動或消息驅動的微服務

在本文中,我們將通過一些簡單的示例介紹Spring Cloud Stream的概念和構造。

2. Maven依賴

首先,我們需要將帶有代理RabbitMQ Maven依賴關係的Spring Cloud Starter Stream作為messaging-middleware添加到我們的pom.xml

<dependency>

 <groupId>org.springframework.cloud</groupId>

 <artifactId>spring-cloud-starter-stream-rabbit</artifactId>

 <version>1.3.0.RELEASE</version>

 </dependency>

而且,我們將從Maven Central添加模塊依賴性,以啟用JUnit支持:

<dependency>

 <groupId>org.springframework.cloud</groupId>

 <artifactId>spring-cloud-stream-test-support</artifactId>

 <version>1.3.0.RELEASE</version>

 <scope>test</scope>

 </dependency>

3.主要概念

微服務架構遵循“智能端點和啞管道”原則。端點之間的通信由RabbitMQ或Apache Kafka之類的消息傳遞中間件各方驅動。服務通過這些端點或通道發布域事件進行通信

讓我們遍歷組成Spring Cloud Stream框架的概念,以及構建消息驅動的服務必須了解的基本範例。

3.1。創建Stream服務

讓我們看一下Spring Cloud Stream中的一個簡單服務,該服務偵聽input綁定並發送對output綁定的響應:

@SpringBootApplication

 @EnableBinding(Processor.class)

 public class MyLoggerServiceApplication {

 public static void main(String[] args) {

 SpringApplication.run(MyLoggerServiceApplication.class, args);

 }



 @StreamListener(Processor.INPUT)

 @SendTo(Processor.OUTPUT)

 public LogMessage enrichLogMessage(LogMessage log) {

 return new LogMessage(String.format("[1]: %s", log.getMessage()));

 }

 }

註釋@EnableBinding將應用程序配置為綁定接口Processor定義的通道INPUTOUTPUT這兩個通道都是綁定,可以配置為使用具體的消息傳遞中間件或綁定程序。

讓我們看一下所有這些概念的定義:

  • Bindings -聲明式標識輸入和輸出通道的接口的集合
  • Binder —消息傳遞中間件的實現,例如Kafka或RabbitMQ
  • Channel —表示消息傳遞中間件和應用程序之間的通信管道
  • StreamListenersStreamListeners消息處理方法,在MessageConverter在特定於中間件的事件與域對像類型/ POJO之間進行序列化/反序列化之後,將從通道上的消息中自動調用該方法
  • Message Schemas -用於序列化和消息的反序列化,這些架構可以靜態地從一位置讀取或動態加載,支持域對象類型的進化

3.2。溝通方式

指定給目標的消息是通過“ Publish-Subscribe消息傳遞模式傳遞的。發布者將消息分類為主題,每個主題均由名稱標識。訂戶表達對一個或多個主題的興趣。中間件過濾消息,將有趣主題的消息傳遞給訂戶。

現在,可以將訂戶分組。 consumer group是一組由group id標識的訂戶或使用者,來自主題或主題分區的消息以負載平衡的方式傳遞。

4.編程模型

本節描述構建Spring Cloud Stream應用程序的基礎知識。

4.1。功能測試

測試支持是一個活頁夾實現,允許與渠道進行交互並檢查消息。

讓我們發送一條消息到上面的enrichLogMessage服務,並檢查響應是否在消息的開頭包含文本“[1]: “

@RunWith(SpringJUnit4ClassRunner.class)

 @ContextConfiguration(classes = MyLoggerServiceApplication.class)

 @DirtiesContext

 public class MyLoggerApplicationTests {



 @Autowired

 private Processor pipe;



 @Autowired

 private MessageCollector messageCollector;



 @Test

 public void whenSendMessage_thenResponseShouldUpdateText() {

 pipe.input()

 .send(MessageBuilder.withPayload(new LogMessage("This is my message"))

 .build());



 Object payload = messageCollector.forChannel(pipe.output())

 .poll()

 .getPayload();



 assertEquals("[1]: This is my message", payload.toString());

 }

 }

4.2。自定義渠道

在上面的示例中,我們使用了Spring Cloud提供的Processor接口,該接口只有一個輸入和一個輸出通道。

如果我們需要不同的東西,例如一個輸入和兩個輸出通道,則可以創建一個自定義處理器:

public interface MyProcessor {

 String INPUT = "myInput";



 @Input

 SubscribableChannel myInput();



 @Output("myOutput")

 MessageChannel anOutput();



 @Output

 MessageChannel anotherOutput();

 }

Spring將為我們提供此接口的正確實現。可以使用@Output(“myOutput”)註釋來設置通道名稱。

否則,Spring將使用方法名稱作為通道名稱。因此,我們有三個通道,分別稱為myInputmyOutputanotherOutput

現在,讓我們想像一下,如果值小於10,我們希望將消息路由到一個輸出,而值大於或等於10,則將消息路由到另一個輸出:

@Autowired

 private MyProcessor processor;



 @StreamListener(MyProcessor.INPUT)

 public void routeValues(Integer val) {

 if (val < 10) {

 processor.anOutput().send(message(val));

 } else {

 processor.anotherOutput().send(message(val));

 }

 }



 private static final <T> Message<T> message(T val) {

 return MessageBuilder.withPayload(val).build();

 }

4.3。有條件的調度

使用@StreamListener批註,我們還可以使用我們使用SpEL表達式定義的任何條件來過濾消費者中期望的消息

例如,我們可以使用條件分派作為將消息路由到不同輸出的另一種方法:

@Autowired

 private MyProcessor processor;



 @StreamListener(

 target = MyProcessor.INPUT,

 condition = "payload < 10")

 public void routeValuesToAnOutput(Integer val) {

 processor.anOutput().send(message(val));

 }



 @StreamListener(

 target = MyProcessor.INPUT,

 condition = "payload >= 10")

 public void routeValuesToAnotherOutput(Integer val) {

 processor.anotherOutput().send(message(val));

 }

這種方法的唯一局限性是這些方法不得返回值。

5.設置應用程序

讓我們設置將處理來自RabbitMQ代理的消息的應用程序。

5.1。活頁夾配置

我們可以通過META-INF/spring.binders將應用程序配置為使用默認的活頁夾實現:

rabbit:\

 org.springframework.cloud.stream.binder.rabbit.config.RabbitMessageChannelBinderConfiguration

或者,我們可以通過包含[this dependency](https://search.maven.org/classic/#search%7Cgav%7C1%7Cg%3A%22org.springframework.cloud%22%20AND%20a%3A%22spring-cloud-stream-binder-rabbit%22)項將RabbitMQ的綁定程序庫添加到類路徑中:

<dependency>

 <groupId>org.springframework.cloud</groupId>

 <artifactId>spring-cloud-stream-binder-rabbit</artifactId>

 <version>1.3.0.RELEASE</version>

 </dependency>

如果沒有提供活頁夾實現,Spring將在通道之間使用直接消息通信。

5.2。 RabbitMQ配置

要配置第3.1節中的示例以使用RabbitMQ綁定器,我們需要更新位於src/main/resourcesapplication.yml

spring:

 cloud:

 stream:

 bindings:

 input:

 destination: queue.log.messages

 binder: local_rabbit

 output:

 destination: queue.pretty.log.messages

 binder: local_rabbit

 binders:

 local_rabbit:

 type: rabbit

 environment:

 spring:

 rabbitmq:

 host: <host>

 port: 5672

 username: <username>

 password: <password>

 virtual-host: /

input綁定將使用名為queue.log.messages的交換,而output綁定將使用交換queue.pretty.log.messages 。這兩個綁定都將使用名為local_rabbit綁定local_rabbit

請注意,我們不需要提前創建RabbitMQ交換或隊列。運行該應用程序時,將自動創建兩個交換

為了測試該應用程序,我們可以使用RabbitMQ管理站點來發布消息。在交換queue.log.messages的“ Publish Message面板中,我們需要以JSON格式輸入請求。

5.3。自定義消息轉換

Spring Cloud Stream允許我們將消息轉換應用於特定的內容類型。在上面的示例中,我們要提供純文本,而不是使用JSON格式。

為此,我們將使用MessageConverter將自定義轉換應用於LogMessage

@SpringBootApplication

 @EnableBinding(Processor.class)

 public class MyLoggerServiceApplication {

 //...



 @Bean

 public MessageConverter providesTextPlainMessageConverter() {

 return new TextPlainMessageConverter();

 }



 //...

 }
public class TextPlainMessageConverter extends AbstractMessageConverter {



 public TextPlainMessageConverter() {

 super(new MimeType("text", "plain"));

 }



 @Override

 protected boolean supports(Class<?> clazz) {

 return (LogMessage.class == clazz);

 }



 @Override

 protected Object convertFromInternal(Message<?> message,

 Class<?> targetClass, Object conversionHint) {

 Object payload = message.getPayload();

 String text = payload instanceof String

 ? (String) payload

 : new String((byte[]) payload);

 return new LogMessage(text);

 }

 }

應用這些更改後,返回到“ Publish Message面板,如果將標頭“ contentTypes ”設置為“ text/plain ”,而有效載荷設置為“ Hello World ”,則它應該像以前一樣工作。

5.4。消費群體

當運行我們的應用程序的多個實例時,每當輸入通道中有一條新消息時,都會通知所有訂閱者

大多數時候,我們只需要處理一次消息。 Spring Cloud Stream通過使用者組實現此行為。

要啟用此行為,每個使用者綁定都可以使用spring.cloud.stream.bindings.<CHANNEL>.group屬性指定組名:

spring:

 cloud:

 stream:

 bindings:

 input:

 destination: queue.log.messages

 binder: local_rabbit

 group: logMessageConsumers

 ...

6.消息驅動的微服務

在本部分中,我們介紹了在微服務環境中運行Spring Cloud Stream應用程序所需的所有必需功能。

6.1。擴大

當運行多個應用程序時,重要的是要確保將數據正確分配給各個使用者。為此,Spring Cloud Stream提供了兩個屬性:

  • **spring.cloud.stream.instanceCount**正在運行的應用程序數
  • spring.cloud.stream.instanceIndex —當前應用程序的索引

例如,如果我們已經部署了上述MyLoggerServiceApplication應用程序的兩個實例,則兩個應用程序的spring.cloud.stream.instanceCount屬性應為2, spring.cloud.stream.instanceIndex屬性的屬性應分別為0和1。

如果我們按照本文所述使用Spring Data Flow部署Spring Cloud Stream應用程序,則會自動設置這些屬性。

6.2。分區

域事件可以是Partitioned消息。這在我們擴大存儲規模並提高應用程序性能時會有所幫助。

域事件通常具有分區鍵,因此它與相關消息一起位於同一分區中。

假設我們希望將日誌消息按消息中的第一個字母(即分區鍵)進行分區,並分為兩個分區。

對於以AM開頭的日誌消息,將有一個分區,對於NZ. ,將有另一個分區NZ.可以使用兩個屬性進行配置:

  • spring.cloud.stream.bindings.output.producer.partitionKeyExpression —對有效負載進行分區的表達式
  • spring.cloud.stream.bindings.output.producer.partitionCount —組數

有時,要分區的表達式過於復雜,無法僅在一行中編寫。對於這些情況,我們可以使用屬性spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass編寫我們的自定義分區策略。

6.3。健康指標

在微服務環境中,我們還需要檢測服務何時關閉或開始出現故障。 Spring Cloud Stream提供了屬性management.health.binders.enabled來啟用活頁夾的運行狀況指示器。

運行應用程序時,我們可以在http://<host>:<port>/health查詢運行http://<host>:<port>/health

7.結論

在本教程中,我們介紹了Spring Cloud Stream的主要概念,並通過RabbitMQ上的一些簡單示例展示瞭如何使用它。關於Spring Cloud Stream的更多信息可以在這裡找到。

可以在GitHub上找到本文的源代碼。