小熊貓簡介
1. 概述
在本教程中,我們將討論一個名為Redpanda的強大事件流平台。這是對事實上的行業串流媒體平台 Kafka 的競爭,有趣的是,它也與 Kafka API 相容。
我們將了解 Redpanda 的關鍵元件、功能和用例,創建用於將訊息發佈到 Redpanda 主題的 Java 程序,然後從中讀取訊息。
2.Redpanda 與 Kafka
由於 Redpanda 的製作者聲稱自己是 Kafka 的競爭對手,讓我們在幾個重要因素上對它們進行比較:
特徵 | 小熊貓 | 卡夫卡 |
---|---|---|
開發者經驗 |
包括一個易於安裝的二進位包
不依賴JVM和第三方工具
|
依賴 Zookeeper 或 KRaft
對於安裝,開發人員需要更多專業知識
|
| 表現 |由於其每核心線程編程模型,比 Kafka 快 10 倍
用 C++ 編寫
每個核心可以處理 1 GB/秒的寫入
支援自動內核調整
p99999 延遲為 16ms
|
Kafka 是很久以前開發的,因此沒有針對運行多核心的新時代 CPU 進行最佳化。
用Java編寫
p99999 延遲為 1.8 秒
|
| 成本 |比Kafka低6倍
|
需要更多基礎設施來支援類似的性能
|
| 連接器 |Redpanda Cloud 提供了一些開箱即用的託管連接器
|
它非常成熟並且支援許多開箱即用的連接器
|
| 社區支持 |就接受度而言,與Kafka相比還有很長的路要走
-
|
它在各個行業都有廣泛的採用,因此有一個非常成熟的社區
|
3.Redpanda架構
Redpanda 的架構不僅簡單而且非常容易掌握。有趣的是,它有一個易於安裝的二進制安裝包。這使開發人員能夠快速領先,這也是其受歡迎的原因。此外,它還提供了一個具有極高吞吐量的高效能串流平台。
3.1.關鍵部件和特性
讓我們深入了解 Redpanda 的關鍵組件和功能,這些組件和功能使其極其強大和高效能:
控制平面支援 Kafka API,用於管理代理、建立訊息主題、發布和消費訊息等等。因此,依賴 Kafka 的遺留系統可以輕鬆遷移到 Redpanda。但是,有一組不同的管理 API 用於管理和配置 Redpanda 叢集。
Redpanda 支援分層儲存。這意味著我們可以將其配置為將其資料日誌從本地快取卸載或存檔到雲端中更便宜的物件儲存。此外,根據消費者的需求,資料會即時從遠端物件儲存移回本地快取。
Redpanda 有一個Raft 共識演算法實作層,可以跨節點複製主題分區資料。此功能可防止發生故障時資料遺失。自然也就保證了較高的資料安全性和容錯性。
Redpanda 擁有強大的身份驗證和授權支援。它可以使用 SASL、OAuth、OpenID Connect (OIDC)、基本身份驗證、Kerberos 等方法對外部用戶和應用程式進行身份驗證。此外,它還可以透過基於角色的存取控制 (RBAC) 機制對其資源進行細粒度的存取控制。
模式對於定義 Redpanda 經紀人、消費者和生產者之間交換的資料至關重要。因此,叢集有一個架構註冊表。架構註冊表 API 幫助註冊和修改架構。
HTTP 代理程式 (pandaproxy) API提供了一種與 Redpanda 互動的便捷方式,以進行基本資料操作,例如列出主題和代理程式、獲取事件、生成事件等等。
最後, **Redpanda 為其監控提供了指標端點**。這些可以在 Prometheus(監控工具)上進行配置,以提取重要指標並將其顯示在Grafana 儀表板上。
3.2.單一二進位安裝包
Redpanda 的安裝包包含一個二進位文件,因此它的安裝比 Kafka 簡單得多。與 Kafka 不同,它不依賴 JVM 或 Zookeeper 等叢集管理器。由於這些因素,操作 Redpanda 非常容易。
它是用 C++ 開發的,具有引人注目的每核線程程式設計模型,有助於最佳地利用 CPU 核心、記憶體和網路。因此,其部署的硬體成本顯著降低。該模型還可以實現低延遲和高吞吐量。
Redpanda 的叢集由多個節點組成。每個節點可以是資料平面或控制平面。所有這些節點都需要安裝一個具有適當配置的二進位套件。如果節點具有高階運算能力,則它們可以同時扮演這兩種角色,而不會出現效能瓶頸。
3.3.管理工具
Redpanda 提供了兩種管理工具,一個Web 控制台和一個稱為Redpanda Keeper (RPK)的 CLI 。控制台是一個用戶友好的 Web 應用程序,集群管理員可以使用。
RPK 主要用於低階集群管理和調優。但是,控制台提供了資料流的可見性以及故障排除和管理叢集的功能。
4. 部署
Redpanda支援自架和Redpanda雲端部署。
在自託管部署中,客戶可以將 Redpanda 叢集部署在其私人資料中心內或公有雲的 VPC 中。它可以部署在實體機或虛擬機器以及 Kubernetes 上。根據經驗,每個代理都應該有其專用節點。目前支援RHEL/CentOS和Ubuntu作業系統。
此外,AWS Simple Storage Service (S3)、Azure Blob Storage (ABS) 和 Google Cloud Storage (GCS) 可用於支援分層儲存。
有趣的是,客戶也可以選擇Redpanda Cloud來提供託管服務。他們可以將整個叢集完全放在 Redpanda Cloud 上,也可以選擇擁有在其私人資料中心或公有雲帳戶中運行的資料平面。控制平面保留在 Redpanda Cloud 上,監控、配置和升級都在其中完成。
5. 關鍵用例
與 Kafka 不同,Redpanda 對於開發人員來說是一個極其強大的串流媒體平台,因為它的架構簡單且易於安裝。讓我們沿著同樣的思路快速瀏覽一下用例:
一般來說,串流媒體平台的參與者是:
- 來源系統產生提要
- 源可以監視事件、指標、通知等
- 集群中的代理管理主題
- 生產者從來源系統讀取提要並將其發佈到主題
- 消費者不斷對訂閱的主題進行民意調查
- 目標系統接收來自消費者的轉換後的訊息
Redpanda 保證將來自監控工具、合規性和安全平台、物聯網設備等各種來源的即時資訊傳送到目標系統,平均延遲降低 10 倍。
它支援消費者和生產者模型來處理來自各種來源的即時提要或事件。生產者是從來源系統讀取資料並將其發佈到 Redpanda 叢集中的主題的應用程式。叢集中的broker高度可靠、容錯,保證訊息的傳遞。
消費者應用程式訂閱集群中的主題。最終,他們從主題中讀取數據,並在進一步轉換數據後,將其發送到各種目標系統,例如分析平台、NoSQL 資料庫、關聯式資料庫或其他流平台。
在微服務架構中,Redpanda 透過促進微服務之間的非同步通訊來幫助解耦微服務。
因此,它可以在各行業的發展中發揮重要作用:
- 用於事件和日誌處理、報告、故障排除和自動修復的可觀察性平台
- 即時合規和詐欺偵測系統
- 即時分析儀表板和應用程式
6. 使用 Kafka API 實作 Redpanda 用戶端
值得注意的是,Redpanda 支援 Kafka API。因此,我們將使用 Kafka 用戶端編寫可以與 Redpanda Stream 互動的程式。
對於我們的範例,我們使用 Java Testcontainers 在 Windows 桌面上部署單節點 Redpanda。
此外,我們將探索涵蓋主題創建、訊息發布和訊息消費的基本程序。這僅用於演示目的,因此,我們不會深入研究 Kafka API 概念。
6.1.先決條件
在開始之前,讓我們匯入 Kafka 客戶端庫所需的Maven 依賴項:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.1</version>
</dependency>
6.2.創建主題
為了在 Redpanda 上建立主題,我們首先實例化 Kafka 客戶端庫中的AdminClient
類別:
AdminClient createAdminClient() {
Properties adminProps = new Properties();
adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerUrl());
return KafkaAdminClient.create(adminProps);
}
為了設定AdminClient
,我們取得了代理 URL 並將其傳遞給其靜態create()
方法。
現在,讓我們看看如何建立主題:
void createTopic(String topicName) {
try (AdminClient adminClient = createAdminClient()) {
NewTopic topic = new NewTopic(topicName, 1, (short) 1);
adminClient.createTopics(Collections.singleton(topic));
} catch (Exception e) {
LOGGER.error("Error occurred during topic creation:", e);
}
}
AdminClient
類別的createTopics()
方法採用NewTopic
物件作為建立主題的參數。
最後,讓我們來看看createTopic()
方法的實際操作:
@Test
void whenCreateTopic_thenSuccess() throws ExecutionException, InterruptedException {
String topic = "test-topic";
createTopic(topic);
try(AdminClient adminClient = createAdminClient()) {
assertTrue(adminClient.listTopics()
.names()
.get()
.contains(topic));
}
}
程式在Redpanda上成功創建了主題test-topic
。我們也使用AdminClient
類別的listTopics()
方法來驗證代理程式中主題是否存在。
6.3.向主題發布訊息
可以理解的是,生產者應用程式最基本的要求是將訊息發佈到主題。為此,我們將使用KafkaProducer
:
KafkaProducer<String, String> createProducer() {
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerUrl());
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<String, String>(producerProps);
}
我們透過向KafkaProducer
建構函式提供代理 URL 和StringSerializer
類別等基本屬性來實例化生產者。
現在,讓我們使用生產者將訊息發佈到主題:
void publishMessage(String msgKey, String msg, String topic, KafkaProducer<String, String> producer)
throws ExecutionException, InterruptedException {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, msgKey, msg);
producer.send(record).get();
}
創建ProducerRecord
物件後,我們將其傳遞給KafkaProducer
物件中的send()
方法來發布訊息。 send()
方法是非同步操作的,因此,我們呼叫get()
方法以確保在訊息發布之前處於阻塞狀態。
最後,現在讓我們發布一條訊息:
@Test
void givenTopic_whenPublishMsg_thenSuccess() {
try (final KafkaProducer<String, String> producer = createProducer()) {
assertDoesNotThrow(() -> publishMessage("test_msg_key_2", "Hello Redpanda!", "baeldung-topic", producer));
}
}
首先,我們透過呼叫方法 createProducer() 來建立KafkaProducer
物件createProducer().
然後我們發布消息“Hello Redpanda!”
透過呼叫我們之前介紹過的方法publishMessage()
到主題baeldung-topic
。
6.4.使用主題中的消息
下一步,我們先建立一個KafkaConsumer
,然後才能使用流中的消息:
KafkaConsumer<String, String> createConsumer() {
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerUrl());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return new KafkaConsumer<String, String>(consumerProps);
}
我們透過向KafkaConsumer
建構函式提供代理 URL、 StringDeSerializer
類別等基本屬性來實例化使用者。此外,我們確保消費者將從偏移量 0(「最早」)開始消費訊息。
繼續,讓我們使用一些訊息:
@Test
void givenTopic_whenConsumeMessage_thenSuccess() {
try (KafkaConsumer<String, String> kafkaConsumer = createConsumer()) {
kafkaConsumer.subscribe(Collections.singletonList(TOPIC_NAME));
while(true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));
if(records.count() == 0) {
continue;
}
assertTrue(records.count() >= 1);
break;
}
}
}
此方法在建立KafkaConsumer
物件後訂閱一個主題。然後,它每 1000 毫秒輪詢一次以從中讀取訊息。在這裡,為了演示,我們將退出循環,但在現實世界中,應用程式會不斷輪詢訊息,然後進一步處理它們。
七、結論
在本教程中,我們探索了 Redpanda Streaming 平台。從概念上講,它與 Apache Kafka 類似,但更容易安裝、監控和管理。此外,透過更少的運算和記憶體資源,它可以實現極高的效能和高容錯能力。
然而,與 Kafka 相比,Redpanda 在產業採用方面仍有相當大的距離。此外,Redpanda 的社區支持不如 Kafka 強烈。
最後,應用程式可以輕鬆地從 Kafka 遷移到 Redpanda,因為它與 Kafka API 相容。
與往常一樣,本文中使用的程式碼可以在 GitHub 上找到。