使用 PostgreSQL 在 Java 中實現事件驅動的 LISTEN/NOTIFY 支持
1. 簡介
在本文中,我們將了解 PostgreSQL 中的[LISTEN](https://www.postgresql.org/docs/current/sql-listen.html)
和[NOTIFY](https://www.postgresql.org/docs/current/sql-notify.html)
指令。我們將了解它們是什麼、如何使用它們以及如何在應用程式中使用它們。
2.什麼是LISTEN
和NOTIFY
?
PostgreSQL 支援使用LISTEN
和NOTIFY
指令在伺服器和連線的用戶端之間進行非同步通訊。這些 PostgreSQL 特有的擴充功能使我們能夠將資料庫用作一個簡單的訊息系統,從而允許我們從資料庫中產生客戶端可以回應的事件。這可用於許多用途,例如即時儀表板、快取失效、資料審計等等。
2.1. 監聽通知
我們使用LISTEN
指令來註冊接收事件的興趣。指令會取得我們要監聽的頻道名稱:
postgres=# LISTEN my_channel;
LISTEN
一旦我們完成此操作,我們的連接就可以接收此通道上發生的事件的非同步通知。
每個對這些通知感興趣的連接都會收到它們,因此系統實際上是廣播訊息,而不是將它們發送給單一接收者。這意味著我們可以使用此機制輕鬆地將資料庫中正在發生的事件告知每個客戶端。
請注意,如果我們使用psql
,則不會自動接收通知。相反,我們需要再次執行LISTEN
命令,這樣我們就會顯示自上次執行以來發出的所有通知:
postgres=# LISTEN my_channel;
LISTEN
.....
postgres=# LISTEN my_channel;
LISTEN
Asynchronous notification "my_channel" with payload "Hello, World!" received from server process with PID 66.
在這裡,我們可以看到某個連結引發了帶有有效載荷「Hello, world!」的事件,而我們的監聽連線已經收到了通知。
雖然我們可以註冊的監聽器數量沒有上限,但每個監聽器必須保持其資料庫連線開啟才能接收通知,因此最大連線數限制實際上起到了限制的作用。此外,每個監聽器都會使用一定程度的資源,因此過多的監聽器可能會導致效能問題。
2.2. 發出通知
現在我們知道如何監聽事件,接下來我們需要能夠觸發這些事件。我們可以使用NOTIFY
指令來觸發事件。該命令需要傳入頻道名稱和要傳送的訊息:
postgres=# NOTIFY my_channel, 'Hello, World!';
NOTIFY
當執行此命令時,所有先前執行過對應LISTEN
命令的連線都可以接收此事件,正如我們之前所看到的。
我們的有效載荷是可選的,但如果我們沒有提供或提供NULL
,系統將視為提供了空字串。它的最大大小為 8,000 位元組。如果我們嘗試傳送超過該值的內容,則會收到錯誤,並且不會通知任何監聽器。
通知會參與事務。這意味著,如果我們在活動事務期間發出通知,系統將不會發送通知,直到事務提交為止。這也意味著,如果事務回滾,系統將根本不會發送通知。
2.3. 動態消息
NOTIFY
指令要求發送的訊息必須精確指定。我們無法動態產生該訊息,包括簡單的字串連線:
postgres=# NOTIFY my_channel, 'Hello, ' || 'World';
ERROR: syntax error at or near "||"
LINE 1: NOTIFY my_channel, 'Hello, ' || 'World!';
但是,我們可以使用pg_notify
函數來產生通知。它可以接收任何形式的訊息:
postgres=# SELECT pg_notify('my_channel', 'Hello, ' || 'World!');
pg_notify
-----------
(1 row)
在這種情況下,頻道名稱必須以字串形式提供,有效負載也必須以單獨的字串形式提供。我們可以根據需要以任何方式建構這些字串,包括使用 SQL 語句的結果。
2.4. 從觸發器引發事件
我們可以透過執行適當的語句自行觸發事件,也可以讓資料庫自動觸發事件。例如,我們可以註冊在適當時間執行的觸發器函數,這些函數也可以產生以下通知:
CREATE OR REPLACE FUNCTION notify_table_change() RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify('table_change', TG_TABLE_NAME);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER table_change
AFTER INSERT OR UPDATE OR DELETE ON table_name
FOR EACH ROW EXECUTE PROCEDURE notify_table_change();
完成此操作後,如果在table_name
表中建立、更新或刪除行,此觸發器將自動在table_change
通道上發送帶有更改的表名稱的通知。
3. 使用 JDBC 發出通知
我們可以從 JDBC 發出通知,就像我們已經看到的一樣。
首先,我們需要連接資料庫。目前我們可以使用官方驅動程式。讓我們將它們添加到我們的構建中:
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.7.6</version>
</dependency>
然後我們可以正常建立連線:
Connection connection = DriverManager.getConnection("jdbc:postgresql://localhost:5432/postgres", "postgres", "mysecretpassword");
一旦連線成功,我們就可以使用NOTIFY
指令或pg_notify()
函數發出通知:
try (Statement statement = connection.createStatement()) {
statement.execute("NOTIFY my_channel, 'Hello, NOTIFY!'");
}
正如我們之前看到的,如果我們想要做的不僅僅是一個空字串,例如使用綁定參數,那麼我們需要使用pg_notify
:
try (PreparedStatement statement = connection.prepareStatement("SELECT pg_notify(?, ?)")) {
statement.setString(1, "my_channel");
statement.setString(2, "Hello, pg_notify!");
statement.execute();
}
兩種方法的工作方式相同,系統會按照預期傳遞通知:
postgres=# postgres=# LISTEN my_channel;
LISTEN
Asynchronous notification "my_channel" with payload "Hello, NOTIFY!" received from server process with PID 390.
Asynchronous notification "my_channel" with payload "Hello, pg_notify!" received from server process with PID 390.
在這裡,我們可以看到我們的監聽會話成功接收了 Java 程式碼發出的兩個通知。
4. 使用官方 JDBC 驅動程式
雖然使用 JDBC 發出通知很簡單,但監聽通知卻比較複雜。從資料庫接收非同步訊息並不是 JDBC 規範的正式組成部分,因此我們需要藉助驅動程式特定的功能。
我們需要做的第一件事是執行LISTEN
語句:
try (Statement statement = connection.createStatement()) {
statement.execute("LISTEN my_channel");
}
但是,為了接收通知本身,我們需要在原始PGConnection
物件上使用getNotifications()
方法。這意味著我們首先需要確保獲取到正確的連接類型:
PGConnection pgConnection = connection.unwrap(org.postgresql.PGConnection.class);
然後,我們呼叫getNotifications()
來取得已收到的所有通知。我們需要循環執行此操作,並以適當的頻率輪詢資料庫:
while (!Thread.currentThread().isInterrupted()) {
PGNotification[] notifications = pgConnection.getNotifications(1000);
if (notifications != null) {
// React to notifications
}
}
收到通知後,我們可以按照自己喜歡的方式做出反應。但是,直到下次呼叫getNotifications()
時,我們才會收到更多通知,因此,如果我們需要有效地對更多通知做出反應,請記住不要停止此循環。
我們有三種不同的getNotifications()
呼叫方式。最簡單的方法是不帶參數,這樣它會立即傳回所有未完成的通知。然而,這不是推薦的解決方案。還有一個版本,它會設定一個超時時間(以毫秒為單位),執行緒會阻塞該時間:
PGNotification[] notifications = pgConnection.getNotifications(100);
在這種情況下,通話將在逾時或任何通知可用後返回(以先發生者為準)。
如果我們在呼叫此版本時將超時值設為0
,那麼它將永遠阻塞。這實際上意味著我們只會在收到任何通知時才返回。如果我們在專用執行緒上執行此方法,那麼這將使管理更加輕鬆,因為我們不再需要進行任何空閒等待。
5. 使用 PGJDBC-NG 進行監聽
如果我們想在不輪詢資料庫的情況下接收通知,可以使用一些替代驅動程式來實現。 PGJDBC -NG 驅動程式與 PostgreSQL 相容,同時提供一些更進階的功能,包括註冊通知回呼的功能。
在使用它們之前,我們需要將它們添加到我們的構建中:
<dependency>
<groupId>com.impossibl.pgjdbc-ng</groupId>
<artifactId>pgjdbc-ng</artifactId>
<version>0.8.9</version>
</dependency>
然後,我們可以像平常一樣建立連接,只是這次我們使用jdbc:pgsql
類型的 URL,而不是jdbc:postgresql
。例如:
Connection connection = DriverManager.getConnection("jdbc:pgsql://localhost:5432/postgres", "postgres", "mysecretpassword");
我們仍然需要在連線上執行LISTEN
命令,與之前完全相同。不過,這次我們可以註冊一個監聽器,以便在通知發生時接收回呼。為此,我們需要實作PGNotificationListener
介面:
class Listener implements PGNotificationListener {
@Override
public void notification(int processId, String channelName, String payload) {
LOG.info("Received notification: Channel='{}', Payload='{}', PID={}",
channelName, payload, processId);
}
}
然後我們可以使用我們的連線註冊一個實例:
PGConnection pgConnection = connection.unwrap(com.impossibl.postgres.api.jdbc.PGConnection.class);
pgConnection.addNotificationListener(new Listener());
此時,只要連線處於活動狀態,我們就會在發出通知時自動收到通知,而無需輪詢資料庫:
10:34:03.104 [PG-JDBC I/O (1)] INFO com.baeldung.listennotify.JdbcLiveTest -- Received notification: Channel='my_channel', Payload='Hello, NOTIFY!', PID=844
10:34:03.106 [PG-JDBC I/O (1)] INFO com.baeldung.listennotify.JdbcLiveTest -- Received notification: Channel='my_channel', Payload='Hello, pg_notify!', PID=844
這不僅讓我們更容易管理,而且對我們的應用程式也更有高效,因為我們不再需要輪詢資料庫等待某些事情發生。
6. 結論
本文簡要介紹了 PostgreSQL 中的LISTEN
和NOTIFY
指令,以及如何在 JDBC 連線中使用它們。下次您需要從資料庫觸發事件時,不妨嘗試一下。
與往常一樣,本文中的所有範例都可以在 GitHub 上找到。