Netty 中的粘包和拆包

Netty 底层是基于 TCP 协议来处置网络数据传输。我们知道 TCP 协议是面向字节约的协议,数据像流水一样在网络中传输那何来 “包” 的观点呢?

TCP是四层协议不卖力数据逻辑的处置,然则数据在TCP层 “流” 的时刻为了保证平安和节约效率会把 “流” 做一些分包处置,好比:

  1. 发送方约定了每次数据传输的最大包巨细,跨越该值的内容将会被拆分成两个包发送;
  2. 发送端 和 吸收端 约定每次发送数据包长度并随着网络状态动态调整吸收窗口巨细,这里也会泛起拆包的情形;

Netty 自己是基于 TCP 协议做的处置,若是它不去对 “流” 举行处置,到底这个 “流” 从哪到哪才是完整的数据就是个迷。我们先来看在 TCP 协议中有哪些步骤可能会让 “流” 不完整或者是泛起粘滞的可能。

1. TCP 中可能泛起粘包/拆包的缘故原由

数据流在TCP协议下流传,由于协议自己对于流有一些规则的限制,这些规则会导致当前对端吸收到的数据包不完整,归结缘故原由有下面三种情形:

  • Socket 缓冲区与滑动窗口
  • MSS/MTU限制
  • Nagle算法
1. Socket缓冲区与滑动窗口

对于 TCP 协议而言,它传输数据是基于字节约传输的。应用层在传输数据时,现实上会先将数据写入到 TCP 套接字的缓冲区,当缓冲区被写满后,数据才会被写出去。每个TCP Socket 在内核中都有一个发送缓冲区(SO_SNDBUF )和一个吸收缓冲区(SO_RCVBUF),TCP 的全双工的事情模式以及 TCP 的滑动窗口即是依赖于这两个自力的 buffer 以及此 buffer 的填充状态。

SO_SNDBUF:

历程发送的数据的时刻假设挪用了一个 send 方式,将数据拷贝进入 Socket 的内核发送缓冲区之中,然后 send 便会在上层返回。换句话说,send 返回之时,数据纷歧定会发送到对端去(和write写文件有点类似),send 仅仅是把应用层 buffer 的数据拷贝进 Socket 的内核发送 buffer 中。

SO_RCVBUF:

把吸收到的数据缓存入内核,应用历程一直没有挪用 read 举行读取的话,此数据会一直缓存在响应 Socket 的吸收缓冲区内。不管历程是否读取 Socket,对端发来的数据都市经由内核吸收而且缓存到 Socket 的内核吸收缓冲区之中。read 所做的事情,就是把内核缓冲区中的数据拷贝到应用层用户的 buffer 内里,仅此而已。

吸收缓冲区保留收到的数据一直到应用历程读走为止。对于 TCP,若是应用历程一直没有读取,buffer 满了之后发生的动作是:通知对端 TCP 协议中的窗口关闭。这个即是滑动窗口的实现。保证 TCP 套接口吸收缓冲区不会溢出,从而保证了 TCP 是可靠传输。由于对方不允许发出跨越所通告窗口巨细的数据。 这就是 TCP 的流量控制,若是对方无视窗口巨细而发出了跨越窗口巨细的数据,则吸收方 TCP 将抛弃它。

滑动窗口:

TCP毗邻在三次握手的时刻,会将自己的窗口巨细(window size)发送给对方,实在就是 SO_RCVBUF 指定的值。之后在发送数据的时,发送方必须要先确认吸收方的窗口没有被填充满,若是没有填满,则可以发送。

每次发送数据后,发送方将自己维护的对方的 window size 减小,示意对方的 SO_RCVBUF 可用空间变小。

当吸收方处置最先处置 SO_RCVBUF 中的数据时,会将数据从 Socket 在内核中的接受缓冲区读出,此时吸收方的 SO_RCVBUF 可用空间变大,即 window size 变大,接受方会以 ack 新闻的方式将自己最新的 window size 返回给发送方,此时发送方将自己的维护的接受的方的 window size 设置为ack新闻返回的 window size。

此外,发送方可以延续的给接受方发送新闻,只要保证对方的 SO_RCVBUF 空间可以缓存数据即可,即 window size>0。当吸收方的 SO_RCVBUF 被填充满时,此时 window size=0,发送方不能再继续发送数据,要守候吸收方 ack 新闻,以获得最新可用的 window size。

2. MSS/MTU分片

MTU (Maxitum Transmission Unit,最大传输单元)是链路层对一次可以发送的最大数据的限制。MSS(Maxitum Segment Size,最大分段巨细)是 TCP 报文中 data 部门的最大长度,是传输层对一次可以发送的最大数据的限制。

Netty 中的粘包和拆包

数据在传输历程中,每经由一层,都市加上一些分外的信息:

  • 应用层:只体贴发送的数据 data,将数据写入 Socket 在内核中的缓冲区 SO_SNDBUF 即返回,操作系统会将 SO_SNDBUF 中的数据取出来举行发送;
  • 传输层:会在 data 前面加上 TCP Header(20字节);
  • 网络层:会在 TCP 报文的基础上再添加一个 IP Header,也就是将自己的网络地址加入到报文中。IPv4 中 IP Header 长度是 20 字节,IPV6 中 IP Header 长度是 40 字节;
  • 链路层:加上 Datalink Header 和 CRC。会将 SMAC(Source Machine,数据发送方的MAC地址),DMAC(Destination Machine,数据接受方的MAC地址 )和 Type 域加入。SMAC+DMAC+Type+CRC 总长度为 18 字节;
  • 物理层:举行传输。

在回首这个基本内容之后,再来看 MTU 和 MSS。MTU 是以太网传输数据方面的限制,每个以太网帧最大不能跨越 1518bytes。刨去以太网帧的帧头(DMAC+SMAC+Type域) 14Bytes 和帧尾 (CRC校验 ) 4 Bytes,那么剩下承载上层协议的地方也就是 data 域最大就只能有 1500 Bytes 这个值 我们就把它称之为 MTU。

MSS 是在 MTU 的基础上减去网络层的 IP Header 和传输层的 TCP Header 的部门,这就是 TCP 协议一次可以发送的现实应用数据的最大巨细。

MSS = MTU(1500) -IP Header(20 or 40)-TCP Header(20) 

由于 IPV4 和 IPV6 的长度差别,在 IPV4 中,以太网 MSS 可以到达 1460byte。在 IPV6 中,以太网 MSS 可以到达 1440byte。

发送方发送数据时,当 SO_SNDBUF 中的数据量大于 MSS 时,操作系统会将数据举行拆分,使得每一部门都小于 MSS,也形成了拆包。然后每一部门都加上 TCP Header,组成多个完整的 TCP 报文举行发送,固然经由网络层和数据链路层的时刻,还会划分加上响应的内容。

另外需要注重的是:对于内陆回环地址(lookback)不需要走以太网,以是不受到以太网 MTU=1500 的限制。linux 服务器上输入 ifconfig 下令,可以查看差别网卡的 MTU 巨细,如下:

Netty 中的粘包和拆包

上图显示了 2 个网卡信息:

  • eth0 需要走以太网,以是 MTU 是 1500;
  • lo 是内陆回环,不需要走以太网,以是不受 1500 的限制。

Nagle 算法

TCP/IP 协议中,无论发送若干数据,总是要在数据(data)前面加上协议头(TCP Header+IP Header),同时,对方吸收到数据,也需要发送 ACK 示意确认。

纵然从键盘输入的一个字符,占用一个字节,可能在传输上造成 41 字节的包,其中包罗 1 字节的有用信息和 40 字节的首部数据。这种情形转变成了 4000% 的消耗,这样的情形对于重负载的网络来是无法接受的。称之为”糊涂窗口综合征”。

为了尽可能的行使网络带宽,TCP 总是希望尽可能的发送足够大的数据。(一个毗邻会设置 MSS 参数,因此,TCP/IP 希望每次都能够以 MSS 尺寸的数据块来发送数据)。Nagle 算法就是为了尽可能发送大块数据,制止网络中充斥着许多小数据块。

Nagle 算法的基本界说是随便时刻,最多只能有一个未被确认的小段。 所谓 “小段”,指的是小于 MSS 尺寸的数据块;所谓“未被确认”,是指一个数据块发送出去后,没有收到对方发送的 ACK 确认该数据已收到。

Nagle 算法的规则:

  1. 若是 SO_SNDBUF 中的数据长度到达 MSS,则允许发送;
  2. 若是该 SO_SNDBUF 中含有 FIN,示意请求关闭毗邻,则先将 SO_SNDBUF 中的剩余数据发送,再关闭;
  3. 设置了 TCP_NODELAY=true 选项,则允许发送。TCP_NODELAY 是作废 TCP 的确认延迟机制,相当于禁用了 Negale 算法。正常情形下,当 Server 端收到数据之后,它并不会马上向 client 端发送 ACK,而是会将 ACK 的发送延迟一段时间(一样平常是 40ms),它希望在 t 时间内 server 端会向 client 端发送应答数据,这样 ACK 就能够和应答数据一起发送,就像是应答数据捎带着 ACK 已往。固然,TCP 确认延迟 40ms 并不是一直稳定的, TCP 毗邻的延迟确认时间一样平常初始化为最小值 40ms,随后凭据毗邻的重传超时时间(RTO)、上次收到数据包与本次吸收数据包的时间距离等参数举行不停调整。另外可以通过设置 TCP_QUICKACK 选项来作废确认延迟;
  4. 未设置 TCP_CORK 选项时,若所有发出去的小数据包(包长度小于MSS)均被确认,则允许发送;
  5. 上述条件都未知足,但发生了超时(一样平常为200ms),则立刻发送。

基于以上问题,TCP层一定是会泛起当次吸收到的数据是不完整数据的情形。泛起粘包可能的缘故原由有:

  1. 发送方每次写入数据 < 套接字缓冲区巨细;
  2. 吸收方读取套接字缓冲区数据不够实时。

泛起半包的可能缘故原由有:

  1. 发送方每次写入数据 > 套接字缓冲区巨细;
  2. 发送的数据大于协议 MTU,以是必须要拆包。

解决问题一定不是在4层来做而是在应用层,通过界说通信协议来解决粘包和拆包的问题。发送方 和 吸收方约定某个规则:

  1. 当发生粘包的时刻通过某种约定来拆包;
  2. 若是在拆包,通过某种约定来将数据组成一个完整的包处置。

2. 业界常用解决方案

1. 定长协议

指定一个报文具有牢固长度。好比约定一个报文的长度是 5 字节,那么:

报文:1234,只有4字节,然则还差一个怎么办呢,不足部门用空格补齐。就变为:1234 。

若是不补齐空格,那么就会读到下一个报文的字节来填充上一个报文直到补齐为止,这样粘包了。

定长协议的优点是使用简朴,瑕玷很明显:虚耗带宽。

Netty 中提供了 FixedLengthFrameDecoder ,支持把牢固的长度的字节数当做一个完整的新闻举行解码。

2. 特殊字符支解协议

很好明白,在每一个你认为是一个完整的包的尾部添加指定的特殊字符,好比:\n,\r等等。

需要注重的是:约定的特殊字符要保证唯一性,不能泛起在报文的正文中,否则就将正文一分为二了。

Netty 中提供了 DelimiterBasedFrameDecoder 凭据特殊字符举行解码,LineBasedFrameDecoder默认以换行符作为分隔符。

3. 变长协议

变长协议的焦点就是:将新闻分为新闻头和新闻体,新闻头中标识当前完整的新闻体长度。

  1. 发送方在发送数据之前先获取数据的二进制字节巨细,然后在新闻体前面添加新闻巨细;
  2. 吸收方在剖析新闻时先获取新闻巨细,之后必须读到该巨细的字节数才认为是完整的新闻。

Netty 中提供了 LengthFieldBasedFrameDecoder ,通过 LengthFieldPrepender 来给现实的新闻体添加 length 字段。

3. Netty 粘包演示

代码示例请看:github点我

1. 实验主要逻辑

演示客户端发送多条新闻,使用 Netty 自界说的 ByteBuf 作为传输数据花样,看看服务端吸收数据是否是按每次发送的条数来吸收照样根据当前缓冲区巨细来吸收。

主要代码:

Server:

package com.rickiyang.learn.packageEvent1;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;

/**
 * @author: rickiyang
 * @date: 2020/3/15
 * @description: server 端
 */
@Slf4j
public class PeServer {

    private int port;

    public PeServer(int port) {
        this.port = port;
    }

    public void start(){
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();

        ServerBootstrap server = new ServerBootstrap().group(bossGroup,workGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ServerChannelInitializer());

        try {
            ChannelFuture future = server.bind(port).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("server start fail",e);
        }finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        PeServer server = new PeServer(7788);
        server.start();
    }
}

ServerInitialzr:

package com.rickiyang.learn.packageEvent1;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;

/**
 * @author: rickiyang
 * @date: 2020/3/15
 * @description:
 */
public class ServerChannelInitializer  extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        // 自己的逻辑Handler
        pipeline.addLast("handler", new PeServerHandler());
        }
}

ServerHandler:

package com.rickiyang.learn.packageEvent1;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.StandardCharsets;

/**
 * @author: rickiyang
 * @date: 2020/3/15
 * @description:
 */
@Slf4j
public class PeServerHandler extends SimpleChannelInboundHandler {

    private int counter;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("server channelActive");
    }


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, StandardCharsets.UTF_8);
        System.out.println("-----start------\n"+ body + "\n------end------");

        String content = "receive" + ++counter;
        ByteBuf resp = Unpooled.copiedBuffer(content.getBytes());
        ctx.writeAndFlush(resp);
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        ctx.close();
    }

}

服务端的 handler 主要逻辑是吸收客户端发送过来的数据,看看是否是一条一条吸收。然后每次吸收到数据之后给客户端回复一个确认新闻。

Client:

package com.rickiyang.learn.packageEvent1;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;

/**
 * @author: rickiyang
 * @date: 2020/3/15
 * @description:
 */
@Slf4j
public class PeClient {

    private  int port;
    private  String address;

    public PeClient(int port, String address) {
        this.port = port;
        this.address = address;
    }

    public void start(){
        EventLoopGroup group = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ClientChannelInitializer());
        try {
            ChannelFuture future = bootstrap.connect(address,port).sync();
            future.channel().writeAndFlush("Hello world, i'm online");
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            log.error("client start fail",e);
        }finally {
            group.shutdownGracefully();
        }

    }

    public static void main(String[] args) {
        PeClient client = new PeClient(7788,"127.0.0.1");
        client.start();
    }
}

ClientInitializer:

package com.rickiyang.learn.packageEvent1;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;


public class ClientChannelInitializer extends  ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();

        // 客户端的逻辑
        pipeline.addLast("handler", new PeClientHandler());
    }
}

ClientHandler:

package com.rickiyang.learn.packageEvent1;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.StandardCharsets;

/**
 * @author: rickiyang
 * @date: 2020/3/15
 * @description:
 */
@Slf4j
public class PeClientHandler extends SimpleChannelInboundHandler {

    private int counter;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, StandardCharsets.UTF_8);
        System.out.println(body + " count:" + ++counter + "----end----\n");
    }



    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("client channelActive");
        byte[] req = ("我是一条测试新闻,快来读我吧,啦啦啦").getBytes();

        for (int i = 0; i < 100; i++) {
            ByteBuf message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("Client is close");
    }


}

客户端 handler 主要逻辑是:循环100次给服务端发送测试新闻。吸收服务端的确认新闻。

启动项目之后我们来看看客户端 和 服务端划分收到的新闻效果:

服务端吸收到的新闻:

-----start------
我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦�
------end------
-----start------
��我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦�
------end------
-----start------
�啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦
------end------
-----start------
啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,�
------end------
-----start------
��啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧�
------end------
-----start------
�啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦我是一条测试新闻,快来读我吧,啦啦啦
------end------

这里能看到多条新闻被粘到一起发送了。

客户端吸收到服务端回传的新闻:

receive1receive2receive3receive4receive5 count:1----end----

receive6 count:2----end----

服务端收到 6 次新闻,以是回复了 6 次,同样客户端吸收新闻也泛起粘包的征象。

由于我们并没有对数据包做任何声明,站在 TCP 协议端看, Netty 属于应用层,我们上面的示例代码中未对原始的数据包做任何处置。

4. Netty 粘包处置

处置 TCP 粘包的唯一方式就是制订应用层的数据通讯协议,通过协议来规范现有吸收的数据是否知足新闻数据的需要。

1. Netty 提供的能力

为了解决网络数据流的拆包粘包问题,Netty 为我们内置了如下的解码器:

  • ByteToMessageDecoder:若是想实现自己的半包解码器,实现该类;
  • MessageToMessageDecoder:一样平常作为二次解码器,当我们在 ByteToMessageDecoder 将一个 bytes 数组转换成一个 java 工具的时刻,我们可能还需要将这个工具举行二次解码成其他工具,我们就可以继续这个类;
  • LineBasedFrameDecoder:通过在包尾添加回车换行符 \r\n 来区分整包新闻;
  • StringDecoder:字符串解码器;
  • DelimiterBasedFrameDecoder:特殊字符作为分隔符来区分整包新闻;
  • FixedLengthFrameDecoder:报文巨细牢固长度,不够空格补全;
  • ProtoBufVarint32FrameDecoder:通过 Protobuf 解码器来区分整包新闻;
  • ProtobufDecoder: Protobuf 解码器;
  • LengthFieldBasedFrameDecoder:指定长度来标识整包新闻,通过在包头指定整包长度来约定包长。

Netty 还内置了如下的编码器:

  • ProtobufEncoder:Protobuf 编码器;
  • MessageToByteEncoder:将 Java 工具编码成 ByteBuf;
  • MessageToMessageEncoder:若是不想将 Java 工具编码成 ByteBuf,而是自界说类就继续这个;
  • LengthFieldPrepender:LengthFieldPrepender 是一个异常适用的工具类,若是我们在发送新闻的时刻接纳的是:新闻长度字段+原始新闻的形式,那么我们就可以使用 LengthFieldPrepender。这是由于 LengthFieldPrepender 可以将待发送新闻的长度(二进制字节长度)写到 ByteBuf 的前两个字节。

编解码相关类结构图如下:

搭建你的第一个区块链网络(二)

Netty 中的粘包和拆包

上面的类关系能看到所有的自界说解码器都是继续自 ByteToMessageDecoder。在Netty 中 Decoder 主要分为两大类:

  1. 一种是将字节约转换为某种协议的数据花样:ByteToMessageDecoderReplayingDecoder
  2. 一种是将一直协议的数据转为另一种协议的数据花样:MessageToMessageDecoder

将字节约转为工具是一种很常见的操作,也是一个新闻框架应该提供的基础功效。由于 Decoder 的作用是将输入的数据剖析成特定协议,上图中可以看到所有的 Decoder 都实现了 ChannelInboundHandler接口。在应用层将 byte 转为 message 的难度在于若何确定当前的包是一个完整的数据包,有两种方案可以实现:

  1. 监听当前 socket 的线程一直守候,直到收到的 byte 可以完成的组成一个包为止。这种方式的坏处就在于要虚耗一个线程去等。
  2. 第二种方案是为每个监听的 socket 都构建一个内陆缓存,当前监听线程若是遇到字节数不够的情形就先将获取到的数据存入缓存,继而处置其余请求,等到这里有数据的时刻再来将新数据继续写入缓存直到数据组成一个完整的包取出。

ByteToMessageDecoder 接纳的是第二种方案。在 ByteToMessageDecoder 中有一个工具 ByteBuf,该工具用于存储当前 Decoder吸收到的 byte 数据。

public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
	
  // 用来保留累计读取到的字节. 我们读到的新字节会保留(缓冲)在这里
  ByteBuf cumulation;
  // 用来做累计的,卖力将读到的新字节写入 cumulation,有两个实现 MERGE_CUMULATOR 和 COMPOSITE_CUMULATOR
  private Cumulator cumulator = MERGE_CUMULATOR;
  //设置为true后, 单个解码器只会解码出一个效果
  private boolean singleDecode;
  private boolean decodeWasNull;
  //是否是第一次读取数据
  private boolean first;
  //若干次读取后, 抛弃数据 默认16次
  private int discardAfterReads = 16;
  //已经累加了若干次数据
  private int numReads;
  
  //每次吸收到数据,就会挪用channelRead 举行处置
  //该处置器用于处置二进制数据,以是 msg 字段的类型应该是 ByteBuf。
  //若是不是,则交给pipeLine的下一个处置器举行处置。
  //下面的代码中可以看出
  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //若是不是ByteBuf则不处置
    if (msg instanceof ByteBuf) {
      //out用于存储剖析二进制流获得的效果,一个二进制流可能会剖析出多个新闻,以是out是一个list
      CodecOutputList out = CodecOutputList.newInstance();
      try {
        ByteBuf data = (ByteBuf) msg;
        //判断cumulation == null;并将效果赋值给first。因此若是first为true,则示意第一次接受到数据     
        first = cumulation == null;
        //若是是第一次接受到数据,直接将接受到的数据赋值给缓存工具cumulation
        if (first) {
          cumulation = data;
        } else {
          // 第二次解码,就将 data 向 cumulation 追加,并释放 data
          //若是cumulation中的剩余空间,不足以存储吸收到的data,将cumulation扩容
          cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
        }
        // 获得追加后的 cumulation 后,挪用 decode 方式举行解码
				// 解码历程中,挪用 fireChannelRead 方式,主要目的是将累积区的内容 decode 到 数组中
        callDecode(ctx, cumulation, out);
      } catch (DecoderException e) {
        throw e;
      } catch (Throwable t) {
        throw new DecoderException(t);
      } finally {
         //若是cumulation没有数据可读了,说明所有的二进制数据都被剖析过了
         //此时对cumulation举行释放,以节约内存空间。
         //反之cumulation另有数据可读,那么if中的语句不会运行,由于纰谬cumulation举行释放
         //因此也就缓存了用户尚未剖析的二进制数据。
        if (cumulation != null && !cumulation.isReadable()) {
          // 将次数归零
          numReads = 0;
          // 释放累计区
          cumulation.release();
          // 守候 gc
          cumulation = null;
          
          // 若是跨越了 16 次,就压缩累计区,主要是将已经读过的数据抛弃,将 readIndex 归零。
        } else if (++ numReads >= discardAfterReads) {
          // We did enough reads already try to discard some bytes so we not risk to see a OOME.
          // See https://github.com/netty/netty/issues/4275
          numReads = 0;
          discardSomeReadBytes();
        }

        int size = out.size();
        // 若是没有向数组插入过任何数据
        decodeWasNull = !out.insertSinceRecycled();
        // 循环数组,向后面的 handler 发送数据,若是数组是空,那不会挪用
        fireChannelRead(ctx, out, size);
         // 将数组中的内容清空,将数组的数组的下标恢复至原来
        out.recycle();
      }
    } else {
      //若是msg类型是不是ByteBuf,直接挪用下一个handler举行处置
      ctx.fireChannelRead(msg);
    }
  }
  
  //callDecode方式主要用于剖析cumulation 中的数据,并将剖析的效果放入List<Object> out中。
  //由于cumulation中缓存的二进制数据,可能包罗了出多条有用信息,因此在callDecode方式中,默认会挪用多次decode方式
  //我们在覆写decode方式时,每次只剖析一个新闻,添加到out中,callDecode通过多次回调decode
  //每次通报进来都是相同的List<Object> out实例,因此每一次剖析出来的新闻,都存储在同一个out实例中。
  //当cumulation没有数据可以继续读,或者某次挪用decode方式后,List<Object> out中元素个数没有转变,则住手回调decode方式。
  protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    try {
      //若是cumulation中有数据可读的话,一直循环挪用decode
      while (in.isReadable()) {
        //获取上一次decode方式挪用后,out中元素数目,若是是第一次挪用,则为0。
        int outSize = out.size();
				//上次循环乐成解码
        if (outSize > 0) {
          //用后面的营业 handler 的 ChannelRead 方式读取剖析的数据
          fireChannelRead(ctx, out, outSize);
          out.clear();

         
          if (ctx.isRemoved()) {
            break;
          }
          outSize = 0;
        }

        int oldInputLength = in.readableBytes();
        //回调decode方式,由开发者覆写,用于剖析in中包罗的二进制数据,并将剖析效果放到out中。
        decode(ctx, in, out);

      
        if (ctx.isRemoved()) {
          break;
        }
				//outSize是上一次decode方式挪用时out的巨细,out.size()是当前out巨细
        //若是二者相等,则说明当前decode方式挪用没有剖析出有用信息。
        if (outSize == out.size()) {
          //此时,若是发现上次decode方式和本次decode方式挪用候,in中的剩余可读字节数相同
          //则说明本次decode方式没有读取任何数据剖析
          //(可能是遇到半包等问题,即剩余的二进制数据不足以组成一条新闻),跳出while循环。
          if (oldInputLength == in.readableBytes()) {
            break;
          } else {
            continue;
          }
        }
				//处置人为失误 。若是走到这段代码,则说明outSize != out.size()。
        //也就是本次decode方式现实上是剖析出来了有用信息放到out中。
        //然则oldInputLength == in.readableBytes(),说明本次decode方式挪用并没有读取任何数据
        //然则out中元素却添加了。
        //这可能是由于开发者错误的编写了代码,例如mock了一个新闻放到List中。
        if (oldInputLength == in.readableBytes()) {
          throw new DecoderException(
            StringUtil.simpleClassName(getClass()) +
            ".decode() did not read anything but decoded a message.");
        }

        if (isSingleDecode()) {
          break;
        }
      }
    } catch (DecoderException e) {
      throw e;
    } catch (Throwable cause) {
      throw new DecoderException(cause);
    }
  }
  
}

这里 channelRead()的主要逻辑是:

  1. 从工具池中取出一个空的数组;
  2. 判断成员变量是否是第一次使用,要注重的是,这里既然使用了成员变量,以是这个 handler 不能是 @Shareble 状态的 handler,否则你就分不清成员变量是哪个 channel 的。将 unsafe 中通报来的数据写入到这个 cumulation 累积区中;
  3. 写到累积区后,挪用子类的 decode 方式,实验将累积区的内容解码,每乐成解码一个,就挪用后面节点的 channelRead 方式。若没有解码乐成,什么都不做;
  4. 若是累积区没有未读数据了,就释放累积区;
  5. 若是另有未读数据,且解码跨越了 16 次(默认),就对累积区举行压缩。将读取过的数据清空,也就是将 readIndex 设置为0;
  6. 设置 decodeWasNull 的值,若是上一次没有插入任何数据,这个值就是 ture。该值在 挪用 channelReadComplete 方式的时刻,会触发 read 方式(不是自动读取的话),实验从 JDK 的通道中读取数据,并将之前的逻辑重来。主要应该是怕若是什么数据都没有插入,就执行 channelReadComplete 会遗漏数据;
  7. 挪用 fireChannelRead 方式,将数组中的元素发送到后面的 handler 中;
  8. 将数组清空。并还给工具池。

当数据添加到累积区之后,需要挪用 decode 方式举行解码,代码见上面的 callDecode()方式。在 callDecode()中最要害的代码就是将剖析完的数据拿取挪用decode(ctx, in, out)方式。以是若是继续 ByteToMessageDecoder 类实现自己的字节约转工具的逻辑我们就要覆写该方式。

2. LineBasedFrameDecoder 使用

LineBasedFrameDecoder 通过在包尾添加回车换行符 \r\n 来区分整包新闻。逻辑比较简朴,示例代码见:

示例代码见:LineBasedFrameDecoder gitHub示例

3. FixedLengthFrameDecoder 使用

LineBasedFrameDecoder即牢固新闻长度解码器,个人认为这个貌似不能适用通用场景。

示例代码见:FixedLengthFrameDecoder gitHub 示例

4. DelimiterBasedFrameDecoder 使用

DelimiterBasedFrameDecoder即自界说分隔符解码器。相当于是 LineBasedFrameDecoder的高阶版。

示例代码见:DelimiterBasedFrameDecoder gitHub示例

5. LengthFieldBasedFrameDecoder 使用

LengthFieldBasedFrameDecoder相对就高端一点。前面我们使用到的拆包都是基于一些约定来做的,好比牢固长度,特殊分隔符,这些方案总是有一定的坏处。最好的方案就是:发送方告诉我当前新闻总长度,吸收方若是没有收到该长度巨细的数据就认为是没有收完继续守候。

先看一下该类的组织函数:

		/**
     * Creates a new instance.
     *
     * @param maxFrameLength 帧的最大长度
     *        
     * @param lengthFieldOffset 长度字段偏移的地址
     *        
     * @param lengthFieldLength 长度字段所占的字节长
     *        修改帧数据长度字段中界说的值,可以为负数 由于有时刻我们习惯把头部记入长度,
     *        若为负数,则说明要推后若干个字段
     * @param lengthAdjustment 剖析时刻跳过若干个长度
     *
     * @param initialBytesToStrip 解码出一个数据包之后,去掉开头的字节数
     *        
     * @param initialBytesToStrip  为true,当frame长度跨越maxFrameLength时立刻报
     *                   TooLongFrameException异常,为false,读取完整个帧再报异
     *        
     */
public LengthFieldBasedFrameDecoder(
  int maxFrameLength,
  int lengthFieldOffset, int lengthFieldLength,
  int lengthAdjustment, int initialBytesToStrip) {
  this(
    maxFrameLength,
    lengthFieldOffset, lengthFieldLength, lengthAdjustment,
    initialBytesToStrip, true);
}

LengthFieldBasedFrameDecoder类的注解上给出了一些关于该类使用的示例:

示例1:

lengthFieldOffset = 0,长度字段偏移位置为0示意从包的第一个字节最先读取;

lengthFieldLength = 2,长度字段长为2,从包的最先位置往后2个字节的长度为长度字段;

lengthAdjustment = 0 ,剖析的时刻无需跳过任何长度;

initialBytesToStrip = 0,无需去掉当前数据包的开头字节数, header + body。

0x000C 转为 int = 12。

 * <pre>
 * <b>lengthFieldOffset</b>   = <b>0</b>
 * <b>lengthFieldLength</b>   = <b>2</b>
 * lengthAdjustment    = 0
 * initialBytesToStrip = 0 (= do not strip header)
 *
 * BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)
 * +--------+----------------+      +--------+----------------+
 * | Length | Actual Content |----->| Length | Actual Content |
 * | 0x000C | "HELLO, WORLD" |      | 0x000C | "HELLO, WORLD" |
 * +--------+----------------+      +--------+----------------+
 * </pre>

上面这个设置示意:body长度为12,从当前包的第0个字节最先读取,前两个字节示意包长度,读取数据 body的时刻不偏移从0字节最先,以是整包巨细14个字节,包罗包头长度字节在内。

示例2:

lengthFieldOffset = 0,长度字段偏移位置为0示意从包的第一个字节最先读取;

lengthFieldLength = 2,长度字段长为2,从包的最先位置往后2个字节的长度为长度字段;

lengthAdjustment = 0 ,剖析的时刻无需跳过任何长度;

initialBytesToStrip = 2,去掉当前数据包的开头2字节,去掉 header。

0x000C 转为 int = 12。

* <pre>
* lengthFieldOffset   = 0
* lengthFieldLength   = 2
* lengthAdjustment    = 0
* <b>initialBytesToStrip</b> = <b>2</b> (= the length of the Length field)
*
* BEFORE DECODE (14 bytes)         AFTER DECODE (12 bytes)
* +--------+----------------+      +----------------+
* | Length | Actual Content |----->| Actual Content |
* | 0x000C | "HELLO, WORLD" |      | "HELLO, WORLD" |
* +--------+----------------+      +----------------+
* </pre>

这个设置跟上面的而区别就在于,initialBytesToStrip = 2,示意当前包中的有用数据是从整包偏移2个字节最先盘算的,即包头中的长度字段 2 byte 不属于包内容的一部门。

示例3:

lengthFieldOffset = 0,长度字段偏移位置为0示意从包的第一个字节最先读取;

lengthFieldLength = 2,长度字段长为2,从包的最先位置往后2个字节的长度为长度字段;

lengthAdjustment = -2 ,剖析的时刻无需跳过任何长度;

initialBytesToStrip = 0,无需去掉当前数据包的开头字节数。

0x000C 转为 int = 12。

* <pre>
* lengthFieldOffset   =  0
* lengthFieldLength   =  2
* <b>lengthAdjustment</b>    = <b>-2</b> (= the length of the Length field)
* initialBytesToStrip =  0
*
* BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)
* +--------+----------------+      +--------+----------------+
* | Length | Actual Content |----->| Length | Actual Content |
* | 0x000E | "HELLO, WORLD" |      | 0x000E | "HELLO, WORLD" |
* +--------+----------------+      +--------+----------------+
* </pre>

length = 14,长度字段为 2 字节,真实的数据长度为 12 个字节,然则 length = 14,那么说明 length的长度也算上了数据包长度了。lengthAdjustment = -2 ,示意当前length长度往回调2个字节,这样总包长度就是14个字节。

示例4:

lengthFieldOffset = 2,长度字段偏移位置为2示意从包的第3个字节最先读取;

lengthFieldLength = 3,长度字段长为3,从包的最先位置往后3个字节的长度为长度字段;

lengthAdjustment = 0 ,剖析的时刻无需跳过任何长度;

initialBytesToStrip = 0,无需去掉当前数据包的开头字节数。

0x000E 转为 int = 14。

  * <pre>
  * <b>lengthFieldOffset</b>   = <b>2</b> (= the length of Header 1)
  * <b>lengthFieldLength</b>   = <b>3</b>
  * lengthAdjustment    = 0
  * initialBytesToStrip = 0
  *
  * BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)
  * +----------+----------+----------------+      +----------+----------+----------------+
  * | Header 1 |  Length  | Actual Content |----->| Header 1 |  Length  | Actual Content |
  * |  0xCAFE  | 0x00000C | "HELLO, WORLD" |      |  0xCAFE  | 0x00000C | "HELLO, WORLD" |
  * +----------+----------+----------------+      +----------+----------+----------------+
  * </pre>
  *

header头占2个字节,长度字段占3个字节,content字段占12个字节,总共17个字节。body读取无偏移要求,以是body整体也是17个字节。

示例5:

lengthFieldOffset = 0,长度字段偏移位置为0示意从包的第0个字节最先读取;

lengthFieldLength = 3,长度字段长为3,从包的最先位置往后3个字节的长度为长度字段;

lengthAdjustment = 2 ,剖析的时刻跳过2个字节;

initialBytesToStrip = 0,无需去掉当前数据包的开头字节数。

0x000C 转为 int = 12。

* <pre>
* lengthFieldOffset   = 0
* lengthFieldLength   = 3
* <b>lengthAdjustment</b>    = <b>2</b> (= the length of Header 1)
* initialBytesToStrip = 0
*
* BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)
* +----------+----------+----------------+      +----------+----------+----------------+
* |  Length  | Header 1 | Actual Content |----->|  Length  | Header 1 | Actual Content |
* | 0x00000C |  0xCAFE  | "HELLO, WORLD" |      | 0x00000C |  0xCAFE  | "HELLO, WORLD" |
* +----------+----------+----------------+      +----------+----------+----------------+
* </pre>
*

这个包 length在最前面传输占3个字节,header在中心占两个字节,content在最后占12个字节。body字段只有content,以是读取content的时刻需要在length字段的基础上往前偏移2个字节跳过heade字段。

关于 LengthFieldBasedFrameDecoder 组织函数的示例用法我们先将这么多,下来举一个示例我们看看现实中的使用:

示例代码见:LengthFieldBasedFrameDecoder基本使用 gitHub示例

代码注释:

@Slf4j
public class PeClientHandler extends SimpleChannelInboundHandler {

  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    log.info("client channelActive");
    for (int i = 0; i < 100; i++) {
      byte[] req = ("我是一条测试新闻,快来读我吧,啦啦啦" + i).getBytes();
      ByteBuf message = Unpooled.buffer(req.length);
      message.writeInt(req.length);
      message.writeBytes(req);
      ctx.writeAndFlush(message);
    }
  }
}

客户端发送新闻是:int型的length字段占4个字节,剩余字节为content内容。那么对应到客户端吸收的解码器设置:

pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, // 帧的最大长度,即每个数据包最大限度
                                                  0, // 长度字段偏移量
                                                  4, // 长度字段所占的字节数
                                                  0, // 新闻头的长度,可以为负数
                                                  4) // 需要忽略的字节数,从新闻头最先,这里是指整个包

                );

长度字段4个字节,新闻体忽略4字节,即清扫长度字段之后的内容算是body。

以上的这段演示代码的重点,人人可以下载示例功效,自己演示一下。

然则有个问题是:我们上面写的示例代码在生产环境中只能是玩具。新闻体的读取设置不应该在这里通过参数设置来设置,应该有一个约定的新闻结构体,每一个字段是什么数据结构会占用多大空间都应该在结构体中约定清晰。每个字段读取对应空间巨细的数据剩下的就是别人的部门互不侵犯。

以是下面的一个示例给出了通过继续 LengthFieldBasedFrameDecoder 重写 decode 方式来实现剖析出约定工具的实现。

6. 自界说编解码器的 LengthFieldBasedFrameDecoder 使用

首先我们自界说了一个新闻体:

public class MsgReq {

    private byte type;

    private int length;

    private String content;


}

包罗3个字段。

发送新闻出去的时刻一定是要将工具转为 byte 发送,以是需要一个新闻编码器,我们继续 MessageToByteEncoder 来实现编码器:

package com.rickiyang.learn.packageEvent5;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

import java.nio.charset.StandardCharsets;

/**
 * @author rickiyang
 * @date 2020-05-14
 * @Desc 自界说编码器
 */
public class MyProtocolEncoder extends MessageToByteEncoder {



    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
        MsgReq req = (MsgReq) msg;
        out.writeByte(req.getType());
        out.writeInt(req.getLength());
        out.writeBytes(req.getContent().getBytes(StandardCharsets.UTF_8));
    }
}

即将 MsgReq 工具转为对应的 byte 发送。

发送出去的是 byte 字节,对应的解码器应该是将 byte 转为工具。自然解码器应该是继续 ByteToMessageDecoder。我们的目的不是自己实现一个完完全全的自界说解码器,而是在新闻长度解码器的基础上完成工具剖析的事情,以是解码器如下:

package com.rickiyang.learn.packageEvent5;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

import java.nio.charset.StandardCharsets;

/**
 * @author rickiyang
 * @date 2020-05-14
 * @Desc 自界说解码器
 */
public class MyProtocolDecoder extends LengthFieldBasedFrameDecoder {


    /**
     * @param maxFrameLength      帧的最大长度
     * @param lengthFieldOffset   length字段偏移的地址
     * @param lengthFieldLength   length字段所占的字节长
     * @param lengthAdjustment    修改帧数据长度字段中界说的值,可以为负数 由于有时刻我们习惯把头部记入长度,若为负数,则说明要推后若干个字段
     * @param initialBytesToStrip 剖析时刻跳过若干个长度
     * @param failFast            为true,当frame长度跨越maxFrameLength时立刻报TooLongFrameException异常,为false,读取完整个帧再报异
     */
    public MyProtocolDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,
                             int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
        super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast);
    }


    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        //在这里挪用父类的方式
        in = (ByteBuf) super.decode(ctx, in);
        if (in == null) {
            return null;
        }
        //读取type字段
        byte type = in.readByte();
        //读取length字段
        int length = in.readInt();
        if (in.readableBytes() != length) {
            throw new RuntimeException("长度与符号不符");
        }
        //读取body
        byte[] bytes = new byte[in.readableBytes()];
        in.readBytes(bytes);
        return MsgReq.builder().length(length).type(type).content(new String(bytes, StandardCharsets.UTF_8)).build();
    }
}

通过这种方式,我们只用约定好新闻的最大长度,好比一条新闻跨越若干字节就拒收,约定好新闻长度字段所占的字节,一样平常来说int类型4个字节足够。剩下的几个参数都无需设置,根据约定的新闻花样举行剖析即可。

示例代码见:LengthFieldBasedFrameDecoder自界说编解码器 gitHub示例

5. 小结

本篇将了关于 Netty 中处置拆包粘包的一些适用工具以及若是实现自界说的编解码器的方式。每种处置方式都给出了对应的案例操作,人人有兴趣的可以下载代码自行运行看看处置效果。后面也给出了关于自界说编解码器的示例,人人若是有兴趣可以自己写一下编解码操作,下一篇再一起看看编解码器在新闻读写历程被使用在哪个阶段。

原创文章,作者:admin,如若转载,请注明出处:https://www.2lxm.com/archives/9902.html