Java中的進程間通訊方法
一、簡介
我們之前研究過進程間通訊 (IPC),並了解了不同方法之間的一些效能比較。在本文中,我們將了解如何在 Java 應用程式中實作其中一些方法。
2.什麼是進程間通訊?
進程間通信,簡稱IPC,是一種不同進程之間進行通訊的機制。這可以是形成同一應用程式的各種進程,可以是在同一台電腦上運行的不同進程,也可以是分佈在互聯網上的其他進程。
例如,某些 Web 瀏覽器將每個標籤作為不同的作業系統進程運行。這樣做是為了使它們彼此隔離,但確實需要選項卡進程和主瀏覽器進程之間有一定程度的 IPC,以保持一切正常工作。
我們在這裡看到的一切都將以訊息傳遞的形式出現。 Java 缺乏對共享記憶體機制的標準支持,儘管一些第三方函式庫可以促進這一點。因此,我們將考慮向消費流程發送訊息的生產流程。
3. 基於檔案的IPC
我們可以在標準 Java 中實現的最簡單的 IPC 形式就是使用本機檔案系統上的檔案。一個行程可以寫入文件,而另一個行程可以讀取相同文件。任何進程使用進程邊界以外的檔案系統執行的任何操作都可以被同一台電腦上的所有其他進程看到。
3.1.共享文件
我們可以從讓兩個行程讀取和寫入同一個檔案開始。我們的生產進程將寫入檔案系統上的文件,稍後,我們的消費進程將從同一文件中讀取。
我們確實需要小心,寫入檔案和讀取檔案不要重疊。在許多電腦上,檔案系統操作不是原子的,因此如果寫入和讀取同時發生,則消費進程可能會收到損壞的訊息。然而,如果我們能夠保證這一點(例如,使用檔案系統鎖定),那麼共用檔案就是促進 IPC 的直接方法。
3.2.共享目錄
共享單一眾所周知的文件的一個步驟是共享整個目錄。我們的生產應用程式可以在每次需要時將新檔案寫入目錄,而我們的消費應用程式可以檢測新檔案的存在並對其做出反應。
Java 在 NIO2 中有WatchService
API,我們可以使用它來實現這一點。我們的消費進程可以使用它來監視我們的目標目錄,每當它通知我們已經建立了一個新檔案時,我們就可以對其做出反應:
WatchService watchService = FileSystems.getDefault().newWatchService();
Path path = Paths.get("pathToDir");
path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE);
WatchKey key;
while ((key = watchService.take()) != null) {
for (WatchEvent<?> event : key.pollEvents()) {
// React to new file.
}
key.reset();
}
完成此操作後,我們的生產程序需要在此目錄中建立適當的文件,並且消費進程將檢測並處理它們。
但請記住,大多數檔案系統操作都不是原子的。我們必須保證只有當文件完全寫入時才會觸發文件創建事件。這通常是透過將檔案寫入臨時目錄,然後在完成後將其移至目標目錄來完成的。
在大多數檔案系統上,「移動檔案」或「重新命名檔案」操作只要發生在同一檔案系統中就被視為原子操作。
3.3.命名管道
到目前為止,我們已經使用完整的文件在進程之間傳遞訊息。這要求生產進程在消費進程讀取該檔案之前已寫入整個檔案。
命名管道是我們可以在這裡使用的一種特殊類型的檔案。命名管道是檔案系統上的條目,但背後沒有任何儲存。相反,它們充當寫入和讀取過程之間的管道。
我們首先讓我們的消費進程打開命名管道進行讀取。因為這個命名管道在檔案系統上作為檔案呈現,所以我們使用標準檔案 IO 機制來執行此操作:
BufferedReader reader = new BufferedReader(new FileReader(file));
String line;
while ((line = reader.readLine()) != null) {
// Process read line
}
然後,寫入此命名管道的所有內容都將立即被此消耗進程讀取。這意味著我們的生產過程需要正常打開這個文件並寫入。
不幸的是,我們沒有一種機制可以在 Java 中建立這些命名管道。相反,我們需要使用標準作業系統命令來建立檔案系統條目,然後我們的程式才能使用它。我們具體如何做到這一點因作業系統而異。例如,在 Linux 上,我們將使用mkfifo
指令:
$ mkfifo /tmp/ipc-namedpipe
然後,我們可以在消費和生產過程中使用/tmp/ipc-namedpipe
。
4.基於網路的IPC
我們所看到的一切都圍繞著共享同一檔案系統的兩個進程。這意味著它們需要在同一台電腦上運行。然而,在某些情況下,我們希望我們的進程能夠相互通信,而不管它們運行在哪台電腦上。
我們可以透過使用基於網路的 IPC 來實現這一點。本質上,這只是在一個進程中運行網頁伺服器,在另一個進程中運行網路用戶端。
4.1.簡單插座
實現基於網路的 IPC 最明顯的例子是使用簡單的網路套接字。我們可以使用 JDK 中的套接字支持,也可以依賴 Netty 或 Grizzly 等函式庫。
我們的消費進程將運行一個偵聽已知位址的網路伺服器。然後,它可以像任何網頁伺服器一樣處理傳入連線和訊息:
try (ServerSocket serverSocket = new ServerSocket(1234)) {
Socket clientSocket = serverSocket.accept();
PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
String line;
while ((line = in.readLine()) != null) {
// Process read line
}
}
然後,生產進程可以向其發送網路訊息以方便我們的 IPC:
try (Socket clientSocket = new Socket(host, port)) {
PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
out.println(msg);
}
值得注意的是,與基於文件的 IPC 相比,我們可以更輕鬆地雙向發送訊息。
4.2.吉米克斯
使用網路套接字效果很好,但是有很多複雜性需要我們自己管理。作為替代方案,我們也可以使用 JMX。從技術上講,這仍然使用基於網路的 IPC,但它將網路從我們手中抽象化出來,因此我們僅在 MBean 方面進行工作。
和以前一樣,我們需要一個在我們的消費進程上運行的伺服器。然而,這個伺服器現在是我們來自 JVM 的標準MBeanServer
,而不是我們自己做的任何事情。
我們首先需要定義 MBean 本身:
public interface IPCTestMBean {
void sendMessage(String message);
}
class IPCTest implements IPCTestMBean {
@Override
public void sendMessage(String message) {
// Process message
}
}
然後,我們可以將其提供給 JVM 中的MBeanServer
:
ObjectName objectName = new ObjectName("com.baeldung.ipc:type=basic,name=test");
MBeanServer server = ManagementFactory.getPlatformMBeanServer();
server.registerMBean(new IPCTest(), objectName);
至此,我們的消費者已經準備好了。
然後,我們可以使用JMXConnectorFactory
實例從我們的生產系統向該伺服器發送訊息:
JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1234/jmxrmi");
try (JMXConnector jmxc = JMXConnectorFactory.connect(url, null)) {
ObjectName objectName = new ObjectName("com.baeldung.ipc:type=basic,name=test");
IPCTestMBean mbeanProxy = JMX.newMBeanProxy(jmxc.getMBeanServerConnection(), objectName, IPCTestMBean.class, true);
mbeanProxy.sendMessage("Hello");
}
請注意,為此,我們需要使用一些額外的 JVM 參數來運行我們的使用者,以在眾所周知的連接埠上公開 JMX :
-Dcom.sun.management.jmxremote=true
-Dcom.sun.management.jmxremote.port=1234
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
然後,我們需要在客戶端的 URL 中使用它,以便它可以連接到正確的伺服器。
5. 訊息傳遞基礎設施
到目前為止我們所看到的都是一種相對簡單的IPC 手段。在某個時刻,這也會停止工作。例如,它假設只有一個進程在消費訊息,或者生產者確切地知道要與哪個消費者交談。
如果我們需要超越這一點,我們可以使用 JMS、AMPQ 或 Kafka 等與專用訊息傳遞基礎架構整合。
顯然,這比我們在這裡討論的規模要大得多——這將允許一整套生產和消費系統在彼此之間傳遞訊息。但是,如果我們需要這種規模,那麼這些選項確實存在。
六,結論
我們已經了解了進程之間 IPC 的幾種不同方式以及我們如何自己實現它們。這涵蓋了從共享個人文件到企業級規模的一系列規模。
下次您需要多個進程相互通訊時,為什麼不考慮其中一些選項呢?
與往常一樣,本文中的所有程式碼都可以在 GitHub 上找到。