Spring Cloud AWS 3.0簡介-SQS集成
1. 概述
Spring Cloud AWS是一個旨在簡化與 AWS 服務互動的專案。 Simple Queue Service (SQS) 是一種 AWS 解決方案,用於以可擴展的方式傳送和接收非同步訊息。
在本教程中,我們將重新引入Spring Cloud AWS SQS 集成,該集成已針對 Spring Cloud AWS 3.0 完全重寫。
該框架提供了熟悉的 Spring 抽象來處理 SQS 佇列,例如SqsTemplate
和@SqsListener
註解。
我們將透過發送和接收訊息的範例來示範事件驅動的場景,並展示使用 Testcontainers(用於管理一次性 Docker 容器的工具)和LocalStack (在本地模擬類似 AWS 的環境進行測試)設定整合測試的策略我們的邏輯。
2. 依賴關係
Spring Cloud AWS 物料清單 (BOM)確保項目之間的版本相容。它聲明了許多依賴項的版本,包括 Spring Boot,並且應該用來代替 Spring Boot 自己的 BOM 。
以下是將其匯入到我們的pom.xml
檔案中的方法:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws</artifactId>
<version>3.0.4</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
我們需要的主要依賴項是SQS Starter ,它包含該專案的所有 SQS 相關類別。 SQS 整合不依賴 Spring Boot,可以在任何標準 Java 應用程式中獨立使用:
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter-sqs</artifactId>
</dependency>
對於 Spring Boot 應用程式(例如我們在本教程中建立的應用程式),我們應該新增專案的Core Starter ,因為它允許我們利用 Spring Boot 的 SQS 自動配置和 AWS 配置(例如憑證和區域):
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter</artifactId>
</dependency>
3.建置本地測試環境
在本節中,我們將逐步使用 Testcontainers 設定 LocalStack 環境,以在本機環境中測試我們的程式碼。請注意,本教學中的範例**也可以直接針對 AWS 執行**。
3.1.依賴關係
為了使用 JUnit 5 運行LocalStack 和 TestContainers ,我們需要兩個額外的依賴:
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>localstack</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
我們還包括等待庫來幫助我們斷言非同步訊息消耗:
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
3.2.配置
現在,我們將建立一個具有管理容器邏輯的類,該類別可以由我們的測試套件繼承。我們將其命名為BaseSqsIntegrationTests
。對於擴展此類別的每個測試套件,Testcontainers 將建立並啟動一個新容器,這對於將每個套件的資料相互隔離至關重要。
@SpringBootTest
註解對於初始化 Spring Context 是必需的,@ @Testcontainers
註解將 Testcontainers 註解與 JUnit 的執行時間相關聯,以便容器在測試套件運行時啟動,並在測試完成後停止:
@SpringBootTest
@Testcontainers
public class BaseSqsIntegrationTest {
// Our test configuration will be added here
}
現在讓我們聲明LocalStackContainer
。 @Container
註解對於框架自動管理容器的生命週期也是必要的:
private static final String LOCAL_STACK_VERSION = "localstack/localstack:2.3.2";
@Container
static LocalStackContainer localstack = new LocalStackContainer(DockerImageName.parse(LOCAL_STACK_VERSION));
最後,我們將 Spring Cloud AWS 框架用於自動配置的屬性與 LocalStack 綁定。我們將在運行時獲取容器端口和主機,因為 Testcontainers 將為我們提供一個隨機端口,這對於並行測試非常有用。我們可以使用@DynamicPropertySource
註解來實現:
@DynamicPropertySource
static void overrideProperties(DynamicPropertyRegistry registry) {
registry.add("spring.cloud.aws.region.static", () -> localStack.getRegion());
registry.add("spring.cloud.aws.credentials.access-key", () -> localStack.getAccessKey());
registry.add("spring.cloud.aws.credentials.secret-key", () -> localStack.getSecretKey());
registry.add("spring.cloud.aws.sqs.endpoint", () -> localStack.getEndpointOverride(SQS)
.toString());
// ...other AWS services endpoints can be added here
}
這就是我們使用 LocalStack、Testcontainers 和 Spring Cloud AWS 實現 Spring Boot 測試所需的一切。在執行測試之前,我們還需要確保 Docker 引擎在本機環境中運作。
4. 設定隊列名稱
我們可以利用 Spring Boot 的application.yml
屬性機制來設定佇列名稱。
在本教程中,我們將建立三個隊列:
events:
queues:
user-created-by-name-queue: user_created_by_name_queue
user-created-record-queue: user_created_record_queue
user-created-event-type-queue: user_created_event_type_queue
讓我們建立一個 POJO 來表示這些屬性:
@ConfigurationProperties(prefix = "events.queues")
public class EventQueuesProperties {
private String userCreatedByNameQueue;
private String userCreatedRecordQueue;
private String userCreatedEventTypeQueue;
// getters and setters
}
最後,我們需要在@Configuration
註解類別或主 Spring Application 類別中使用@EnableConfigurationProperties
註釋,讓 Spring Boot 知道我們要使用application.yml
屬性填入它:
@SpringBootApplication
@EnableConfigurationProperties(EventQueuesProperties.class)
public class SpringCloudAwsApplication {
public static void main(String[] args) {
SpringApplication.run(SpringCloudAwsApplication.class, args);
}
}
現在,我們準備在需要佇列名稱時注入值本身或 POJO。
預設情況下,如果沒有找到佇列,Spring Cloud AWS SQS 會建立佇列,這有助於我們快速設定開發環境。在生產中,應用程式不應該有建立佇列的權限,因此如果找不到佇列,應用程式將無法啟動。如果找不到佇列,框架還可以配置為明確失敗。
5. 發送和接收訊息
使用 Spring Cloud AWS 可以透過多種方式向 SQS 發送訊息和從 SQS 接收訊息。在這裡,我們將介紹最常見的,使用SqsTemplate
發送訊息和@SqsListener
註解接收訊息。
5.1.設想
在我們的場景中,我們將模擬一個事件驅動的應用程序,該應用程式透過將相關資訊保存在其本地存儲庫中來響應UserCreatedEvent
。
讓我們建立一個User
實體:
public record User(String id, String name, String email) {
}
讓我們建立一個簡單的記憶體中UserRepository
:
@Repository
public class UserRepository {
private final Map<String, User> persistedUsers = new ConcurrentHashMap<>();
public void save(User userToSave) {
persistedUsers.put(userToSave.id(), userToSave);
}
public Optional<User> findById(String userId) {
return Optional.ofNullable(persistedUsers.get(userId));
}
public Optional<User> findByName(String name) {
return persistedUsers.values().stream()
.filter(user -> user.name().equals(name))
.findFirst();
}
}
最後,讓我們建立一個UserCreatedEvent
Java Record 類別:
public record UserCreatedEvent(String id, String username, String email) {
}
5.2.設定
為了測試我們的場景,我們將建立一個SpringCloudAwsSQSLiveTest
類,該類別擴展我們先前建立的BaseSqsIntegrationTest
檔案。我們將自動組裝三個依賴項:由框架自動配置的SqsTemplate
、 UserRepository
(以便我們可以斷言訊息處理正常)以及帶有佇列名稱的EventQueuesProperties
POJO:
public class SpringCloudAwsSQSLiveTest extends BaseSqsIntegrationTest {
private static final Logger logger = LoggerFactory.getLogger(SpringCloudAwsSQSLiveTest.class);
@Autowired
private SqsTemplate sqsTemplate;
@Autowired
private UserRepository userRepository;
@Autowired
private EventQueuesProperties eventQueuesProperties;
// ...
}
為了包含我們的偵聽器,讓我們建立一個UserEventListeners
類別並將其聲明為 Spring @Component
:
@Component
public class UserEventListeners {
private static final Logger logger = LoggerFactory.getLogger(UserEventListeners.class);
public static final String EVENT_TYPE_CUSTOM_HEADER = "eventType";
private final UserRepository userRepository;
public UserEventListeners(UserRepository userRepository) {
this.userRepository = userRepository;
}
// Our listeners will be added here
}
5.3. String
有效負載
在第一個範例中,我們將發送一條帶有String
有效負載的訊息,在偵聽器中接收它,並將其保存到我們的儲存庫中。然後,我們將輪詢儲存庫以確保我們的應用程式正確保存資料。
首先,讓我們在測試類別中建立一個用於發送訊息的測試:
@Test
void givenAStringPayload_whenSend_shouldReceive() {
// given
var userName = "Albert";
// when
sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedByNameQueue())
.payload(userName));
logger.info("Message sent with payload {}", userName);
// then
await().atMost(Duration.ofSeconds(3))
.until(() -> userRepository.findByName(userName)
.isPresent());
}
我們應該會看到類似以下內容的日誌:
INFO [ main] cbscasqs.SpringCloudAwsSQSLiveTest : Message sent with payload Albert
然後,請注意,測試失敗,因為我們還沒有該佇列的偵聽器。
讓我們設定偵聽器以在偵聽器類別中使用此佇列中的消息並使測試通過:
@SqsListener("${events.queues.user-created-by-name-queue}")
public void receiveStringMessage(String username) {
logger.info("Received message: {}", username);
userRepository.save(new User(UUID.randomUUID()
.toString(), username, null));
}
現在,當我們運行測試時,我們應該在日誌中看到結果:
INFO [ntContainer#0-1] cbscloud.aws.sqs.UserEventListeners : Received message: Albert
測試通過了。
請注意,我們使用 Spring 的屬性解析功能從我們先前建立的application.yml
中取得佇列名稱。
5.4. POJO 和記錄有效負載
現在我們已經發送和接收了String
有效負載,讓我們使用 Java Record(我們之前建立的UserCreatedEvent
設定一個場景。
首先,讓我們來寫一個失敗的測試:
@Test
void givenARecordPayload_whenSend_shouldReceive() {
// given
String userId = UUID.randomUUID()
.toString();
var payload = new UserCreatedEvent(userId, "John", "[email protected]");
// when
sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedRecordQueue())
.payload(payload));
// then
logger.info("Message sent with payload: {}", payload);
await().atMost(Duration.ofSeconds(3))
.until(() -> userRepository.findById(userId)
.isPresent());
}
在測試失敗之前我們應該會看到類似這樣的日誌:
INFO [ main] cbscasqs.SpringCloudAwsSQSLiveTest : Message sent with payload: UserCreatedEvent[id=67f52cf6-c750-4200-9a02-345bda0516f8, username=John, [email protected]]
現在,讓我們建立對應的監聽器以使測試通過:
@SqsListener("${events.queues.user-created-record-queue}")
public void receiveRecordMessage(UserCreatedEvent event) {
logger.info("Received message: {}", event);
userRepository.save(new User(event.id(), event.username(), event.email()));
}
我們將看到輸出表明已收到訊息並且測試通過:
INFO [ntContainer#1-1] cbscloud.aws.sqs.UserEventListeners : Received message: UserCreatedEvent[id=2d66df3d-2dbd-4aed-8fc0-ddd08416ed12, username=John, [email protected]]
該框架將自動配置 Spring 上下文中可用的任何ObjectMapper
bean 來處理訊息的序列化和反序列化。我們可以配置自己的ObjectMapper
並透過多種方式自訂序列化,但這超出了本教學的範圍。
5.5. Spring 訊息和標題
在最後一個場景中,我們將發送帶有自訂標頭的記錄,並以 Spring Message
實例的形式接收訊息,以及我們添加的自訂標頭和方法簽名中的標準 SQS 標頭。此框架會自動將所有 SQS 訊息屬性轉換為訊息標頭,包括使用者提供的任何屬性。
讓我們先建立失敗的測試:
@Test
void givenCustomHeaders_whenSend_shouldReceive() {
// given
String userId = UUID.randomUUID()
.toString();
var payload = new UserCreatedEvent(userId, "John", "[email protected]");
var headers = Map.<String, Object> of(EVENT_TYPE_CUSTOM_HEADER, "UserCreatedEvent");
// when
sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedEventTypeQueue())
.payload(payload)
.headers(headers));
// then
logger.info("Sent message with payload {} and custom headers: {}", payload, headers);
await().atMost(Duration.ofSeconds(3))
.until(() -> userRepository.findById(userId)
.isPresent());
}
測試失敗前應產生與此類似的日誌:
INFO [ main] cbscasqs.SpringCloudAwsSQSLiveTest : Sent message with payload UserCreatedEvent[id=575de854-82de-44e4-8dfe-8fdc9f6ae4a1, username=John, [email protected]] and custom headers: {eventType=UserCreatedEvent}
現在,我們添加相應的監聽器以使測試通過:
@SqsListener("${events.queues.user-created-event-type-queue}")
public void customHeaderMessage(Message<UserCreatedEvent> message, @Header(EVENT_TYPE_CUSTOM_HEADER) String eventType,
@Header(SQS_APPROXIMATE_FIRST_RECEIVE_TIMESTAMP) Long firstReceive) {
logger.info("Received message {} with event type {}. First received at approximately {}.", message, eventType, firstReceive);
UserCreatedEvent payload = message.getPayload();
userRepository.save(new User(payload.id(), payload.username(), payload.email()));
}
當我們重新運行測試時,我們將看到輸出,表示成功:
INFO [ntContainer#2-1] cbscloud.aws.sqs.UserEventListeners : Received message GenericMessage [payload=UserCreatedEvent[id=575de854-82de-44e4-8dfe-8fdc9f6ae4a1, username=John, [email protected]], headers=...
在此範例中,我們收到一條Message
,其中包含反序列化的UserCreatedEvent
記錄作為有效負載和兩個標頭。為了確保整個專案的一致性,我們應該使用SqsHeader
類別常數來檢索 SQS 標準標頭。
六,結論
在本文中,我們使用事件驅動場景來了解使用 Spring Cloud AWS SQS 3.0 發送和接收訊息的不同範例。
我們使用 LocalStack 和 TestContainers 設定本地環境,並配置框架以使用正確的本地配置進行整合測試。
與往常一樣,本教程中使用的完整程式碼可以 在 GitHub 上找到。