Apache Pulsar 和 Spring Boot 入門
1. 概述
Apache Pulsar是一個分佈式發布者-訂閱者消息系統。雖然 Apache Pulsar 提供的功能與 Apache Kafka 類似,但 Pulsar 旨在克服 Kafka 高延遲、低吞吐量、擴展和異地複制困難等限制。在處理需要實時處理的大量數據時,Apache Pulsar 是一個很好的選擇。
在本教程中,我們將了解如何將 Apache Pulsar 與我們的 Spring Boot 應用程序集成。我們將利用 Pulsar 的 Spring Boot Starter 配置的PulsarTemplate和PulsarListener 。我們還將了解如何根據我們的要求修改其默認配置。
2.Maven依賴
我們將首先運行一個獨立的 Apache Pulsar 服務器,如 Apache Pulsar 簡介中所述。
接下來,讓我們將spring-pulsar-spring-boot-starter庫添加到我們的項目中:
<dependency>
<groupId>org.springframework.pulsar</groupId>
<artifactId>spring-pulsar-spring-boot-starter</artifactId>
<version>0.2.0</version>
</dependency>
3.Pulsar PulsarClient
為了與 Pulsar 服務器交互,我們需要配置PulsarClient 。默認情況下, Spring 自動配置一個PulsarClient連接到localhost:6650上的 Pulsar 服務器:
spring:
pulsar:
client:
service-url: pulsar://localhost:6650
我們可以更改此配置以在不同的地址上建立連接。
要連接到安全服務器,我們可以使用pulsar+ssl代替**pulsar.**我們還可以通過將spring.pulsar.client.*屬性添加到application.yml來配置連接超時、身份驗證和內存限制等屬性。
4. 指定自定義對象的架構
我們將為我們的應用程序使用一個簡單的User類:
public class User {
private String email;
private String firstName;
// standard constructors, getters and setters
}
Spring-Pulsar 自動檢測原始數據類型並生成相關模式。但是,如果我們需要使用自定義 JSON 對象,我們必須為PulsarClient配置其架構信息:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.baeldung.springpulsar.User
schema-info:
schema-type: JSON
這裡, message-type屬性接受消息類的完全限定名稱,而schema-type提供有關要使用的模式類型的信息。對於復雜對象, schema-type屬性接受AVRO或JSON值。
雖然使用屬性文件指定模式是首選方法,但我們也可以通過 bean 提供此模式:
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.JSON(User.class));
}
}
應將此配置添加到生產者應用程序和偵聽器應用程序中。
5. 出版商
要在 Pulsar 主題上發布消息,我們將使用PulsarTemplate 。 PulsarTemplate實現了PulsarOperations接口,並提供了以同步和異步形式發布記錄的方法。 send方法會阻止調用以提供同步操作功能,而sendAsync方法則提供非阻塞異步操作。
在本教程中,我們將使用同步操作來發布記錄。
5.1.發布消息
Spring Boot 自動配置一個即用型PulsarTemplate ,將記錄發佈到指定主題。
讓我們創建一個將String消息發佈到隊列的生產者:
@Component
public class PulsarProducer {
@Autowired
private PulsarTemplate<String> stringTemplate;
private static final String STRING_TOPIC = "string-topic";
public void sendStringMessageToPulsarTopic(String str) throws PulsarClientException {
stringTemplate.send(STRING_TOPIC, str);
}
}
現在,讓我們嘗試將User對象發送到新隊列:
@Autowired
private PulsarTemplate<User> template;
private static final String USER_TOPIC = "user-topic";
public void sendMessageToPulsarTopic(User user) throws PulsarClientException {
template.send(USER_TOPIC, user);
}
在上面的代碼片段中,我們使用PulsarTemplate將User類的對象發送到 Apache Pulsar 的名為user-topic.
5.2.生產者端定制
PulsarTemplate接受TypedMessageBuilderCustomizer來配置傳出消息,並接受ProducerBuilderCustomizer來自定義生產者的屬性。
我們可以使用TypedMessageBuilderCustomizer來配置消息延遲、在特定時間發送、禁用複制並提供其他屬性:
public void sendMessageToPulsarTopic(User user) throws PulsarClientException {
template.newMessage(user)
.withMessageCustomizer(mc -> {
mc.deliverAfter(10L, TimeUnit.SECONDS);
})
.send();
}
ProducerBuilderCustomizer可用於添加訪問模式、自定義消息路由器和攔截器,並啟用或禁用分塊和批處理:
public void sendMessageToPulsarTopic(User user) throws PulsarClientException {
template.newMessage(user)
.withProducerCustomizer(pc -> {
pc.accessMode(ProducerAccessMode.Shared);
})
.send();
}
6. 消費者
將消息發佈到我們的主題後,我們現在將為同一主題建立一個偵聽器。為了啟用監聽主題,我們需要使用@PulsarListener註解來修飾監聽器方法。
Spring Boot 為偵聽器方法配置所有必需的組件。
我們還需要使用@EnablePulsar來使用PulsarListener 。
6.1.接收消息
我們首先為前面部分中創建的string-topic創建一個偵聽器方法:
@Service
public class PulsarConsumer {
private static final String STRING_TOPIC = "string-topic";
@PulsarListener(
subscriptionName = "string-topic-subscription",
topics = STRING_TOPIC,
subscriptionType = SubscriptionType.Shared
)
public void stringTopicListener(String str) {
LOGGER.info("Received String message: {}", str);
}
}
在這裡,在PulsarListener註釋中,我們配置了該方法將在topicName中偵聽的主題,並在subscriptionName屬性中給出了訂閱名稱。
現在,讓我們為User類使用的user-topic創建一個偵聽器方法:
private static final String USER_TOPIC = "user-topic";
@PulsarListener(
subscriptionName = "user-topic-subscription",
topics = USER_TOPIC,
schemaType = SchemaType.JSON
)
public void userTopicListener(User user) {
LOGGER.info("Received user object with email: {}", user.getEmail());
}
除了前面的Listener方法中提供的屬性之外,我們還添加了一個schemaType屬性,該屬性的值與其生成器中的值相同。
我們還將@EnablePulsar註釋添加到我們的主類中:
@EnablePulsar
@SpringBootApplication
public class SpringPulsarApplication {
public static void main(String[] args) {
SpringApplication.run(SpringPulsarApplication.class, args);
}
}
6.2.消費端定制
除了訂閱名稱和模式類型之外, PulsarListener還可以用於配置自動啟動、批處理和確認模式等屬性:
@PulsarListener(
subscriptionName = "user-topic-subscription",
topics = USER_TOPIC,
subscriptionType = SubscriptionType.Shared,
schemaType = SchemaType.JSON,
ackMode = AckMode.RECORD,
properties = {"ackTimeout=60s"}
)
public void userTopicListener(User user) {
LOGGER.info("Received user object with email: {}", user.getEmail());
}
在這裡,我們將確認模式設置為Record ,並將確認超時設置為 60 秒。
7. 使用死信主題
如果消息的確認超時或服務器收到nack ,Pulsar 會嘗試重新發送消息一定次數。重試次數用完後,這些未傳遞的消息可以發送到稱為Dead Letter Queues (DLQ) 的隊列。
此選項僅適用於Shared訂閱類型。為了為我們的user-topic隊列配置 DLQ,我們首先創建一個DeadLetterPolicy bean,它將定義應嘗試重新傳遞的次數以及要用作 DLQ 的隊列的名稱:
private static final String USER_DEAD_LETTER_TOPIC = "user-dead-letter-topic";
@Bean
DeadLetterPolicy deadLetterPolicy() {
return DeadLetterPolicy.builder()
.maxRedeliverCount(10)
.deadLetterTopic(USER_DEAD_LETTER_TOPIC)
.build();
}
現在,我們將此策略添加到我們之前創建的PulsarListener中:
@PulsarListener(
subscriptionName = "user-topic-subscription",
topics = USER_TOPIC,
subscriptionType = SubscriptionType.Shared,
schemaType = SchemaType.JSON,
deadLetterPolicy = "deadLetterPolicy",
properties = {"ackTimeout=60s"}
)
public void userTopicListener(User user) {
LOGGER.info("Received user object with email: {}", user.getEmail());
}
在這裡,我們將userTopicListener配置為使用之前創建的deadLetterPolicy ,並將確認時間配置為 60 秒。
我們可以創建一個單獨的Listener來處理DQL中的消息:
@PulsarListener(
subscriptionName = "dead-letter-topic-subscription",
topics = USER_DEAD_LETTER_TOPIC,
subscriptionType = SubscriptionType.Shared
)
public void userDlqTopicListener(User user) {
LOGGER.info("Received user object in user-DLQ with email: {}", user.getEmail());
}
八、結論
在本教程中,我們了解瞭如何將 Apache Pulsar 與 Spring Boot 應用程序一起使用,以及更改默認配置的一些方法。
與往常一樣,示例實現可以在 GitHub 上找到。