使用 Spring Boot 和 Dapr 實現靈活的發布/訂閱訊息傳遞
1. 簡介
在本文中,我們將了解什麼是 Dapr,它如何與 Spring Boot 集成,以及如何創建不與特定代理耦合的發布/訂閱系統。我們將介紹一個叫車場景,其中用戶請求乘車並且司機訂閱這些請求。最終,我們將實現無需 Dapr CLI 或外部基礎設施即可運行的測試。
2. 使用 Dapr 實現不可知基礎設施
分散式系統常常面臨常見但複雜的挑戰。我們通常使用特定於供應商的庫、基礎設施工具和手動整合工作來解決這些問題。
Dapr (分散式應用程式運行時)提供了一組 API 和建構塊來應對這些挑戰,抽象化了基礎設施,以便我們可以專注於業務邏輯。這些原則適用於其他問題,例如呼叫其他服務(透過服務呼叫 API)、持久狀態(透過狀態管理 API)或檢索機密(透過機密 API)。
這種解耦使得應用程式更易於測試、更易於跨環境移植、並且更能適應基礎設施的變化。在本文中,我們將重點介紹pub/sub API ,以在實踐中說明這些好處。
2.1.使用 Dapr 橋接 Spring Messaging
Spring Boot 具有強烈的主觀整合模型,特別是在訊息傳遞方面。許多開發人員已經熟悉 Spring 抽象,例如KafkaTemplate
、 RabbitTemplate
及其監聽器對應部分。雖然這些簡化了經紀人的集成,但它們仍然與特定技術緊密耦合。
dapr-spring-boot-starter專案不僅僅是另一個 API,還提供了無縫整合。它使用熟悉命名的接口,例如DaprMessagingTemplate
和@Topic
。這些使得利用 Dapr 的分散式訊息功能變得容易,而無需了解底層基礎設施的詳細資訊。
更具體地說,透過包含 Dapr Spring Boot 啟動器,我們不需要包含任何特定的代理依賴項。這使得無需更改任何代碼即可交換代理。無需更改應用程式程式碼,即可在元件層級配置特定於提供者的功能。例如,我們可以包含 特定於 Kafka 的設定來利用消費者群體等本機功能。
2.2.擁有基礎設施彈性,無需鎖定
Dapr 將應用程式程式碼與基礎設施分開。例如,無論我們在後台使用 Kafka、RabbitMQ、Redis Streams 還是 Azure Service Bus,我們的 Spring Boot 應用程式都會透過 HTTP 或 gRPC 與 Dapr sidecar 進行通信,而 Dapr 會處理與實際代理程式的整合。
最重要的是,我們可以在沒有完整基礎設施的情況下進行本地測試,正如我們將使用 Testcontainers 所看到的那樣。 dapr-spring-boot-starter-test模組將 Dapr sidecar 作為測試生命週期的一部分啟動,無需學習Dapr CLI或 Kubernetes。
3.設定 Spring Boot 項目
我們將模擬一個叫車應用程式來演示 Dapr 如何與 Spring Boot 整合。用戶將向我們的 API 端點發送乘車請求,該端點會向訂閱的司機發布訊息。然後司機可以選擇是否接受行程。
讓我們先新增所需的依賴項。我們需要spring-boot-starter-web作為我們的 REST 端點,以及dapr-spring-boot-starter用於 Spring Boot 整合:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>io.dapr.spring</groupId>
<artifactId>dapr-spring-boot-starter</artifactId>
</dependency>
為了進行測試,我們還將添加dapr-spring-boot-starter-test以支援 Testcontainers,並添加RabbitMQ容器作為我們的訊息代理程式:
<dependency>
<groupId>io.dapr.spring</groupId>
<artifactId>dapr-spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>rabbitmq</artifactId>
<version>1.20.6</version>
<scope>test</scope>
</dependency>
3.1.創建模型
這個 POJO 代表一次乘車請求:
public class RideRequest {
private String passengerId;
private String location;
private String destination;
// default getters and setters
}
它不需要對訊息進行特殊的註釋。
4. 使用DaprMessagingTemplate
實作發佈器
DaprMessagingTemplate
與 Spring 的其他訊息傳遞範本類似,但不需要特定的代理作為依賴項。讓我們先在application.properties
中定義訊息傳遞元件的名稱:
dapr.pubsub.name=ride-hailing
然後,我們將使用DaprPubSubProperties
類別來引用此屬性並使用我們的RideRequest
作為訊息類型。這樣就完成了發送訊息所需的設定:
@Configuration
@EnableConfigurationProperties({ DaprPubSubProperties.class })
public class DaprMessagingConfig {
@Bean
public DaprMessagingTemplate<RideRequest> messagingTemplate(
DaprClient client, DaprPubSubProperties config) {
return new DaprMessagingTemplate<>(client, config.getName(), false);
}
}
4.1.使用端點接收訊息
接下來,我們將建立一個控制器來接收乘車請求,並使用 Dapr 模板將其轉發到「ride-requests」主題。我們可以將控制器映射到我們想要的任何路徑:
@RestController
@RequestMapping("/passenger")
public class PassengerRestController {
@Autowired
private DaprMessagingTemplate<RideRequest> messaging;
@PostMapping("/request-ride")
public String requestRide(@RequestBody RideRequest request) {
messaging.send("ride-requests", request);
return "waiting for drivers";
}
}
請注意,我們的訊息正文不需要任何轉換或配置,因為 Dapr 會自動處理它。
5. 建立和配置訂閱者
在我們的範例中,司機充當訂閱者,接收乘車請求並決定是否接受。我們將使用 Dapr 的@Topic
註解來實現這一點,將傳入訊息綁定到控制器方法。
5.1.使用@Topic
實作控制器
使用@Topic
註解時,我們必須同時包含元件和主題名稱。每當Dapr sidecar (由測試容器自動處理)從代理轉發訊息時,它都會呼叫此端點:
@RestController
@RequestMapping("driver")
public class DriverRestController {
// ...
@PostMapping("ride-request")
@Topic(pubsubName = "ride-hailing", name = "ride-requests")
public void onRideRequest(@RequestBody CloudEvent<RideRequest> event) {
// ...
}
}
請注意,有效負載包裝在 Dapr 自動建立的CloudEvent
物件中。這對於基於CloudEvent
元資料的路由或過濾等進階場景很有幫助,但對於基本的發布/訂閱來說不是必要的。
5.2.配置訂閱者行為
我們的訂閱者代表接受或拒絕乘車的司機。為了說明,我們將使用簡單的模式邏輯來確定騎乘是否可以接受。讓我們將其添加到我們的application.properties
中,以便我們可以在啟動應用程式時輕鬆更改其值:
driver.acceptance.criteria=East Side
接下來,我們將這個值注入到控制器中的一個變數中,同時注入用於計數接受/拒絕的驅動器的變數:
int drivesAccepted;
int drivesRejected;
@Value("${driver.acceptance.criteria}")
String criteria;
public int getDrivesAccepted() {
return drivesAccepted;
}
public int getDrivesRejected() {
return drivesRejected;
}
我們將在編寫測試時使用這些來檢查控制器的行為。
5.3.處理CloudEvent
最後,我們將從CloudEvent
中檢索有效負載並決定磁碟機是否可以接受:
@Topic(pubsubName = "ride-hailing", name = "ride-requests")
public void onRideRequest(@RequestBody CloudEvent<RideRequest> event) {
RideRequest request = event.getData();
if (request.getDestination().contains(criteria)) {
drivesAccepted++;
} else {
drivesRejected++;
throw new UnsupportedOperationException("drive rejected");
}
}
由於我們不能直接拒絕訊息,因此我們拋出異常來觸發訊息的重新排隊。對於 RabbitMQ,這需要requeueInFailure
配置,我們將在建立測試容器時設定該配置。
6. 使用 Testcontainers 測試發布者
為了驗證我們的發布者是否正確發送訊息,我們將使用 Testcontainers 編寫整合測試。這使我們能夠啟動 Dapr sidecar 和 RabbitMQ 實例,而無需依賴外部工具或 Dapr CLI。
6.1.設定測試配置
對於我們的測試屬性,除了驗收標準之外,我們還將包括訊息傳遞元件名稱和 Dapr 容器的專用伺服器連接埠。
此外,我們需要選擇一個固定端口,以便我們的元件可以在同一個網路中相互定位:
driver.acceptance.criteria=East Side
dapr.pubsub.name=ride-hailing
server.port=60601
我們將透過設定伺服器連接埠號碼和指定元件之間共享的網路來開始設定。我們還將包含DaprPubSubProperties
以便稍後取得我們的 RabbitMQ 元件的名稱:
@TestConfiguration(proxyBeanMethods = false)
@EnableConfigurationProperties({ DaprPubSubProperties.class })
public class DaprTestContainersConfig {
@Value("${server.port}")
private int serverPort;
@Bean
public Network daprNetwork() {
return Network.newNetwork();
}
// ...
}
6.2.配置容器
讓我們建立暴露預設連接埠 5672 的 RabbitMQ 容器:
@Bean
public RabbitMQContainer rabbitMQContainer(Network daprNetwork) {
return new RabbitMQContainer(DockerImageName.parse("rabbitmq:3.7.25-management-alpine"))
.withExposedPorts(5672)
.withNetworkAliases("rabbitmq")
.withNetwork(daprNetwork);
}
最後,我們將添加一個 Dapr 容器來包裝所有內容,並使用@ServiceConnection
註解來簡化配置:
@Bean
@ServiceConnection
public DaprContainer daprContainer(
Network daprNetwork, RabbitMQContainer rabbitMQ, DaprPubSubProperties pubSub) {
Map<String, String> rabbitMqConfig = new HashMap<>();
rabbitMqConfig.put("connectionString", "amqp://guest:guest@rabbitmq:5672");
rabbitMqConfig.put("user", "guest");
rabbitMqConfig.put("password", "guest");
rabbitMqConfig.put("requeueInFailure", "true");
return new DaprContainer("daprio/daprd:1.14.4")
.withAppName("dapr-pubsub")
.withNetwork(daprNetwork)
.withComponent(new Component(pubSub.getName(), "pubsub.rabbitmq", "v1", rabbitMqConfig))
.withAppPort(serverPort)
.withAppChannelAddress("host.testcontainers.internal")
.dependsOn(rabbitMQ);
}
除了樣板之外,關鍵配置還包括:
-
requeueInFailure
:我們將啟用此選項,因為我們無法直接NACK訊息。當我們有多個訂閱者實例時,這允許其他客戶端接收被其他客戶端拒絕的訊息。 -
withComponent(…”pubsub.rabbitmq”)
:我們將使用 RabbitMQ 實現,因此我們在此處指定它。 Dapr 支援許多訊息代理,包括雲端供應商管理的服務,如Google PubSub 、 Amazon SQS/SNS和Azure Event Hub 。 -
withAppChannelAddress
:我們將包括此功能以啟動主機對容器的存取。如果沒有它,測試可能會在等待 Dapr 回應時掛起。
我們還可以使用日誌配置啟動 Dapr 容器,使其更易於調試。為此,我們設定了withDaprLogLevel
和withLogConsumer
選項:
.withDaprLogLevel(DaprLogLevel.INFO)
.withLogConsumer(outputFrame -> logger.info(outputFrame.getUtf8String()))
6.3.創建測試應用程式
現在我們準備在測試包中建立測試應用程式:
@SpringBootApplication
public class DaprPublisherTestApp {
public static void main(String[] args) {
SpringApplication.from(DaprPublisherApp::main)
.with(DaprTestContainersConfig.class)
.run(args);
}
}
我們將引用我們的主要應用程式類別以避免重複任何配置,例如DaprMessagingConfig
類別。我們還需要將DriverRestController
複製到測試資料夾以進行整合測試。
6.4.創建整合測試
我們需要引用我們的測試應用程式、設定和DaprAutoConfiguration
類別。然後,注入我們的控制器來檢查我們的控制變量,並注入 Dapr 容器以了解我們的應用程式何時準備好接收訊息:
@SpringBootTest(
classes = {
DaprPublisherTestApp.class,
DaprTestContainersConfig.class,
DaprAutoConfiguration.class },
webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
class DaprPublisherIntegrationTest {
@Autowired
DriverRestController controller;
@Autowired
DaprContainer daprContainer;
@Value("${server.port}")
int serverPort;
@Value("${driver.acceptance.criteria}")
String criteria;
// ...
}
由於我們需要驗證我們的容器是否已正確啟動,我們可以等待「應用程式已訂閱以下主題」訊息。這有助於確保我們的測試僅在我們的容器準備好接受訊息時才開始。我們也會定義 API 的基本 URI,以便使用RestAssured
進行呼叫:
@BeforeEach
void setUp() {
RestAssured.baseURI = "http://localhost:" + serverPort;
org.testcontainers.Testcontainers.exposeHostPorts(serverPort);
Wait.forLogMessage(".*app is subscribed to the following topics.*", 1)
.waitUntilReady(daprContainer);
}
我們的第一個測試涉及發布符合驅動程式接受標準的驅動請求並檢查已接受的磁碟機數量。當這個數字增加時,我們可以斷言訂閱者已處理該訊息:
@Test
void whenDriveAcceptable_thenDrivesAcceptedIncrease() {
int drivesAccepted = controller.getDrivesAccepted();
given()
.contentType(ContentType.JSON)
.body("""
{
"passengerId": "1",
"location": "Point A",
"destination": "%s Point B"
}
""".formatted(criteria))
.when()
.post("/passenger/request-ride")
.then()
.statusCode(200);
await()
.atMost(Duration.ofSeconds(5))
.until(controller::getDrivesAccepted, equalTo(drivesAccepted + 1));
}
相反,我們的第二個測試涉及發布我們的驅動程式應該拒絕的驅動請求:
@Test
void whenDriveUnacceptable_thenDrivesRejectedIncrease() {
int drivesRejected = controller.getDrivesRejected();
given().contentType(ContentType.JSON)
.body("""
{
"passengerId": "2",
"location": "Point B",
"destination": "West Side A"
}
""")
.when()
.post("/passenger/request-ride")
.then()
.statusCode(200);
await()
.atMost(Duration.ofSeconds(5))
.until(controller::getDrivesRejected, greaterThan(drivesRejected));
}
這次,我們測試被拒絕的驅動器數量是否增加了。此外,由於訊息在發生錯誤時會重新排隊,我們會驗證變數是否大於其初始值,因為我們無法確定它已被處理了多少次。
7. 使用 Testcontainers 測試訂閱伺服器
現在讓我們測試一下我們的訂閱者行為。我們將創建一個類似於發布者的設置,重點驗證訂閱者如何處理傳入訊息。
7.1.設定環境
首先,我們將包含類似的測試屬性,僅更改伺服器連接埠:
driver.acceptance.criteria=East Side
dapr.pubsub.name=ride-hailing
server.port=60602
我們將把DaprMessagingConfig
類別複製到我們的測試包中,以便我們可以在整合測試中使用它。我們還需要將DaprTestContainersConfig
複製到我們的測試資料夾,因為我們需要相同的容器。
7.2.建立整合測試
與我們先前的整合測試一樣,我們需要連接容器、控制器、伺服器連接埠、驅動程式驗收標準,並等待容器在@Setup
期間準備就緒。我們還需要包含 Dapr 訊息範本來向我們的訂閱者發送訊息:
@SpringBootTest(
classes = {
DaprSubscriberTestApp.class,
DaprTestContainersConfig.class,
DaprMessagingConfig.class,
DaprAutoConfiguration.class },
webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
class DaprSubscriberIntegrationTest {
@Autowired
DaprMessagingTemplate<RideRequest> messaging;
@Autowired
DriverRestController controller;
@Autowired
DaprContainer daprContainer;
@Value("${server.port}")
int serverPort;
@Value("${driver.acceptance.criteria}")
String criteria;
// test setup...
}
7.3.實施測試場景
我們的第一個測試涉及發送可接受的驅動器並檢查我們的控制器是否正確接收它:
@Test
void whenDriveAcceptable_thenDrivesAcceptedIncrease() {
int drivesAccepted = controller.getDrivesAccepted();
RideRequest ride = new RideRequest(
"1", "Point A", String.format("%s Point B", criteria));
messaging.send("ride-requests", ride);
await().atMost(Duration.ofSeconds(5))
.until(controller::getDrivesAccepted, equalTo(drivesAccepted + 1));
}
我們的第二項測試包括發送不可接受的驅動器並檢查我們的控制器是否正確拒絕它:
@Test
void whenDriveUnacceptable_thenDrivesRejectedIncrease() {
int drivesRejected = controller.getDrivesRejected();
RideRequest request = new RideRequest("2", "Point B", "West Side Point A");
messaging.send("ride-requests", request);
await().atMost(Duration.ofSeconds(5))
.until(controller::getDrivesRejected, greaterThan(drivesRejected));
}
透過我們的訂閱者測試,我們已經驗證了 Dapr 正確地將訊息從代理路由到我們的 Spring Boot 應用程序,並且訂閱者的行為按預期工作。
8. 結論
在本文中,我們使用 Spring Boot 和 Dapr 建立了一個鬆散耦合的發布/訂閱訊息系統。透過利用 Dapr 對訊息代理的抽象及其 Spring Boot 集成,我們簡化了訊息傳遞邏輯,而無需依賴特定的基礎設施。我們也示範如何使用 Testcontainers 在本地運行和測試整個設置,從而實現開發過程中的快速回饋循環。
與往常一樣,原始碼可在 GitHub 上取得。