Spring Boot教學
Spring Boot是什麼?
Spring Boot簡介
Spring Boot主要目標
Spring Boot快速入門
新項目爲什麼需要Spring Boot?
Spring Boot引導過程
Spring Boot核心和限制
Spring Boot Tomcat部署
Spring Boot優點和缺點
Spring Boot構建系統
Spring Boot入門
Spring Boot代碼結構
Spring Boot安裝
Spring Boot Bean和依賴注入
Spring Boot應用程序開發入門
Spring Boot運行器(Runner)
Spring Boot JSP應用實例
Spring Boot應用程序屬性
Spring Boot將WAR文件部署到Tomcat
Spring Boot日誌
Spring Boot Hello World(Thymeleaf)示例
Spring Boot構建RESTful Web服務
Spring Boot非web應用程序實例
Spring Boot異常處理
Spring Boot @ConfigurationProperties實例
Spring Boot攔截器
Spring Boot SLF4J日誌實例
Spring Boot Servlet過濾器
Spring Boot Ajax實例
Spring Boot Tomcat端口號
Spring Boot文件上傳示例(Ajax和REST)
Spring Boot Rest模板
Spring Boot文件上傳示例
Spring Boot文件處理
Spring Boot服務組件
Spring Boot Thymeleaf示例
Spring Boot使用RESTful Web服務
Spring Boot CORS支持
Spring Boot國際化
Spring Boot調度
Spring Boot啓用HTTPS
Spring Boot Eureka服務器
Spring Boost Eureka服務註冊
Spring Boot Zuul代理服務器和路由
Spring Boot雲配置服務器
Spring Boot雲配置客戶端
Spring Boot Actuator
Spring Boot管理服務器
Spring Boot管理客戶端
Spring Boot啓用Swagger2
Spring Boot創建Docker鏡像
Spring Boot跟蹤微服務日誌
Spring Boot Flyway數據庫
Spring Boot發送電子郵件
Spring Boot Hystrix
Spring Boot Web Socket
Spring Boot批量服務
Spring Boot Apache Kafka
Spring Boot單元測試用例
Spring Boot Rest控制器單元測試
Spring Boot數據庫源(連接數據庫)
Spring Boot保護Web應用程序

Spring Boot Apache Kafka

本教程演示瞭如何從Spring Kafka發送和接收消息。 首先創建一個Spring Kafka Producer,它能夠將消息發送到Kafka主題。 接下來創建一個Spring Kafka Consumer,它能夠收聽發送給Kafka的消息。使用適當的鍵/值序列化器和反序列化器來配置它們。 最後,使用簡單的Spring Boot應用程序演示應用程序。

下載並安裝Apache Kafka

要下載並安裝Apache Kafka,請閱讀此處的官方文檔。本教程假定使用默認配置啓動服務器,並且不更改任何服務器端口。

注意:在使用 Kafka 之前,需要安裝好

項目設置

  • Spring Kafka:2.1.4.RELEASE
  • Spring Boot:2.0.0.RELEASE
  • Apache Kafka:kafka_2.11-1.0.0
  • Maven:3.5

項目結構

請參考以下項目結構來構建項目。

Maven依賴

在這個項目中,使用Apache Maven來管理項目依賴項。確保以下依賴項存在於類路徑上。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
                             http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>com.yiibai.spring.kafka</groupId>
    <artifactId>producer-consumer</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <url>http://www.yiibai.com/spring-boot/</url>
    <name>Spring Kafka - ${project.artifactId}</name>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RELEASE</version>
    </parent>

    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spring-kafka.version>2.1.4.RELEASE</spring-kafka.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>${spring-kafka.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <version>${spring-kafka.version}</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

Spring Kafka將消息發送到主題

這個項目是從發送消息開始,使用KafkaTemplate類來包裝Producer並提供高級操作以將數據發送到Kafka主題。 提供異步和同步方法,異步方法返回Future

package com.yiibai.kafka.producer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class Sender {

    private static final Logger LOG = LoggerFactory.getLogger(Sender.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${app.topic.foo}")
    private String topic;

    public void send(String message){
        LOG.info("sending message='{}' to topic='{}'", message, topic);
        kafkaTemplate.send(topic, message);
    }
}

使用ProducerFactory的實現來配置KafkaTemplate,更具體地說是DefaultKafkaProducerFactory。可以使用Map <String,Object>初始化這個生產者工廠。使用從ProducerConfig類中獲取鍵。

  • ProducerConfig.BOOTSTRAP_SERVERS_CONFIG指定用於建立與Kafka羣集的初始連接的主機/端口對列表。客戶端將使用所有服務器,而不管此處指定哪些服務器進行引導/此列表僅影響用於發現整套服務器的初始主機。此列表應採用host1:port1,host2:port2,....的形式。由於這些服務器僅用於初始連接以發現完整的集羣成員資格(可能會動態更改),因此此列表不需要包含完整集 服務器(但是,如果服務器關閉,可能需要多個服務器)。
  • ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG指定用於實現org.apache.kafka.common.serialization.Serializer接口的鍵的序列化程序類。
  • ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG指定用於實現org.apache.kafka.common.serialization.Serializer接口的值的序列化程序類。

有關配置選項的完整列表,請查看ProducerConfig類。

package com.yiibai.kafka.producer;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class SenderConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}

Spring Kafka監聽來自主題的消息

接下來,將演示如何從Kafka主題中收聽消息。 Receiver類將使用Kafka主題消息。創建一個Listen()方法並使用[@KafkaListener](https://github.com/KafkaListener "@KafkaListener")註釋對其進行了註釋,該註釋將該方法標記爲指定主題上的Kafka消息偵聽器的目標。

package com.yiibai.kafka.consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

@Service
public class Receiver {

    private static final Logger LOG = LoggerFactory.getLogger(Receiver.class);

    @KafkaListener(topics = "${app.topic.foo}")
    public void listen(@Payload String message) {
        LOG.info("received message='{}'", message);
    }

}

此機制需要在其中一個[@Configuration](https://github.com/Configuration "@Configuration")類和偵聽器容器工廠上使用[@EnableKafka](https://github.com/EnableKafka "@EnableKafka")註釋,該工廠用於配置基礎ConcurrentMessageListenerContainer。使用SenderConfig類中相同類型的鍵/值反序列化器。

  • ConsumerConfig.GROUP_ID_CONFIG指定一個唯一字符串,用於標識此使用者所屬的組。
  • ConsumerConfig.AUTO_OFFSET_RESET_CONFIG指定當Kafka中沒有初始偏移量或服務器上當前偏移量不再存在時要執行的操作(例如,因爲該數據已被刪除):
    • earliest: 自動將偏移重置爲最早的偏移量
    • latest: 自動將偏移重置爲最新的偏移量
    • none: 如果沒有找到消費者組的先前偏移量,則向消費者拋出異常
    • anything else: 向消費者拋出異常。

消費者使用消費者組名稱標記自己,並且發佈到主題的每個記錄被傳遞到每個訂閱消費者組中的一個消費者實例。 消費者實例可以在單獨的進程中,也可以在不同的機器。
如果所有使用者實例具有相同的使用者組,則記錄將有效地在使用者實例上進行負載平衡。 如果所有消費者實例具有不同的消費者組,則每個記錄將被廣播到所有消費者進程。

有關配置選項的完整列表,請查看ConsumerConfig類。

package com.yiibai.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class ReceiverConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}

使用 application.yml 配置應用程序

需要創建了一個application.yml 屬性文件,該文件位於src/main/resources 文件夾中。 這些屬性通過spring boot在配置類中注入。

spring:
  kafka:
    bootstrap-servers: localhost:9092

app:
  topic:
    foo: foo.t

logging:
  level:
    root: ERROR
    org.springframework.web: ERROR
    com.memorynotfound: DEBUG

運行應用程序

現在,編寫一個簡單的Spring Boot應用程序來演示應用程序。 爲了使這個演示工作,需要前先在端口9092上運行localhost的Kafka服務器(Kafka的默認配置)。

package com.yiibai.kafka;

import com.yiibai.kafka.producer.Sender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ProducerConsumerApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(ProducerConsumerApplication.class, args);
    }

    @Autowired
    private Sender sender;

    @Override
    public void run(String... strings) throws Exception {
        sender.send("Spring Kafka Producer and Consumer Example");
    }
}

使用 Maven 命令構建項目:

mvn clean install

看到構建成功後,執行以下Java命令,運行Jar程序:

java -jar target\producer-consumer-1.0.0-SNAPSHOT.jar

當運行應用程序時,應該會得到類似以下的結果:

Spring