Spring Cloud AWS SQS v3 中的消息確認
1. 概述
訊息確認是訊息傳遞系統中的標準機制,向訊息代理程式發出訊號,表示訊息已被接收,不應再次傳遞。在亞馬遜的SQS (簡單的佇列服務)中,確認是**透過刪除佇列中的消息來**執行。
在本教學中,我們將探討 Spring Cloud AWS SQS v3 提供的三種開箱即用的確認模式: ON_SUCCESS 、 MANUAL和ALWAYS 。
我們將使用事件驅動的場景來說明我們的用例,並利用 Spring Cloud AWS SQS V3 介紹文章中的環境和測試設定。
2. 依賴關係
我們首先導入Spring Cloud AWS 物料清單,以確保pom.xml中的所有依賴項彼此相容:
<dependencyManagement>
 <dependencies>
 <dependency>
 <groupId>io.awspring.cloud</groupId>
 <artifactId>spring-cloud-aws</artifactId>
 <version>3.1.0</version>
 <type>pom</type>
 <scope>import</scope>
 </dependency>
 </dependencies>
 </dependencyManagement>
我們還將新增 Core 和 SQS 啟動相依性:
<dependency>
 <groupId>io.awspring.cloud</groupId>
 <artifactId>spring-cloud-aws-starter-sqs</artifactId>
 </dependency>
 <dependency>
 <groupId>io.awspring.cloud</groupId>
 <artifactId>spring-cloud-aws-starter</artifactId>
 </dependency>
最後,我們將新增測試所需的依賴項,即LocalStack 和 TestContainers with JUnit 5、用於驗證非同步訊息消耗的等待庫以及用於處理斷言的 AssertJ:
<dependency>
 <groupId>org.testcontainers</groupId>
 <artifactId>localstack</artifactId>
 <scope>test</scope>
 </dependency>
 <dependency>
 <groupId>org.testcontainers</groupId>
 <artifactId>junit-jupiter</artifactId>
 <scope>test</scope>
 </dependency>
 <dependency>
 <groupId>org.awaitility</groupId>
 <artifactId>awaitility</artifactId>
 <scope>test</scope>
 </dependency>
 <dependency>
 <groupId>org.assertj</groupId>
 <artifactId>assertj-core</artifactId>
 <scope>test</scope>
 </dependency>
3.建置本地測試環境
首先,我們將使用 Testcontainers 來設定 LocalStack 環境以進行本地測試:
@Testcontainers
 public class BaseSqsLiveTest {
 private static final String LOCAL_STACK_VERSION = "localstack/localstack:2.3.2";
 @Container
 static LocalStackContainer localStack = new LocalStackContainer(DockerImageName.parse(LOCAL_STACK_VERSION));
 @DynamicPropertySource
 static void overrideProperties(DynamicPropertyRegistry registry) {
 registry.add("spring.cloud.aws.region.static", () -> localStack.getRegion());
 registry.add("spring.cloud.aws.credentials.access-key", () -> localStack.getAccessKey());
 registry.add("spring.cloud.aws.credentials.secret-key", () -> localStack.getSecretKey());
 registry.add("spring.cloud.aws.sqs.endpoint", () -> localStack.getEndpointOverride(SQS)
 .toString());
 }
 }
儘管此設定使測試變得簡單且可重複,但請注意,本教程中的程式碼也可用於直接針對 AWS。
4. 設定隊列名稱
預設情況下, Spring Cloud AWS SQS 自動建立任何@SqsListener註解方法中指定的佇列。作為第一個設定步驟,我們將在我們的 application.yaml檔:
events:
 queues:
 order-processing-retry-queue: order_processing_retry_queue
 order-processing-async-queue: order_processing_async_queue
 order-processing-no-retries-queue: order_processing_no_retries_queue
 acknowledgment:
 order-processing-no-retries-queue: ALWAYS
確認屬性ALWAYS也會被我們的偵聽器之一使用。
我們也將一些productIds加入到同一文件中,以便在我們的範例中使用:
product:
 id:
 smartphone: 123e4567-e89b-12d3-a456-426614174000
 wireless-headphones: 123e4567-e89b-12d3-a456-426614174001
 laptop: 123e4567-e89b-12d3-a456-426614174002
 tablet: 123e4567-e89b-12d3-a456-426614174004
為了在我們的應用程式中將這些屬性作為 POJO 獲取,我們將創建兩個 @ ConfigurationProperties類,其中一個用於佇列:
@ConfigurationProperties(prefix = "events.queues")
 public class EventsQueuesProperties {
 private String orderProcessingRetryQueue;
 private String orderProcessingAsyncQueue;
 private String orderProcessingNoRetriesQueue;
 // getters and setters
 }
另一個是產品:
@ConfigurationProperties("product.id")
 public class ProductIdProperties {
 private UUID smartphone;
 private UUID wirelessHeadphones;
 private UUID laptop;
 // getters and setters
 }
最後,我們使用@EnableConfigurationProperties在@Configuration類別中啟用配置屬性:
@EnableConfigurationProperties({ EventsQueuesProperties.class, ProductIdProperties.class})
 @Configuration
 public class OrderProcessingConfiguration {
 }
5. 處理成功的確認
預設確認方式 對於@SqsListeners是ON_SUCCESS 。在此模式下,如果偵聽器方法完成執行而沒有引發錯誤,則訊息將被確認。
為了說明這種行為,我們將建立一個簡單的偵聽器,它將接收 OrderCreatedEvent ,檢查一個 InventoryService ,和 ,如果請求的商品和數量有庫存,則將訂單狀態變更為 PROCESSED 。
5.1.創建服務
讓我們先創建我們的 OrderService ,它將負責更新訂單狀態:
@Service
 public class OrderService {
 Map<UUID, OrderStatus> ORDER_STATUS_STORAGE = new ConcurrentHashMap<>();
 public void updateOrderStatus(UUID orderId, OrderStatus status) {
 ORDER_STATUS_STORAGE.put(orderId, status);
 }
 public OrderStatus getOrderStatus(UUID orderId) {
 return ORDER_STATUS_STORAGE.getOrDefault(orderId, OrderStatus.UNKNOWN);
 }
 }
然後,我們將建立InventoryService 。我們將使用Map模擬存儲,並使用ProductIdProperties填充它,它使用application.yaml檔案中的值自動組裝:
@Service
 public class InventoryService implements InitializingBean {
 private ProductIdProperties productIdProperties;
 private Map<UUID, Integer> inventory;
 public InventoryService(ProductIdProperties productIdProperties) {
 this.productIdProperties = productIdProperties;
 }
 @Override
 public void afterPropertiesSet() {
 this.inventory = new ConcurrentHashMap<>(Map.of(productIdProperties.getSmartphone(), 10,
 productIdProperties.getWirelessHeadphones(), 15,
 productIdProperties.getLaptop(), 5);
 }
 }
InitializingBean介面提供afterPropertiesSet,這是 Spring 在解決該 bean 的所有依賴項(在我們的範例中為ProductIdProperties bean)之後呼叫的生命週期方法。
讓我們新增一個checkInventory方法,該方法驗證庫存是否具有所要求的產品數量。如果產品不存在,它將拋出ProductNotFoundException ,如果產品存在但數量不足,它將拋出OutOfStockException 。在第二種情況下,我們還將模擬隨機補貨,以便在重試幾次後,處理最終會成功:
public void checkInventory(UUID productId, int quantity) {
 Integer stock = inventory.get(productId);
 if (stock < quantity) {
 inventory.put(productId, stock + (int) (Math.random() * 5));
 throw new OutOfStockException(
 "Product with id %s is out of stock. Quantity requested: %s ".formatted(productId, quantity));
 };
 inventory.put(productId, stock - quantity);
 }
5.2.建立監聽器
我們已經準備好創建我們的第一個監聽器了。我們將使用@Component註解並透過Spring的建構子依賴注入機制注入服務:
@Component
 public class OrderProcessingListeners {
 private static final Logger logger = LoggerFactory.getLogger(OrderProcessingListeners.class);
 private InventoryService inventoryService;
 private OrderService orderService;
 public OrderProcessingListeners(InventoryService inventoryService, OrderService orderService) {
 this.inventoryService = inventoryService;
 this.orderService = orderService;
 }
 }
接下來我們來寫監聽方法:
@SqsListener(value = "${events.queues.order-processing-retry-queue}", id = "retry-order-processing-container", messageVisibilitySeconds = "1")
 public void stockCheckRetry(OrderCreatedEvent orderCreatedEvent) {
 logger.info("Message received: {}", orderCreatedEvent);
 orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.PROCESSING);
 inventoryService.checkInventory(orderCreatedEvent.productId(), orderCreatedEvent.quantity());
 orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.PROCESSED);
 logger.info("Message processed successfully: {}", orderCreatedEvent);
 }
value value是透過 application.yaml 自動組裝的佇列名稱**application.yaml.**由於ON_SUCCESS是預設的確認模式,因此我們不需要在註解中指定它。
5.3.設定測試類
為了斷言邏輯按預期工作,讓我們建立一個測試類別:
@SpringBootTest
 class OrderProcessingApplicationLiveTest extends BaseSqsLiveTest {
 private static final Logger logger = LoggerFactory.getLogger(OrderProcessingApplicationLiveTest.class);
 @Autowired
 private EventsQueuesProperties eventsQueuesProperties;
 @Autowired
 private ProductIdProperties productIdProperties;
 @Autowired
 private SqsTemplate sqsTemplate;
 @Autowired
 private OrderService orderService;
 @Autowired
 private MessageListenerContainerRegistry registry;
 }
我們也會加入一個名為assertQueueIsEmpty的方法。在其中,我們將使用自動組裝的MessageListenerContainerRegistry來取得容器,然後停止容器以確保它沒有消耗任何訊息。註冊表包含@SqsListener註解所建立的所有容器:
private void assertQueueIsEmpty(String queueName, String containerId) {
 logger.info("Stopping container {}", containerId);
 var container = Objects
 .requireNonNull(registry.getContainerById(containerId), () -> "could not find container " + containerId);
 container.stop();
 // ...
 }
容器停止後,我們將使用SqsTemplate在佇列中尋找訊息。如果確認成功,則不應傳回任何訊息。我們也將pollTimeout設定為大於可見性逾時的值,這樣,如果訊息尚未刪除,它將在指定的時間間隔內再次傳遞。
這是assertQueueIsEmpty方法的延續:
// ...
 logger.info("Checking for messages in queue {}", queueName);
 var message = sqsTemplate.receive(from -> from.queue(queueName)
 .pollTimeout(Duration.ofSeconds(5)));
 assertThat(message).isEmpty();
 logger.info("No messages found in queue {}", queueName);
5.4.測試
在第一個測試中,我們將向隊列發送一個OrderCreatedEvent ,其中包含數量大於庫存的產品Order 。當異常通過偵聽器方法時,它將向框架發出信號,表示訊息處理失敗,並且應在message visibility時間視窗過去後再次傳遞訊息。
為了加快測試速度,我們在註解中將messageVisibilitySeconds設為 1,但通常此配置是在佇列本身中完成的,預設為 30 秒。
我們將建立事件並使用 Spring Cloud AWS 提供的自動配置的SqsTemplate發送它。然後,我們將使用Awaitility等待訂單狀態變更為PROCESSED ,最後,我們將斷言佇列為空,這表示確認成功:
@Test
 public void givenOnSuccessAcknowledgementMode_whenProcessingThrows_shouldRetry() {
 var orderId = UUID.randomUUID();
 var queueName = eventsQueuesProperties.getOrderProcessingRetryQueue();
 sqsTemplate.send(queueName, new OrderCreatedEvent(orderId, productIdProperties.getLaptop(), 10));
 Awaitility.await()
 .atMost(Duration.ofMinutes(1))
 .until(() -> orderService.getOrderStatus(orderId)
 .equals(OrderStatus.PROCESSED));
 assertQueueIsEmpty(queueName, "retry-order-processing-container");
 }
請注意,我們將**@SqsListener註解中指定的containerId**傳遞給assertQueueIsEmpty方法。
現在我們可以運行測試了。首先,我們將確保 Docker 正在運行,然後我們將執行測試。在容器初始化日誌之後,我們應該會看到應用程式的日誌訊息:
Message received: OrderCreatedEvent[id=83f27bf2-1bd4-460a-9006-d784ec7eff47, productId=123e4567-e89b-12d3-a456-426614174002, quantity=10]
然後,應該會看到由於缺貨而導致的一個或多個故障:
Caused by: com.baeldung.spring.cloud.aws.sqs.acknowledgement.exception.OutOfStockException: Product with id 123e4567-e89b-12d3-a456-426614174002 is out of stock. Quantity requested: 10
而且,由於我們添加了補充邏輯,我們最終應該看到訊息處理成功:
Message processed successfully: OrderCreatedEvent[id=83f27bf2-1bd4-460a-9006-d784ec7eff47, productId=123e4567-e89b-12d3-a456-426614174002, quantity=10]
最後,我們將確保確認已成功:
INFO 2699 --- [main] asaOrderProcessingApplicationLiveTest : Stopping container retry-order-processing-container
 INFO 2699 --- [main] acslAbstractMessageListenerContainer : Container retry-order-processing-container stopped
 INFO 2699 --- [main] asaOrderProcessingApplicationLiveTest : Checking for messages in queue order_processing_retry_queue
 INFO 2699 --- [main] asaOrderProcessingApplicationLiveTest : No messages found in queue order_processing_retry_queue
請注意,測試完成後可能會引發「連線被拒絕」錯誤 - 這是因為 Docker 容器在框架停止輪詢訊息之前停止。我們可以安全地忽略這些錯誤。
6. 手動確認
該框架支援手動確認訊息,這對於我們需要更好地控制確認過程的場景非常有用。
6.1.建立監聽器
為了說明這一點,我們將建立一個非同步場景,其中InventoryService的連線速度較慢,並且我們希望在其完成之前釋放偵聽器執行緒:
@SqsListener(value = "${events.queues.order-processing-async-queue}", acknowledgementMode = SqsListenerAcknowledgementMode.MANUAL, id = "async-order-processing-container", messageVisibilitySeconds = "3")
 public void slowStockCheckAsynchronous(OrderCreatedEvent orderCreatedEvent, Acknowledgement acknowledgement) {
 orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.PROCESSING);
 CompletableFuture.runAsync(() -> inventoryService.slowCheckInventory(orderCreatedEvent.productId(), orderCreatedEvent.quantity()))
 .thenRun(() -> orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.PROCESSED))
 .thenCompose(voidFuture -> acknowledgement.acknowledgeAsync())
 .thenRun(() -> logger.info("Message for order {} acknowledged", orderCreatedEvent.id()));
 logger.info("Releasing processing thread.");
 }
在此邏輯中,我們使用 Java 的CompletableFuture非同步執行庫存檢查。我們將Acknowledge物件加入到偵聽器方法,並將SqsListenerAcknowledgementMode.MANUAL新增至註解的acknowledgementMode屬性。該屬性是一個String ,接受屬性佔位符和 SpEL。只有當我們將AcknowledgementMode設定為MANUAL時, Acknowledgement物件**才可**用。
請注意,在本例中,我們利用 Spring Boot 自動配置(它提供了合理的預設值)和@SqsListener註解屬性來在確認模式之間進行變更。另一種方法是聲明一個SqsMessageListenerContainerFactory bean,它允許設定更複雜的配置。
6.2.類比慢速連接
現在,讓我們將slowCheckInventory方法加入到InventoryService類別中,使用Thread.sleep模擬慢速連線:
public void slowCheckInventory(UUID productId, int quantity) {
 simulateBusyConnection();
 checkInventory(productId, quantity);
 }
 private void simulateBusyConnection() {
 try {
 Thread.sleep(2000);
 } catch (InterruptedException e) {
 Thread.currentThread().interrupt();
 throw new RuntimeException(e);
 }
 }
6.3.測試
接下來,讓我們來寫測試:
@Test
 public void givenManualAcknowledgementMode_whenManuallyAcknowledge_shouldAcknowledge() {
 var orderId = UUID.randomUUID();
 var queueName = eventsQueuesProperties.getOrderProcessingAsyncQueue();
 sqsTemplate.send(queueName, new OrderCreatedEvent(orderId, productIdProperties.getSmartphone(), 1));
 Awaitility.await()
 .atMost(Duration.ofMinutes(1))
 .until(() -> orderService.getOrderStatus(orderId)
 .equals(OrderStatus.PROCESSED));
 assertQueueIsEmpty(queueName, "async-order-processing-container");
 }
這次,我們請求庫存中的可用數量,因此我們應該不會看到拋出任何錯誤。
執行測試時,我們將看到一條日誌訊息,表示已收到訊息:
INFO 2786 --- [ing-container-1] cbscasalOrderProcessingListeners : Message received: OrderCreatedEvent[id=013740a3-0a45-478a-b085-fbd634fbe66d, productId=123e4567-e89b-12d3-a456-426614174000, quantity=1]
然後,我們會看到線程釋放訊息:
INFO 2786 --- [ing-container-1] cbscasalOrderProcessingListeners : Releasing processing thread.
這是因為我們正在非同步處理和確認訊息。大約兩秒鐘後,我們應該會看到訊息已被確認的日誌:
INFO 2786 --- [onPool-worker-1] cbscasalOrderProcessingListeners : Message for order 013740a3-0a45-478a-b085-fbd634fbe66d acknowledged
最後,我們將看到停止容器並斷言隊列為空的日誌:
INFO 2786 --- [main] asaOrderProcessingApplicationLiveTest : Stopping container async-order-processing-container
 INFO 2786 --- [main] acslAbstractMessageListenerContainer : Container async-order-processing-container stopped
 INFO 2786 --- [main] asaOrderProcessingApplicationLiveTest : Checking for messages in queue order_processing_async_queue
 INFO 2786 --- [main] asaOrderProcessingApplicationLiveTest : No messages found in queue order_processing_async_queue
7. 成功和錯誤的確認
我們將探討的最後一個確認模式是ALWAYS,這會導致框架確認訊息,無論偵聽器方法是否拋出錯誤。
7.1.建立監聽器
讓我們模擬一個銷售活動,在此期間我們的庫存有限,無論發生任何故障,我們都不想重新處理任何訊息。我們將使用我們先前在application.yml中定義的屬性將確認模式設為ALWAYS :
@SqsListener(value = "${events.queues.order-processing-no-retries-queue}", acknowledgementMode = ${events.acknowledgment.order-processing-no-retries-queue}, id = "no-retries-order-processing-container", messageVisibilitySeconds = "3")
 public void stockCheckNoRetries(OrderCreatedEvent orderCreatedEvent) {
 logger.info("Message received: {}", orderCreatedEvent);
 orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.RECEIVED);
 inventoryService.checkInventory(orderCreatedEvent.productId(), orderCreatedEvent.quantity());
 logger.info("Message processed: {}", orderCreatedEvent);
 }
在測試中,我們將建立一個數量大於庫存的訂單:
7.2.測試
@Test
 public void givenAlwaysAcknowledgementMode_whenProcessThrows_shouldAcknowledge() {
 var orderId = UUID.randomUUID();
 var queueName = eventsQueuesProperties.getOrderProcessingNoRetriesQueue();
 sqsTemplate.send(queueName, new OrderCreatedEvent(orderId, productIdProperties.getWirelessHeadphones(), 20));
 Awaitility.await()
 .atMost(Duration.ofMinutes(1))
 .until(() -> orderService.getOrderStatus(orderId)
 .equals(OrderStatus.RECEIVED));
 assertQueueIsEmpty(queueName, "no-retries-order-processing-container");
 }
現在,即使拋出OutOfStockException ,訊息也會被確認,並且不會嘗試重試該訊息:
Message received: OrderCreatedEvent[id=7587f1a2-328f-4791-8559-ee8e85b25259, productId=123e4567-e89b-12d3-a456-426614174001, quantity=20]
 Caused by: com.baeldung.spring.cloud.aws.sqs.acknowledgement.exception.OutOfStockException: Product with id 123e4567-e89b-12d3-a456-426614174001 is out of stock. Quantity requested: 20
 INFO 2835 --- [main] asaOrderProcessingApplicationLiveTest : Stopping container no-retries-order-processing-container
 INFO 2835 --- [main] acslAbstractMessageListenerContainer : Container no-retries-order-processing-container stopped
 INFO 2835 --- [main] asaOrderProcessingApplicationLiveTest : Checking for messages in queue order_processing_no_retries_queue
 INFO 2835 --- [main] asaOrderProcessingApplicationLiveTest : No messages found in queue order_processing_no_retries_queue
八、結論
在本文中,我們使用事件驅動的場景來展示 Spring Cloud AWS v3 SQS 整合提供的三種確認模式: ON_SUCCESS (預設)、 MANUAL和ALWAYS 。
我們利用自動配置的設定並使用@SqsListener註解屬性在模式之間切換。我們還使用 Testcontainers 和 LocalStack 創建了即時測試來斷言行為。
與往常一樣,本文中使用的完整程式碼可用 在 GitHub 上。與往常一樣,本文中使用的完整程式碼可用 在 GitHub 上。