檢查 Apache Kafka 服務器是否正在運行的指南
一、概述
使用 Apache Kafka 的客戶端應用程序通常屬於兩類,即生產者和消費者。生產者和消費者都要求底層 Kafka 服務器啟動並運行,然後才能分別開始生產和消費工作。
在本文中,我們將學習一些確定 Kafka 服務器是否正在運行的策略。
2. 使用 Zookeeper 命令
找出是否存在活動代理的最快方法之一是使用 Zookeeper 的dump
命令。 dump
命令是可用於管理 Zookeeper 服務器的4LW命令之一**。**
讓我們繼續使用nc
命令通過正在偵聽 2181 端口的 Zookeeper 服務器發送轉儲命令:
$ echo dump | nc localhost 2181 | grep -i broker | xargs
/brokers/ids/0
在執行命令時,我們會看到在 Zookeeper 服務器上註冊的臨時代理 ID 列表。如果不存在臨時 ID,則沒有任何代理節點正在運行。
此外,重要的是要注意,需要在zookeeper.properties
或zoo.cfg
配置文件中通常可用的配置中明確允許dump
命令:
lw.commands.whitelist=dump
或者,我們也可以使用 Zookeeper API 來查找活動代理列表。
3. 使用 Apache Kafka 的AdminClient
如果我們的生產者或消費者是 Java 應用程序,那麼我們可以使用 Apache Kafka 的AdminClient
類來確定 Kafka 服務器是否已啟動。
讓我們定義KafkaAdminClient
類來包裝AdminClient
類的實例,以便我們可以快速測試我們的代碼:
public class KafkaAdminClient {
private final AdminClient client;
public KafkaAdminClient(String bootstrap) {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrap);
props.put("request.timeout.ms", 3000);
props.put("connections.max.idle.ms", 5000);
this.client = AdminClient.create(props);
}
}
接下來,讓我們在KafkaAdminClient
類中定義verifyConnection()
方法來驗證客戶端是否可以連接到正在運行的代理服務器:
public boolean verifyConnection() throws ExecutionException, InterruptedException {
Collection<Node> nodes = this.client.describeCluster()
.nodes()
.get();
return nodes != null && nodes.size() > 0;
}
最後,讓我們通過連接到正在運行的 Kafka 集群來測試我們的代碼:
@Test
void givenKafkaIsRunning_whenCheckedForConnection_thenConnectionIsVerified() throws Exception {
boolean alive = kafkaAdminClient.verifyConnection();
assertThat(alive).isTrue();
}
4. 使用kcat
實用程序
我們可以使用kcat
(以前的kafkacat
)命令來查看是否有正在運行的 Kafka 代理節點。為此,讓我們使用-L
選項來顯示現有主題的元數據:
$ kcat -b localhost:9092 -t demo-topic -L
Metadata for demo-topic (from broker -1: localhost:9092/bootstrap):
1 brokers:
broker 0 at 192.168.1.53:9092 (controller)
1 topics:
topic "demo-topic" with 1 partitions:
partition 0, leader 0, replicas: 0, isrs: 0
接下來,讓我們在代理節點關閉時執行相同的命令:
$ kcat -b localhost:9092 -t demo-topic -L -m 1
%3|1660579562.937|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 1ms in state CONNECT)
% ERROR: Failed to acquire metadata: Local: Broker transport failure (Are the brokers reachable? Also try increasing the metadata timeout with -m <timeout>?)
對於這種情況,我們會收到“連接被拒絕”錯誤,因為沒有正在運行的代理節點。此外,我們必須注意,通過使用-m
選項將請求超時限制為 1 秒,我們能夠快速失敗。
5. 使用 UI 工具
對於不需要自動檢查的實驗性 POC 項目,我們可以依靠諸如Offset Explorer之類的 UI 工具。但是,如果我們要驗證企業級 Kafka 客戶端的代理節點狀態,不建議使用這種方法。
讓我們使用 Offset Explorer 使用 Zookeeper 主機和端口詳細信息連接到 Kafka 集群:
我們可以在左側窗格中看到正在運行的代理列表。而已。我們只需單擊一下按鈕即可獲得它。
六,結論
在本教程中,我們探索了一些使用 Zookeeper 命令、Apache 的AdminClient
和kcat
實用程序的命令行方法,然後使用基於 UI 的方法來確定 Kafka 服務器是否已啟動。
與往常一樣,本教程的完整源代碼可在 GitHub 上獲得。