Spring Cloud AWS SQS v3 中的消息確認
1. 概述
訊息確認是訊息傳遞系統中的標準機制,向訊息代理程式發出訊號,表示訊息已被接收,不應再次傳遞。在亞馬遜的SQS (簡單的佇列服務)中,確認是**透過刪除佇列中的消息來**執行。
在本教學中,我們將探討 Spring Cloud AWS SQS v3 提供的三種開箱即用的確認模式: ON_SUCCESS
、 MANUAL
和ALWAYS
。
我們將使用事件驅動的場景來說明我們的用例,並利用 Spring Cloud AWS SQS V3 介紹文章中的環境和測試設定。
2. 依賴關係
我們首先導入Spring Cloud AWS 物料清單,以確保pom.xml
中的所有依賴項彼此相容:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws</artifactId>
<version>3.1.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
我們還將新增 Core 和 SQS 啟動相依性:
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter-sqs</artifactId>
</dependency>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter</artifactId>
</dependency>
最後,我們將新增測試所需的依賴項,即LocalStack 和 TestContainers with JUnit 5、用於驗證非同步訊息消耗的等待庫以及用於處理斷言的 AssertJ:
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>localstack</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
3.建置本地測試環境
首先,我們將使用 Testcontainers 來設定 LocalStack 環境以進行本地測試:
@Testcontainers
public class BaseSqsLiveTest {
private static final String LOCAL_STACK_VERSION = "localstack/localstack:2.3.2";
@Container
static LocalStackContainer localStack = new LocalStackContainer(DockerImageName.parse(LOCAL_STACK_VERSION));
@DynamicPropertySource
static void overrideProperties(DynamicPropertyRegistry registry) {
registry.add("spring.cloud.aws.region.static", () -> localStack.getRegion());
registry.add("spring.cloud.aws.credentials.access-key", () -> localStack.getAccessKey());
registry.add("spring.cloud.aws.credentials.secret-key", () -> localStack.getSecretKey());
registry.add("spring.cloud.aws.sqs.endpoint", () -> localStack.getEndpointOverride(SQS)
.toString());
}
}
儘管此設定使測試變得簡單且可重複,但請注意,本教程中的程式碼也可用於直接針對 AWS。
4. 設定隊列名稱
預設情況下, Spring Cloud AWS SQS 自動建立任何@SqsListener
註解方法中指定的佇列。作為第一個設定步驟,我們將在我們的 application.yaml
檔:
events:
queues:
order-processing-retry-queue: order_processing_retry_queue
order-processing-async-queue: order_processing_async_queue
order-processing-no-retries-queue: order_processing_no_retries_queue
acknowledgment:
order-processing-no-retries-queue: ALWAYS
確認屬性ALWAYS
也會被我們的偵聽器之一使用。
我們也將一些productIds
加入到同一文件中,以便在我們的範例中使用:
product:
id:
smartphone: 123e4567-e89b-12d3-a456-426614174000
wireless-headphones: 123e4567-e89b-12d3-a456-426614174001
laptop: 123e4567-e89b-12d3-a456-426614174002
tablet: 123e4567-e89b-12d3-a456-426614174004
為了在我們的應用程式中將這些屬性作為 POJO 獲取,我們將創建兩個 @ ConfigurationProperties
類,其中一個用於佇列:
@ConfigurationProperties(prefix = "events.queues")
public class EventsQueuesProperties {
private String orderProcessingRetryQueue;
private String orderProcessingAsyncQueue;
private String orderProcessingNoRetriesQueue;
// getters and setters
}
另一個是產品:
@ConfigurationProperties("product.id")
public class ProductIdProperties {
private UUID smartphone;
private UUID wirelessHeadphones;
private UUID laptop;
// getters and setters
}
最後,我們使用@EnableConfigurationProperties
在@Configuration
類別中啟用配置屬性:
@EnableConfigurationProperties({ EventsQueuesProperties.class, ProductIdProperties.class})
@Configuration
public class OrderProcessingConfiguration {
}
5. 處理成功的確認
預設確認方式 對於@SqsListeners
是ON_SUCCESS
。在此模式下,如果偵聽器方法完成執行而沒有引發錯誤,則訊息將被確認。
為了說明這種行為,我們將建立一個簡單的偵聽器,它將接收 OrderCreatedEvent
,檢查一個 InventoryService
,和 ,如果請求的商品和數量有庫存,則將訂單狀態變更為 PROCESSED
。
5.1.創建服務
讓我們先創建我們的 OrderService
,它將負責更新訂單狀態:
@Service
public class OrderService {
Map<UUID, OrderStatus> ORDER_STATUS_STORAGE = new ConcurrentHashMap<>();
public void updateOrderStatus(UUID orderId, OrderStatus status) {
ORDER_STATUS_STORAGE.put(orderId, status);
}
public OrderStatus getOrderStatus(UUID orderId) {
return ORDER_STATUS_STORAGE.getOrDefault(orderId, OrderStatus.UNKNOWN);
}
}
然後,我們將建立InventoryService
。我們將使用Map
模擬存儲,並使用ProductIdProperties
填充它,它使用application.yaml
檔案中的值自動組裝:
@Service
public class InventoryService implements InitializingBean {
private ProductIdProperties productIdProperties;
private Map<UUID, Integer> inventory;
public InventoryService(ProductIdProperties productIdProperties) {
this.productIdProperties = productIdProperties;
}
@Override
public void afterPropertiesSet() {
this.inventory = new ConcurrentHashMap<>(Map.of(productIdProperties.getSmartphone(), 10,
productIdProperties.getWirelessHeadphones(), 15,
productIdProperties.getLaptop(), 5);
}
}
InitializingBean
介面提供afterPropertiesSet,
這是 Spring 在解決該 bean 的所有依賴項(在我們的範例中為ProductIdProperties
bean)之後呼叫的生命週期方法。
讓我們新增一個checkInventory
方法,該方法驗證庫存是否具有所要求的產品數量。如果產品不存在,它將拋出ProductNotFoundException
,如果產品存在但數量不足,它將拋出OutOfStockException
。在第二種情況下,我們還將模擬隨機補貨,以便在重試幾次後,處理最終會成功:
public void checkInventory(UUID productId, int quantity) {
Integer stock = inventory.get(productId);
if (stock < quantity) {
inventory.put(productId, stock + (int) (Math.random() * 5));
throw new OutOfStockException(
"Product with id %s is out of stock. Quantity requested: %s ".formatted(productId, quantity));
};
inventory.put(productId, stock - quantity);
}
5.2.建立監聽器
我們已經準備好創建我們的第一個監聽器了。我們將使用@Component
註解並透過Spring的建構子依賴注入機制注入服務:
@Component
public class OrderProcessingListeners {
private static final Logger logger = LoggerFactory.getLogger(OrderProcessingListeners.class);
private InventoryService inventoryService;
private OrderService orderService;
public OrderProcessingListeners(InventoryService inventoryService, OrderService orderService) {
this.inventoryService = inventoryService;
this.orderService = orderService;
}
}
接下來我們來寫監聽方法:
@SqsListener(value = "${events.queues.order-processing-retry-queue}", id = "retry-order-processing-container", messageVisibilitySeconds = "1")
public void stockCheckRetry(OrderCreatedEvent orderCreatedEvent) {
logger.info("Message received: {}", orderCreatedEvent);
orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.PROCESSING);
inventoryService.checkInventory(orderCreatedEvent.productId(), orderCreatedEvent.quantity());
orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.PROCESSED);
logger.info("Message processed successfully: {}", orderCreatedEvent);
}
value value
是透過 application.yaml 自動組裝的佇列名稱**application.yaml.**
由於ON_SUCCESS
是預設的確認模式,因此我們不需要在註解中指定它。
5.3.設定測試類
為了斷言邏輯按預期工作,讓我們建立一個測試類別:
@SpringBootTest
class OrderProcessingApplicationLiveTest extends BaseSqsLiveTest {
private static final Logger logger = LoggerFactory.getLogger(OrderProcessingApplicationLiveTest.class);
@Autowired
private EventsQueuesProperties eventsQueuesProperties;
@Autowired
private ProductIdProperties productIdProperties;
@Autowired
private SqsTemplate sqsTemplate;
@Autowired
private OrderService orderService;
@Autowired
private MessageListenerContainerRegistry registry;
}
我們也會加入一個名為assertQueueIsEmpty
的方法。在其中,我們將使用自動組裝的MessageListenerContainerRegistry
來取得容器,然後停止容器以確保它沒有消耗任何訊息。註冊表包含@SqsListener
註解所建立的所有容器:
private void assertQueueIsEmpty(String queueName, String containerId) {
logger.info("Stopping container {}", containerId);
var container = Objects
.requireNonNull(registry.getContainerById(containerId), () -> "could not find container " + containerId);
container.stop();
// ...
}
容器停止後,我們將使用SqsTemplate
在佇列中尋找訊息。如果確認成功,則不應傳回任何訊息。我們也將pollTimeout
設定為大於可見性逾時的值,這樣,如果訊息尚未刪除,它將在指定的時間間隔內再次傳遞。
這是assertQueueIsEmpty
方法的延續:
// ...
logger.info("Checking for messages in queue {}", queueName);
var message = sqsTemplate.receive(from -> from.queue(queueName)
.pollTimeout(Duration.ofSeconds(5)));
assertThat(message).isEmpty();
logger.info("No messages found in queue {}", queueName);
5.4.測試
在第一個測試中,我們將向隊列發送一個OrderCreatedEvent
,其中包含數量大於庫存的產品Order
。當異常通過偵聽器方法時,它將向框架發出信號,表示訊息處理失敗,並且應在message visibility
時間視窗過去後再次傳遞訊息。
為了加快測試速度,我們在註解中將messageVisibilitySeconds
設為 1,但通常此配置是在佇列本身中完成的,預設為 30 秒。
我們將建立事件並使用 Spring Cloud AWS 提供的自動配置的SqsTemplate
發送它。然後,我們將使用Awaitility
等待訂單狀態變更為PROCESSED
,最後,我們將斷言佇列為空,這表示確認成功:
@Test
public void givenOnSuccessAcknowledgementMode_whenProcessingThrows_shouldRetry() {
var orderId = UUID.randomUUID();
var queueName = eventsQueuesProperties.getOrderProcessingRetryQueue();
sqsTemplate.send(queueName, new OrderCreatedEvent(orderId, productIdProperties.getLaptop(), 10));
Awaitility.await()
.atMost(Duration.ofMinutes(1))
.until(() -> orderService.getOrderStatus(orderId)
.equals(OrderStatus.PROCESSED));
assertQueueIsEmpty(queueName, "retry-order-processing-container");
}
請注意,我們將**@SqsListener
註解中指定的containerId
**傳遞給assertQueueIsEmpty
方法。
現在我們可以運行測試了。首先,我們將確保 Docker 正在運行,然後我們將執行測試。在容器初始化日誌之後,我們應該會看到應用程式的日誌訊息:
Message received: OrderCreatedEvent[id=83f27bf2-1bd4-460a-9006-d784ec7eff47, productId=123e4567-e89b-12d3-a456-426614174002, quantity=10]
然後,應該會看到由於缺貨而導致的一個或多個故障:
Caused by: com.baeldung.spring.cloud.aws.sqs.acknowledgement.exception.OutOfStockException: Product with id 123e4567-e89b-12d3-a456-426614174002 is out of stock. Quantity requested: 10
而且,由於我們添加了補充邏輯,我們最終應該看到訊息處理成功:
Message processed successfully: OrderCreatedEvent[id=83f27bf2-1bd4-460a-9006-d784ec7eff47, productId=123e4567-e89b-12d3-a456-426614174002, quantity=10]
最後,我們將確保確認已成功:
INFO 2699 --- [main] asaOrderProcessingApplicationLiveTest : Stopping container retry-order-processing-container
INFO 2699 --- [main] acslAbstractMessageListenerContainer : Container retry-order-processing-container stopped
INFO 2699 --- [main] asaOrderProcessingApplicationLiveTest : Checking for messages in queue order_processing_retry_queue
INFO 2699 --- [main] asaOrderProcessingApplicationLiveTest : No messages found in queue order_processing_retry_queue
請注意,測試完成後可能會引發「連線被拒絕」錯誤 - 這是因為 Docker 容器在框架停止輪詢訊息之前停止。我們可以安全地忽略這些錯誤。
6. 手動確認
該框架支援手動確認訊息,這對於我們需要更好地控制確認過程的場景非常有用。
6.1.建立監聽器
為了說明這一點,我們將建立一個非同步場景,其中InventoryService
的連線速度較慢,並且我們希望在其完成之前釋放偵聽器執行緒:
@SqsListener(value = "${events.queues.order-processing-async-queue}", acknowledgementMode = SqsListenerAcknowledgementMode.MANUAL, id = "async-order-processing-container", messageVisibilitySeconds = "3")
public void slowStockCheckAsynchronous(OrderCreatedEvent orderCreatedEvent, Acknowledgement acknowledgement) {
orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.PROCESSING);
CompletableFuture.runAsync(() -> inventoryService.slowCheckInventory(orderCreatedEvent.productId(), orderCreatedEvent.quantity()))
.thenRun(() -> orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.PROCESSED))
.thenCompose(voidFuture -> acknowledgement.acknowledgeAsync())
.thenRun(() -> logger.info("Message for order {} acknowledged", orderCreatedEvent.id()));
logger.info("Releasing processing thread.");
}
在此邏輯中,我們使用 Java 的CompletableFuture
非同步執行庫存檢查。我們將Acknowledge
物件加入到偵聽器方法,並將SqsListenerAcknowledgementMode.MANUAL
新增至註解的acknowledgementMode
屬性。該屬性是一個String
,接受屬性佔位符和 SpEL。只有當我們將AcknowledgementMode
設定為MANUAL
時, Acknowledgement
物件**才可**用。
請注意,在本例中,我們利用 Spring Boot 自動配置(它提供了合理的預設值)和@SqsListener
註解屬性來在確認模式之間進行變更。另一種方法是聲明一個SqsMessageListenerContainerFactory
bean,它允許設定更複雜的配置。
6.2.類比慢速連接
現在,讓我們將slowCheckInventory
方法加入到InventoryService
類別中,使用Thread.sleep
模擬慢速連線:
public void slowCheckInventory(UUID productId, int quantity) {
simulateBusyConnection();
checkInventory(productId, quantity);
}
private void simulateBusyConnection() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
6.3.測試
接下來,讓我們來寫測試:
@Test
public void givenManualAcknowledgementMode_whenManuallyAcknowledge_shouldAcknowledge() {
var orderId = UUID.randomUUID();
var queueName = eventsQueuesProperties.getOrderProcessingAsyncQueue();
sqsTemplate.send(queueName, new OrderCreatedEvent(orderId, productIdProperties.getSmartphone(), 1));
Awaitility.await()
.atMost(Duration.ofMinutes(1))
.until(() -> orderService.getOrderStatus(orderId)
.equals(OrderStatus.PROCESSED));
assertQueueIsEmpty(queueName, "async-order-processing-container");
}
這次,我們請求庫存中的可用數量,因此我們應該不會看到拋出任何錯誤。
執行測試時,我們將看到一條日誌訊息,表示已收到訊息:
INFO 2786 --- [ing-container-1] cbscasalOrderProcessingListeners : Message received: OrderCreatedEvent[id=013740a3-0a45-478a-b085-fbd634fbe66d, productId=123e4567-e89b-12d3-a456-426614174000, quantity=1]
然後,我們會看到線程釋放訊息:
INFO 2786 --- [ing-container-1] cbscasalOrderProcessingListeners : Releasing processing thread.
這是因為我們正在非同步處理和確認訊息。大約兩秒鐘後,我們應該會看到訊息已被確認的日誌:
INFO 2786 --- [onPool-worker-1] cbscasalOrderProcessingListeners : Message for order 013740a3-0a45-478a-b085-fbd634fbe66d acknowledged
最後,我們將看到停止容器並斷言隊列為空的日誌:
INFO 2786 --- [main] asaOrderProcessingApplicationLiveTest : Stopping container async-order-processing-container
INFO 2786 --- [main] acslAbstractMessageListenerContainer : Container async-order-processing-container stopped
INFO 2786 --- [main] asaOrderProcessingApplicationLiveTest : Checking for messages in queue order_processing_async_queue
INFO 2786 --- [main] asaOrderProcessingApplicationLiveTest : No messages found in queue order_processing_async_queue
7. 成功和錯誤的確認
我們將探討的最後一個確認模式是ALWAYS,
這會導致框架確認訊息,無論偵聽器方法是否拋出錯誤。
7.1.建立監聽器
讓我們模擬一個銷售活動,在此期間我們的庫存有限,無論發生任何故障,我們都不想重新處理任何訊息。我們將使用我們先前在application.yml
中定義的屬性將確認模式設為ALWAYS
:
@SqsListener(value = "${events.queues.order-processing-no-retries-queue}", acknowledgementMode = ${events.acknowledgment.order-processing-no-retries-queue}, id = "no-retries-order-processing-container", messageVisibilitySeconds = "3")
public void stockCheckNoRetries(OrderCreatedEvent orderCreatedEvent) {
logger.info("Message received: {}", orderCreatedEvent);
orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.RECEIVED);
inventoryService.checkInventory(orderCreatedEvent.productId(), orderCreatedEvent.quantity());
logger.info("Message processed: {}", orderCreatedEvent);
}
在測試中,我們將建立一個數量大於庫存的訂單:
7.2.測試
@Test
public void givenAlwaysAcknowledgementMode_whenProcessThrows_shouldAcknowledge() {
var orderId = UUID.randomUUID();
var queueName = eventsQueuesProperties.getOrderProcessingNoRetriesQueue();
sqsTemplate.send(queueName, new OrderCreatedEvent(orderId, productIdProperties.getWirelessHeadphones(), 20));
Awaitility.await()
.atMost(Duration.ofMinutes(1))
.until(() -> orderService.getOrderStatus(orderId)
.equals(OrderStatus.RECEIVED));
assertQueueIsEmpty(queueName, "no-retries-order-processing-container");
}
現在,即使拋出OutOfStockException
,訊息也會被確認,並且不會嘗試重試該訊息:
Message received: OrderCreatedEvent[id=7587f1a2-328f-4791-8559-ee8e85b25259, productId=123e4567-e89b-12d3-a456-426614174001, quantity=20]
Caused by: com.baeldung.spring.cloud.aws.sqs.acknowledgement.exception.OutOfStockException: Product with id 123e4567-e89b-12d3-a456-426614174001 is out of stock. Quantity requested: 20
INFO 2835 --- [main] asaOrderProcessingApplicationLiveTest : Stopping container no-retries-order-processing-container
INFO 2835 --- [main] acslAbstractMessageListenerContainer : Container no-retries-order-processing-container stopped
INFO 2835 --- [main] asaOrderProcessingApplicationLiveTest : Checking for messages in queue order_processing_no_retries_queue
INFO 2835 --- [main] asaOrderProcessingApplicationLiveTest : No messages found in queue order_processing_no_retries_queue
八、結論
在本文中,我們使用事件驅動的場景來展示 Spring Cloud AWS v3 SQS 整合提供的三種確認模式: ON_SUCCESS
(預設)、 MANUAL
和ALWAYS
。
我們利用自動配置的設定並使用@SqsListener
註解屬性在模式之間切換。我們還使用 Testcontainers 和 LocalStack 創建了即時測試來斷言行為。
與往常一樣,本文中使用的完整程式碼可用 在 GitHub 上。與往常一樣,本文中使用的完整程式碼可用 在 GitHub 上。