Kafka 配置中的 bootstrap-server
1. 概述
每當我們實現 Kafka 生產者或消費者(例如,使用 Spring)時,我們需要配置的內容之一是“ bootstrap.servers
”屬性。
在本教程中,我們將了解此設置的含義及其用途。
2.Kafka拓撲
Kafka 的拓撲是為了可擴展性和高可用性而設計的。這就是為什麼有一個服務器(代理)集群來處理代理之間複製的主題分區。每個分區都有一個代理作為領導者,其他代理作為追隨者。
生產者將消息發送到分區領導者,然後分區領導者將記錄傳播到每個副本。消費者通常也會連接到分區領導者,因為消費消息會改變狀態(消費者偏移量)。
副本的數量就是複制因子。建議使用值3
,因為它可以在性能和容錯之間提供適當的平衡,並且通常云提供商會提供三個數據中心(可用區)來部署到作為區域的一部分。
作為示例,下圖顯示了一個由四個代理組成的集群,提供一個具有兩個分區和復制因子為3
的主題:
當一個分區領導者崩潰時,Kafka 選擇另一個代理作為新的分區領導者。然後,消費者和生產者(“客戶”)也必須切換到新的領導者。因此,如果Broker 1
崩潰,情況可能會變成這樣:
3. 引導
正如我們所看到的,整個集群是動態的,客戶端需要了解拓撲的當前狀態才能連接到正確的分區領導者以發送和接收消息。**這就是引導程序發揮作用的地方。**
“ bootstrap-servers
”配置是一個“ hostname:port
”對的列表,用於尋址一個或多個(甚至所有)代理。客戶端通過執行以下步驟來使用此列表:
- 從列表中選擇第一個經紀人
- 向代理髮送請求以獲取集群元數據,其中包含有關主題、分區和每個分區的領導代理的信息(每個代理都可以提供此元數據)
- 連接到所選主題分區的領導者代理
當然,在列表中指定多個代理是有意義的,因為如果第一個代理不可用,客戶端可以選擇第二個代理進行引導。
Kafka 使用 Kraft(來自早期的 Zookeeper)來管理所有此類編排。
4. 樣品
假設我們在開發環境中使用帶有 Kafka 和 Kraft (如bashj79/kafka-kraft )的簡單 Docker 映像。我們可以使用以下命令安裝此 Docker 映像:
docker run -p 9092:9092 -d bashj79/kafka-kraft
這會在容器內和主機上的端口9092
上運行一個可用的 Kafka 實例。
4.1.使用 Kafka CLI
連接 Kafka 的一種方法是使用 Kafka CLI ,它在 Kafka 安裝中可用。首先,我們創建一個名為samples
的主題。在容器的 Bash 中,我們可以運行以下命令:
$ cd /opt/kafka/bin
$ sh kafka-topics.sh --bootstrap-server localhost:9092 --create --topic samples --partitions 1 --replication-factor 1
如果我們想開始使用該主題,我們需要再次指定引導服務器:
$ sh kafka-console-consumer.sh --bootstrap-server localhost:9092,another-host.com:29092 --topic samples
我們還可以將集群元數據視為一種虛擬文件系統。我們使用kafka-metadata-shell
腳本連接到元數據:
$ sh kafka-metadata-shell.sh --snapshot /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000167.log
4.2.使用Java
在 Java 應用程序中,我們可以使用Kafka 客戶端:
static Consumer<Long, String> createConsumer() {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092,another-host.com:29092");
props.put(ConsumerConfig.GROUP_ID_CONFIG,
"MySampleConsumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
// Create the consumer using props.
final Consumer<Long, String> consumer = new KafkaConsumer<>(props);
// Subscribe to the topic.
consumer.subscribe(Arrays.asList("samples"));
return consumer;
}
通過 Spring Boot 和 Spring 的Kafka 集成,我們可以簡單地配置application.properties
:
spring.kafka.bootstrap-servers=localhost:9092,another-host.com:29092
5. 結論
在本文中,我們了解到Kafka是一個由多個代理組成的分佈式系統,這些代理複製主題分區以確保高可用性、可擴展性和容錯性。
客戶端需要從一個代理檢索元數據,以找到要連接的當前分區領導者。該代理是引導服務器,我們通常提供引導服務器列表,以便在主代理無法訪問時為客戶端提供替代方案。
與往常一樣,所有代碼實現都可以在 GitHub 上獲得。