JeroMQ簡介
一、簡介
在本文中,我們將了解JeroMQ ,它是ZeroMQ的純 Java 實現。我們將看看它是什麼以及它可以在我們的應用程序中為我們做什麼。
2.ZeroMQ是什麼?
ZeroMQ 是一種消息傳遞基礎設施,不需要設置任何實際的基礎設施服務。我們不需要像 ActiveMQ 或 Kafka 等其他實現那樣需要單獨的消息代理。相反,我們應用程序中的 ZeroMQ 依賴項能夠為我們完成所有這些工作。
那麼,我們能用這個做什麼呢?我們可以實現我們通常想要的所有標準消息傳遞模式:
- 請求/響應
- 發布/訂閱
- 同步與異步
- 和別的
2.1.插座
ZeroMQ 使用套接字的概念。它們在概念上與我們在低級網絡編程中使用的套接字非常相似。
所有套接字都有一個類型,我們將在本文中看到其中一些類型。然後,它們要么偵聽來自其他套接字的連接,要么打開與其他套接字的連接。一旦一對套接字連接起來,我們就可以在它們之間發送消息了。請注意,只有某些套接字組合可以一起使用,具體取決於我們想要實現的目標。
JeroMQ 還支持套接字之間的幾種不同的傳輸機制。例如,常見的包括:
-
tcp://<host>:<port>
– 使用 TCP/IP 網絡在套接字之間發送消息。這可以允許套接字位於不同的進程和不同的主機上,但確實帶來了網絡所具有的一些可靠性問題。 -
ipc://<endpoint>
– 這使用系統相關的機制在套接字之間發送消息。這允許套接字是不同的進程,但它們必須位於同一主機上,並且進程可以通信可能存在其他系統限制。 -
inproc://<name>
– 這允許同一進程中的套接字之間進行通信。具體來說,它們必須位於同一 JeroMQ 上下文中。
運輸的具體選擇將取決於我們的需求。根據具體的傳輸和套接字類型,我們還可以使用它與其他 ZeroMQ 實現進行通信,包括其他語言。
3. 入門
JeroMQ 是 ZeroMQ 的純 Java 實現。那麼讓我們快速了解一下在我們的應用程序中使用它。
3.1.依賴關係
我們需要的第一件事是添加依賴項:
<dependency>
<groupId>org.zeromq</groupId>
<artifactId>jeromq</artifactId>
<version>0.5.3</version>
</dependency>
我們可以在Maven Central Repository中找到最新版本。
3.2. JeroMQ 上下文
在使用 JeroMQ 執行任何操作之前,我們需要設置一個上下文。這是ZContext
類的一個實例,負責管理一切。
創建上下文沒有什麼特別的——我們可以簡單地使用new ZContext()
。我們還必須確保使用close()
方法正確關閉它。這可確保我們正確釋放任何網絡資源。
我們正在使用的實例必須至少與我們正在執行的任何操作一樣長,因此我們需要確保它是在應用程序開始時創建的,並且直到結束才關閉。
如果我們正在編寫標準 Java 應用程序,則可以簡單地使用 try-with-resources 模式。如果我們使用 Spring 之類的東西,那麼我們可以將其設置為配置有 destroy 方法的 bean。以及我們正在使用的框架所需的其他模式。
3.3.創建套接字
一旦我們有了上下文,我們就可以用它來創建套接字。這些套接字是我們所有消息傳遞的基礎。
我們使用ZContext.createSocket()
方法創建一個套接字,並提供我們想要使用的套接字類型。完成此操作後,我們通常需要調用ZMQ.Socket.bind()
來偵聽連接,或ZMQ.Socket.connect()
來打開與另一個套接字的連接。
此時,我們已經準備好使用我們的套接字了。消息使用send()
等方法發送,使用recv().
完成後我們可以關閉套接字以斷開連接。我們可以通過顯式調用Socket.close()
或通過關閉ZContext
來完成此操作,然後,從它創建的所有套接字都會自動關閉。
請注意,套接字不是線程安全的。我們可以在線程之間傳遞它們,但重要的是一次只有一個線程訪問它們。
4. 請求/響應消息傳遞
讓我們從簡單的請求/響應設置開始。我們首先需要的是服務器。這是監聽傳入連接的部分:
try (ZContext context = new ZContext()) {
ZMQ.Socket socket = context.createSocket(SocketType.REP);
socket.bind("tcp://*:5555");
byte[] reply = socket.recv();
// Do something here.
String response = "world";
socket.send(response.getBytes(ZMQ.CHARSET), 0);
}
這裡我們創建了一個REP
類型的新套接字——Reply 的縮寫。我們可以指示它在進入循環之前開始偵聽給定地址,在循環中我們從套接字接收下一條消息,對其執行某些操作,然後發迴響應。
接下來,我們需要一個客戶。這是打開與服務器的連接的一側。它也是必鬚髮送初始請求的一方 - 我們的服務器只能回复它收到的請求:
try (ZContext context = new ZContext()) {
ZMQ.Socket socket = context.createSocket(SocketType.REQ);
socket.connect("tcp://localhost:5555");
String request = "Hello";
socket.send(request.getBytes(ZMQ.CHARSET), 0);
byte[] reply = socket.recv();
}
和以前一樣,我們創建一個新的套接字。只是這一次,它是REQ
類型——Request 的縮寫。然後,我們指示它在發送消息和接收響應之前連接到某個地方的另一個套接字。
REQ
和REP
端之間的主要區別在於何時允許它們發送消息。 REQ
端可以隨時發送消息,而REP
端只能在收到消息後才發送消息——因此有請求和響應。
4.1.多個客戶端
我們在這裡看到瞭如何讓單個客戶端向單個服務器發送消息。但是如果我們想要有多個客戶端怎麼辦?
好消息是,它確實有效。 JeroMQ 將允許任意數量的客戶端連接到同一服務器地址,並且它將為我們處理所有網絡需求。
然而,這是如何運作的呢?我們的服務器中沒有任何內容表明將響應發送到哪個客戶端。這是因為我們不需要它。 JeroMQ 為我們跟踪這一切。當服務器調用send()
時,消息將發送到我們上次收到消息的客戶端。這使得我們的代碼不需要關心任何這些。
缺點是我們的處理必須完全是單線程的。由於其工作原理,我們需要完成一條消息的所有處理並在收到下一條消息之前發送回复。對於某些場景來說,這很好,但通常這會成為一個很大的瓶頸。
4.2.異步處理
相反,如果我們希望能夠異步處理傳入請求並無序發送響應怎麼辦?我們無法通過 REQ/REP 設置輕鬆做到這一點,因為每個響應都直接發送到最後收到的請求。
相反,我們可以使用不同類型的套接字 – ROUTER
來完成此操作。這與REP
的工作原理非常相似,只不過我們有責任指示消息的收件人是誰。
讓我們看一下服務器組件:
try (ZContext context = new ZContext()) {
ZMQ.Socket broker = context.createSocket(SocketType.ROUTER);
broker.bind("tcp://*:5555");
String identity = broker.recvStr();
broker.recv(); // Envelope delimiter
String message = broker.recvStr(0);
// Do something here.
broker.sendMore(identity);
broker.sendMore("");
broker.send("Hello back");
}
這看起來很相似,但又不完全相同。我們將套接字類型設置為ROUTER
而不是REP
。這種套接字類型允許服務器通過了解特定客戶端的身份來將消息路由到特定客戶端。
當我們在這裡接收消息時,實際上接收到了三種不同的數據。首先,我們接收客戶端的身份,然後是信封分隔符,然後是實際的消息。
同樣,當我們發送消息時,我們也需要這樣做。我們發送消息所針對的客戶端的身份,後跟信封分隔符(可以是任何字符串),然後是實際消息。
我們看一下客戶端:
try (ZContext context = new ZContext()) {
ZMQ.Socket worker = context.createSocket(SocketType.REQ);
worker.setIdentity(Thread.currentThread().getName().getBytes(ZMQ.CHARSET));
worker.connect("tcp://localhost:5555");
worker.send("Hello " +
String workload = worker.recvStr();
// Do something with the response.
}
這與我們之前的客戶幾乎相同。我們現在已經為客戶端提供了一個身份,以便服務器知道哪個客戶端是哪個。如果沒有這個,服務器將無法將響應定向到正確的客戶端。除此之外,這與我們之前看到的相同。
因為我們的服務器現在可以指示消息是針對哪個客戶端的,所以我們可以突然一次處理多個請求 - 例如,使用執行程序服務。唯一的要求是我們永遠不能有多個線程同時訪問套接字。
5. 發布/訂閱消息傳遞
到目前為止,我們已經看到了客戶端發送初始請求,然後服務器發迴響應的情況。相反,如果我們想讓服務器只廣播客戶端可以使用的事件怎麼辦?
我們可以使用 Pub/Sub 模式來做到這一點。服務器將發布消息,然後訂閱者將消費該消息。那麼,這看起來怎麼樣?
首先我們需要有我們的發布者:
try (ZContext context = new ZContext()) {
ZMQ.Socket pub = context.createSocket(SocketType.PUB);
pub.bind("tcp://*:5555");
// Wait until something happens.
pub.send("Hello");
}
這看起來非常簡單,但這是因為 JeroMQ 為我們管理了大部分複雜性。我們所做的就是創建一個PUB
類型的套接字(Publish 的縮寫),偵聽連接,然後向其發送消息。
接下來,我們需要一個訂閱者:
try (ZContext context = new ZContext()) {
ZMQ.Socket sub = context.createSocket(SocketType.SUB);
sub.connect("tcp://localhost:5555");
sub.subscribe("".getBytes());
String message = sub.recvStr();
}
這稍微複雜一些,但仍然不是很多。在這裡,我們創建一個SUB
類型的套接字(Subscribe 的縮寫)並將其連接到我們的發布者。然後我們需要訂閱消息。這需要一組字節作為所有傳入消息的前綴,或者使用一組空字節來訂閱所有消息。
完成此操作後,我們就可以接收消息了。我們收到訂閱者發送的任何適當的消息。請注意,我們只能接收訂閱後發送的消息 - 在此之前發送的任何消息都將丟失。
5.1.多個客戶端
和以前一樣,如果我們想要有多個客戶端,那麼我們可以這樣做。每個連接的訂閱者都將收到發布者發送的所有適當的消息,這意味著這充當多播 - 例如,類似於 JMS 主題而不是 JMS 隊列。
我們還可以讓不同的客戶擁有不同的訂閱。這意味著它們各自僅獲得廣播消息的適當子集。所有這一切都完全按照我們的預期進行,我們無需付出任何額外的努力。
5.2.異步處理
我們遇到的一個問題是, recv()
方法會阻塞,直到有消息可用為止。如果我們的訂閱者只是等待來自該套接字的消息然後對它們做出反應,那麼就可以了。然而,如果我們希望我們的訂閱者做其他事情——例如等待多個套接字——那麼這不起作用。
我們使用的recv()
或recvStr()
方法有一個替代簽名,允許提供一些標誌。如果提供標誌ZMQ.DONTWAIT
,這將導致該方法立即返回而不是阻塞。如果沒有消息可供讀取,則返回null
。
這將允許我們輪詢套接字以查看是否有消息在等待,如果有則處理它,如果沒有,則在此期間執行其他操作。
六,結論
在這裡,我們看到了關於使用 JeroMQ 可以實現的目標的非常簡短的介紹。然而,我們可以用它做的事情比我們在這裡介紹的要多得多。下次您需要在應用程序中執行任何形式的消息傳遞時,為什麼不嘗試一下呢?
與往常一樣,我們可以在 GitHub 上找到本文中的所有代碼。