使用 SASL/PLAIN 驗證保護 Kafka
1. 簡介
在本文中,我們將學習如何在 Kafka 服務中實作 SASL/PLAIN 驗證機制。我們也將使用 Spring Kafka 提供的支援實作客戶端身份驗證。
Kafka 支援多種身份驗證選項,提供增強的安全性和相容性。這些選項包括 SASL、SSL 和委託令牌身份驗證。
簡單身份驗證和安全層 (SASL) 是一個身份驗證框架,允許輕鬆整合其他身份驗證機制,例如 GSSAPI、OAuthBearer、SCRAM 和 PLAIN 。
SASL/PLAIN 驗證並不安全!這是因為使用者憑證會以明文形式在網路上曝光。不過,由於配置要求較低,它對於本地開發來說仍然很有用。
需要注意的是,除非與 SSL/TLS 結合使用,否則不應在生產環境中使用 SASL/PLAIN 驗證。當 SSL 與 SASL/PLAIN 驗證結合使用時(在 Kafka 中稱為 SASL-SSL),它會加密流量,包括客戶端和伺服器之間的敏感憑證。
2. 使用 SASL/PLAIN 驗證實作 Kafka
假設我們需要在 Docker 環境中建置一個支援 SASL/PLAIN 驗證的 Kafka 服務。
為此,我們將利用 JAAS 配置來新增 SASL/PLAIN 所需的使用者憑證。
2.1. 設定 Kafka 憑證
為了在 Kafka 中配置使用者憑證,我們將使用PlainLoginModule
安全實作。
讓我們包含一個kafka_server_jaas.conf
檔案來設定admin
和user1
憑證:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_user1="user1-secret";
};
在上面的程式碼中,我們定義了admin
和user1
用戶,分別用於 Kafka 的 Broker 間身份驗證和外部客戶端身份驗證。 user1 user1
定義如下user_<username>
一行程式碼即可定義,並附 secret。
2.2. 設定 Zookeeper 憑證
由於我們在 Kafka 服務中包含了客戶端使用者憑證,因此我們也將使用 SASL/PLAIN 驗證來保護 Zookeeper 服務。保護 Zookeeper 服務的安全也是一種很好的做法。
讓我們包含一個zookeeper_jaas.conf
檔案來設定zookeeper
使用者憑證:
Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="zookeeper"
password="zookeeper-secret"
user_zookeeper="zookeeper-secret";
};
在上面的配置中,我們使用 Zookeeper 特定的安全實作DigestLoginModule
而不是 Kafka 的PlainLoginModule
,以提高相容性。
此外,我們將在先前建立的kafka_server_jaas.conf
檔案中包含zookeeper憑證:
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="zookeeper"
password="zookeeper-secret";
};
Kafka 服務使用上述Client
端憑證向 Zookeeper 服務進行驗證。
2.3. 使用 Zookeeper 設定 Kafka 服務
我們可以使用 Docker Compose 檔案設定我們的 Kafka 和 Zookeeper 服務。
首先,我們將實作一個 Zookeeper 服務並包含zookeeper_jaas.conf
檔:
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.6.6
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/zookeeper_jaas.conf"
volumes:
- ./config/zookeeper_jaas.conf:/etc/kafka/zookeeper_jaas.conf
ports:
- 2181
接下來,我們將實作一個使用 SASL/PLAIN 認證的 Kafka 服務:
kafka:
image: confluentinc/cp-kafka:7.6.6
depends_on:
- zookeeper
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: SASL_PLAINTEXT://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://localhost:9092
KAFKA_INTER_BROKER_LISTENER_NAME: SASL_PLAINTEXT
KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./config/kafka_server_jaas.conf:/etc/kafka/kafka_server_jaas.conf
ports:
- "9092:9092"
在上面的程式碼中,我們包含了先前建立的kafka_server_jaas.conf
檔案來設定 SASL/PLAIN 使用者。
我們應該注意, KAFKA_ADVERTISED_LISTENERS
屬性是 Kafka 用戶端將發送訊息和監聽的端點。
最後,我們將使用docker compose
命令運行整個 Docker 設定:
docker compose up --build
我們會在Docker控制台中得到類似的日誌:
kafka-1 | [2025-06-19 14:32:00,441] INFO Session establishment complete on server zookeeper/172.18.0.2:2181, session id = 0x10000004c150001, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn)
kafka-1 | [2025-06-19 14:32:00,445] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)
zookeeper-1 | [2025-06-19 14:32:00,461] INFO Successfully authenticated client: authenticationID=zookeeper; authorizationID=zookeeper. (org.apache.zookeeper.server.auth.SaslServerCallbackHandler)
我們確認 Kafka 和 Zookeeper 服務整合沒有任何錯誤。
3. 使用 Spring 實作 Kafka 用戶端
我們將使用 Spring Kafka 實作來實現生產者和消費者服務。
3.1. Maven 依賴項
首先,我們將包含Spring Kafka依賴項:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.1.2</version>
</dependency>
接下來,我們將實作一個生產者服務來發送訊息。
3.2. Kafka 生產者
讓我們使用KafkaTemplate
類別實作 Kafka 生產者服務:
public void sendMessage(String message, String topic) {
LOGGER.info("Producing message: {}", message);
kafkaTemplate.send(topic, "key", message)
.whenComplete((result, ex) -> {
if (ex == null) {
LOGGER.info("Message sent to topic: {}", message);
} else {
LOGGER.error("Failed to send message", ex);
}
});
}
在上面的程式碼中,我們使用KafkaTemplate
的send
方法來發送訊息。
3.3. Kafka消費者
我們將使用 Spring Kafka 的KafkaListener
和ConsumerRecord
類別來實作消費者服務。
讓我們用@KafkaListener
註解實作一個消費者方法:
@KafkaListener(topics = TOPIC)
public void receive(ConsumerRecord<String, String> consumerRecord) {
LOGGER.info("Received payload: '{}'", consumerRecord.toString());
messages.add(consumerRecord.value());
}
在上面的程式碼中,我們接收一條訊息並將其添加到messages
列表中。
3.4. 使用 Kafka 設定 Spring 應用程式
接下來,我們將建立一個application.yml
檔案並包含一些與 Spring Kafka 相關的屬性:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: test-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
現在,讓我們運行應用程式並驗證設定:
kafka-1 | [2025-06-19 14:38:33,188] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1001] Failed authentication with /192.168.65.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
如預期的那樣,客戶端應用程式無法與 Kafka 伺服器進行身份驗證。
3.5. 使用 JAAS 設定客戶端
為了解決上述錯誤,我們將使用spring.kafka.properties
配置來提供 SASL/PLAIN 設定。
現在,我們將包含一些與user1
相關的附加配置,並將sasl.mechanism
屬性設為PLAIN
:
spring:
kafka:
bootstrap-servers: localhost:9092
properties:
sasl.mechanism: PLAIN
sasl.jaas.config: >
org.apache.kafka.common.security.plain.PlainLoginModule required
username="user1"
password="user1-secret";
security:
protocol: "SASL_PLAINTEXT"
在上面的程式碼中,我們將匹配的username
和password
作為sasl.jaas.config
屬性的一部分。
有時,我們會遇到由於 SASL 配置缺失或不正確而導致的常見錯誤。例如,如果sasl.mechanism
屬性為PLAINTEXT
而非PLAIN
,則會收到下列錯誤:
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
... 25 common frames omitted
Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to create SaslClient with mechanism PLAINTEXT
當sasl.mechanism
屬性被錯誤地命名為security.mechanism
時,我們會收到不同的錯誤:
Caused by: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config
讓我們透過整個設定來驗證 Kafka 應用程式.
4.測試
我們將使用 Testcontainers 框架來測試 Kafka 用戶端應用程式。
首先,我們將使用docker-compose.yml
建立一個DockerComposeContainer
物件:
@Container
public DockerComposeContainer<?> container =
new DockerComposeContainer<>("src/test/resources/sasl-plaintext/docker-compose.yml")
.withExposedService("kafka", "9092", Wait.forListeningPort());
接下來,讓我們實作一個測試方法來驗證消費者:
@Test
void givenSaslIsConfigured_whenProducerSendsMessageOverSasl_thenConsumerReceivesOverSasl() {
String message = UUID.randomUUID().toString();
kafkaProducer.sendMessage(message, "test-topic");
await().atMost(Duration.ofMinutes(2))
.untilAsserted(() -> assertThat(kafkaConsumer.messages).containsExactly(message));
}
最後,我們將運行測試案例並驗證輸出:
16:56:44.525 [kafka-producer-network-thread | producer-1] INFO cbsaslplaintext.KafkaProducer - Message sent to topic: 82e8a804-0269-40a2-b8ed-c509e6951011
16:56:48.566 INFO cbsaslplaintext.KafkaConsumer - Received payload: ConsumerRecord(topic = test-topic, ... key = key, value = 82e8a804-0269-40a2-b8ed-c509e6951011
從以上日誌我們可以看到消費者服務已經成功接收到訊息。
5. 結論
在本教學中,我們學習如何在 Docker 環境中使用 JAAS 配置在 Kafka 服務中設定 SASL/PLAIN 驗證。
我們也實作了生產者/消費者服務,並使用類似的 JAAS 配置配置了身分驗證。最後,我們透過使用 Docker TestContainer 發送和接收訊息來測試整個設定。
與往常一樣,範例程式碼可以在 GitHub 上找到。