Spring Cloud數據流的流處理入門

1.簡介

Spring Cloud Data Flow是可組合數據微服務的雲原生編程和操作模型。

借助Spring Cloud Data Flow ,開發人員可以為常見的用例(例如數據提取,實時分析和數據導入/導出)創建和編排數據管道。

該數據管道有兩種形式,即流數據管道和批處理數據管道。

在第一種情況下,通過消息傳遞中間件消耗或產生了無限制的數據量。在第二種情況下,短期任務將處理一組有限的數據,然後終止。

本文將重點介紹流處理。

2. 架構概述

這些類型的體系結構的關鍵組件是ApplicationsData Flow Server和目標運行時。

除了這些關鍵組件之外,我們在體系結構中通常還具有Data Flow Shellmessage broker

讓我們更詳細地查看所有這些組件。

2.1。應用領域

通常,流數據管道包括使用來自外部系統的事件,數據處理和多語言持久性。這些階段在Spring Cloud術語中通常稱為SourceProcessorSink

  • 來源:是使用事件的應用程序
  • 處理器:Source消費數據,對其進行一些處理,然後將處理後的數據發送到管道中的下一個應用程序
  • 接收器:SourceProcessor進行消耗,並將數據寫入所需的持久層

這些應用程序可以通過兩種方式打包:

  • 在Maven存儲庫,文件,http或任何其他Spring資源實現中託管的Spring Boot uber-jar(本文將使用此方法)
  • 碼頭工人

Spring Cloud Data Flow團隊已經提供了許多通用用例的源,處理器和接收器應用程序(例如jdbc,hdfs,http,路由器),並可供使用。

2.2。運行

此外,需要運行時才能使這些應用程序執行。支持的運行時為:

  • Cloud Foundry
  • Apache YARN
  • Kubernetes
  • Apache Mesos
  • 用於開發的本地服務器(本文將使用)

2.3。數據流服務器

負責將應用程序部署到運行時的組件是Data Flow Server 。每個目標運行時都有一個Data Flow Server可執行jar。

Data Flow Server負責解釋:

  • 流DSL,它描述通過多個應用程序的邏輯數據流。
  • 部署清單,描述應用程序到運行時的映射。

2.4。數據流Shell

數據流外殼程序是數據流服務器的客戶端。該外殼程序使我們能夠執行與服務器交互所需的DSL命令。

例如,描述從http源到jdbc接收器的數據流的DSL將寫為“ http | jdbc”。 DSL中的這些名稱已在Data Flow Server中註冊,並映射到可以託管在Maven或Docker存儲庫中的應用程序工件。

Spring還提供了一個名為Flo的圖形界面,用於創建和監視流數據管道。但是,其用法不在本文討論範圍之內。

2.5。消息代理

正如在上一節的示例中所看到的,我們已經在數據流的定義中使用了管道符號。管道符號表示兩個應用程序之間通過消息傳遞中間件的通信。

這意味著我們需要在目標環境中啟動並運行消息代理。

支持的兩個消息傳遞中間件代理是:

  • Apache Kafka
  • RabbitMQ

因此,現在我們已經對架構組件進行了概述–是時候構建我們的第一個流處理管道了。

3.安裝Message Broker

如我們所見,管道中的應用程序需要消息中間件進行通信。就本文而言,我們將使用RabbitMQ

有關安裝的完整詳細信息,您可以按照官方網站上的說明進行操作。

4.本地數據流服務器

為了加快生成應用程序的過程,我們將使用Spring Initializr ;有了它的幫助,我們可以在幾分鐘內獲得我們的Spring Boot應用程序。

進入網站後,只需選擇一個Group和一個Artifact名稱。

完成此操作後,單擊按鈕Generate Project來開始Maven工件的下載。

Spring Cloud數據流的流處理入門

下載完成後,解壓縮該項目並將其作為Maven項目導入您選擇的IDE中。

讓我們向項目添加一個Maven依賴項。由於我們需要Dataflow Local Server庫,因此我們添加spring-cloud-starter-dataflow-server-local依賴項:

<dependency>

 <groupId>org.springframework.cloud</groupId>

 <artifactId>spring-cloud-starter-dataflow-server-local</artifactId>

 </dependency>

現在我們需要使用@EnableDataFlowServer註釋對Spring Boot主類進行註釋:

@EnableDataFlowServer

 @SpringBootApplication

 public class SpringDataFlowServerApplication {



 public static void main(String[] args) {

 SpringApplication.run(

 SpringDataFlowServerApplication.class, args);

 }

 }

就這樣。我們的Local Data Flow Server可以執行了:

mvn spring-boot:run

該應用程序將在端口9393上啟動。

5.數據流Shell

同樣,轉到Spring Initializr,然後選擇一個Group and Artifact名稱。

下載並導入項目後,讓我們添加spring-cloud-dataflow-shell依賴項:

<dependency>

 <groupId>org.springframework.cloud</groupId>

 <artifactId>spring-cloud-dataflow-shell</artifactId>

 </dependency>

現在我們需要在Spring Boot主類中添加@EnableDataFlowShell批註:

@EnableDataFlowShell

 @SpringBootApplication

 public class SpringDataFlowShellApplication {



 public static void main(String[] args) {

 SpringApplication.run(SpringDataFlowShellApplication.class, args);

 }

 }

我們現在可以運行外殼程序:

mvn spring-boot:run

Shell運行之後,我們可以在提示符下鍵入help命令,以查看我們可以執行的命令的完整列表。

6.源應用程序

同樣,在Initializr上,我們現在將創建一個簡單的應用程序,並添加一個稱為spring-cloud-starter-stream-rabbitStream Rabbit依賴項

<dependency>

 <groupId>org.springframework.cloud</groupId>

 <artifactId>spring-cloud-starter-stream-rabbit</artifactId>

 </dependency>

然後,我們將@EnableBinding(Source.class)批註添加到Spring Boot主類中:

@EnableBinding(Source.class)

 @SpringBootApplication

 public class SpringDataFlowTimeSourceApplication {



 public static void main(String[] args) {

 SpringApplication.run(

 SpringDataFlowTimeSourceApplication.class, args);

 }

 }

現在我們需要定義必須處理的數據源。該來源可能是任何潛在的無盡工作負載(物聯網傳感器數據,24/7事件處理,在線交易數據提取)。

在我們的示例應用程序中,我們使用Poller每10秒產生一個事件(為簡單起見,新的時間戳記)。

@InboundChannelAdapter批註使用返回值作為消息的有效負載將消息發送到源的輸出通道:

@Bean

 @InboundChannelAdapter(

 value = Source.OUTPUT,

 poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1")

 )

 public MessageSource<Long> timeMessageSource() {

 return () -> MessageBuilder.withPayload(new Date().getTime()).build();

 }

我們的數據源已準備就緒。

7.處理器應用程序

接下來,我們將創建一個應用程序並添加Stream Rabbit依賴項。

然後,我們將@EnableBinding(Processor.class)批註添加到Spring Boot主類中:

@EnableBinding(Processor.class)

 @SpringBootApplication

 public class SpringDataFlowTimeProcessorApplication {



 public static void main(String[] args) {

 SpringApplication.run(

 SpringDataFlowTimeProcessorApplication.class, args);

 }

 }

接下來,我們需要定義一種方法來處理來自源應用程序的數據。

要定義一個轉換器,我們需要使用@Transformer註釋來註釋此方法:

@Transformer(inputChannel = Processor.INPUT,

 outputChannel = Processor.OUTPUT)

 public Object transform(Long timestamp) {



 DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd hh:mm:yy");

 String date = dateFormat.format(timestamp);

 return date;

 }

它將時間戳從“輸入”通道轉換為格式化日期,該日期將發送到“輸出”通道。

8.接收器應用程序

創建的最後一個應用程序是Sink應用程序。

同樣,轉到Spring Initializr並選擇一個Group ,一個Artifact名稱。下載項目後,讓我們添加Stream Rabbit依賴項。

然後將@EnableBinding(Sink.class)批註添加到Spring Boot主類中:

@EnableBinding(Sink.class)

 @SpringBootApplication

 public class SpringDataFlowLoggingSinkApplication {



 public static void main(String[] args) {


 SpringApplication.run(

 SpringDataFlowLoggingSinkApplication.class, args);

 }

 }

現在,我們需要一種方法來攔截來自處理器應用程序的消息。

為此,我們需要在方法中添加@StreamListener(Sink.INPUT)批註:

@StreamListener(Sink.INPUT)

 public void loggerSink(String date) {

 logger.info("Received: " + date);

 }

該方法僅將轉換為格式化日期的時間戳打印到日誌文件中。

9.註冊一個流應用

Spring Cloud Data Flow Shell允許我們使用app register命令在App Registry中註冊Stream App。

我們必須提供唯一的名稱,應用程序類型和可以解析為應用程序工件的URI。對於類型,指定“ source ”,“ processor ”或“ sink ”。

在為URI提供maven方案時,格式應符合以下要求:

maven://<groupId>:<artifactId>[:<extension>[:<classifier>]]:<version>

要註冊先前創建的SourceProcessorSink應用程序,請轉到Spring Cloud Data Flow Shell並從提示符處發出以下命令:

app register --name time-source --type source

 --uri maven://com.baeldung.spring.cloud:spring-data-flow-time-source:jar:0.0.1-SNAPSHOT



 app register --name time-processor --type processor

 --uri maven://com.baeldung.spring.cloud:spring-data-flow-time-processor:jar:0.0.1-SNAPSHOT



 app register --name logging-sink --type sink

 --uri maven://com.baeldung.spring.cloud:spring-data-flow-logging-sink:jar:0.0.1-SNAPSHOT

10.創建和部署流

要創建新的流定義,請轉到Spring Cloud Data Flow Shell並執行以下shell命令:

stream create --name time-to-log

 --definition 'time-source | time-processor | logging-sink'

這定義了一個名為流time-to-log基礎上,DSL表達'time-source | time-processor | logging-sink'

然後,要部署流,請執行以下shell命令:

stream deploy --name time-to-log

Data Flow Servertime-sourcetime-processorlogging-sink解析為Maven坐標,並使用它們來啟動流的time-sourcetime-processorlogging-sink應用程序。

如果流被正確部署,你會在看到Data Flow Server日誌,該模塊已經啟動並連接在一起:

2016-08-24 12:29:10.516 INFO 8096 --- [io-9393-exec-10] oscdspi.local.LocalAppDeployer: deploying app time-to-log.logging-sink instance 0

 Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink

 2016-08-24 12:29:17.600 INFO 8096 --- [io-9393-exec-10] oscdspi.local.LocalAppDeployer : deploying app time-to-log.time-processor instance 0

 Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034556862/time-to-log.time-processor

 2016-08-24 12:29:23.280 INFO 8096 --- [io-9393-exec-10] oscdspi.local.LocalAppDeployer : deploying app time-to-log.time-source instance 0

 Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034562861/time-to-log.time-source

11.查看結果

在此示例中,源僅每秒發送一次當前時間戳作為消息,處理器對其進行格式化,日誌接收器使用日誌記錄框架輸出格式化的時間戳。

日誌文件位於Data Flow Server日誌輸出中顯示的目錄內,如上所示。要查看結果,我們可以拖尾日誌:

tail -f PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink/stdout_0.log

 2016-08-24 12:40:42.029 INFO 9488 --- [r.time-to-log-1] scSpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:01

 2016-08-24 12:40:52.035 INFO 9488 --- [r.time-to-log-1] scSpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:11

 2016-08-24 12:41:02.030 INFO 9488 --- [r.time-to-log-1] scSpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:21

12.結論

在本文中,我們已經看到瞭如何通過使用Spring Cloud Data Flow構建用於流處理的數據管道。

此外,我們看到了流中SourceProcessorSink應用程序的作用,以及如何通過使用Data Flow ShellData Flow Server插入和綁定此模塊。

示例代碼可以在GitHub項目中找到。