Netty 中的自訂事件處理程序和偵聽器
一、簡介
在本教程中,我們將使用 Netty 建立一個聊天室應用程式。在網路程式設計中,Netty 作為一個強大的框架而脫穎而出,它簡化了非同步 I/O 操作的複雜性。我們將探討如何建立一個基本的聊天伺服器,多個客戶端可以在其中連接並進行即時對話。
2. 場景
發送到伺服器的訊息將轉發到所有連接的客戶端。它還會保留最近發送的幾條訊息的列表,以便新客戶端在連接時可以從當前對話中獲得上下文。為此,我們只需要幾個事件處理程序來維持通道之間的通訊:
在 Netty 中,通訊是透過通道完成的,通道抽象化了任何協定上的非同步 I/O 操作。這使我們能夠專注於應用程式邏輯而不是網頁程式碼。我們的應用程式將透過命令列運行。我們將編寫一個伺服器和一個客戶端應用程式。
3. 建立自訂事件處理程序
對於通道之間的通信,我們將實作SimpleChannelInboundHandler<String>
,它是ChannelInboundHandlerAdapter
的通用實作。此適配器使我們能夠專注於僅實現我們關心的事件。在本例中,它是channelRead0()
,當從伺服器接收到訊息時呼叫它。我們將使用它來簡化我們的用例,因為我們只交換String
訊息。
3.1.客戶端事件處理程序
讓我們從客戶端訊息的處理程序開始,它將把伺服器接收到的任何內容列印到控制台,無需修改:
public class ClientEventHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
System.out.println(msg);
}
}
稍後,我們將透過直接寫入通道來處理訊息發送。
3.2.訊息對象
在我們繼續討論伺服器事件之前,讓我們先寫一個 POJO 來表示發送到伺服器的每個訊息。我們將註冊發送的日期以及用戶名和訊息:
public class Message {
private final Instant time;
private final String user;
private final String message;
public Message(String user, String message) {
this.time = Instant.now();
this.user = user;
this.message = message;
}
// standard getters...
}
然後,我們將包括一些幫助程序,首先是伺服器發送訊息時訊息如何顯示在控制台上:
@Override
public String toString() {
return time + " - " + user + ": " + message;
}
然後,為了解析客戶端收到的訊息,我們將使用 CSV 格式。當我們建立客戶端應用程式時,我們將看到客戶端如何以這種格式發送訊息:
public static Message parse(String string) {
String[] arr = string.split(";", 2);
return new Message(arr[0], arr[1]);
}
將分割限制為 2 很重要,因為訊息部分可能包含分號。
3.3.伺服器事件處理程序
在我們的伺服器事件處理程序中,我們將首先為我們將要覆蓋的其他事件建立一個輔助方法。此外,我們還需要一個已連接客戶端的映射和一個Queue
來最多保留MAX_HISTORY
元素:
public class ServerEventHandler extends SimpleChannelInboundHandler<String> {
static final Map<String, Channel> clients = new HashMap<>();
static final Queue<String> history = new LinkedList<>();
static final int MAX_HISTORY = 5;
private void handleBroadcast(Message message, ChannelHandlerContext context) {
String channelId = context.channel()
.id()
.asShortText();
clients.forEach((id, channel) -> {
if (!id.equals(channelId))
channel.writeAndFlush(message.toString());
});
// history-control code...
}
// ...
}
首先,我們取得通道 ID 作為地圖的鍵。然後,對於廣播,對於每個連接的客戶端(不包括發送者),我們中繼他們的訊息。
值得注意的是writeAndFlush()
接收一個Object
。而且,由於我們的處理程序只能處理字串,因此必須呼叫toString()
以便客戶端可以正確接收它。
最後,我們進行歷史控制。每次新增訊息時,如果清單超過MAX_HISTORY
項,我們就會刪除最舊的訊息:
history.add(message.toString());
if (history.size() > MAX_HISTORY)
history.poll();
現在,我們可以重寫channelRead0()
並解析從客戶端收到的訊息:
@Override
public void channelRead0(ChannelHandlerContext context, String msg) {
handleBroadcast(Message.parse(msg), context);
}
然後,對於每個上線的客戶端,我們將其添加到我們的clients
清單中,中繼舊訊息以獲取上下文,並發送一條系統訊息宣布新客戶端:
@Override
public void channelActive(final ChannelHandlerContext context) {
Channel channel = context.channel();
clients.put(channel.id().asShortText(), channel);
history.forEach(channel::writeAndFlush);
handleBroadcast(new Message("system", "client online"), context);
}
最後,我們重寫channelInactive()
,在客戶端離線時呼叫。這次,我們只需要從清單中刪除客戶端並發送系統訊息:
@Override
public void channelInactive(ChannelHandlerContext context) {
Channel channel = context.channel();
clients.remove(channel.id().asShortText());
handleBroadcast(new Message("system", "client offline"), context);
}
4. 伺服器引導應用程式
我們的處理程序不會獨立執行任何操作,因此我們需要一個應用程式來引導並運行它,這是一個通用模板。
4.1.在ChannelPipeline
中註冊自訂元件
為了準備引導程序,我們選擇一個通道實作並實作一個子處理程序,該處理程序為通道的請求提供服務:
bootstrap.group(serverGroup, clientGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) {
channel.pipeline()
.addFirst(
new StringDecoder(),
new ServerEventHandler()
new StringEncoder());
}
});
在子處理程序中,我們定義處理管道。由於我們只關心String
訊息,因此我們將使用內建的String
編碼器和解碼器,這樣就不必自己對交換的位元組緩衝區進行編碼/解碼,從而節省了一些時間。
最後,由於順序很重要,我們加入解碼器、 ServerEventHandler,
和編碼器。這是因為事件透過管道從入站流向出站。
我們將伺服器綁定到主機/連接埠來完成我們的應用程序,該應用程式傳回一個ChannelFuture
。我們將使用它來等待非同步套接字透過sync()
關閉:
ChannelFuture future = bootstrap.bind(HOST, PORT).sync();
System.out.println("server started. accepting clients.");
future.channel().closeFuture().sync();
5. 客戶端引導應用程式
最後,我們的客戶端應用程式遵循通用客戶端模板進行引導。最重要的是,當呼叫handler()
時,我們將使用ClientEventHandler
來代替:
channel.pipeline().addFirst(
new StringDecoder(),
new ClientEventHandler(),
new StringEncoder());
5.1.處理訊息輸入
最後,為了處理用戶輸入,連接到伺服器後,我們將使用Scanner
循環,直到收到用戶名,然後直到訊息等於「退出」。最重要的是,我們必須使用writeAndFlush()
來傳送訊息。我們以Message.parse()
期望的格式傳送訊息:
private static void messageLoop(Scanner scanner, Channel channel) {
while (user.isEmpty()) {
System.out.print("your name: ");
user = scanner.nextLine();
}
while (scanner.hasNext()) {
System.out.print("> ");
String message = scanner.nextLine();
if (message.equals("exit"))
break;
channel.writeAndFlush(user + ";" + message);
}
}
6. 建立自訂事件監聽器
在 Netty 中,事件監聽器在通道整個生命週期中處理非同步事件方面發揮著至關重要的作用。事件監聽器本質上是一種回呼機制,我們可以使用它對傳回ChannelFuture
的任何操作的完成做出反應。
我們在完成時實作ChannelFutureListener
介面以實作自訂行為。 ChannelFuture
表示非同步操作的結果,例如連線嘗試或 I/O 操作。
ChannelFutureListener
很有用,因為它定義了預設實現,例如CLOSE_ON_FAILURE
或FIRE_EXCEPTION_ON_FAILURE
。但是,由於我們不會使用這些,因此讓我們實作一個用於操作確認的GenericFutureListener
。
我們將保留上下文的自訂事件名稱,並且我們將檢查未來是否成功完成。否則,我們將在記錄之前將狀態標記為“FAILED”
:
public class ChannelInfoListener implements GenericFutureListener<ChannelFuture> {
private final String event;
public ChannelInfoListener(String event) {
this.event = event;
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Channel channel = future.channel();
String status = "OK";
if (!future.isSuccess()) {
status = "FAILED";
future.cause().printStackTrace();
}
System.out.printf(
"%s - channel#%s %s: %s%n", Instant.now(), channel.id().asShortText(), status, event);
}
}
6.1.活動收據
讓我們回到程式碼的某些部分來包含偵聽器。首先,對於客戶端,我們新增一個「連接到伺服器」確認:
future.addListener(new ChannelInfoListener("connected to server"));
然後,讓我們在訊息循環中包含「訊息已發送」確認:
ChannelFuture sent = channel.writeAndFlush(user + ";" + message);
sent.addListener(new ChannelInfoListener("message sent"));
這使我們能夠確保在發送訊息時仍然連接到伺服器。最後,對於伺服器處理程序,讓我們在廣播期間發送「訊息中繼」確認:
clients.forEach((id, channel) -> {
if (!id.equals(channelId)) {
ChannelFuture relay = channel.writeAndFlush(message.toString());
relay.addListener(new ChannelInfoListener("message relayed to " + id));
}
});
7. 看到它的實際效果
Netty 允許我們使用EmbeddedChannel
測試管道,但對於客戶端/伺服器交互,讓我們看看從終端運行時它是什麼樣子。讓我們啟動伺服器(為了方便閱讀,我們將省略套件名稱):
$ mvn exec:java -Dexec.mainClass=ChatServerMain
chat server started. ready to accept clients.
然後,讓我們啟動第一個客戶端,輸入名稱,然後發送兩個訊息:
$ mvn exec:java -Dexec.mainClass=ChatClientMain
2024-01-12 3:47:02 - channel#03c40ad4 OK: connected to server
your name: Bob
> Hello
2024-01-12 3:47:02 - channel#03c40ad4 OK: message sent
> Anyone there?!
2024-01-12 3:47:03 - channel#03c40ad4 OK: message sent
當我們與第二個客戶端連線時,我們將在輸入名稱之前取得訊息歷史記錄:
$ mvn exec:java -Dexec.mainClass=ChatClientMain
2024-01-12 3:49:33 - channel#daa64476 OK: connected to server
2024-01-12 3:46:55 - system: client online: 03c40ad4
2024-01-12 3:47:03 - Bob: Hello
2024-01-12 3:48:40 - Bob: Anyone there?!
當然,在選擇名稱並發送訊息後:
your name: Alice
> Hi, Bob!
2024-01-12 3:51:05 - channel#daa64476 OK: message sent
第一個客戶端將收到它:
2024-01-12 3:49:33 - system: client online: daa64476
2024-01-12 3:51:05 - Alice: Hi, Bob!
八、結論
在本文中,我們成功使用 Netty 建立了一個功能齊全的聊天伺服器,展示了該框架在處理非同步通訊方面的強大功能和簡單性。透過實現事件處理程序,我們設法在連接的客戶端之間中繼訊息並維護上下文歷史記錄。
與往常一樣,原始碼可以在 GitHub 上取得。