Netty原理與基礎(chǔ)(五)

1.Decoder原理

1.1什么叫作Netty的解碼器呢?

首先,它是一個(gè)InBound入站處理器,解碼器負(fù)責(zé)處理“入站數(shù)據(jù)”。其次,它能將上一站Inbound入站處理器傳過來的輸入(Input)數(shù)據(jù),進(jìn)行數(shù)據(jù)的解碼或者格式轉(zhuǎn)換,然后輸出(Output)到下一站Inbound入站處理器。一個(gè)標(biāo)準(zhǔn)的解碼器將輸入類型為ByteBuf緩沖區(qū)的數(shù)據(jù)進(jìn)行解碼,輸出一個(gè)一個(gè)的Java POJO對(duì)象。Netty內(nèi)置了這個(gè)解碼器,叫作ByteToMessageDecoder,位在Netty的io.netty.handler.codec包中。


image.png
  • ByteToMessageDecoder僅僅提供了一個(gè)流程性質(zhì)的框架:它僅僅將子類的decode方法解碼之后的Object結(jié)果,放入自己內(nèi)部的結(jié)果列表List<Object>中,最終,父類會(huì)負(fù)責(zé)將List<Object>中的元素,一個(gè)一個(gè)地傳遞給下一個(gè)站

1.2代碼示例

//解碼
public class Byte2IntegerDecoder extends ByteToMessageDecoder {
    @Override
    public void decode(ChannelHandlerContext ctx, ByteBuf in,
                       List<Object> out) {
        while (in.readableBytes() >= 4) {
            int i = in.readInt();
            Logger.info("解碼出一個(gè)整數(shù): " + i);
            out.add(i);
        }
    }
}
//處理程序
public class IntegerProcessHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Integer integer = (Integer) msg;
        Logger.info("打印出一個(gè)整數(shù): " + integer);
    }
}
//測試類
public class Byte2IntegerDecoderTester {
    /**
     * 整數(shù)解碼器的使用實(shí)例
     */
    @Test
    public void testByteToIntegerDecoder() {
        ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
            protected void initChannel(EmbeddedChannel ch) {
                ch.pipeline().addLast(new Byte2IntegerDecoder());
                ch.pipeline().addLast(new IntegerProcessHandler());
            }
        };
        EmbeddedChannel channel = new EmbeddedChannel(i);

        for (int j = 0; j < 100; j++) {
            ByteBuf buf = Unpooled.buffer();
            buf.writeInt(j);
            channel.writeInbound(buf);
        }

        try {
            Thread.sleep(Integer.MAX_VALUE);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

  • ByteBuf緩沖區(qū)由誰負(fù)責(zé)進(jìn)行引用計(jì)數(shù)和釋放管理

基類ByteToMessageDecoder負(fù)責(zé)解碼器的ByteBuf緩沖區(qū)的釋放工作,它會(huì)調(diào)用ReferenceCountUtil.release(in)方法,將之前的ByteBuf緩沖區(qū)的引用數(shù)減1。

1.3 ReplayingDecoder解碼器

image.png

ReplayingDecoder類是ByteToMessageDecoder的子類。其作用是:
· 在讀取ByteBuf緩沖區(qū)的數(shù)據(jù)之前,需要檢查緩沖區(qū)是否有足夠的字節(jié)。
· 若ByteBuf中有足夠的字節(jié),則會(huì)正常讀取;反之,如果沒有足夠的字節(jié),則會(huì)停止解碼。

  • ReplayingDecoder進(jìn)行長度判斷的原理,其實(shí)很簡單:它的內(nèi)部定義了一個(gè)新的二進(jìn)制緩沖區(qū)類,對(duì)ByteBuf緩沖區(qū)進(jìn)行了裝飾,這個(gè)類名為ReplayingDecoderBuffer。該裝飾器的特點(diǎn)是:在緩沖區(qū)真正讀數(shù)據(jù)之前,首先進(jìn)行長度的判斷:如果長度合格,則讀取數(shù)據(jù);否則,拋出ReplayError。ReplayingDecoder捕獲到ReplayError后,會(huì)留著數(shù)據(jù),等待下一次IO事件到來時(shí)再讀取。
  • ReplayingDecoder的作用,遠(yuǎn)遠(yuǎn)不止于進(jìn)行長度判斷,它更重要的作用是用于分包傳輸?shù)膽?yīng)用場景

1.4整數(shù)分包解碼器

  • 底層通信協(xié)議是分包傳輸?shù)?,一份?shù)據(jù)可能分幾次達(dá)到對(duì)端。發(fā)送端出去的包在傳輸過程中會(huì)進(jìn)行多次的拆分和組裝。接收端所收到的包和發(fā)送端所發(fā)送的包不是一模一樣的


    image.png

    在Java OIO流式傳輸中,不會(huì)出現(xiàn)這樣的問題,因?yàn)樗牟呗允牵翰蛔x到完整的信息,就一直阻塞程序,不向后執(zhí)行。但是,在Java的NIO中,由于NIO的非阻塞性,就會(huì)出現(xiàn)上述情況

可以使用ReplayingDecoder來解決
要完成以上的例子,需要用到ReplayingDecoder一個(gè)很重要的屬性——state成員屬性。該成員屬性的作用就是保存當(dāng)前解碼器在解碼過程中的當(dāng)前階段

  • ReplayingDecoder源碼
    protected ReplayingDecoder() {
        this((Object)null);
    }

    protected ReplayingDecoder(S initialState) {
        this.replayable = new ReplayingDecoderByteBuf();
        this.checkpoint = -1;
        this.state = initialState;
    }

    protected void checkpoint() {
        this.checkpoint = this.internalBuffer().readerIndex();
    }

    protected void checkpoint(S state) {
        this.checkpoint();
        this.state(state);
    }
  • checkpoint(Status)方法有兩個(gè)作用
    (1)設(shè)置state屬性的值,更新一下當(dāng)前的狀態(tài)。
    (2)還有一個(gè)非常大的作用,就是設(shè)置“讀斷點(diǎn)指針”。
    (3)“讀斷點(diǎn)指針”是ReplayingDecoder類的另一個(gè)重要的成員,它保存著裝飾器內(nèi)部ReplayingDecoderBuffer成員的起始讀指針,有點(diǎn)兒類似于mark標(biāo)記。當(dāng)讀數(shù)據(jù)時(shí),一旦可讀數(shù)據(jù)不夠,ReplayingDecoderBuffer在拋出ReplayError異常之前,ReplayingDecoder會(huì)把讀指針的值還原到之前的checkpoint(IntegerAddDecoder.Status)方法設(shè)置的“讀斷點(diǎn)指針”(checkpoint)。于是乎,在ReplayingDecoder下一次讀取時(shí),還會(huì)從之前設(shè)置的斷點(diǎn)位置開始。

1.5分包解碼器

在原理上,字符串分包解碼和整數(shù)分包解碼是一樣的。有所不同的是:整數(shù)的長度是固定的,目前在Java中是4個(gè)字節(jié);而字符串的長度不是固定的,是可變長度的,這就是一個(gè)小小的難題

  • 如何獲取字符串的長度信息呢?
    (1)在協(xié)議的Head部分放置字符串的字節(jié)長度。Head部分可以用一個(gè)整型int來描述即可。
    (2)在協(xié)議的Content部分,放置的則是字符串的字節(jié)數(shù)組。
public class StringReplayDecoder
        extends ReplayingDecoder<StringReplayDecoder.Status> {

    enum Status {
        PARSE_1, PARSE_2
    }

    private int length;
    private byte[] inBytes;

    public StringReplayDecoder() {
        //構(gòu)造函數(shù)中,需要初始化父類的state 屬性,表示當(dāng)前階段
        super(Status.PARSE_1);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in,
                          List<Object> out) throws Exception {


        switch (state()) {
            case PARSE_1:
                //第一步,從裝飾器ByteBuf 中讀取長度
                length = in.readInt();
                inBytes = new byte[length];
                // 進(jìn)入第二步,讀取內(nèi)容
                // 并且設(shè)置“讀指針斷點(diǎn)”為當(dāng)前的讀取位置
                checkpoint(Status.PARSE_2);
                break;
            case PARSE_2:
                //第二步,從裝飾器ByteBuf 中讀取內(nèi)容數(shù)組
                in.readBytes(inBytes, 0, length);
                out.add(new String(inBytes, "UTF-8"));
                // 第二步解析成功,
                // 進(jìn)入第一步,讀取下一個(gè)字符串的長度
                // 并且設(shè)置“讀指針斷點(diǎn)”為當(dāng)前的讀取位置
                checkpoint(Status.PARSE_1);
                break;
            default:
                break;
        }

    }

public class StringProcessHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String s = (String) msg;
        System.out.println("打印: " + s);
    }
}

public class StringReplayDecoderTester {
    static String content = "smallmartial:Netty知識(shí)學(xué)習(xí)";

    /**
     * 字符串解碼器的使用實(shí)例
     */
    @Test
    public void testStringReplayDecoder() {
        ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
            protected void initChannel(EmbeddedChannel ch) {
                ch.pipeline().addLast(new StringReplayDecoder());
                ch.pipeline().addLast(new StringProcessHandler());
            }
        };
        EmbeddedChannel channel = new EmbeddedChannel(i);
        byte[] bytes = content.getBytes(Charset.forName("utf-8"));
        for (int j = 0; j < 100; j++) {
            //1-3之間的隨機(jī)數(shù)
            int random = RandomUtil.randInMod(3);
            ByteBuf buf = Unpooled.buffer();
            buf.writeInt(bytes.length * random);
            for (int k = 0; k < random; k++) {
                buf.writeBytes(bytes);
            }
            channel.writeInbound(buf);
        }
        try {
            Thread.sleep(Integer.MAX_VALUE);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
image.png
  • ReplayingDecoder解碼器不足
    (1)不是所有的ByteBuf操作都被ReplayingDecoderBuffer裝飾類所支持,可能有些ByteBuf操作在ReplayingDecoder子類的decode實(shí)現(xiàn)方法中被使用時(shí)就會(huì)拋出ReplayError異常。
    (2)在數(shù)據(jù)解析邏輯復(fù)雜的應(yīng)用場景,ReplayingDecoder在解析速度上相對(duì)較差。

1.6MessageToMessageDecoder解碼器

MessageToMessageDecoder<I>。在繼承它的時(shí)候,需要明確的泛型實(shí)參<I>。這個(gè)實(shí)參的作用就是指定入站消息JavaPOJO類型。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容