HiveMQ MQTT 用戶端簡介
1. 引言
在建立使用 MQTT 的 Java 應用程式時,我們需要一個易於使用且適合生產環境的客戶端程式庫。由 HiveMQ 團隊開發和維護的 HiveMQ MQTT 用戶端是一個現代化的 Java 函式庫,它同時支援 MQTT 3.1.1 和 MQTT 5。
此客戶端提供多種 API 風格,包括阻塞式、非同步式和響應式,以支援不同的程式設計模型。此外,它還提供了一個流暢的、基於建構器的 API,用於在與 MQTT 代理程式互動時配置連接、訂閱和發布操作。
在本教程中,我們將示範該庫的基本用法。具體來說,我們將連接到公共 MQTT 代理,訂閱一個主題,發布一條訊息,並使用整合測試驗證訊息的送達情況。
2. 項目設定
在建立客戶端之前,我們需要將 HiveMQ MQTT 用戶端相依性新增至我們的專案:
<dependency>
<groupId>com.hivemq</groupId>
<artifactId>hivemq-mqtt-client</artifactId>
<version>1.3.12</version>
</dependency>
最新版本的庫模組可在Maven Central上找到。
有了依賴關係,我們就可以建立一個 MQTT 用戶端,並連接到broker.hivemq.com:1883上公開可用的 MQTT 代理。
3. 創建客戶
為了說明 HiveMQ MQTT 用戶端的不同 API 風格,我們將建立兩個客戶端。我們將使用一個非同步客戶端來訂閱主題並接收訊息,以及一個阻塞客戶端來同步發布訊息。
3.1 建立非同步客戶端
首先,我們從一個非同步客戶端開始,它訂閱主題並以非阻塞、事件驅動的方式處理傳入的訊息。這種客戶端類型允許我們註冊回調函數,這些回調函數會在訊息到達時被呼叫。
我們將使用 Fluent Builder API 來創建它:
Mqtt5AsyncClient subscriber = Mqtt5Client.builder()
.identifier("baeldung-sub-" + UUID.randomUUID())
.serverHost(PUBLIC_BROKER_HOST)
.serverPort(PUBLIC_BROKER_PORT)
.buildAsync();
透過設定唯一的客戶端識別碼和 MQTT 代理位址, buildAsync()會建立一個非同步 MQTT 5 用戶端實例。稍後,我們將使用此客戶端連接到代理並訂閱一個主題。
3.2 建立阻塞客戶端
相反,我們將使用阻塞式客戶端來發布訊息。阻塞式 API 同步執行操作,這在我們不需要回應傳入訊息時非常有效。
同樣地,我們將使用建構器 API 建立阻塞客戶端:
Mqtt5BlockingClient publisher = Mqtt5Client.builder()
.identifier("baeldung-pub-" + UUID.randomUUID())
.serverHost(PUBLIC_BROKER_HOST)
.serverPort(PUBLIC_BROKER_PORT)
.buildBlocking();
配置好唯一的客戶端識別碼和代理位址後,呼叫buildBlocking()函數會建立一個 MQTT 5 客戶端實例。此客戶端提供用於連接到代理和發布訊息的同步方法。
4. 連結、訂閱和發布
現在我們有了兩個客戶端,我們可以連接到公開可用的代理,訂閱一個主題,並發布一條訊息。
4.1 連接到經紀商
首先,我們將連接兩個客戶端:
subscriber.connect()
.join();
publisher.connect();
非同步客戶端在連線時傳回CompletableFuture ,因此我們將使用join().另一方面,阻塞客戶端同步連接,並且僅在連接建立後才返回。
4.2. 訂閱主題
接下來,我們將在非同步客戶端上註冊一個處理程序來處理傳入的發布訊息。我們將使用一個閂鎖來讓我們的測試等待,直到訂閱者收到訊息為止:
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<String> receivedMessage = new AtomicReference<>();
subscriber.publishes(MqttGlobalPublishFilter.SUBSCRIBED, publish -> {
String message = new String(publish.getPayloadAsBytes(), StandardCharsets.UTF_8);
receivedMessage.set(message);
latch.countDown();
});
處理程序定義了訊息處理邏輯。當訊息到達時,回呼函數會提取有效負載並通知測試執行緒繼續執行。
有了處理程序,我們就可以訂閱我們的主題了:
subscriber.subscribeWith()
.topicFilter(topic)
.send()
.join();
這會告訴經紀人客戶想要接收哪個主題的訊息,並等待確認。
4.3 發布訊息
最後,我們將使用阻塞客戶端發布一則訊息:
String payload = "Hello from Baeldung";
publisher.publishWith()
.topic(topic)
.payload(payload.getBytes(StandardCharsets.UTF_8))
.send();
此時,訊息代理人會將已發佈的訊息轉發給該主題的所有訂閱者。在我們的例子中,非同步客戶端上註冊的回呼函數會接收該訊息並儲存其有效負載。
5. 驗證訊息送達
最後,為了驗證訂閱者是否收到了已發布的消息,我們將在測試中新增斷言。訂閱者回呼運行後,測試會檢查接收到的有效負載是否與發布的值相符:
assertTrue(latch.await(2, TimeUnit.SECONDS));
assertEquals(payload, receivedMessage.get());
驗證完成後,我們將斷開兩個客戶端的連接,關閉與代理的連接並釋放資源:
publisher.disconnect();
subscriber.disconnect()
.join();
因此,斷開客戶端連線可以釋放網路資源並完成客戶端生命週期。
6. 結論
在本文中,我們介紹了 HiveMQ MQTT 用戶端,並示範如何連接到公共 MQTT 代理、訂閱主題、發布訊息以及使用整合測試驗證訊息傳遞。
根據不同的使用場景,開發者可以選擇阻塞式 API 用於同步工作流程,非同步 API 用於事件驅動型應用,或響應式 API 用於串流處理。這種靈活性使得 HiveMQ MQTT 用戶端適用於各種 Java 應用。
與往常一樣,完整的原始程式碼可在 GitHub 上找到。