使用 Spring Integration 接收 PostreSQL 推送通知
一、簡介
在本教程中,我們將展示如何在基於 Spring Integration 的應用程序中使用 PostgreSQL 的NOTIFY/LISTEN
功能。
2. 快速回顧
PostgreSQL 提供了一種輕量級消息通知機制,允許客戶端使用常規數據庫連接相互發送通知。該機制使用兩個非標準 SQL 語句NOTIFY
和LISTEN
,因此得名。
我們已經在之前的教程中更詳細地介紹了此機制,因此我們將假設您具備如何使用它的基本知識。在這裡,我們將介紹一個更具體的用例:如何使用此機制來實現SubscribableChannel
。
3. 依賴關係
對於本教程,我們只需要核心 Spring 集成庫和 PostgreSQL JDBC 驅動程序:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>5.1.13.RELEASE</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.3.8</version>
</dependency>
spring-integration-core
和postgresql
的最新版本可在 Maven Central 上找到。
4. 什麼是SubscribableChannel
?
Spring Integration 的SubscribableChannel
接口是MessageChannel
擴展,支持向訂閱者異步發送消息。它向其父級添加了兩個附加方法:
-
subscribe(MessageHandler handler)
-
unsubscribe(MessageHandler handler)
這些方法允許客戶端註冊/註銷MessageHandler
實例來處理接收到的消息。
儘管它與反應式消息調度有相似之處,但有一個根本的區別:這裡我們有一個基於推送的模型,而在反應式世界中是基於拉動的。這意味著沒有隱式流量控制,並且由消費者實施任何緩衝/丟棄策略來處理過多的流量。
Spring Integration 開箱即用地附帶了此接口的簡單實現PublishSubscribeChannel
,但此實現僅適用於同一個 VM 實例。使用NOTIFY/LISTEN
機制,我們將不再有這個限制。
5. SubscribableChannel
實現
我們的實現將基於 Spring 集成核心中一個現成的基類: AbstractSubscribableChannel
。儘管並非絕對必要,但選擇這種方法有一些好處:
- 管理:對於生產很重要,公開關鍵指標以監控系統的運行狀況,並幫助解決性能問題
- 攔截器:允許客戶端代碼添加
ChannelInterceptors
,可以在處理消息之前/之後檢查消息
實現本身由兩個主要部分組成:消息傳遞和調度。
5.1.消息傳遞
此功能對應於通道的生產者端。消息生產者通常會使用MessageChannel
接口上可用的標準send()
方法,或者使用通過用戶友好的接口包裝通道的MesssageGateway
。
由於我們正在利用AbstractSubscribableChannel
,而 AbstractSubscribableChannel 又擴展了AbstractMessageChannel
,因此我們所要做的就是實現doSend()
方法。在這裡,我們將使用NOTIFY
將消息發送到 PostgreSQL,然後 PostgreSQL 會將其傳遞給已為同一通道發出LISTEN
命令的任何客戶端。
@Override
protected boolean doSend(Message<?> message, long timeout) {
try {
String msg = prepareNotifyPayload(message);
try (Connection c = ds.getConnection()) {
c.createStatement().execute("NOTIFY " + channelName + ", '" + msg + "'");
}
return true;
} catch (Exception ex) {
throw new MessageDeliveryException(message, "Unable to deliver message: " + ex.getMessage(), ex);
}
}
我們使用通過構造函數傳遞的DataSource
和通道名稱來分別獲取數據庫連接和通知的通道名稱。
prepareNotifyPayload()
方法將傳入的Message
對象轉換為適合用作通知負載的 JSON 字符串:
protected String prepareNotifyPayload(Message<?> message) throws JsonProcessingException {
Map<String, Object> rawMap = new HashMap<>();
rawMap.putAll(message.getHeaders());
JsonNode headerData = om.valueToTree(rawMap);
JsonNode bodyData = om.valueToTree(message.getPayload());
ObjectNode msg = om.getNodeFactory().objectNode();
msg.set(HEADER_FIELD, headerData);
msg.set(BODY_FIELD, bodyData);
return om.writeValueAsString(msg);
}
這種方法有一個重要的限制:默認情況下,PostgreSQL 將通知有效負載的大小限制為大約 8 KB。一般來說,這對於僅發出事件信號的消息來說已經足夠了,但在消息可能攜帶文件的完整內容的集成場景中顯然是不夠的。
在這些情況下,更好的方法是將“大”數據部分存儲在某些共享存儲(例如數據庫表或 S3 存儲桶)中,並在消息中發送對其的引用。
5.2.消息發送
此代碼負責接收來自 PostgreSQL 的異步通知並將其分派給訂閱者。由於在沒有訂閱者時監聽通知是沒有意義的,因此只有當有人調用subscribe()
時,該實現才會啟動負責此任務的後台線程:
@Override
public boolean subscribe(MessageHandler handler) {
boolean r = super.subscribe(handler);
if (r && super.getSubscriberCount() == 1) {
startListenerThread();
}
return r;
}
同樣,當沒有更多訂閱者時,我們將停止監聽器:
@Override
public boolean unsubscribe(MessageHandler handle) {
boolean r = super.unsubscribe(handle);
if (r && super.getSubscriberCount() == 0) {
stopListenerThread();
}
return r;
}
後台偵聽器線程將發出初始 LISTEN 語句,然後循環接收到的通知:
@Override
public void run() {
startLatch.countDown();
try (Statement st = conn.createStatement()) {
st.execute("LISTEN " + channelName);
PGConnection pgConn = conn.unwrap(PGConnection.class);
while (!Thread.currentThread().isInterrupted()) {
PGNotification[] nts = pgConn.getNotifications();
for (PGNotification n : nts) {
Message<?> msg = convertNotification(n);
getDispatcher().dispatch(msg);
}
}
} catch (SQLException sex) {
// ... exception handling omitted
} finally {
stopLatch.countDown();
}
}
對於每個收到的通知,我們首先將其轉換為Message,
然後將實際傳遞委託給配置的調度程序。它負責處理與此任務相關的所有事務,例如調用攔截器、更新指標等。
6. 集成示例
現在我們已經實現了,讓我們用一個簡單的集成場景來測試它。我們將使用此通道來傳遞BUY/SELL
Order
消息,並且在接收端,我們將有一個訂閱者來接收這些訂單並保持每個交易品種的交易餘額。
首先,讓我們為SubscribableChannel
創建一個@Bean
:
@Bean
static SubscribableChannel orders(@Value("${db.url}") String url, @Value("${db.username}") String username,
@Value("${db.password}") String password) {
SingleConnectionDataSource ds = new SingleConnectionDataSource(url, username, password, true);
Supplier<Connection> connectionSupplier = () -> {
try {
return ds.getConnection();
}
catch(SQLException ex) {
throw new RuntimeException(ex);
}
};
PGSimpleDataSource pgds = new PGSimpleDataSource();
pgds.setUrl(url);
pgds.setUser(username);
pgds.setPassword(password);
return new PostgresSubscribableChannel("orders", connectionSupplier, pgds, new ObjectMapper());
}
請注意,我們使用提供的數據庫URL
和憑據來創建兩個DataSource
對象。第一個是SingleConnectionDataSource
,我們作為所需連接提供者的源,用於接收通知。第二個數據源用於發送通知並使用 PostgreSQL 本機實現。
其次,我們創建一個@ServiceActivator
方法來接收訂單:
@ServiceActivator(inputChannel = "orderProcessor")
void processOrder(Order order){
BigDecimal orderTotal = order.getQuantity().multiply(order.getPrice());
if (order.getOrderType() == OrderType.SELL) {
orderTotal = orderTotal.negate();
}
BigDecimal sum = orderSummary.get(order.getSymbol());
if (sum == null) {
sum = orderTotal;
} else {
sum = sum.add(orderTotal);
}
orderSummary.put(order.getSymbol(), sum);
orderSemaphore.release();
}
我們使用信號量來跟踪收到的消息。它的主要目的是使測試變得更容易,因為我們可以使用它來同步運行測試的主線程與在後台接收和處理的消息。
最後,我們還需要一個@Transformer
將接收到的消息負載(即JsonNode
實例)轉換為Order
對象:
@Transformer(inputChannel = "orders", outputChannel = "orderProcessor" )
Order validatedOrders(Message<?> orderMessage) throws JsonProcessingException {
ObjectNode on = (ObjectNode)orderMessage.getPayload();
Order order = om.treeToValue(on, Order.class);
return order;
}
請注意,轉換後的消息將進入orderProcessor
通道,該通道將由 Spring Integration 自動創建。除非我們用這個名稱明確定義一個通道,否則實際的通道將是一個DirectChannel
,它只是將生產者和消費者聯繫在一起。
或者,我們可以將此通道定義為QueueChannel
或類似通道。這將提供一個緩衝區來存儲消息,從而使我們的系統能夠應對任何臨時的消息激增。
7. 測試
最後,讓我們編寫一個集成測試來查看所有這些如何協同工作:
@SpringJUnitConfig(classes = {PostgresqlPubSubExample.class})
public class PostgresqlPubSubExampleLiveTest {
@Autowired
PostgresqlPubSubExample processor;
@Autowired
OrdersGateway ordersGateway;
@Test
void whenPublishOrder_thenSuccess() throws Exception {
Order o = new Order(1l,"BAEL", OrderType.BUY, BigDecimal.valueOf(2.0), BigDecimal.valueOf(5.0));
ordersGateway.publish(o);
assertThat(processor.awaitNextMessage(10, TimeUnit.SECONDS)).isTrue();
BigDecimal total = processor.getTotalBySymbol("BAEL");
assertThat(total).isEqualTo(BigDecimal.valueOf(10));
}
}
我們利用 Spring 的測試工具來實例化所有必需的 bean,這樣我們就可以使用包裝我們的通道的MessageGateway
發送訂單。一旦我們發送消息,測試就會在查詢總訂單值之前使用awaitNextMessage()
輔助方法。
請注意,由於這是一個集成測試,我們必須有一個正在運行的 PostgreSQL 實例可用以及使用它所需的憑據。
8. Spring Integration 6 用戶注意事項
從版本 6 開始,Spring Integration 附帶了實現SubscribableChannel
PostgresSubscribableChannel
類。然而,該版本需要 Spring 6,因此意味著使用 Java 17 作為開發應用程序的基準。
這個新實現對有效負載大小沒有與本教程中的代碼相同的限制,但需要在數據庫上創建一個表來存儲它。然而,由於 Java 8 和 11 仍然佔現有應用程序的很大一部分,因此這裡描述的技術仍然適用一段時間。
9. 結論
在本教程中,我們展示瞭如何利用 PostgreSQL 上可用的NOTIFY/LISTEN
機制在 Spring Integration 應用程序中實現異步消息傳遞。
與往常一樣,完整的代碼可以在 GitHub 上找到。