阿帕契卡夫卡簡介
1. 概述
在本教程中,我們將學習 Kafka 的基礎知識——任何人都應該知道的用例和核心概念。然後我們就可以找到並了解關於Kafka更詳細的文章。
2.什麼是卡夫卡?
Kafka是由Apache軟體基金會開發的開源串流處理平台。我們可以將它用作訊息系統來解耦訊息生產者和消費者,但與ActiveMQ等「經典」訊息系統相比,它旨在處理即時資料流,並提供分散式、容錯和高度可擴展的架構用於處理和儲存資料。
因此,我們可以在各種用例中使用它:
- 即時數據處理與分析
- 日誌和事件資料聚合
- 監控和指標收集
- 點擊流資料分析
- 詐欺識別
- 大數據管道中的流處理
3. 設定本地環境
如果我們是第一次接觸 Kafka,我們可能希望本地安裝來體驗它的功能。在 Docker 的幫助下我們可以快速實現這一點。
3.1.安裝卡夫卡
我們下載現有映像並使用以下命令運行容器實例:
docker run -p 9092:9092 -d bashj79/kafka-kraft
這將使所謂的Kafka broker
在主機系統的連接埠 9092 上可用。現在,我們想使用 Kafka 客戶端連接到代理。我們可以使用多個客戶端。
3.2.使用 Kafka CLI
Kafka CLI 是安裝的一部分,可在 Docker 容器中使用。我們可以透過連接到容器的 bash 來使用它。
首先,我們需要使用以下命令找出容器的名稱:
docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
7653830053fa bashj79/kafka-kraft "/bin/start_kafka.sh" 8 weeks ago Up 2 hours 0.0.0.0:9092->9092/tcp awesome_aryabhata
在此範例中,名稱為awesome_aryabhata
。然後我們使用以下命令連接到 bash:
docker exec -it awesome_aryabhata /bin/bash
例如,現在我們可以建立主題(稍後我們將澄清這個術語)並使用以下命令列出所有現有主題:
cd /opt/kafka/bin
# create topic 'my-first-topic'
sh kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-first-topic --partitions 1 --replication-factor 1
# list topics
sh kafka-topics.sh --bootstrap-server localhost:9092 --list
# send messages to the topic
sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-first-topic
>Hello World
>The weather is fine
>I love Kafka
3.3.使用偏移資源管理器
Offset Explorer(以前稱為:Kafka Tool)是用於管理 Kafka 的 GUI 應用程式。我們可以快速下載並安裝它。然後,我們建立一個連接並指定 Kafka 代理的主機和連接埠:
然後,我們可以探索該架構:
3.4.使用 Apache Kafka 的 UI (Kafka UI)
Apache Kafka 的 UI (Kafka UI)是一個 Web UI,使用 Spring Boot 和 React 實現,並作為 Docker 容器提供,以便使用以下命令進行簡單安裝:
docker run -it -p 8080:8080 -e DYNAMIC_CONFIG_ENABLED=true provectuslabs/kafka-ui
然後我們可以使用http://localhost:8080在瀏覽器中開啟 UI 並定義一個集群,如下圖所示:
由於 Kafka 代理程式在與 Kafka UI 後端不同的容器中執行,因此它無法存取localhost:9092
。我們可以使用host.docker.internal:9092
來定址主機系統,但這只是引導 URL。
不幸的是,Kafka 本身將回傳一個回應,導致再次重定向到 localhost:9092,這是行不通的。如果我們不想設定 Kafka(因為這會破壞其他客戶端),我們需要建立一個從 Kafka UI 的容器連接埠 9092 到主機系統連接埠 9092 的連接埠轉送。下圖說明了這些連線:
我們可以設定這個容器內部連接埠轉發,例如使用socat
。我們必須將其安裝在容器(Alpine Linux)中,因此我們需要使用 root 權限連接到容器的 bash。因此,我們需要這些命令,從主機系統的命令列開始:
# Connect to the container's bash (find out the name with 'docker ps')
docker exec -it --user=root <name-of-kafka-ui-container> /bin/sh
# Now, we are connected to the container's bash.
# Let's install 'socat'
apk add socat
# Use socat to create the port forwarding
socat tcp-listen:9092,fork tcp:host.docker.internal:9092
# This will lead to a running process that we don't kill as long as the container's running
不幸的是,我們每次啟動容器時都需要執行socat
。另一種可能性是提供Dockerfile的擴充。
現在,我們可以在 Kafka UI 中指定 localhost:9092 作為引導伺服器,並且應該能夠查看和建立主題,如下所示:
3.5.使用 Kafka Java 用戶端
我們必須將以下 Maven 依賴項新增至我們的專案:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.5.1</version>
</dependency>
然後我們可以連接到 Kafka 並消費先前產生的訊息:
// specify connection properties
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "MyFirstConsumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// receive messages that were sent before the consumer started
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// create the consumer using props.
try (final Consumer<Long, String> consumer = new KafkaConsumer<>(props)) {
// subscribe to the topic.
final String topic = "my-first-topic";
consumer.subscribe(Arrays.asList(topic));
// poll messages from the topic and print them to the console
consumer
.poll(Duration.ofMinutes(1))
.forEach(System.out::println);
}
當然,Spring 中有 Kafka Client 的整合。
4. 基本概念
4.1.生產者和消費者
我們可以將 Kafka 客戶端區分為消費者和生產者。生產者向 Kafka 發送訊息,而消費者則從 Kafka 接收訊息。他們僅透過主動從 Kafka 輪詢來接收訊息。卡夫卡本身就是以被動的方式行事。這使得每個消費者都有自己的效能,而不會阻塞 Kafka。
當然,可以同時有多個生產者和多個消費者。當然,一個應用程式可以同時包含生產者和消費者。
消費者是Consumer Group
的一部分,Kafka 透過一個簡單的名稱來識別消費者群組。一個消費者群組中只有一個消費者會收到該訊息。這允許在保證僅一次訊息傳遞的情況下擴展消費者。
下圖是多個生產者和消費者與Kafka一起工作的情況:
4.2.留言
訊息(我們也可以將其命名為“ record
”或“ event
”,具體取決於用例)是 Kafka 處理的資料的基本單位。其有效負載可以是任何二進位格式以及純文字、 Avro 、XML 或 JSON 等文字格式。
每個生產者都必須指定一個序列化器來將訊息物件轉換為二進位有效負載格式。每個消費者必須指定對應的反序列化器來將有效負載格式轉換回其 JVM 中的物件。我們將這些元件簡稱為SerDes
。有內建的 SerDes ,但我們也可以實現自訂 SerDes 。
下圖展示了payload的序列化與反序列化過程:
此外,訊息可以具有以下可選屬性:
- 密鑰也可以是任何二進位格式。如果我們使用金鑰,我們還需要 SerDes。 Kafka 使用鍵進行分區(我們將在下一章中更詳細地討論這一點)。
- 時間戳指示訊息的產生時間。 Kafka 使用時間戳記來排序訊息或實施保留策略。
- 我們可以應用標頭將元資料與有效負載相關聯。例如,Spring 預設會新增用於序列化和反序列化的類型標頭。
4.3.主題和分區
主題是生產者發布訊息的邏輯通道或類別。消費者訂閱一個主題以從其消費者群組的上下文中接收訊息。
預設情況下,主題的保留策略為7天,即7天后,Kafka會自動刪除訊息,與是否發送給消費者無關。如果需要的話我們可以進行設定。
主題由分區(至少一個)組成。確切地說,訊息儲存在主題的一個分區中。在一個分區內,訊息會獲得一個順序號( offset
)。這可以確保訊息以與儲存在分區中相同的順序傳遞給消費者。而且,透過儲存消費者群組已經收到的偏移量,Kafka 保證只傳遞一次。
透過處理多個分區,我們可以確定Kafka 可以在消費者進程池上提供排序保證和負載平衡。
一個消費者在訂閱主題時將被指派到一個分割區,例如使用 Java Kafka 用戶端 API,正如我們已經看到的:
String topic = "my-first-topic";
consumer.subscribe(Arrays.asList(topic));
但是,對於消費者來說,可以選擇它想要從中輪詢訊息的分區:
TopicPartition myPartition = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(myPartition));
此變體的缺點是所有群組消費者都必須使用它,因此自動將分區分配給群組消費者將無法與連接到特殊分區的單一消費者結合使用。此外,如果架構發生變化(例如在群組中添加更多消費者),則無法進行重新平衡。
理想情況下,我們有與partitions一樣多的consumer ,這樣每個consumer都可以準確地分配給其中一個partition,如下所示:
如果我們的消費者多於分區,這些消費者將不會從任何分區接收訊息:
如果我們的消費者少於分區,消費者將從多個分區接收訊息,這與最佳負載平衡相衝突:
生產者不一定只向一個分區發送訊息。每個產生的訊息都會自動分配到一個分區,遵循以下規則:
- 生產者可以指定分區作為訊息的一部分。如果這樣做,這具有最高優先級
- 如果訊息有密鑰,則透過計算密鑰的雜湊值來完成分區。具有相同雜湊值的金鑰將儲存在同一分區中。理想情況下,我們至少擁有與分區一樣多的哈希值
- 否則,
Sticky Partitioner
將訊息分發到分區
同樣,將訊息儲存到同一分區將保留訊息順序,而將訊息儲存到不同分區將導致無序但並行處理。
如果預設分區不符合我們的期望,我們可以簡單地實作一個自訂分區器。因此,我們實作Partitioner介面並在生產者初始化時註冊它:
Properties producerProperties = new Properties();
// ...
producerProperties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyCustomPartitioner.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties);
下圖展示了生產者和消費者以及他們與分區的連結:
每個生產者都有自己的分區器,因此如果我們想確保訊息在主題內一致地分區,我們必須確保所有生產者的分區器以相同的方式運作,或者我們應該只與單一生產者一起工作。
分區會依照訊息到達 Kafka 代理的順序儲存訊息。通常,生產者不會將每個訊息作為單一請求發送,而是在一批中發送多個訊息。如果我們需要確保訊息的順序以及在一個分區內只傳遞一次,我們需要事務感知的生產者和消費者。
4.4.叢集和分區副本
正如我們所發現的,Kafka 使用主題分區來允許平行訊息傳遞和消費者的負載平衡。但 Kafka 本身必須具有可擴展性和容錯性。因此,我們通常不使用單一 Kafka Broker,而是使用多個 Broker 組成的Cluster
。這些代理的行為並不完全相同,但它們中的每一個都被分配了特殊的任務,如果一個代理發生故障,群集的其餘部分可以承擔這些任務。
為了理解這一點,我們需要擴大對主題的理解。建立主題時,我們不僅指定分區的數量,還指定使用同步共同管理分區的代理的數量。我們稱之為Replication Factor
。例如,使用 Kafka CLI,我們可以建立一個具有 6 個分區的主題,每個分區在 3 個代理程式上同步:
sh kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-replicated-topic --partitions 6 --replication-factor 3
例如,複製因子為 3 表示群集對於最多兩個副本故障具有彈性( N-1 resiliency
)。我們必須確保我們至少擁有與指定的複製因子一樣多的代理。否則,Kafka 不會建立主題,直到代理數量增加。
為了提高效率,分區的複製僅發生在一個方向。 Kafka 透過將其中一個代理聲明為Partition Leader
來實現這一點。生產者僅向分區領導者發送訊息,然後領導者與其他代理商進行同步。消費者也將向分區領導者輪詢,因為增加的消費者群組的偏移量也必須同步。
分區領先被分配給多個經紀人。 Kafka 嘗試為不同的分區找到不同的代理。讓我們來看一個具有四個代理和兩個分區且複製因子為三的範例:
Broker 1 是分區 1 的領導者,代理 4 是分區 2 的領導者。因此,每個用戶端在從這些分區發送或輪詢訊息時都會連接到這些代理。為了獲取有關分區領導者和其他可用代理(元資料)的信息,有一種特殊的引導機制。總而言之,我們可以說每個代理都可以提供叢集的元數據,因此客戶端可以初始化與每個代理的連接,然後重定向到分區領導者。這就是為什麼我們可以指定多個代理作為引導伺服器。
如果一個分區領導代理發生故障,Kafka 將聲明仍在工作的代理之一作為新的分區領導者。然後,所有客戶端都必須連接到新的領導者。在我們的範例中,如果 Broker 1 發生故障,Broker 2 將成為分區 1 的新領導者。然後,連接到 Broker 1 的客戶端必須切換到 Broker 2。
Kafka 使用 Kraft(早期版本中:Zookeeper)來編排叢集內的所有代理程式。
4.4.將所有內容放在一起
如果我們將生產者和消費者與三個代理組成的叢集放在一起,這些代理程式管理具有三個分區和複製因子 3 的單一主題,我們將得到以下架構:
5. 生態系統
我們已經知道,可以使用多個客戶端(例如 CLI、整合到 Spring 應用程式的基於 Java 的客戶端)以及多個 GUI 工具來連接 Kafka。當然,還有其他程式語言(例如C/C++ 、 Python或Javascript )的客戶端 API,但這些不屬於 Kafka 專案的一部分。
在這些 API 之上,還有更多用於特殊用途的 API。
5.1.卡夫卡連線API
Kafka Connect是用來與第三方系統交換資料的API。現有的連接器例如用於 AWS S3、JDBC,甚至用於在不同的 Kafka 叢集之間交換資料。當然,我們也可以編寫自訂連接器。
5.2.卡夫卡流 API
Kafka Streams 是用於實作流處理應用程式的 API,這些應用程式從 Kafka 主題獲取輸入,並將結果儲存在另一個 Kafka 主題中。
5.3. KSQL
KSQL 是一個建構在 Kafka Streams 之上的類似 SQL 的介面。它不需要我們開發Java程式碼,但我們可以宣告類似SQL的語法來定義與Kafka交換的訊息的流處理。為此,我們使用連接到 Kafka 叢集的 ksqlDB。我們可以使用 CLI 或 Java 用戶端應用程式存取 ksqlDB。
5.4.卡夫卡 REST 代理
Kafka REST 代理程式為 Kafka 叢集提供 RESTful 介面。這樣,我們就不需要任何 Kafka 用戶端,並避免使用原生 Kafka 協定。它允許 Web 前端與 Kafka 連接,並可使用 API 網關或防火牆等網路元件。
5.5. Kubernetes 的 Kafka 運算子 (Strimzi)
Strimzi是一個開源項目,提供了一種在 Kubernetes 和 OpenShift 平台上運行 Kafka 的方法。它引入了自訂 Kubernetes 資源,讓您可以更輕鬆地以 Kubernetes 原生方式聲明和管理 Kafka 相關資源。它遵循操作員模式,即操作員自動執行 Kafka 叢集的配置、擴展、滾動更新和監控等任務。
六,結論
在本文中,我們了解到 Kafka 是為高可擴展性和容錯能力而設計的。生產者收集訊息並批量發送,主題被劃分為分區以允許並行訊息傳遞和消費者的負載平衡,並且透過多個代理完成複製以確保容錯。
與往常一樣,所有程式碼實作都可以在 GitHub 上取得。