消費者確認和發布者透過 RabbitMQ 進行確認
1. 概述
在本教程中,我們將學習如何確保透過發布者確認將訊息發佈到 RabbitMQ 代理程式。然後,我們將了解如何告訴代理我們成功使用了帶有消費者確認的訊息。
2. 場景
在簡單的應用程式中,我們在使用 RabbitMQ 時經常忽略明確的確認機制,而是依賴基本的訊息發佈到佇列以及消費時的自動訊息確認。然而,儘管 RabbitMQ 具有強大的基礎設施,錯誤仍然可能發生,因此需要一種方法來仔細檢查訊息傳遞到代理並確認成功的訊息消費。這就是發布者確認和消費者確認發揮作用的地方,提供了安全網。
3.等待發布者確認
即使我們的應用程式沒有錯誤,發布的消息也可能會遺失。例如,它可能會由於不明的網路錯誤而在運輸過程中遺失。為了避免這種情況, AMQP 提供了事務語義來保證訊息不會遺失。然而,這需要付出巨大的代價。由於事務量很大,處理訊息的時間可能會顯著增加,尤其是在處理大量訊息時。
相反,我們將採用confirm
模式,儘管會帶來一些開銷,但它比事務更快。此模式指示客戶端和代理程式發起訊息計數。隨後,用戶端使用經紀人發回的交付標籤以及相應的編號來驗證此計數。此過程可確保訊息的安全存儲,以便隨後分發給消費者。
要進入confirm mode
,我們需要在channel
上呼叫一次:
channel.confirmSelect();
確認可能需要時間,特別是對於持久隊列,因為存在 IO 延遲。因此,RabbitMQ 非同步等待確認,但提供了在我們的應用程式中使用的同步方法:
-
Channel.waitForConfirms()
—
阻止執行,直到自上次呼叫以來的所有訊息均被代理 ACK(確認)或 NACK(拒絕)。 -
Channel.waitForConfirms(timeout) —
這與上面相同,但我們可以將等待限制為毫秒值。否則,我們將得到一個TimeoutException
。 -
Channel.waitForConfirmsOrDie() —
如果自上次呼叫以來有任何訊息已被 NACK,則此函數會引發異常。如果我們不能容忍任何訊息遺失,這很有用。 -
Channel.waitForConfirmsOrDie(timeout)
—
與上面相同,但有超時。
3.1.發布商設定
我們先從一個常規的類別來發布訊息開始。我們只會收到一個要連接的channel
和queue
:
class UuidPublisher {
private Channel channel;
private String queue;
public UuidPublisher(Channel channel, String queue) {
this.channel = channel;
this.queue = queue;
}
}
然後,我們將新增一個用於發布String
訊息的方法:
public void send(String message) throws IOException {
channel.basicPublish("", queue, null, message.getBytes());
}
當我們以這種方式發送訊息時,我們有在傳輸過程中丟失訊息的風險,因此讓我們添加一些代碼以確保代理安全地接收我們的訊息。
3.2.在頻道上啟動確認模式
我們將首先修改建構函式以在最後呼叫channel
上的confirmSelect()
。這是必要的,因此我們可以在channel
上使用“等待”方法:
public UuidPublisher(Channel channel, String queue) throws IOException {
// ...
this.channel.confirmSelect();
}
如果我們嘗試等待確認而不進入confirm
模式,我們將得到一個IllegalStateException
。然後,我們將選擇同步wait()
方法之一,並在使用send()
方法發布訊息後呼叫它。讓我們等待超時,這樣我們就可以確保我們永遠不會等待:
public boolean send(String message) throws Exception {
channel.basicPublish("", queue, null, message.getBytes());
return channel.waitForConfirms(1000);
}
傳回true
意味著代理程式成功接收到訊息。如果我們要發送幾條訊息,這很有效。
3.3.大量確認已發布的消息
由於確認訊息需要時間,因此我們不應在每次發布後等待確認。相反,我們應該在等待確認之前發送一堆。讓我們修改我們的方法以接收訊息列表,並僅在發送所有訊息後等待:
public void sendAllOrDie(List<String> messages) throws Exception {
for (String message : messages) {
channel.basicPublish("", queue, null, message.getBytes());
}
channel.waitForConfirmsOrDie(1000);
}
這次,我們使用waitForConfirmsOrDie()
因為waitForConfirms()
的false
回傳意味著代理 NACK 了未知數量的訊息。雖然這確保瞭如果任何訊息被 NACK 處理,我們都會收到異常,但我們無法判斷哪一則訊息失敗了。
4.利用確認模式保證批量發布
當使用確認模式時,也可以在我們的channel
上註冊一個ConfirmListener
。此偵聽器採用兩個回呼處理程序:一個用於成功交付,另一個用於代理失敗。這樣,我們就可以實現一種機制來確保不會留下任何訊息。我們將從一個將此偵聽器新增至我們的channel
的方法開始:
private void createConfirmListener() {
this.channel.addConfirmListener(
(tag, multiple) -> {
// ...
},
(tag, multiple) -> {
// ...
}
);
}
在回呼中, tag
參數是指訊息的順序傳遞標籤,而multiple
表示是否確認各個訊息。在這種情況下, tag
參數將指向最新確認的標籤。相反,如果最後一個回調是 NACK,則所有傳遞標記大於最新 NACK 回調tag
的訊息也會得到確認。
為了協調這些回調,我們將在 ConcurrentSkipListMap 中保留未確認的訊息。我們將把待處理的訊息放在那裡,使用其標籤號碼作為鍵。這樣,我們可以呼叫headMap()
並取得所有先前的訊息的視圖,直到我們現在收到的tag
:
private ConcurrentNavigableMap<Long, PendingMessage> pendingDelivery = new ConcurrentSkipListMap<>();
已確認訊息的回調將從我們的地圖中刪除最多tag
的所有訊息:
(tag, multiple) -> {
ConcurrentNavigableMap<Long, PendingMessage> confirmed = pendingDelivery.headMap(tag, true);
confirmed.clear();
}
如果multiple
為false
,則headMap()
將包含單一項目,否則將包含多個項目。因此,我們不需要檢查是否收到多個訊息的確認。
4.1.實作拒絕訊息重試機制
我們將為被拒絕的訊息的回調實現重試機制。此外,我們將設定最大重試次數,以避免永遠重試的情況。讓我們從一個類別開始,該類別將保存當前訊息的嘗試次數以及一個增加此計數器的簡單方法:
public class PendingMessage {
private int tries;
private String body;
public PendingMessage(String body) {
this.body = body;
}
public int incrementTries() {
return ++this.tries;
}
// standard getters
}
現在,讓我們用它來實現我們的回調。我們首先查看被拒絕的訊息,然後刪除任何超過最大嘗試次數的項目:
(tag, multiple) -> {
ConcurrentNavigableMap<Long, PendingMessage> failed = pendingDelivery.headMap(tag, true);
failed.values().removeIf(pending -> {
return pending.incrementTries() >= MAX_TRIES;
});
// ...
}
然後,如果仍有待處理的訊息,我們會再次發送它們。這次,如果我們的應用程式中發生意外錯誤,我們還將刪除該訊息:
if (!pendingDelivery.isEmpty()) {
pendingDelivery.values().removeIf(message -> {
try {
channel.basicPublish("", queue, null, message.getBody().getBytes());
return false;
} catch (IOException e) {
return true;
}
});
}
4.2.把它們放在一起
最後,我們可以建立一個新方法,批量發送訊息,但可以偵測被拒絕的訊息並嘗試再次發送它們。我們必須在我們的頻道上呼叫getNextPublishSeqNo()
來找出我們的訊息標籤:
public void sendOrRetry(List<String> messages) throws IOException {
createConfirmListener();
for (String message : messages) {
long tag = channel.getNextPublishSeqNo();
pendingDelivery.put(tag, new PendingMessage(message));
channel.basicPublish("", queue, null, message.getBytes());
}
}
我們在發布訊息之前創建監聽器;否則,我們將不會收到確認訊息。這將創建一個接收回調的循環,直到我們成功發送或重試所有訊息。
5. 發送消費者交付確認
在研究手動確認之前,讓我們先來看一個沒有手動確認的範例。使用自動確認時,一旦代理商將訊息傳送給消費者,訊息就被視為已成功傳遞。讓我們來看一個簡單的例子:
public class UuidConsumer {
private String queue;
private Channel channel;
// all-args constructor
public void consume() throws IOException {
channel.basicConsume(queue, true, (consumerTag, delivery) -> {
// processing...
}, cancelledTag -> {
// logging...
});
}
}
當透過autoAck
參數將true
傳遞給basicConsume()
時,自動確認已啟動。儘管快速且簡單,但這是不安全的,因為代理在我們處理訊息之前會丟棄該訊息。因此,最安全的選擇是停用它並在channel
上使用basickAck()
發送手動確認,確保訊息在退出佇列之前成功處理:
channel.basicConsume(queue, false, (consumerTag, delivery) -> {
long deliveryTag = delivery.getEnvelope().getDeliveryTag();
// processing...
channel.basicAck(deliveryTag, false);
}, cancelledTag -> {
// logging...
});
以最簡單的形式,我們在處理每個訊息後對其進行確認。我們使用收到的相同送貨標籤來確認消費。最重要的是,為了發出單獨的確認訊號,我們必須將false
傳遞給basicAck()
。這可能會很慢,所以讓我們看看如何改進它。
5.1.定義Channel
上的基本 QoS
通常,RabbitMQ 會在訊息可用時立即推播訊息。我們將在我們的頻道上設定基本的服務品質設定以避免這種情況。因此,讓我們在建構函式中包含一個batchSize
參數並將其傳遞給channel
上的basicQos()
,這樣就只預取了這個數量的消息:
public class UuidConsumer {
// ...
private int batchSize;
public UuidConsumer(Channel channel, String queue, int batchSize) throws IOException {
// ...
this.batchSize = batchSize;
channel.basicQos(batchSize);
}
}
這有助於在我們盡力處理的同時,讓其他消費者可以獲取訊息。
5.2.定義確認策略
我們可以透過在每次達到批次大小時發送一個 ACK 來提高效能,而不是向我們處理的每個訊息發送 ACK。為了更完整的場景,我們包含一個簡單的處理方法。如果我們可以將訊息解析為 UUID,我們將認為該訊息已處理:
private boolean process(String message) {
try {
UUID.fromString(message);
return true;
} catch (IllegalArgumentException e) {
return false;
}
}
現在,讓我們使用發送批量確認的基本框架來修改我們的consume()
方法:
channel.basicConsume(queue, false, (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
long deliveryTag = delivery.getEnvelope().getDeliveryTag();
if (!process(message)) {
// ...
} else if (deliveryTag % batchSize == 0) {
// ...
} else {
// ...
}
}
如果無法處理訊息,我們將 NACK 訊息,並檢查是否達到了 ACK 待處理訊息的批次大小。否則,我們將儲存待處理 ACK 的傳遞標記,以便在稍後的迭代中發送它。我們將其儲存在一個類別變數中:
private AtomicLong pendingTag = new AtomicLong();
5.3.拒絕訊息
如果我們不想要或無法處理訊息,我們會拒絕它們;當拒絕時,我們可以重新排隊。例如,如果我們超出容量並希望另一個消費者接受它而不是告訴代理丟棄它,則重新排隊很有用。為此我們有兩種方法:
-
channel.basicReject(deliveryTag, requeue)
—
拒絕單一訊息,可以選擇重新排隊或丟棄。 -
channel.basicNack(deliveryTag, multiple, requeue)
—
與上方相同,但可以選擇批次拒絕。將true
傳遞給multiple
將拒絕自上次 ACK 到目前傳遞標記的每條訊息。
由於我們要單獨拒絕郵件,因此我們將使用第一個選項。如果有待處理的 ACK,我們將發送它並重置變數。最後,我們拒絕該訊息:
if (!process(message, deliveryTag)) {
if (pendingTag.get() != 0) {
channel.basicAck(pendingTag.get(), true);
pendingTag.set(0);
}
channel.basicReject(deliveryTag, false);
}
5.4.大量確認訊息
由於交貨標籤是連續的,因此我們可以使用模運算子來檢查是否已達到批量大小。如果有,我們發送一個 ACK 並重置pendingTag
。這次,將true
傳遞給「 multiple”
參數是至關重要的,這樣代理就知道我們已經成功處理了所有訊息,包括目前的傳遞標籤:
else if (deliveryTag % batchSize == 0) {
channel.basicAck(deliveryTag, true);
pendingTag.set(0);
} else {
pendingTag.set(deliveryTag);
}
否則,我們只需設定pendingTag
以在另一個迭代中檢查它。此外,為相同標籤發送多個確認將導致 RabbitMQ 出現「 PRECONDITION_FAILED – unknown delivery tag
」錯誤。
需要注意的是,當發送帶有multiple
標誌的 ACK 時,我們必須考慮由於沒有更多訊息需要處理而永遠無法達到批量大小的情況。一種選擇是保留一個觀察程序線程,定期檢查是否有待發送的 ACK。
六,結論
在本文中,我們探討了 RabbitMQ 中發布者確認和消費者確認的功能,這對於確保分散式系統中的資料安全性和穩健性至關重要。
發布者確認使我們能夠驗證訊息是否成功傳輸到 RabbitMQ 代理,從而降低了訊息遺失的風險。消費者確認透過確認訊息消費來實現受控和彈性的訊息處理。
透過實際的程式碼範例,我們了解如何有效地實現這些功能,為建立可靠的訊息系統提供了基礎。
與往常一樣,原始碼可以在 GitHub 上取得。