Spring消費者和生產者

本教程演示瞭如何發送和接收來自Spring Kafka的消息。 首先創建一個能夠發送消息給Kafka主題的Spring Kafka Producer。 接下來,我們創建一個Spring Kafka Consumer,它可以收聽發送給Kafka主題的消息。使用適當的鍵/值序列化器和解串器來配置它們。 最後用一個簡單的Spring Boot應用程序演示應用程序。

下載並安裝Apache Kafka

要下載並安裝Apache Kafka,請閱讀官方文檔( https://kafka.apache.org/quickstart )。 本教程假設服務器使用默認配置啓動,並且沒有更改服務器端口。

Maven依賴

這個項目中,使用Apache Maven來管理項目依賴關係。 確保以下依賴關係在類路徑中。pom.xml 文件的內容如下所示 -

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
                             http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>com.yiibai.spring.kafka</groupId>
    <artifactId>producer-consumer</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <url>https://www.yiibai.com/kafka</url>
    <name>Spring Kafka - ${project.artifactId}</name>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RELEASE</version>
    </parent>

    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spring-kafka.version>2.1.4.RELEASE</spring-kafka.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>${spring-kafka.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <version>${spring-kafka.version}</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
        <defaultGoal>compile</defaultGoal>
    </build>

</project>

Spring Kafka發送消息到主題

使用ProducerKafkaTemplate類發送消息,並提供將數據發送到Kafka主題的高級操作。 提供異步和同步方法,異步方法返回Future

package com.yiibai.kafka.producer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class Sender {

    private static final Logger LOG = LoggerFactory.getLogger(Sender.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${app.topic.foo}")
    private String topic;

    public void send(String message){
        LOG.info("sending message='{}' to topic='{}'", message, topic);
        kafkaTemplate.send(topic, message);
    }
}

爲了能成功地發送消息給Kafka主題,我們需要配置KafkaTemplate。 此配置由SenderConfig類處理。

使用ProducerFactory的實現來配置KafkaTemplate,更具體地說是使用DefaultKafkaProducerFactory。可以使用Map <String,Object>來初始化這個生產者工廠,從ProducerConfig類獲取的鍵。

  • ProducerConfig.BOOTSTRAP_SERVERS_CONFIG指定用於建立到Kafka集羣的初始連接的主機/端口對列表。 客戶端將使用所有服務器,而不管這裏指定哪些服務器用於引導/此列表僅影響用於發現全套服務器的初始主機。 此列表應採用host1:port1host2:port2...的形式。由於這些服務器僅用於初始連接以發現完整集羣成員資格(可能會動態更改),因此此列表不需包含完整集合 的服務器(不過,如果服務器停機,可能需要多個服務器)。

  • ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG指定實現org.apache.kafka.common.serialization.Serializer接口的鍵的序列化程序類。

  • ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG指定實現org.apache.kafka.common.serialization.Serializer接口的值的序列化程序類。

有關配置選項的完整列表,請查看ProducerConfig類(https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerConfig.html )。

package com.yiibai.kafka.producer;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class SenderConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}

Spring Kafka監聽來自主題的消息

接下來,我們將演示如何監聽來自Kafka主題的消息。 Receiver類將使用來自Kafka主題的消息。 創建Listen()方法並使用[@KafkaListener](https://github.com/KafkaListener "@KafkaListener")註解對其進行了註釋,該註釋將方法標記爲指定主題上的Kafka消息偵聽器的目標。

package com.yiibai.kafka.consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

@Service
public class Receiver {

    private static final Logger LOG = LoggerFactory.getLogger(Receiver.class);

    @KafkaListener(topics = "${app.topic.foo}")
    public void listen(@Payload String message) {
        LOG.info("received message='{}'", message);
    }

}

該機制需要在[@Configuration](https://github.com/Configuration "@Configuration")類和偵聽器容器工廠之一上使用[@EnableKafka](https://github.com/EnableKafka "@EnableKafka")註解,該工廠用於配置底層ConcurrentMessageListenerContainer

使用SenderConfig類中使用的相同類型的鍵/值反序列化器是非常重要的。

  • ConsumerConfig.GROUP_ID_CONFIG指定一個唯一的字符串,標識此用戶所屬的用戶組。
  • ConsumerConfig.AUTO_OFFSET_RESET_CONFIG指定在Kafka中沒有初始偏移量或當前偏移量不再存在於服務器上(例如,因爲該數據已被刪除)時要執行的操作:
    • earliest: 自動將偏移重置爲最早的偏移量
    • latest: 自動將偏移量重置爲最新的偏移量
    • none: 如果未找到消費者組的前一個偏移量,則向消費者拋出異常
    • anything else: 向消費者拋出異常。

消費者用消費者組名稱標記自己,並且發佈到主題的每個記錄都被傳送到每個訂閱消費者組中的一個消費者實例。 消費者實例可以在單獨的進程中或在單獨的機器上。

如果所有消費者實例具有相同的消費者組,則記錄將有效地在消費者實例上進行負載均衡。 如果所有消費者實例具有不同的消費者組,則每個記錄將被廣播到所有消費者進程。

有關配置選項的完整列表,請查看ConsumerConfig類(https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html )。

package com.yiibai.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class ReceiverConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}

使用application.yml配置應用程序

創建一個在src/main/resources文件夾中的application.yml屬性文件。 這些屬性通過spring引導注入到配置類中。

spring:
  kafka:
    bootstrap-servers: localhost:9092

app:
  topic:
    foo: foo.t

logging:
  level:
    root: ERROR
    org.springframework.web: ERROR
    com.yiibai: DEBUG

運行應用程序

最後,編寫一個簡單的Spring Boot應用程序來演示應用程序。 爲了使這個演示工作,需要在端口9092上運行的本地主機上的Kafka服務器,這是Kafka的默認配置。

在運行這個項目程序之前,需要運行 zookeeper 和 kafka ,如下所示 -

啓動zookeeper服務 -

D:\software\kafka_2.12-1.0.1\bin\windows> zookeeper-server-start.bat D:\software\kafka_2.12-1.0.1\config\zookeeper.properties

啓動kafka服務 -

D:\software\kafka_2.12-1.0.1\bin\windows> kafka-server-start.bat D:\software\kafka_2.12-1.0.1\config\server.properties

應用程序的實現 -

package com.yiibai.kafka;

import com.yiibai.kafka.producer.Sender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ProducerConsumerApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(ProducerConsumerApplication.class, args);
    }

    @Autowired
    private Sender sender;

    @Override
    public void run(String... strings) throws Exception {
        sender.send("Spring Kafka Producer and Consumer Example");
    }
}

當我們運行應用程序時,應該會得到類似下面的輸出。


  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.0.0.RELEASE)

2018-03-14 14:40:41.454  INFO 9740 --- [           main] c.y.kafka.ProducerConsumerApplication    : Starting ProducerConsumerApplication on MY-PC with PID 9740 (F:\worksp\spring-kafka\producer-consumer\target\classes started by Administrator in F:\worksp\spring-kafka\producer-consumer)
2018-03-14 14:40:41.458 DEBUG 9740 --- [           main] c.y.kafka.ProducerConsumerApplication    : Running with Spring Boot v2.0.0.RELEASE, Spring v5.0.4.RELEASE
2018-03-14 14:40:41.458  INFO 9740 --- [           main] c.y.kafka.ProducerConsumerApplication    : No active profile set, falling back to default profiles: default
2018-03-14 14:40:47.512  INFO 9740 --- [           main] c.y.kafka.ProducerConsumerApplication    : Started ProducerConsumerApplication in 6.567 seconds (JVM running for 7.084)
2018-03-14 14:40:47.514  INFO 9740 --- [           main] com.yiibai.kafka.producer.Sender         : sending message='Spring Kafka Producer and Consumer Example' to topic='foo.t'
2018-03-14 14:40:49.413  INFO 9740 --- [ntainer#0-0-C-1] com.yiibai.kafka.consumer.Receiver       : received message='Spring Kafka Producer and Consumer Example'