Netty(4)Stream by codec(粘包与拆包)

摘要:
可能会发生粘包和拆包。比如本例,channelAdd()中初始分配一个4字节的ByteBuf,在channelRemove()中释放ByteBuf。TimeDecoder.javaimportjava.util.List;importio.netty.buffer.ByteBuf;importio.netty.channel.ChannelHandlerContext;importio.netty.handler.codec.ByteToMessageDecoder;publicclassTimeDecoderextendsByteToMessageDecoder{//@OverrideprotectedvoiddecodethrowsException{//if{return;//}out.add;//}}1、ByteToMessageDecoder实现了ChannelInboundHandler2、ByteToMessageDecoder,当调用decode()时,其内部实现了“累积”功能的buffer,即不用自己在写全局buffer了,当接收到新数据时,会向该buffer中写入。此时,ByteToMessageDecoder将会丢弃掉“累积”buffer中已读的消息。

TCP/IP,传输的是byte[],将byte[]放入队列中。可能会发生粘包和拆包。

比如,客户端向服务端发送了2条消息,分别为D1,D2,可能产生的情况,如下图所示:

Netty(4)Stream by codec(粘包与拆包)第1张

情况一:正常的。

情况二:粘包。

情况三:拆包。即:部分数据不是一次完整发送的,而是分了至少2次发送。

如本例,D2拆成了D2_1和D2_2,这是拆包。

服务端分2次收到包,第一次收到了D1和D2_1包,这是粘包;服务端第二次收到了D2_2包,这是拆包。

回到Time client例子,存在相同的问题。4字节的int很小,很少发生粘包或拆包。但是,如果并发量大时,可能会发生。
最简单的方法是创建一个内部全局的(只为了多次接收放入相同buffer)buffer,等待,直到4个字节全部接受。以下修改了TimeClientHandler解决此问题。

第一种解决方法:全局buffer,累积。

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import cn.hutool.core.date.DateUtil;
@Slf4j
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    private ByteBuf buf;
    
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        buf = ctx.alloc().buffer(4);//(1)
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        buf.release();//(1)
        buf =null;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf m =(ByteBuf) msg;
        buf.writeBytes(m);//(2)
        m.release();
        if (buf.readableBytes() >= 4) {//(3)
            long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L)*1000L;
            log.info("{}",DateUtil.date(currentTimeMillis));
            ctx.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}

1、ChannelHandler

  • ChannelHandler有2个监听器方法(生命周期):handlerAdded()和handlerRemoved()。当该handler被添加到pipeline中时,被触发;当该handler从pipeline删除时,被触发。该handler,执行顺序:channelAdd()--->channelRead()--->channelRemove()。
  • 只要任务不重,即不会阻塞很久,可以在这2个方法中做一些初始化的工作。比如本例,channelAdd()中初始分配一个4字节的ByteBuf,在channelRemove()中释放ByteBuf。

2、首先,接收到的所有字节均被写入buf。
3、然后,handler必须要检查是否够4个字节(本例),如果不够(拆包),则当有剩下的数据来时,Netty会再次调用该channelRead()方法,直到4个字节都接收到为止

第二种解决方法:使用解码器

尽管第一种方法解决了粘包和拆包问题,但是,代码臃肿。因为,可以向pipeline中添加多个handler,因此,我们可以将TimeClientHandler分割成2个handler:
1)、TimeDecoder
2)、上节里的TimeClientHandler版本。

TimeDecoder.java

importjava.util.List;

importio.netty.buffer.ByteBuf;
importio.netty.channel.ChannelHandlerContext;
importio.netty.handler.codec.ByteToMessageDecoder;

public class TimeDecoder extends ByteToMessageDecoder {//(1)
@Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {//(2)
        if (in.readableBytes() < 4) {
            return ;//(3)
}
        out.add(in.readBytes(4));//(4)
}
}

1、ByteToMessageDecoder 实现了ChannelInboundHandler

Netty(4)Stream by codec(粘包与拆包)第2张

2、ByteToMessageDecoder,当调用decode()时,其内部实现了“累积”功能的buffer,即不用自己在写全局buffer了,当接收到新数据时,会向该buffer中写入。
3、decode():比如,一个包分2次传的,则会调用2次decode()。第一次即使不够4个字节,也会存入其内部“累积”buffer。我们的decode()方法中,return即可。
4、decode()中,一旦添加了一个obj到“out”,意味着该decoder已经成功将消息解码了,即解决了粘包拆包问题。此时,ByteToMessageDecoder将会丢弃掉“累积”buffer中已读的消息。ByteToMessageDecoder将会不断调用decode(),直到添加“空”到out为止。

TimeClientHandler.java(上节time中的TimeClientHandler)

importio.netty.buffer.ByteBuf;
importio.netty.channel.ChannelHandlerContext;
importio.netty.channel.ChannelInboundHandlerAdapter;
importlombok.extern.slf4j.Slf4j;
importcn.hutool.core.date.DateUtil;
@Slf4j
public class TimeClientHandler extendsChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throwsException {
        ByteBuf m =(ByteBuf) msg;
        try{
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L)*1000L;
            log.info("{}",DateUtil.date(currentTimeMillis));
            ctx.close();
        } finally{
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throwsException {
        cause.printStackTrace();
        ctx.close();
    }

}

然后,

@Override
protected void initChannel(SocketChannel ch) throwsException {
     ch.pipeline().addLast(new TimeDecoder(),newTimeClientHandler());
}

执行顺序:TimeDecoder.decode(ctx, ByteBuf in, List<Object> out)--->TimeClientHandler.channelRead(ctx, Object msg)。

上例中,从TimeDecoder传给TimeClientHandler的依然是ByteBuf,既然是int,那我们可以直接传递int吗?可以,如下:

TimeDecoder.java,修改成

protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {//(2)
        if (in.readableBytes() < 4) {
            return ;//(3)
}
        //out.add(in.readBytes(4));//(4)
        out.add(in.readUnsignedInt());//(4)
    }

TimeClientHandler.java,修改成

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throwsException {
        long m = (long) msg;
        long currentTimeMillis = (m - 2208988800L)*1000L;
        log.info("{}",DateUtil.date(currentTimeMillis));
        ctx.close();
}

ReplayingDecoder是一个更加简单的decoder。可以代替ByteToMessageDecoder,只需要修改TimeDecoder,如下:

importio.netty.buffer.ByteBuf;
importio.netty.channel.ChannelHandlerContext;
importio.netty.handler.codec.ReplayingDecoder;

importjava.util.List;

public class TimeDecoder extends ReplayingDecoder<Void> {//(1)
@Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {//(2)
out.add(in.readUnsignedInt());
    }
}

其他代码都不用动。

最终输出:

服务端:18:43:45.824 [nioEventLoopGroup-3-4] -549056671

客户端:

18:43:45.854 [nioEventLoopGroup-2-1] 2018-09-14 18:43:45
18:43:45.929 [main] client channel is closed.

免责声明:文章转载自《Netty(4)Stream by codec(粘包与拆包)》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Unity3D中Mathf数学运算函数总结项目中应该怎么选择MySQL的事务隔离级别下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

下一代GNU/Linux显示服务Wayland 1.12正式发布

导读 最近,Bryce Harrington很高兴地宣布了“面向GNU/Linux操作系统的Wayland 1.12.0显示服务已正式发布”的消息。与它一同到来的,还有Weston 1.12.0合成器。 Wayland 1.12和Weston 1.12的开发工作早已开始,一个月前就就想公测者们放出了首个Alpha编译版本。最终编译版本中,加入了许多可以...

hadoop26----netty,多个handler

k客户端: package cn.itcast_03_netty.sendorder.client; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; im...

netty实现长连接心跳检

主要逻辑: 使用netty实现长连接,主要靠心跳来维持服务器端及客户端连接。 实现的逻辑主要是: 服务器端方面: 1, 服务器在网络空闲操作一定时间后,服务端失败心跳计数器加1。 2, 如果收到客户端的ping心跳包,则清零失败心跳计数器,如果连续n次未收到客户端的ping心跳包,则关闭链路,释放资源,等待客户端重连。 客户端方面: 1, 客户端网络...

5G信令(就是用户身份信息)风暴——就是客户端通过公钥加密的消息(携带手机IMSI号)发给服务端,服务器需用私钥解密,这个解密比较消耗资源,如果短时间大量请求到来就会触发信令风暴

信令:手机开机后,先从USIM中读取之前运营商分配的临时身份信息GUTI/TMSI,发送携带该身份信息的信令给基站,请求接入运营商网络。 如果每个设备的每条消息都需要单独认证,则网络侧安全信令的验证需要消耗大量资源。在传统4G网络认证机制中没有考虑到这种海量认证信令的问题,一旦网络收到终端信令请求超过了网络各项信令资源的处理能力,则会触发信令风暴,导致网络...

rabbitmq trace 日志的使用以及其疑惑之处

RabbitMQ 默认日志里只有类似客户端“accpet/close”等信息,对于有异常或者跟踪消息内部结构就比较麻烦了。 不过MQ有个rabbitmq_tracing插件,安装该插件后在控制台的管理tab页,就可以看到多了一个trace的菜单。 其中,最重要的是理解pattern的格式。一般来跟踪消息时会涉及到两个部分:有没有收到,有没有发出去。其抓包...

js原生封装自定义滚动条

1 /* 2 * @Author: dothin前端 3 * @Date: 2015-11-21 00:12:15 4 * @Last Modified by: dothin前端 5 * @Last Modified time: 2015-11-21 00:29:12 6 */ 7 ! function()...