《精通并发与Netty》学习笔记(13

摘要:
一般所谓的TCP粘包是在一次接收数据不能完全地体现一个完整的消息数据。

一、粘包/拆包概念

TCP是一个“流”协议,所谓流,就是没有界限的一长串二进制数据。TCP作为传输层协议并不不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行数据包的划分,所以在业务上认为是一个完整的包,可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。

一般所谓的TCP粘包是在一次接收数据不能完全地体现一个完整的消息数据。TCP通讯为何存在粘包呢?主要原因是TCP是以流的方式来处理数据,再加上网络上MTU的往往小于在应用处理的消息数据,所以就会引发一次接收的数据无法满足消息的需要,导致粘包的存在。处理粘包的唯一方法就是制定应用层的数据通讯协议,通过协议来规范现有接收的数据是否满足消息数据的需要。

现在假设客户端向服务端连续发送了两个数据包,用packet1和packet2来表示,那么服务端收到的数据可以分为三种,现列举如下:

第一种情况:

接收端正常收到两个数据包,即没有发生拆包和粘包的现象,此种情况不在本文的讨论范围内。

《精通并发与Netty》学习笔记(13第1张

第二种情况:

接收端只收到一个数据包,由于TCP是不会出现丢包的,所以这一个数据包中包含了发送端发送的两个数据包的信息,这种现象即为粘包。这种情况由于接收端不知道这两个数据包的界限,所以对于接收端来说很难处理。

《精通并发与Netty》学习笔记(13第2张

第三种情况:

这种情况有两种表现形式,如下图。接收端收到了两个数据包,但是这两个数据包要么是不完整的,要么就是多出来一块,这种情况即发生了拆包和粘包。这两种情况如果不加特殊处理,对于接收端同样是不好处理的。

《精通并发与Netty》学习笔记(13第3张

《精通并发与Netty》学习笔记(13第4张

二、粘包问题的解决策略

  • 消息定长,报文大小固定长度,不够空格补全,发送和接收方遵循相同的约定,这样即使粘包了通过接收方编程实现获取定长报文也能区分。
  • 包尾添加特殊分隔符,例如每条报文结束都添加回车换行符(例如FTP协议)或者指定特殊字符作为报文分隔符,接收方通过特殊分隔符切分报文区分。
  • 将消息分为消息头和消息体,消息头中包含表示信息的总长度(或者消息体长度)的字段

三、Netty粘包和拆包解决方案

Netty提供了多个解码器,可以进行分包的操作,分别是:
LineBasedFrameDecoder
DelimiterBasedFrameDecoder(添加特殊分隔符报文来分包)
FixedLengthFrameDecoder(使用定长的报文来分包)
LengthFieldBasedFrameDecoder

四、TCP粘包和拆包实例演示

首先编写服务端

packagecom.spring.netty.handler;

importio.netty.bootstrap.ServerBootstrap;
importio.netty.channel.ChannelFuture;
importio.netty.channel.EventLoopGroup;
importio.netty.channel.nio.NioEventLoopGroup;
importio.netty.channel.socket.nio.NioServerSocketChannel;

public classMyServer {
    public static void main(String[] args) throwsException {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = newNioEventLoopGroup();

        try{
            ServerBootstrap serverBootstrap = newServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).
                    childHandler(newMyServerInitializer());

            ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
            channelFuture.channel().closeFuture().sync();
        }finally{
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
packagecom.spring.netty.handler;

importio.netty.channel.ChannelInitializer;
importio.netty.channel.ChannelPipeline;
importio.netty.channel.socket.SocketChannel;

public class MyServerInitializer extends ChannelInitializer<SocketChannel>{
    @Override
    protected void initChannel(SocketChannel ch) throwsException {
        ChannelPipeline pipeline =ch.pipeline();
        pipeline.addLast(newMyServerHandler());
    }
}
packagecom.spring.netty.handler;

importio.netty.buffer.ByteBuf;
importio.netty.buffer.Unpooled;
importio.netty.channel.ChannelHandlerContext;
importio.netty.channel.SimpleChannelInboundHandler;

importjava.nio.charset.Charset;
importjava.util.UUID;

public class MyServerHandler extends SimpleChannelInboundHandler<ByteBuf>{

    private intcount;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throwsException {
        byte[] buffer = new byte[msg.readableBytes()];
        msg.readBytes(buffer);

        String message = new String(buffer, Charset.forName("utf-8"));
        System.out.println("服务端接收到的消息内容:"+message);
        System.out.println("服务端接收的消息数量:"+(++this.count));

        ByteBuf responseByteBuf = Unpooled.copiedBuffer(UUID.randomUUID().toString(),Charset.forName("utf-8"));
        ctx.writeAndFlush(responseByteBuf);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throwsException {
        cause.printStackTrace();
        ctx.close();
    }
}

然后编写客户端

packagecom.spring.netty.handler;

importio.netty.bootstrap.Bootstrap;
importio.netty.channel.ChannelFuture;
importio.netty.channel.EventLoopGroup;
importio.netty.channel.nio.NioEventLoopGroup;
importio.netty.channel.socket.nio.NioSocketChannel;

public classMyClient {
    public static void main(String[] args) throwsException {
        EventLoopGroup eventLoopGroup = newNioEventLoopGroup();
        try{
            Bootstrap bootstrap = newBootstrap();
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                    .handler(newMyClientInitializer());
            ChannelFuture channelFuture = bootstrap.connect("localhost",8899).sync();
            channelFuture.channel().closeFuture().sync();
        }finally{
            eventLoopGroup.shutdownGracefully();
        }
    }
}
packagecom.spring.netty.handler;

importio.netty.channel.ChannelInitializer;
importio.netty.channel.ChannelPipeline;
importio.netty.channel.socket.SocketChannel;

public class MyClientInitializer extends ChannelInitializer<SocketChannel>{
    @Override
    protected void initChannel(SocketChannel ch) throwsException {
        ChannelPipeline pipeline =ch.pipeline();
        pipeline.addLast(newMyClientHandler());
    }
}
packagecom.spring.netty.handler;

importio.netty.buffer.ByteBuf;
importio.netty.buffer.Unpooled;
importio.netty.channel.ChannelHandler;
importio.netty.channel.ChannelHandlerContext;
importio.netty.channel.SimpleChannelInboundHandler;
importio.netty.util.concurrent.EventExecutorGroup;

importjava.nio.charset.Charset;

public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf>{
    private intcount;
    @Override
    public void channelActive(ChannelHandlerContext ctx) throwsException {
        for(int i=0;i<10;i++){
            ByteBuf buffer = Unpooled.copiedBuffer("send from client ", Charset.forName("utf-8"));
            ctx.writeAndFlush(buffer);
        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throwsException {
        byte[] buffer = new byte[msg.readableBytes()];
        msg.readBytes(buffer);

        String message = new String(buffer,Charset.forName("utf-8"));
        System.out.println("客户端接收到的消息内容:"+message);
        System.out.println("客户端接收到的消息数量:"+(++this.count));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throwsException {
        cause.printStackTrace();
        ctx.close();
    }
}

分别运行服务端和客户端查看运行效果

服务端效果:

《精通并发与Netty》学习笔记(13第5张

客户端效果:

《精通并发与Netty》学习笔记(13第6张

本节我们介绍了TCP粘包拆包的现象及做了个实例演示,下节我们来介绍在Netty中如何解决粘包拆包问题。

免责声明:文章转载自《《精通并发与Netty》学习笔记(13》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇内核如何启动根文件系统?Mysql 批处理多条sql语句下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Netty之Bootstrapping-netty学习笔记(10)-20210810

Bootstrap类 引导类的层次结构包括一个抽象的父类和两个具体的引导子类: 相对于将具体的引导类分别看作用于服务器和客户端的引导来说,记住它们的本意是用来支撑不同的应用程序的功能的将有所裨益。也就是说,服务器致力于使用一个父 Channel 来接受来自客户端的连接,并创建子 Channel 以用于它们之间的通信;而客户端将最可能只需要一个 单独的、没...

delphi中时间控制

用TTimer的思路有点问题。 请参考以下思路:   窗体建立时,记录GetTickCount值(关于GetTickCount,请Google),然后,捕捉鼠标键盘消息,如有发送到本窗体的鼠标键盘消息,则重 新记录GetTickCount值,如无,则计算当前GetTickCount值减去原值是否大于规定时间,如大于则Close。 例子如下: //思路是这样...

C语言之生产者与消费者模型

多线程并发应用程序有一个经典的模型,即生产者/消费者模型。系统中,产生消息的是生产者,处理消息的是消费者,消费者和生产者通过一个缓冲区进行消息传递。生产者产生消息后提交到缓冲区,然后通知消费者可以从中取出消息进行处理。消费者处理完信息后,通知生产者可以继续提供消息。 要实现这个模型,关键在于消费者和生产者这两个线程进行同步。也就是说:只有缓冲区中有消息时,...

达梦数据库:第一章:MySQL数据库与达梦数据库的区别

达梦数据库管理系统是达梦公司推出的具有完全自主知识产权的高性能数据库管理系统,简称DM,它具有如下特点: 1、通用性达梦数据库管理系统兼容多种硬件体系,可运行于X86、X64、SPARC、POWER等硬件体系之上。DM各种平台上的数据存储结构和消息通信结构完全一致,使得DM各种组件在不同的硬件平台上具有一致的使用特性。达梦数据库管理系统产品实现了平台无关性...

TCP/IP协议族基本知识

常见的网络拓扑 两台主机通信的过程:应用进程产生消息,经由主机的 TCP/IP 协议栈发送到局域网(LAN),最后经过广域网(目前最大的广域网的因特网)中的网络设备(路由器)传给目的主机所在的局域网(LAN),最后经过局域网(LAN)将报文传送个目的主机,经由主机 TCP/IP 协议栈处理,将消息递交给目的应用程序。网络拓扑如下: 图中路由器是网络中的架...

ActiveMQ 认证(一)

新搭建的ActiveMQ服务,在发布和读取消息时,连接的权限为ActiveMQConnection.DEFAULT_USER和ActiveMQConnection.DEFAULT_PASSWORD。 connectionFactory = new ActiveMQConnectionFactory( ActiveMQConn...