Apache Pulsar 和 Spring Boot 入門
1. 概述
Apache Pulsar是一個分佈式發布者-訂閱者消息系統。雖然 Apache Pulsar 提供的功能與 Apache Kafka 類似,但 Pulsar 旨在克服 Kafka 高延遲、低吞吐量、擴展和異地複制困難等限制。在處理需要實時處理的大量數據時,Apache Pulsar 是一個很好的選擇。
在本教程中,我們將了解如何將 Apache Pulsar 與我們的 Spring Boot 應用程序集成。我們將利用 Pulsar 的 Spring Boot Starter 配置的PulsarTemplate
和PulsarListener
。我們還將了解如何根據我們的要求修改其默認配置。
2.Maven依賴
我們將首先運行一個獨立的 Apache Pulsar 服務器,如 Apache Pulsar 簡介中所述。
接下來,讓我們將spring-pulsar-spring-boot-starter
庫添加到我們的項目中:
<dependency>
<groupId>org.springframework.pulsar</groupId>
<artifactId>spring-pulsar-spring-boot-starter</artifactId>
<version>0.2.0</version>
</dependency>
3.Pulsar PulsarClient
為了與 Pulsar 服務器交互,我們需要配置PulsarClient
。默認情況下, Spring 自動配置一個PulsarClient
連接到localhost:6650
上的 Pulsar 服務器:
spring:
pulsar:
client:
service-url: pulsar://localhost:6650
我們可以更改此配置以在不同的地址上建立連接。
要連接到安全服務器,我們可以使用pulsar+ssl
代替**pulsar.**
我們還可以通過將spring.pulsar.client.*
屬性添加到application.yml
來配置連接超時、身份驗證和內存限制等屬性。
4. 指定自定義對象的架構
我們將為我們的應用程序使用一個簡單的User
類:
public class User {
private String email;
private String firstName;
// standard constructors, getters and setters
}
Spring-Pulsar 自動檢測原始數據類型並生成相關模式。但是,如果我們需要使用自定義 JSON 對象,我們必須為PulsarClient
配置其架構信息:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.baeldung.springpulsar.User
schema-info:
schema-type: JSON
這裡, message-type
屬性接受消息類的完全限定名稱,而schema-type
提供有關要使用的模式類型的信息。對於復雜對象, schema-type
屬性接受AVRO
或JSON
值。
雖然使用屬性文件指定模式是首選方法,但我們也可以通過 bean 提供此模式:
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.JSON(User.class));
}
}
應將此配置添加到生產者應用程序和偵聽器應用程序中。
5. 出版商
要在 Pulsar 主題上發布消息,我們將使用PulsarTemplate
。 PulsarTemplate
實現了PulsarOperations
接口,並提供了以同步和異步形式發布記錄的方法。 send
方法會阻止調用以提供同步操作功能,而sendAsync
方法則提供非阻塞異步操作。
在本教程中,我們將使用同步操作來發布記錄。
5.1.發布消息
Spring Boot 自動配置一個即用型PulsarTemplate
,將記錄發佈到指定主題。
讓我們創建一個將String
消息發佈到隊列的生產者:
@Component
public class PulsarProducer {
@Autowired
private PulsarTemplate<String> stringTemplate;
private static final String STRING_TOPIC = "string-topic";
public void sendStringMessageToPulsarTopic(String str) throws PulsarClientException {
stringTemplate.send(STRING_TOPIC, str);
}
}
現在,讓我們嘗試將User
對象發送到新隊列:
@Autowired
private PulsarTemplate<User> template;
private static final String USER_TOPIC = "user-topic";
public void sendMessageToPulsarTopic(User user) throws PulsarClientException {
template.send(USER_TOPIC, user);
}
在上面的代碼片段中,我們使用PulsarTemplate
將User
類的對象發送到 Apache Pulsar 的名為user-topic.
5.2.生產者端定制
PulsarTemplate
接受TypedMessageBuilderCustomizer
來配置傳出消息,並接受ProducerBuilderCustomizer
來自定義生產者的屬性。
我們可以使用TypedMessageBuilderCustomizer
來配置消息延遲、在特定時間發送、禁用複制並提供其他屬性:
public void sendMessageToPulsarTopic(User user) throws PulsarClientException {
template.newMessage(user)
.withMessageCustomizer(mc -> {
mc.deliverAfter(10L, TimeUnit.SECONDS);
})
.send();
}
ProducerBuilderCustomizer
可用於添加訪問模式、自定義消息路由器和攔截器,並啟用或禁用分塊和批處理:
public void sendMessageToPulsarTopic(User user) throws PulsarClientException {
template.newMessage(user)
.withProducerCustomizer(pc -> {
pc.accessMode(ProducerAccessMode.Shared);
})
.send();
}
6. 消費者
將消息發佈到我們的主題後,我們現在將為同一主題建立一個偵聽器。為了啟用監聽主題,我們需要使用@PulsarListener
註解來修飾監聽器方法。
Spring Boot 為偵聽器方法配置所有必需的組件。
我們還需要使用@EnablePulsar
來使用PulsarListener
。
6.1.接收消息
我們首先為前面部分中創建的string-topic
創建一個偵聽器方法:
@Service
public class PulsarConsumer {
private static final String STRING_TOPIC = "string-topic";
@PulsarListener(
subscriptionName = "string-topic-subscription",
topics = STRING_TOPIC,
subscriptionType = SubscriptionType.Shared
)
public void stringTopicListener(String str) {
LOGGER.info("Received String message: {}", str);
}
}
在這裡,在PulsarListener
註釋中,我們配置了該方法將在topicName
中偵聽的主題,並在subscriptionName
屬性中給出了訂閱名稱。
現在,讓我們為User
類使用的user-topic
創建一個偵聽器方法:
private static final String USER_TOPIC = "user-topic";
@PulsarListener(
subscriptionName = "user-topic-subscription",
topics = USER_TOPIC,
schemaType = SchemaType.JSON
)
public void userTopicListener(User user) {
LOGGER.info("Received user object with email: {}", user.getEmail());
}
除了前面的Listener
方法中提供的屬性之外,我們還添加了一個schemaType
屬性,該屬性的值與其生成器中的值相同。
我們還將@EnablePulsar
註釋添加到我們的主類中:
@EnablePulsar
@SpringBootApplication
public class SpringPulsarApplication {
public static void main(String[] args) {
SpringApplication.run(SpringPulsarApplication.class, args);
}
}
6.2.消費端定制
除了訂閱名稱和模式類型之外, PulsarListener
還可以用於配置自動啟動、批處理和確認模式等屬性:
@PulsarListener(
subscriptionName = "user-topic-subscription",
topics = USER_TOPIC,
subscriptionType = SubscriptionType.Shared,
schemaType = SchemaType.JSON,
ackMode = AckMode.RECORD,
properties = {"ackTimeout=60s"}
)
public void userTopicListener(User user) {
LOGGER.info("Received user object with email: {}", user.getEmail());
}
在這裡,我們將確認模式設置為Record
,並將確認超時設置為 60 秒。
7. 使用死信主題
如果消息的確認超時或服務器收到nack
,Pulsar 會嘗試重新發送消息一定次數。重試次數用完後,這些未傳遞的消息可以發送到稱為Dead Letter Queues
(DLQ) 的隊列。
此選項僅適用於Shared
訂閱類型。為了為我們的user-topic
隊列配置 DLQ,我們首先創建一個DeadLetterPolicy
bean,它將定義應嘗試重新傳遞的次數以及要用作 DLQ 的隊列的名稱:
private static final String USER_DEAD_LETTER_TOPIC = "user-dead-letter-topic";
@Bean
DeadLetterPolicy deadLetterPolicy() {
return DeadLetterPolicy.builder()
.maxRedeliverCount(10)
.deadLetterTopic(USER_DEAD_LETTER_TOPIC)
.build();
}
現在,我們將此策略添加到我們之前創建的PulsarListener
中:
@PulsarListener(
subscriptionName = "user-topic-subscription",
topics = USER_TOPIC,
subscriptionType = SubscriptionType.Shared,
schemaType = SchemaType.JSON,
deadLetterPolicy = "deadLetterPolicy",
properties = {"ackTimeout=60s"}
)
public void userTopicListener(User user) {
LOGGER.info("Received user object with email: {}", user.getEmail());
}
在這裡,我們將userTopicListener
配置為使用之前創建的deadLetterPolicy
,並將確認時間配置為 60 秒。
我們可以創建一個單獨的Listener
來處理DQL中的消息:
@PulsarListener(
subscriptionName = "dead-letter-topic-subscription",
topics = USER_DEAD_LETTER_TOPIC,
subscriptionType = SubscriptionType.Shared
)
public void userDlqTopicListener(User user) {
LOGGER.info("Received user object in user-DLQ with email: {}", user.getEmail());
}
八、結論
在本教程中,我們了解瞭如何將 Apache Pulsar 與 Spring Boot 應用程序一起使用,以及更改默認配置的一些方法。
與往常一樣,示例實現可以在 GitHub 上找到。