Netty實踐-處理基於流的傳輸

TCP/IP的基於流的傳輸中,接收的數據被存儲到套接字接收緩衝器中。不幸的是,基於流的傳輸的緩衝器不是分組的隊列,而是字節的隊列。 這意味着,即使將兩個消息作爲兩個獨立的數據包發送,操作系統也不會將它們視爲兩個消息,而只是一組字節(有點悲劇)。 因此,不能保證讀的是您在遠程定入的行數據。 例如,假設操作系統的TCP/IP堆棧已收到三個數據包:

Netty實踐-處理基於流的傳輸

由於基於流的協議的這種通用屬性,在應用程序中以下面的碎片形式(只是其中的一種)讀取它們的機會很高:

Netty實踐-處理基於流的傳輸

因此,接收部分,無論是服務器側還是客戶端側,都應該將接收到的數據碎片整理成邏輯可由應用容易地理解的一個或多個有意義的幀。 在上述示例的情況下,接收的數據應該如下成幀:

Netty實踐-處理基於流的傳輸

針對上面的問題,下面列出了兩個解決方案。

第一個解決方案

現在我們回到TIME客戶端示例。在這裏有同樣的問題。 32位整數可以算是非常少量的數據量了,並且不可能經常被分段。 然而,問題是它可以分割,並且碎片的可能性將隨着流量增加而增加。

簡單的解決方案是創建一個內部累積緩衝區,並等待所有4個字節被接收到內部緩衝區。 以下是修正的TimeClientHandler實現,它修復了問題:

package com.yiibai.netty.time;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    private ByteBuf buf;

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        buf = ctx.alloc().buffer(4); // (1)
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        buf.release(); // (1)
        buf = null;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg;
        buf.writeBytes(m); // (2)
        m.release();

        if (buf.readableBytes() >= 4) { // (3)
            long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. ChannelHandler有兩個生命週期偵聽器方法:handlerAdded()handlerRemoved()。 只要不會阻塞很長時間,您就可以執行任意初始化任務。

  2. 首先,所有接收到的數據應累加到buf中。

  3. 然後,處理程序必須檢查buf是否有足夠的數據(在此示例中爲4個字節),當足夠時就繼續進行實際的業務邏輯。否則,在有更多數據到達時Netty將再次調用channelRead()方法,最終累積到達4個字節再執行實際的業務。

第二個解決方案

雖然第一個解決方案已經解決了TIME客戶端的問題,但修改的處理程序看起來不那麼幹淨。想象如果一個更復雜的協議,它由多個字段組成,例如:可變長度字段等。上面的ChannelInboundHandler實現很快就無法維護了。

可能已經注意到,可以向ChannelPipeline添加多個ChannelHandler,因此,可將一個單片的ChannelHandler拆分爲多個模塊,以降低應用程序的複雜性。 例如,可將TimeClientHandler拆分爲兩個處理程序:

  • TimeDecoder處理碎片問題
  • TimeClientHandler的初始簡單版本

幸運的是,Netty提供了一個可擴展類,可以幫助我們方便地編寫:

package com.yiibai.netty.time;

public class TimeDecoder extends ByteToMessageDecoder { // (1)
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
        if (in.readableBytes() < 4) {
            return; // (3)
        }

        out.add(in.readBytes(4)); // (4)
    }
}
  1. ByteToMessageDecoderChannelInboundHandler的一個實現,它使得處理碎片問題變得容易。

  2. ByteToMessageDecoder在接收到新數據時,使用內部維護的累積緩衝區調用decode()方法。

  3. decode()可以決定在累積緩衝區中沒有足夠數據的情況下不添加任何東西。 當接收到更多數據時,ByteToMessageDecoder將再次調用decode()

  4. 如果decode()將對象添加到out,則意味着解碼器成功地解碼了消息。 ByteToMessageDecoder將丟棄累積緩衝區的讀取部分。要記住,不需要解碼多個消息。 ByteToMessageDecoder將繼續調用decode()方法,直到它沒有再有任何東西添加。

現在我們有另一個處理程序插入ChannelPipeline,應該在TimeClient中修改ChannelInitializer實現:

b.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
    }
});

如果您喜歡折騰,也可以想嘗試使用ReplayDecoder,這簡化了解碼器更多的工作。但需要參考API參考以獲得更多信息。

public class TimeDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(
            ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        out.add(in.readBytes(4));
    }
}

此外,Netty提供了現成的解碼器,使我們能夠非常容易地實現大多數的協議,並幫助您避免使用一個單一的不可維護的處理程序實現。有關更多詳細示例,請參閱以下示例:

二進制協議實現: Netty實踐-factorial服務器
基於文本行的協議實現: Netty實踐-telnet服務器