使用 PostgreSQL 作為消息代理
一、簡介
在本教程中,我們將介紹如何使用 PostgreSQL 的LISTEN/NOTIFY
命令來實現一個簡單的消息代理機制。
2. PostgreSQL 的LISTEN/NOTIFY
機制的快速介紹
簡而言之,這些命令允許連接的客戶端通過常規 PostgreSQL 連接交換消息。客戶端使用NOTIFY
命令向channel
發送通知以及可選的字符串有效負載。
channel
可以是任何有效的 SQL 標識符,它的工作方式類似於傳統消息系統中的主題。這意味著有效負載將被發送到該特定channel
的所有活動偵聽器。當沒有有效負載時,偵聽器只會收到一個空通知。
**要開始接收通知,客戶端使用LISTEN
命令**,該命令將通道名稱作為其單個參數。此命令立即返回,從而允許客戶端使用同一連接繼續執行其他任務。
通知機制有一些重要的屬性:
-
Channel
名稱在數據庫中是唯一的 - 客戶不需要特殊資助即可使用
LISTEN/NOTIFY
- 當在事務中使用
NOTIFY
時,客戶端僅在事務成功完成時才會收到通知
此外,如果在事務中使用相同的有效負載將多個NOTIFY
命令發送到同一通道,則客戶端將收到單個通知。
3. PostgreSQL 作為消息代理的案例
考慮到 PostgreSQL 通知的屬性,我們想知道什麼時候使用它而不是 RabbitMQ 或類似的成熟消息代理才是可行的選擇。像往常一樣,有一些權衡。一般來說,選擇後者意味著:
- 更複雜——消息代理是另一個必須監控、升級等的組件
- 處理分佈式事務帶來的故障模式
通知機制不會遇到這些問題:
- 功能已經到位,假設我們使用 PostgreSQL 作為主數據庫
- 無分佈式事務
當然,也有限制:
- 它是一種專有機制,需要永遠擁抱 PostgreSQL(或者,至少在進行重大重構之前)
- 不直接支持持久訂閱者。在客戶端開始收聽消息之前發送的通知將丟失
即使有這些限制,這種機制也有一些潛在的應用:
- “模塊化單體”式應用程序中的通知總線
- 分佈式緩存失效
- 輕量級消息代理,使用普通數據庫表作為隊列
- 事件溯源架構
4. 在 Spring Boot 應用程序中使用LISTEN/NOTIFY
現在我們對LISTEN/NOTIFY
機制有了基本的了解,讓我們繼續使用它構建一個簡單的 Spring Boot 測試應用程序。我們將創建一個簡單的 API,允許我們提交買/賣訂單。有效負載包括我們願意買賣的工具符號、價格和數量。我們還將添加一個 API,允許我們在給定標識符的情況下查詢訂單。
到目前為止,沒有什麼特別的。但這裡有一個問題:我們希望在將訂單查詢插入數據庫後立即從緩存中開始提供訂單查詢服務。當然,我們可以進行緩存直寫,但在需要擴展服務的分佈式場景中,我們還需要分佈式緩存。
這就是通知機制派上用場的地方:我們將在每次插入時發送一個NOTIFY
,客戶端將使用LISTEN
將訂單預加載到它們各自的本地緩存中。
4.1.項目依賴
我們的示例應用程序需要 WebMVC SpringBoot 應用程序的常規依賴集以及 PostgreSQL 驅動程序:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.7.12</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jdbc</artifactId>
<version>2.7.12</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.6.0</version>
</dependency>
Maven Central 提供了[spring-boot-starter-web](https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web)
,[spring-boot-starter-data-jdbc](https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-data-jdbc) ,
和[postgresql](https://mvnrepository.com/artifact/org.postgresql/postgresql)
的最新版本。
4.2.通知服務
由於通知機制是 PostgreSQL 特有的,我們將把它的一般行為封裝在一個類中: NotifierService
。通過這樣做,我們避免了這些細節洩漏到應用程序的其他部分。這也簡化了單元測試,因為我們可以將此服務替換為模擬版本以實現不同的場景。
NotifierService
有兩個職責。首先,它提供了一個外觀來發送與訂單相關的通知:
public class NotifierService {
private static final String ORDERS_CHANNEL = "orders";
private final JdbcTemplate tpl;
@Transactional
public void notifyOrderCreated(Order order) {
tpl.execute("NOTIFY " + ORDERS_CHANNEL + ", '" + order.getId() + "'");
}
// ... other methods omitted
}
其次,它有一個用於應用程序用於接收通知的Runnable
實例的工廠方法。該工廠採用PGNotification
對象的Consumer
,它具有檢索與通知關聯的通道和有效負載的方法:
public Runnable createNotificationHandler(Consumer<PGNotification> consumer) {
return () -> {
tpl.execute((Connection c) -> {
c.createStatement().execute("LISTEN " + ORDERS_CHANNEL);
PGConnection pgconn = c.unwrap(PGConnection.class);
while(!Thread.currentThread().isInterrupted()) {
PGNotification[] nts = pgconn.getNotifications(10000);
if ( nts == null || nts.length == 0 ) {
continue;
}
for( PGNotification nt : nts) {
consumer.accept(nt);
}
}
return 0;
});
};
}
在這裡,為簡單起見,我們選擇提供原始PGNotification
。在現實世界中,我們通常會處理多個域實體,我們可以使用泛型或類似技術擴展此類,以避免代碼重複。
關於創建的Runnable
的一些注意事項:
- 與數據庫相關的邏輯使用提供的
JdbcTemplate
的execute()
方法。這確保了正確的連接處理/清理並簡化了錯誤處理 - 回調一直運行,直到當前線程被中斷或某些運行時錯誤導致它返回。
注意使用PGConnection
而不是標準的 JDBC Connection
。我們需要它來直接訪問getNotifications()
方法,該方法返回一個或多個排隊的通知。
getNotifications()
有兩個變體。當不帶參數調用時,它會輪詢任何未決通知並返回它們。如果沒有,則返回 null。第二個變體接受一個整數,該整數對應於等待通知直到返回 null 的最長時間。最後,如果我們將 0(零)作為超時值傳遞, getNotifications()
將阻塞直到新通知到達。
在應用程序初始化期間,我們在Configuration class
中使用CommandLineRunner
bean,它Thread actually to start receiving notifications
:
@Configuration
public class ListenerConfiguration {
@Bean
CommandLineRunner startListener(NotifierService notifier, NotificationHandler handler) {
return (args) -> {
Runnable listener = notifier.createNotificationHandler(handler);
Thread t = new Thread(listener, "order-listener");
t.start();
};
}
}
4.3.連接處理
雖然在技術上可行,但使用同一連接處理通知和常規查詢並不方便。必須將對getNotification()
的調用與控制流一起分散,導致代碼難以閱讀和維護。
相反,標準做法是運行一個或多個專用線程來處理通知。每個線程都有自己的連接,會一直保持打開狀態。如果這些連接是由諸如 Hikari 或 DBCP 之類的池創建的,這可能會造成問題。
為了避免這些問題,我們的示例創建了一個專用的DriverDataSource
,然後我們用它來創建NotifierService
所需的JdbcTemplate
:
@Configuration
public class NotifierConfiguration {
@Bean
NotifierService notifier(DataSourceProperties props) {
DriverDataSource ds = new DriverDataSource(
props.determineUrl(),
props.determineDriverClassName(),
new Properties(),
props.determineUsername(),
props.determinePassword());
JdbcTemplate tpl = new JdbcTemplate(ds);
return new NotifierService(tpl);
}
}
請注意,我們共享用於創建 Spring 管理的主DataSource.
但是,我們不會將此專用DataSource
公開為 bean,這會禁用 Spring Boot 的自動配置功能。
4.4.通知處理程序
緩存邏輯的最後一部分是NotificationHandler
類,它實現了Consumer<Notification>
接口。此類的作用是處理單個通知並使用Order
實例填充已配置的Cache
:
@Component
public class NotificationHandler implements Consumer<PGNotification> {
private final OrdersService orders;
@Override
public void accept(PGNotification t) {
Optional<Order> order = orders.findById(Long.valueOf(t.getParameter()));
// ... log messages omitted
}
}
該實現使用getName()
和getParameter()
從通知中檢索渠道名稱和訂單標識符。在這裡,我們可以假設通知將始終是預期的通知。這不是出於懶惰,而是源於NotifierService
構造將在其上調用此處理程序的Runnable
方式。
實際邏輯很簡單:我們使用OrderRepository
從數據庫中獲取Order
並將其添加到緩存中:
@Service
public class OrdersService {
private final OrdersRepository repo;
// ... other private fields omitted
@Transactional(readOnly = true)
public Optional<Order> findById(Long id) {
Optional<Order> o = Optional.ofNullable(ordersCache.get(id, Order.class));
if (!o.isEmpty()) {
log.info("findById: cache hit, id={}",id);
return o;
}
log.info("findById: cache miss, id={}",id);
o = repo.findById(id);
if ( o.isEmpty()) {
return o;
}
ordersCache.put(id, o.get());
return o;
}
}
5. 測試
要查看通知機制的運行情況,最好的方法是啟動兩個或多個測試應用程序實例,每個實例都配置為偵聽不同的端口。我們還需要一個兩個實例都將連接到的工作 PostgreSQL 實例。請參考application.properties
文件並使用您的 PostgreSQL 實例連接詳細信息對其進行修改。
接下來,要啟動我們的測試環境,我們將打開兩個 shell 並使用 Maven 來運行應用程序。項目的pom.xml
包含一個額外的配置文件instance1
,它將在不同的端口上啟動應用程序:
# On first shell:
$ mvn spring-boot:run
... many messages (omitted)
[ restartedMain] osbwembedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
[ restartedMain] cbmessaging.postgresql.Application : Started Application in 2.615 seconds (JVM running for 2.944)
[ restartedMain] cbmpconfig.ListenerConfiguration : Starting order listener thread...
[ order-listener] cbmpservice.NotifierService : notificationHandler: sending LISTEN command...
## On second shell
... many messages (omitted)
[ restartedMain] osbwembedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8081 (http) with context path ''
[ restartedMain] cbmessaging.postgresql.Application : Started Application in 1.984 seconds (JVM running for 2.274)
[ restartedMain] cbmpconfig.ListenerConfiguration : Starting order listener thread...
[ order-listener] cbmpservice.NotifierService : notificationHandler: sending LISTEN command...
一段時間後,我們應該會在每個應用程序上看到一條日誌消息,通知我們應用程序已準備好接收請求。現在,讓我們在另一個 shell 上使用curl
創建我們的第一個Order
:
$ curl --location 'http://localhost:8080/orders/buy' \
--form 'symbol="BAEL"' \
--form 'price="13.34"' \
--form 'quantity="500"'
{"id":30,"symbol":"BAEL","orderType":"BUY","price":13.34,"quantity":500}
在端口 8080 上運行的應用程序實例將打印一些消息。我們還將看到 8081 實例日誌顯示它收到了通知:
[ order-listener] cbmpservice.NotificationHandler : Notification received: pid=5141, name=orders, param=30
[ order-listener] cbmpostgresql.service.OrdersService : findById: cache miss, id=30
[ order-listener] cbmpservice.NotificationHandler : order details: Order(id=30, symbol=BAEL, orderType=BUY, price=13.34, quantity=500.00)
這是該機制按預期工作的證據。
最後,我們可以再次使用curl
來查詢在instance1
上創建的訂單:
curl http://localhost:8081/orders/30
{"id":30,"symbol":"BAEL","orderType":"BUY","price":13.34,"quantity":500.00}
正如預期的那樣,我們獲得了Order
詳細信息。此外,應用程序日誌還顯示此信息來自緩存:
[nio-8081-exec-1] cbmpostgresql.service.OrdersService : findById: cache hit, id=30
六,結論
在本文中,我們介紹了 PostgreSQL 的NOTIFY/LISTEN
機制,以及我們如何使用它來實現一個沒有額外組件的輕量級消息代理。
像往常一樣,所有代碼都可以在 GitHub 上找到。