JavaNetty拆包粘包(二)

摘要:
importio.nety.bootstrap.bootstrap;publicClient4FixedLength(){init();}privatevoidinit(){group=newNioEventLoopGroup();

netty 使用 tcp/ip 协议传输数据。而 tcp/ip 协议是类似水流一样的数据传输方式。多次 访问的时候有可能出现数据粘包的问题,解决这种问题的方式如下:

定长数据流 

客户端和服务器,提前协调好,每个消息长度固定。(如:长度 10)。如果客户端或服 务器写出的数据不足 10,则使用空白字符补足(如:使用空格)。 

/**
 * 1. 单线程组
 * 2. Bootstrap配置启动信息
 * 3. 注册业务处理Handler
 * 4. connect连接服务,并发起请求
 */


import java.nio.charset.Charset;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

public class Client4FixedLength {
	
	// 处理请求和处理服务端响应的线程组
	private EventLoopGroup group = null;
	// 服务启动相关配置信息
	private Bootstrap bootstrap = null;
	
	public Client4FixedLength(){
		init();
	}
	
	private void init(){
		group = new NioEventLoopGroup();
		bootstrap = new Bootstrap();
		// 绑定线程组
		bootstrap.group(group);
		// 设定通讯模式为NIO
		bootstrap.channel(NioSocketChannel.class);
	}
	
	public ChannelFuture doRequest(String host, int port) throws InterruptedException{
		this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {

			@Override
			protected void initChannel(SocketChannel ch) throws Exception {
				ChannelHandler[] handlers = new ChannelHandler[3];
				handlers[0] = new FixedLengthFrameDecoder(3);
				// 字符串解码器Handler,会自动处理channelRead方法的msg参数,将ByteBuf类型的数据转换为字符串对象
				handlers[1] = new StringDecoder(Charset.forName("UTF-8"));
				handlers[2] = new Client4FixedLengthHandler();
				
				ch.pipeline().addLast(handlers);
			}
		});
		ChannelFuture future = this.bootstrap.connect(host, port).sync();
		return future;
	}
	
	public void release(){
		this.group.shutdownGracefully();
	}
	
	public static void main(String[] args) {
		Client4FixedLength client = null;
		ChannelFuture future = null;
		try{
			client = new Client4FixedLength();
			
			future = client.doRequest("localhost", 9999);
			
			Scanner s = null;
			while(true){
				s = new Scanner(System.in);
				System.out.print("enter message send to server > ");
				String line = s.nextLine();
				byte[] bs = new byte[5];
				byte[] temp = line.getBytes("UTF-8");
				if(temp.length <= 5){
					for(int i = 0; i < temp.length; i++){
						bs[i] = temp[i];
					}
				}
				future.channel().writeAndFlush(Unpooled.copiedBuffer(bs));
				TimeUnit.SECONDS.sleep(1);
			}
		}catch(Exception e){
			e.printStackTrace();
		}finally{
			if(null != future){
				try {
					future.channel().closeFuture().sync();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			if(null != client){
				client.release();
			}
		}
	}
	
}

  

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

public class Client4FixedLengthHandler extends ChannelHandlerAdapter {

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		try{
			String message = msg.toString();
			System.out.println("from server : " + message);
		}finally{
			// 用于释放缓存。避免内存溢出
			ReferenceCountUtil.release(msg);
		}
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		System.out.println("client exceptionCaught method run...");
		// cause.printStackTrace();
		ctx.close();
	}

}

  

/**
 * 1. 双线程组
 * 2. Bootstrap配置启动信息
 * 3. 注册业务处理Handler
 * 4. 绑定服务监听端口并启动服务
 */


import java.nio.charset.Charset;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

public class Server4FixedLength {
	// 监听线程组,监听客户端请求
	private EventLoopGroup acceptorGroup = null;
	// 处理客户端相关操作线程组,负责处理与客户端的数据通讯
	private EventLoopGroup clientGroup = null;
	// 服务启动相关配置信息
	private ServerBootstrap bootstrap = null;
	public Server4FixedLength(){
		init();
	}
	private void init(){
		acceptorGroup = new NioEventLoopGroup();
		clientGroup = new NioEventLoopGroup();
		bootstrap = new ServerBootstrap();
		// 绑定线程组
		bootstrap.group(acceptorGroup, clientGroup);
		// 设定通讯模式为NIO
		bootstrap.channel(NioServerSocketChannel.class);
		// 设定缓冲区大小
		bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
		// SO_SNDBUF发送缓冲区,SO_RCVBUF接收缓冲区,SO_KEEPALIVE开启心跳监测(保证连接有效)
		bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
			.option(ChannelOption.SO_RCVBUF, 16*1024)
			.option(ChannelOption.SO_KEEPALIVE, true);
	}
	public ChannelFuture doAccept(int port) throws InterruptedException{
		
		bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

			@Override
			protected void initChannel(SocketChannel ch) throws Exception {
				ChannelHandler[] acceptorHandlers = new ChannelHandler[3];
				// 定长Handler。通过构造参数设置消息长度(单位是字节)。发送的消息长度不足可以使用空格补全。
				acceptorHandlers[0] = new FixedLengthFrameDecoder(5);
				// 字符串解码器Handler,会自动处理channelRead方法的msg参数,将ByteBuf类型的数据转换为字符串对象
				acceptorHandlers[1] = new StringDecoder(Charset.forName("UTF-8"));
				acceptorHandlers[2] = new Server4FixedLengthHandler();
				ch.pipeline().addLast(acceptorHandlers);
			}
		});
		ChannelFuture future = bootstrap.bind(port).sync();
		return future;
	}
	public void release(){
		this.acceptorGroup.shutdownGracefully();
		this.clientGroup.shutdownGracefully();
	}
	
	public static void main(String[] args){
		ChannelFuture future = null;
		Server4FixedLength server = null;
		try{
			server = new Server4FixedLength();
			
			future = server.doAccept(9999);
			System.out.println("server started.");
			future.channel().closeFuture().sync();
		}catch(InterruptedException e){
			e.printStackTrace();
		}finally{
			if(null != future){
				try {
					future.channel().closeFuture().sync();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			
			if(null != server){
				server.release();
			}
		}
	}
	
}

  

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class Server4FixedLengthHandler extends ChannelHandlerAdapter {
	
	// 业务处理逻辑
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		String message = msg.toString();
		System.out.println("from client : " + message.trim());
		String line = "ok ";
		ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
	}
	

	// 异常处理逻辑
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		System.out.println("server exceptionCaught method run...");
		// cause.printStackTrace();
		ctx.close();
	}

}

特殊结束符 

客户端和服务器,协商定义一个特殊的分隔符号,分隔符号长度自定义。如:‘#’、‘$_$’、 ‘AA@’。在通讯的时候,只要没有发送分隔符号,则代表一条数据没有结束。

import java.nio.charset.Charset;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

public class Server4Delimiter {
	    // 监听线程组,监听客户端请求
		private EventLoopGroup acceptorGroup = null;
		// 处理客户端相关操作线程组,负责处理与客户端的数据通讯
		private EventLoopGroup clientGroup = null;
		// 服务启动相关配置信息
		private ServerBootstrap bootstrap = null;
		public Server4Delimiter(){
			init();
		}
		public void init(){
			acceptorGroup = new NioEventLoopGroup();
			clientGroup = new NioEventLoopGroup();
			bootstrap = new ServerBootstrap();
			// 绑定线程组
			bootstrap.group(acceptorGroup, clientGroup);
			// 设定通讯模式为NIO
			bootstrap.channel(NioServerSocketChannel.class);
			// 设定缓冲区大小
			bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
			// SO_SNDBUF发送缓冲区,SO_RCVBUF接收缓冲区,SO_KEEPALIVE开启心跳监测(保证连接有效)
			bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
				.option(ChannelOption.SO_RCVBUF, 16*1024)
				.option(ChannelOption.SO_KEEPALIVE, true);
		}

		public ChannelFuture  doAccept(int port) throws InterruptedException{
			bootstrap.childHandler(new ChannelInitializer<SocketChannel>(){
				@Override
				public void initChannel(SocketChannel ch) throws Exception{
					// 数据分隔符, 定义的数据分隔符一定是一个ByteBuf类型的数据对象。
					ByteBuf delimiter =Unpooled.copiedBuffer("$E$".getBytes());
					ChannelHandler[] acceptorHandlers =new ChannelHandler[3];
					// 处理固定结束标记符号的Handler。这个Handler没有@Sharable注解修饰,
					// 必须每次初始化通道时创建一个新对象
					// 使用特殊符号分隔处理数据粘包问题,也要定义每个数据包最大长度。netty建议数据有最大长度。
					acceptorHandlers[0]=new DelimiterBasedFrameDecoder(1024, delimiter);
					// 字符串解码器Handler,会自动处理channelRead方法的msg参数,将ByteBuf类型的数据转换为字符串对象
					acceptorHandlers[1]=new StringDecoder(Charset.forName("UTF-8"));
					acceptorHandlers[2]=new  Server4DelimiterHandler();
					ch.pipeline().addLast(acceptorHandlers);
				}
			});
			ChannelFuture  future =bootstrap.bind(port).sync();
			return future;	
		}
		
		public void release(){
			this.acceptorGroup.shutdownGracefully();
			this.clientGroup.shutdownGracefully();
		}
		
		public static void main(String[] args) {
			ChannelFuture future = null;
			Server4Delimiter server = null;
			try{
				server = new Server4Delimiter();
				
				future = server.doAccept(9999);
				System.out.println("server started.");
				future.channel().closeFuture().sync();
			}catch(InterruptedException e){
				e.printStackTrace();
			}finally{
				if(null != future){
					try {
						future.channel().closeFuture().sync();
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
				
				if(null != server){
					server.release();
				}
			}
		}
}

  

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class Server4DelimiterHandler extends ChannelHandlerAdapter {
	    // 业务处理逻辑
		@Override
		public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
			String message = msg.toString();
			System.out.println("from client : " + message);
			String line = "server message $E$ test delimiter handler!! $E$ second message $E$";
			ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
		}
		
		// 异常处理逻辑
		@Override
		public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
			System.out.println("server exceptionCaught method run...");
			// cause.printStackTrace();
			ctx.close();
		}
}

  

import java.nio.charset.Charset;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

public class Client4Delimiter {
	   // 处理请求和处理服务端响应的线程组
		private EventLoopGroup group = null;
		// 服务启动相关配置信息
		private Bootstrap bootstrap = null;
		
		public Client4Delimiter(){
			init();
		}
		
		private void init(){
			group = new NioEventLoopGroup();
			bootstrap = new Bootstrap();
			// 绑定线程组
			bootstrap.group(group);
			// 设定通讯模式为NIO
			bootstrap.channel(NioSocketChannel.class);
		}
		
		public ChannelFuture doRequest(String host, int port) throws InterruptedException{
			this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {

				@Override
				protected void initChannel(SocketChannel ch) throws Exception {
					// 数据分隔符
					ByteBuf delimiter = Unpooled.copiedBuffer("$E$".getBytes());
					ChannelHandler[] handlers = new ChannelHandler[3];
					handlers[0] = new DelimiterBasedFrameDecoder(1024, delimiter);
					// 字符串解码器Handler,会自动处理channelRead方法的msg参数,将ByteBuf类型的数据转换为字符串对象
					handlers[1] = new StringDecoder(Charset.forName("UTF-8"));
					handlers[2] = new Client4DelimiterHandler();
					
					ch.pipeline().addLast(handlers);
				}
			});
			ChannelFuture future = this.bootstrap.connect(host, port).sync();
			return future;
		}
		
		public void release(){
			this.group.shutdownGracefully();
		}
		
		public static void main(String[] args) {
			Client4Delimiter client = null;
			ChannelFuture future = null;
			try{
				client = new Client4Delimiter();
				
				future = client.doRequest("localhost", 9999);
				
				Scanner s = null;
				while(true){
					s = new Scanner(System.in);
					System.out.print("enter message send to server > ");
					String line = s.nextLine();
					future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
					TimeUnit.SECONDS.sleep(1);
				}
			}catch(Exception e){
				e.printStackTrace();
			}finally{
				if(null != future){
					try {
						future.channel().closeFuture().sync();
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
				if(null != client){
					client.release();
				}
			}
		}
}

  

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

public class Client4DelimiterHandler extends ChannelHandlerAdapter {
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		try{
			String message = msg.toString();
			System.out.println("from server : " + message);
		}finally{
			// 用于释放缓存。避免内存溢出
			ReferenceCountUtil.release(msg);
		}
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		System.out.println("client exceptionCaught method run...");
		// cause.printStackTrace();
		ctx.close();
	}

}

  

协议 

相对最成熟的数据传递方式。有服务器的开发者提供一个固定格式的协议标准。客户端 和服务器发送数据和接受数据的时候,都依据协议制定和解析消息。

 自定义协议格式:

协议格式:
HEADcontent-length:xxxxHEADBODYxxxxxxBODY

  

/**
 * 1. 双线程组
 * 2. Bootstrap配置启动信息
 * 3. 注册业务处理Handler
 * 4. 绑定服务监听端口并启动服务
 */
import java.nio.charset.Charset;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;

public class Server4Protocol {
	// 监听线程组,监听客户端请求
	private EventLoopGroup acceptorGroup = null;
	// 处理客户端相关操作线程组,负责处理与客户端的数据通讯
	private EventLoopGroup clientGroup = null;
	// 服务启动相关配置信息
	private ServerBootstrap bootstrap = null;

	public Server4Protocol() {
		init();
	}

	private void init() {
		acceptorGroup = new NioEventLoopGroup();
		clientGroup = new NioEventLoopGroup();
		bootstrap = new ServerBootstrap();
		// 绑定线程组
		bootstrap.group(acceptorGroup, clientGroup);
		// 设定通讯模式为NIO
		bootstrap.channel(NioServerSocketChannel.class);
		// 设定缓冲区大小
		bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
		// SO_SNDBUF发送缓冲区,SO_RCVBUF接收缓冲区,SO_KEEPALIVE开启心跳监测(保证连接有效)
		bootstrap.option(ChannelOption.SO_SNDBUF, 16 * 1024).option(ChannelOption.SO_RCVBUF, 16 * 1024)
				.option(ChannelOption.SO_KEEPALIVE, true);
	}

	public ChannelFuture doAccept(int port, final ChannelHandler... acceptorHandlers) throws InterruptedException {

		bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

			@Override
			protected void initChannel(SocketChannel ch) throws Exception {
				ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
				ch.pipeline().addLast(acceptorHandlers);
			}
		});
		ChannelFuture future = bootstrap.bind(port).sync();
		return future;
	}

	public void release() {
		this.acceptorGroup.shutdownGracefully();
		this.clientGroup.shutdownGracefully();
	}

	public static void main(String[] args) {
		ChannelFuture future = null;
		Server4Protocol server = null;
		try {
			server = new Server4Protocol();
			future = server.doAccept(9999, new Server4ProtocolHandler());
			System.out.println("server started.");

			future.channel().closeFuture().sync();

		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			if(null != future){
				try {
					future.channel().closeFuture().sync();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			
			if(null != server){
				server.release();
			}

		}
	}

}

  

/**
 * @Sharable注解 - 
 *  代表当前Handler是一个可以分享的处理器。也就意味着,服务器注册此Handler后,可以分享给多个客户端同时使用。
 *  如果不使用注解描述类型,则每次客户端请求时,必须为客户端重新创建一个新的Handler对象。
 *  
 */
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;


@Sharable
public class Server4ProtocolHandler extends ChannelHandlerAdapter {
	// 业务处理逻辑
		@Override
		public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
			String message = msg.toString();
			System.out.println("server receive protocol content : " + message);
			message = ProtocolParser.parse(message);
			if(null == message){
				System.out.println("error request from client");
				return ;
			}
			System.out.println("from client : " + message);
			String line = "server message";
			line = ProtocolParser.transferTo(line);
			System.out.println("server send protocol content : " + line);
			ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
		}

		// 异常处理逻辑
		@Override
		public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
			System.out.println("server exceptionCaught method run...");
			cause.printStackTrace();
			ctx.close();
		}
		
		static class ProtocolParser{
			public static String parse(String message) {
				String[] temp=message.split("HEADBODY");
				temp[0]=temp[0].substring(4);
				temp[1]=temp[1].substring(0,temp[1].length()-4);
				int length=Integer.parseInt(temp[0].substring(temp[0].indexOf(":")+1));
				if(length != temp[1].length()){
					return null;
				}
				return temp[1];
			}
			public static String transferTo(String message){
				message = "HEADcontent-length:" + message.length() + "HEADBODY" + message + "BODY";
				return message;
			}
		}

}

  

import java.nio.charset.Charset;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;

public class Client4Protocol {

	    // 处理请求和处理服务端响应的线程组
		private EventLoopGroup group = null;
		// 服务启动相关配置信息
		private Bootstrap bootstrap = null;
		
		public Client4Protocol(){
			init();
		}
		
		private void init(){
			group = new NioEventLoopGroup();
			bootstrap = new Bootstrap();
			// 绑定线程组
			bootstrap.group(group);
			// 设定通讯模式为NIO
			bootstrap.channel(NioSocketChannel.class);
		}
		
		public ChannelFuture doRequest(String host, int port, final ChannelHandler... handlers) throws InterruptedException{
			this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {

				@Override
				protected void initChannel(SocketChannel ch) throws Exception {
					ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
					ch.pipeline().addLast(handlers);
				}
			});
			ChannelFuture future = this.bootstrap.connect(host, port).sync();
			return future;
		}
		
		public void release(){
			this.group.shutdownGracefully();
		}
		
		public static void main(String[] args) {
			Client4Protocol client = null;
			ChannelFuture future = null;
			try{
				client = new Client4Protocol();
				future = client.doRequest("localhost", 9999, new Client4ProtocolHandler());
				
				Scanner s = null;
				while(true){
					s = new Scanner(System.in);
					System.out.print("enter message send to server > ");
					String line = s.nextLine();
					line = Client4ProtocolHandler.ProtocolParser.transferTo(line);
					System.out.println("client send protocol content : " + line);
					future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
					TimeUnit.SECONDS.sleep(1);
				}
			}catch(Exception e){
				e.printStackTrace();
			}finally{
				if(null != future){
					try {
						future.channel().closeFuture().sync();
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
				if(null != client){
					client.release();
				}
			}
		}
}

  

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

public class Client4ProtocolHandler extends ChannelHandlerAdapter {
	
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		try{
			String message = msg.toString();
			System.out.println("client receive protocol content : " + message);
			message = ProtocolParser.parse(message);
			if(null == message){
				System.out.println("error response from server");
				return ;
			}
			System.out.println("from server : " + message);
		}finally{
			// 用于释放缓存。避免内存溢出
			ReferenceCountUtil.release(msg);
		}
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		System.out.println("client exceptionCaught method run...");
		// cause.printStackTrace();
		ctx.close();
	}

	static class ProtocolParser{
		public static String parse(String message){
			String[] temp = message.split("HEADBODY");
			temp[0] = temp[0].substring(4);
			temp[1] = temp[1].substring(0, (temp[1].length()-4));
			int length = Integer.parseInt(temp[0].substring(temp[0].indexOf(":")+1));
			if(length != temp[1].length()){
				return null;
			}
			return temp[1];
		}
		public static String transferTo(String message){
			message = "HEADcontent-length:" + message.length() + "HEADBODY" + message + "BODY";
			return message;
		}
	}

}

  

 序列化对象 

JBoss Marshalling 序列化

Java 是面向对象的开发语言。传递的数据如果是 Java 对象,应该是最方便且可靠。 

/**
 * 1. 双线程组
 * 2. Bootstrap配置启动信息
 * 3. 注册业务处理Handler
 * 4. 绑定服务监听端口并启动服务
 */

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import utils.SerializableFactory4Marshalling;

public class Server4Serializable {
	   // 监听线程组,监听客户端请求
		private EventLoopGroup acceptorGroup = null;
		// 处理客户端相关操作线程组,负责处理与客户端的数据通讯
		private EventLoopGroup clientGroup = null;
		// 服务启动相关配置信息
		private ServerBootstrap bootstrap = null;
		public Server4Serializable(){
			init();
		}
		
		private void init(){
			acceptorGroup = new NioEventLoopGroup();
			clientGroup = new NioEventLoopGroup();
			bootstrap = new ServerBootstrap();
			// 绑定线程组
			bootstrap.group(acceptorGroup, clientGroup);
			// 设定通讯模式为NIO
			bootstrap.channel(NioServerSocketChannel.class);
			// 设定缓冲区大小
			bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
			// SO_SNDBUF发送缓冲区,SO_RCVBUF接收缓冲区,SO_KEEPALIVE开启心跳监测(保证连接有效)
			bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
				.option(ChannelOption.SO_RCVBUF, 16*1024)
				.option(ChannelOption.SO_KEEPALIVE, true);
		}
		
		public ChannelFuture doAccept(int port, final ChannelHandler... acceptorHandlers) throws InterruptedException{
			bootstrap.childHandler(new ChannelInitializer<SocketChannel>(){ 
				@Override
				protected void initChannel(SocketChannel ch) throws Exception {
					ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder());
					ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder());
					ch.pipeline().addLast(acceptorHandlers);
				}
			});
			ChannelFuture future=bootstrap.bind(port).sync();
			return future;
		}
		
		public void release(){
			this.acceptorGroup.shutdownGracefully();
			this.clientGroup.shutdownGracefully();
		}
		
		
		public static void main(String[] args){
			ChannelFuture future = null;
			Server4Serializable server = null;
			try{
				server = new Server4Serializable();
				future = server.doAccept(9999,new Server4SerializableHandler());
				System.out.println("server started.");
				
				future.channel().closeFuture().sync();
			}catch(InterruptedException e){
				e.printStackTrace();
			}finally{
				if(null != future){
					try {
						future.channel().closeFuture().sync();
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
				
				if(null != server){
					server.release();
				}
			}
		}	
		
}

  

/**
 * @Sharable注解 - 
 *  代表当前Handler是一个可以分享的处理器。也就意味着,服务器注册此Handler后,可以分享给多个客户端同时使用。
 *  如果不使用注解描述类型,则每次客户端请求时,必须为客户端重新创建一个新的Handler对象。
 *  
 */
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import utils.GzipUtils;
import utils.RequestMessage;
import utils.ResponseMessage;

@Sharable
public class Server4SerializableHandler extends ChannelHandlerAdapter{
	// 业务处理逻辑
		@Override
		public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
			System.out.println("from client : ClassName - " + msg.getClass().getName()
					+ " ; message : " + msg.toString());
			if(msg instanceof RequestMessage){
				RequestMessage request = (RequestMessage)msg;
				//byte[] attachment = GzipUtils.unzip(request.getAttachment());
				//System.out.println(new String(attachment));
			}
			ResponseMessage response = new ResponseMessage(0L, "test response");
			ctx.writeAndFlush(response);
		}

		// 异常处理逻辑
		@Override
		public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
			System.out.println("server exceptionCaught method run...");
			cause.printStackTrace();
			ctx.close();
		}
}

  

/**
 * 1. 单线程组
 * 2. Bootstrap配置启动信息
 * 3. 注册业务处理Handler
 * 4. connect连接服务,并发起请求
 */
import java.util.Random;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import utils.GzipUtils;
import utils.RequestMessage;
import utils.SerializableFactory4Marshalling;

public class Client4Serializable {
	// 处理请求和处理服务端响应的线程组
	private EventLoopGroup group = null;
	// 服务启动相关配置信息
	private Bootstrap bootstrap = null;

	public Client4Serializable() {
		init();
	}

	private void init() {
		group = new NioEventLoopGroup();
		bootstrap = new Bootstrap();
		// 绑定线程组
		bootstrap.group(group);
		// 设定通讯模式为NIO
		bootstrap.channel(NioSocketChannel.class);

	}

	public ChannelFuture doRequest(String host, int port, final ChannelHandler... handlers)
			throws InterruptedException {
		this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
			@Override
			protected void initChannel(SocketChannel ch) throws Exception {
				ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder());
				ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder());
				ch.pipeline().addLast(handlers);
			}
		});
		ChannelFuture future = this.bootstrap.connect(host, port).sync();
		return future;
	}
	
	public void release(){
		this.group.shutdownGracefully();
	}
	
	public static void main(String[] args) {
		Client4Serializable client = null;
		ChannelFuture future = null;
		try{
			client = new Client4Serializable();
			future = client.doRequest("localhost", 9999, new Client4SerializableHandler());
			String attachment = "test attachment";
			byte[] attBuf = attachment.getBytes();
			//attBuf = GzipUtils.zip(attBuf);
			RequestMessage msg = new RequestMessage(new Random().nextLong(), 
					"test",new byte[0]);
					//"test", attBuf);
			future.channel().writeAndFlush(msg);
			TimeUnit.SECONDS.sleep(1);
			future.addListener(ChannelFutureListener.CLOSE);
		}catch(Exception e){
			e.printStackTrace();
		}finally{
			if(null != future){
				try {
					future.channel().closeFuture().sync();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			if(null != client){
				client.release();
			}
		}
	}
}

  

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class Client4SerializableHandler extends ChannelHandlerAdapter {
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		System.out.println("from server : ClassName - " + msg.getClass().getName()
				+ " ; message : " + msg.toString());
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		System.out.println("client exceptionCaught method run...");
		cause.printStackTrace();
		ctx.close();
	}
}

  

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

public class GzipUtils {
	public static void main(String[] args) throws Exception {
		FileInputStream fis = new FileInputStream("D:\3\1.jpg");
		byte[] temp = new byte[fis.available()];
        int length=fis.read(temp);
        System.out.println("长度 : " + length);
        
        byte[] zipArray = GzipUtils.zip(temp);
		System.out.println("压缩后的长度 : " + zipArray.length);
		
		byte[] unzipArray = GzipUtils.unzip(zipArray);
		System.out.println("解压缩后的长度 : " + unzipArray.length);
		
		FileOutputStream fos = new FileOutputStream("D:\3\101.jpg");
		fos.write(unzipArray);
		fos.flush();
		
		fos.close();
		fis.close();
	}
	
	/**
	 * 解压缩
	 * @param source 源数据。需要解压的数据。
	 * @return 解压后的数据。 恢复的数据。
	 * @throws Exception
	 */
	public static  byte[] unzip(byte[] source) throws Exception{
		ByteArrayOutputStream out = new ByteArrayOutputStream();
		ByteArrayInputStream in = new ByteArrayInputStream(source);
		// JDK提供的。 专门用于压缩使用的流对象。可以处理字节数组数据。
		GZIPInputStream zipIn = new GZIPInputStream(in);
		byte[] temp=new byte[256];
		int length = 0;
		while((length = zipIn.read(temp, 0, temp.length)) != -1){
			out.write(temp, 0, length);
		}
		// 将字节数组输出流中的数据,转换为一个字节数组。
		byte[] target = out.toByteArray();
		
		zipIn.close();
		out.close();
		
		return target;
	}
	
	/**
	 * 压缩
	 * @param source 源数据,需要压缩的数据
	 * @return 压缩后的数据。
	 * @throws Exception
	 */
	public static byte[] zip(byte[] source) throws Exception{
		ByteArrayOutputStream out = new ByteArrayOutputStream();
		// 输出流,JDK提供的,提供解压缩功能。
		GZIPOutputStream zipOut = new GZIPOutputStream(out);
		// 将压缩信息写入到内存。 写入的过程会实现解压。
		zipOut.write(source);
		// 结束。
		zipOut.finish();
		byte[] target = out.toByteArray();
		
		zipOut.close();
		
		return target;
	}
}

  

import java.io.Serializable;

public class RequestMessage implements Serializable {
	private static final long serialVersionUID = 7084843947860990140L;
	private Long id;
	private String message;
	private byte[] attachment;
	@Override
	public String toString() {
		return "RequestMessage [id=" + id + ", message=" + message + "]";
	}
	public RequestMessage() {
		super();
	}
	public RequestMessage(Long id, String message, byte[] attachment) {
		super();
		this.id = id;
		this.message = message;
		this.attachment = attachment;
	}
	public Long getId() {
		return id;
	}
	public void setId(Long id) {
		this.id = id;
	}
	public String getMessage() {
		return message;
	}
	public void setMessage(String message) {
		this.message = message;
	}
	public byte[] getAttachment() {
		return attachment;
	}
	public void setAttachment(byte[] attachment) {
		this.attachment = attachment;
	}
}

  

import java.io.Serializable;

public class ResponseMessage implements  Serializable {
	private static final long serialVersionUID = -8134313953478922076L;
	private Long id;
	private String message;
	@Override
	public String toString() {
		return "ResponseMessage [id=" + id + ", message=" + message + "]";
	}
	public ResponseMessage() {
		super();
	}
	public ResponseMessage(Long id, String message) {
		super();
		this.id = id;
		this.message = message;
	}
	public Long getId() {
		return id;
	}
	public void setId(Long id) {
		this.id = id;
	}
	public String getMessage() {
		return message;
	}
	public void setMessage(String message) {
		this.message = message;
	}

}

  

import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;

import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallingDecoder;
import io.netty.handler.codec.marshalling.MarshallingEncoder;
import io.netty.handler.codec.marshalling.UnmarshallerProvider;

public class SerializableFactory4Marshalling {

	/**
	 * 创建Jboss Marshalling解码器MarshallingDecoder
	 * 
	 * @return MarshallingDecoder
	 */
	public static MarshallingDecoder buildMarshallingDecoder() {
		// 首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
		// jboss-marshalling-serial 包提供
		final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
		// 创建了MarshallingConfiguration对象,配置了版本号为5
		final MarshallingConfiguration configuration = new MarshallingConfiguration();
		// 序列化版本。只要使用JDK5以上版本,version只能定义为5。
		configuration.setVersion(5);
		// 根据marshallerFactory和configuration创建provider
		UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
		// 构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
		MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
		return decoder;
	}

    /**
     * 创建Jboss Marshalling编码器MarshallingEncoder
     * @return MarshallingEncoder
     */
    public static MarshallingEncoder buildMarshallingEncoder() {
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
        //构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
        MarshallingEncoder encoder = new MarshallingEncoder(provider);
        return encoder;
    }
}

  定时断线重连 

客户端断线重连机制。

客户端数量多,且需要传递的数据量级较大。可以周期性的发送数据的时候,使用。

要 求对数据的即时性不高的时候,才可使用。

优点: 可以使用数据缓存。不是每条数据进行一次数据交互。可以定时回收资源,对 资源利用率高。相对来说,即时性可以通过其他方式保证。如: 120 秒自动断线。数据变 化 1000 次请求服务器一次。300 秒中自动发送不足 1000 次的变化数据

JavaNetty拆包粘包(二)第1张

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.ReadTimeoutHandler;
import utils.SerializableFactory4Marshalling;

public class Server4Timer {
	    // 监听线程组,监听客户端请求
		private EventLoopGroup acceptorGroup = null;
		// 处理客户端相关操作线程组,负责处理与客户端的数据通讯
		private EventLoopGroup clientGroup = null;
		// 服务启动相关配置信息
		private ServerBootstrap bootstrap = null;
		public Server4Timer(){
			init();
		}
		
		private void init(){
			acceptorGroup = new NioEventLoopGroup();
			clientGroup = new NioEventLoopGroup();
			bootstrap = new ServerBootstrap();
			// 绑定线程组
			bootstrap.group(acceptorGroup, clientGroup);
			// 设定通讯模式为NIO
			bootstrap.channel(NioServerSocketChannel.class);
			// 设定缓冲区大小
			bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
			// SO_SNDBUF发送缓冲区,SO_RCVBUF接收缓冲区,SO_KEEPALIVE开启心跳监测(保证连接有效)
			bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
				.option(ChannelOption.SO_RCVBUF, 16*1024)
				.option(ChannelOption.SO_KEEPALIVE, true);
			// 增加日志Handler,日志级别为info
			// bootstrap.handler(new LoggingHandler(LogLevel.INFO));
		}
		
		public ChannelFuture doAccept(int port) throws InterruptedException{
			
			bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

				@Override
				protected void initChannel(SocketChannel ch) throws Exception {
					ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder());
					ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder());
					// 定义一个定时断线处理器,当多长时间内,没有任何的可读取数据,自动断开连接。
					// 构造参数,就是间隔时长。 默认的单位是秒。
					// 自定义间隔时长单位。 new ReadTimeoutHandler(long times, TimeUnit unit);
					ch.pipeline().addLast(new ReadTimeoutHandler(3));
					ch.pipeline().addLast(new Server4TimerHandler());
				}
			});
			ChannelFuture future = bootstrap.bind(port).sync();
			return future;
		}
		public void release(){
			this.acceptorGroup.shutdownGracefully();
			this.clientGroup.shutdownGracefully();
		}
		
		public static void main(String[] args){
			ChannelFuture future = null;
			Server4Timer server = null;
			try{
				server = new Server4Timer();
				future = server.doAccept(9999);
				System.out.println("server started.");
				
				future.channel().closeFuture().sync();
			}catch(InterruptedException e){
				e.printStackTrace();
			}finally{
				if(null != future){
					try {
						future.channel().closeFuture().sync();
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
				
				if(null != server){
					server.release();
				}
			}
		}
}

  

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import utils.ResponseMessage;

public class Server4TimerHandler extends ChannelHandlerAdapter {
	   // 业务处理逻辑
		@Override
		public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
			System.out.println("from client : ClassName - " + msg.getClass().getName()
					+ " ; message : " + msg.toString());
			ResponseMessage response = new ResponseMessage(0L, "test response");
			ctx.writeAndFlush(response);
		}

		// 异常处理逻辑
		@Override
		public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
			System.out.println("server exceptionCaught method run...");
			// cause.printStackTrace();
			ctx.close();
		}
}

  

import java.util.Random;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.WriteTimeoutHandler;
import utils.RequestMessage;
import utils.SerializableFactory4Marshalling;

public class Client4Timer {
	    // 处理请求和处理服务端响应的线程组
		private EventLoopGroup group = null;
		// 服务启动相关配置信息
		private Bootstrap bootstrap = null;
		private ChannelFuture future = null;
		
		public Client4Timer(){
			init();
		}
		
		private void init(){
			group = new NioEventLoopGroup();
			bootstrap = new Bootstrap();
			// 绑定线程组
			bootstrap.group(group);
			// 设定通讯模式为NIO
			bootstrap.channel(NioSocketChannel.class);
			// bootstrap.handler(new LoggingHandler(LogLevel.INFO));
		}
		
		public void setHandlers() throws InterruptedException{
			this.bootstrap.handler(new ChannelInitializer<SocketChannel>(){
				@Override
				protected void initChannel(SocketChannel ch) throws Exception {
					ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder());
					ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder());
					// 写操作自定断线。 在指定时间内,没有写操作,自动断线。
					ch.pipeline().addLast(new WriteTimeoutHandler(3));
					ch.pipeline().addLast(new Client4TimerHandler());
				}
			});
		}
		
		public ChannelFuture getChannelFuture(String host, int port) throws InterruptedException{
			if(future == null){
				future = this.bootstrap.connect(host, port).sync();
			}
			if(!future.channel().isActive()){
				future = this.bootstrap.connect(host, port).sync();
			}
			return future;
		}
		
		public void release(){
			this.group.shutdownGracefully();
		}
		
		public static void main(String[] args) {
			Client4Timer client = null;
			ChannelFuture future = null;
			try{
				client = new Client4Timer();
				client.setHandlers();
				
				future = client.getChannelFuture("localhost", 9999);
				for(int i = 0; i < 3; i++){
					RequestMessage msg = new RequestMessage(new Random().nextLong(), 
							"test"+i, new byte[0]);
					future.channel().writeAndFlush(msg);
					TimeUnit.SECONDS.sleep(2);
				}
				TimeUnit.SECONDS.sleep(5);
				
				future = client.getChannelFuture("localhost", 9999);
				RequestMessage msg = new RequestMessage(new Random().nextLong(), 
						"test", new byte[0]);
				future.channel().writeAndFlush(msg);
			}catch(Exception e){
				e.printStackTrace();
			}finally{
				if(null != future){
					try {
						future.channel().closeFuture().sync();
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
				if(null != client){
					client.release();
				}
			}
		}

}

  

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class Client4TimerHandler extends ChannelHandlerAdapter {
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		System.out.println("from server : ClassName - " + msg.getClass().getName()
				+ " ; message : " + msg.toString());
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		System.out.println("client exceptionCaught method run...");
		cause.printStackTrace();
		ctx.close();
	}

	/**
	 * 当连接建立成功后,出发的代码逻辑。
	 * 在一次连接中只运行唯一一次。
	 * 通常用于实现连接确认和资源初始化的。
	 */
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		System.out.println("client channel active");
	}
}

  

免责声明:文章转载自《JavaNetty拆包粘包(二)》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇更新与发展 | Alibaba Cloud Linux 2 特性与开发细节揭秘互联网产品设计常用文档类型-BRD、MRD、PRD、FSD (下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

c#实现多线程代码例子

相信大家都有用过网际快车等下载资源的经历,它里面是可以设置线程数的(近年版本默认是10,曾经默认是5)。它会将文件分成与线程数相同的部分,然后每个线程下载自己的那一部分,这样下载效率就有可能提高。相信大家都有加多线程数,提升下载效率的经历。但细心的用户会发现,在带宽一定的情况下,并不是线程越多,速度越快,而是在某一点达到峰值。在C#中用多线程并不难实现。它...

深入理解Binder(二),Binder是什么?

上篇文章深入理解Binder(一),从AIDL谈起我们介绍了AIDL的基本使用,用AIDL两个App的通信是实现了,可是又有小伙伴疑惑了,为什么使用AIDL就能够实现两个App之间的通信?本文我们就来详细说说这个问题。 Binder单从字面上理解,它有活页夹,粘合剂的意思,活页夹可以用来把两个东西夹在一起。在我们的Android系统中,Binder主要用来...

MORMOT通讯类说明(转 自己记录 原文版权归原作者)

MORMOT通讯类说明 MORMOT在SynCrtSock.pas单元实现通讯类。 MORMOT实现TCP/UDP/HTTP/WEBSOCKET客户端和服务端的协议的单元文件。可以看出MORMOT实现的通讯协议是很全面的。MORMOT支持跨操作系统平台(WINDOWS 和 LINUX)。MORMOT支持多种开发工具(DELPHI和LAZARUS)。但我们...

wrk性能测试(详解)

一、简介   wrk 是一款针对 Http 协议的基准测试工具,它能够在单机多核 CPU 的条件下,使用系统自带的高性能 I/O 机制,如 epoll,kqueue 等,通过多线程和事件模式,对目标机器产生大量的负载。 wrk是开源的, 代码在 github 上:https://github.com/wg/wrk 安装:https://www.cnblog...

SQL Server里的闩锁介绍

在今天的文章里我想谈下SQL Server使用的更高级的,轻量级的同步对象:闩锁(Latch)。闩锁是SQL Server存储引擎使用轻量级同步对象,用来保护多线程访问内存内结构。文章的第1部分我会介绍SQL Server里为什么需要闩锁,在第2部分我会给你介绍各个闩锁类型,还有你如何能对它们进行故障排除。 为什么我们需要闩锁?闩锁首次在SQL Serve...

Oracle条件分支查询

  Oracle的条件分支查询其实跟java的条件分支语法没啥太大的区别,只不过java多了一个switch关键字而已。看例子: SQL> SELECT CASE WHEN SUM(t1.TOTALTICKET) is null THEN 0 ELSE SUM(t1.TOTALTICKET) END totalTicket 2 FROM T...