Netty實踐-時間服務器

本教程中實現的協議是TIME協議。 它與先前的示例不同,時間服務器只發送包含32位整數的消息,而不接收任何請求,並在消息發送後關閉連接。 在本示例中,您將學習如何構造和發送消息,以及在完成時關閉連接。

因爲時間服務器將忽略任何接收到的數據,但是一旦建立連接就發送消息,所以我們不能使用channelRead()方法。而是覆蓋channelActive()方法。 以下是代碼的實現:

package com.yiibai.netty.time;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(final ChannelHandlerContext ctx) { // (1)
        final ByteBuf time = ctx.alloc().buffer(4); // (2)
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));

        final ChannelFuture f = ctx.writeAndFlush(time); // (3)
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        }); // (4)
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

下面我們來看看上面代碼的一些解釋分析:

  1. 如上所述,當建立連接並準備好生成流量時,將調用channelActive()方法。現在在這個方法中編寫一個32位的整數來表示當前的時間。

  2. 要發送新消息,需要分配一個包含消息的新緩衝區。我們要寫入一個32位整數,因此需要一個ByteBuf,其容量至少爲4個字節。 通過ChannelHandlerContext.alloc()獲取當前的ByteBufAllocator並分配一個新的緩衝區。

  3. 像之前一樣,編寫構造的消息。
    但是,在NIO中發送消息之前,我們是否曾調用java.nio.ByteBuffer.flip()? ByteBuf沒有這樣的方法,它只有兩個指針; 一個用於讀取操作,另一個用於寫入操作。 當您向ByteBuf寫入內容時,寫入索引會增加,而讀取器索引不會更改。讀取器索引和寫入器索引分別表示消息的開始和結束位置。
    相比之下,NIO緩衝區不提供一個乾淨的方式來確定消息內容開始和結束,而不用調用flip方法。當您忘記翻轉緩衝區時,就將會遇到麻煩,因爲不會發送任何或發送不正確的數據。但是這樣的錯誤不會發生在Netty中,因爲不同的操作類型我們有不同的指針。
    另一點要注意的是ChannelHandlerContext.write()(和writeAndFlush())方法返回一個ChannelFutureChannelFuture表示尚未發生的I/O操作。這意味着,任何請求的操作可能尚未執行,因爲所有操作在Netty中是異步的。 例如,以下代碼可能會在發送消息之前關閉連接:

    Channel ch = ...;
    ch.writeAndFlush(message);
    ch.close();

    因此,需要在ChannelFuture完成後調用close()方法,該方法由write()方法返回,並在寫入操作完成時通知其監聽器。 請注意,close()也可能不會立即關閉連接,並返回一個ChannelFuture。

當寫請求完成時,我們如何得到通知? 這就像向返回的ChannelFuture添加ChannelFutureListener一樣簡單。 在這裏,我們創建了一個新的匿名ChannelFutureListener,當操作完成時關閉Channel

或者,可以使用預定義的偵聽器來簡化代碼:

f.addListener(ChannelFutureListener.CLOSE);

要測試我們的時間服務器是否按預期工作,可以使用UNIX rdate命令:

$ rdate -o <port> -p <host>

其中<port>是在main()方法中指定的端口號,<host>通常是localhost或服務器的IP地址。

編寫時間客戶端

DISCARDECHO服務器不同,我們需要一個用於TIME協議的客戶端,因爲我們無法將32位二進制數據轉換爲日曆上的日期。 在本節中,我們討論如何確保服務器正常工作並學習如何使用Netty編寫客戶端。

Netty中服務器和客戶端之間最大的和唯一的區別是使用了不同的BootstrapChannel實現。 請看看下面的代碼:

package com.yiibai.netty.time;

public class TimeClient {
    public static void main(String[] args) throws Exception {
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NioSocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });

            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (5)

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}
  1. BootstrapServerBootstrap類似,只是它用於非服務器通道,例如客戶端或無連接通道。

  2. 如果只指定一個EventLoopGroup,它將同時用作boss組和worker組。boss組和worker組不是用於客戶端。

  3. 不使用NioServerSocketChannel,而是使用NioSocketChannel來創建客戶端通道。

  4. 注意,這裏不像我們使用的ServerBootstrap,所以不使用childOption(),因爲客戶端SocketChannel沒有父類。

  5. 應該調用connect()方法,而不是bind()方法。

如上面所見,它與服務器端代碼沒有什麼不同。 ChannelHandler實現又是怎麼樣的呢? 它應該從服務器接收一個32位整數,將其轉換爲人類可讀的格式,打印轉換爲我們熟知的時間格式 ,並關閉連接:

package com.yiibai.netty.time;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
                ByteBuf m = (ByteBuf) msg; // (1)
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            Date currentTime = new Date(currentTimeMillis);
            System.out.println("Default Date Format:" + currentTime.toString());

            SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            String dateString = formatter.format(currentTime);
            // 轉換一下成中國人的時間格式
            System.out.println("Date Format:" + dateString);

            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

(1).TCP/IP中,Netty讀取從對端發送的ByteBuf數據。

客戶端看起來很簡單,與服務器端示例沒什麼區別。 但是,這個處理程序有時會拒絕拋出IndexOutOfBoundsException。 我們將在下一節討論爲什麼會發生這種情況。

先運行 TimeServer.java 程序,然後再運行 TimeClient.java , 當運行 TimeClient.java時就可以到有一個時間日期輸出,然後程序自動退出。輸出結果如下 -

Default Date Format:Thu Mar 02 20:50:23 CST 2017
Date Format:2017-03-02 20:50:23