高性能/并发的保证-Netty在Redisson的应用

摘要:
Redisson底层采用的是Netty框架。然后,redisson默认指定IO模型为NioSocketChannel二.接着,给引导类指定一系列处理链路,这里主要就是定义连接的业务处理逻辑,不理解没关系,在后面我们会详细分析RedisChannelInitializerorg.redisson.client.handler.RedisChannelInitializer@OverrideprotectedvoidinitChannelthrowsException{//开启SSL终端识别能力initSsl;if{//Redis正常连接处理类ch.pipeline().addLast;}else{//Redis订阅发布处理类ch.pipeline().addLast;}ch.pipeline().addLast;if(pingConnectionHandler!

背景图

前言

​ Redisson Github: https://github.com/redisson/redisson

​ Redisson 官网:https://redisson.pro/

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。

以下是Redisson的结构:

Redisson底层采用的是Netty 框架。支持Redis 2.8以上版本,支持Java1.6+以上版本。

客户端初始化

createBootstrap

org.redisson.client.RedisClient#createBootstrap

private Bootstrap createBootstrap(RedisClientConfig config, Type type) {
        Bootstrap bootstrap = new Bootstrap()
                        .resolver(config.getResolverGroup())
          							//1.指定配置中的IO类型
                        .channel(config.getSocketChannelClass())
          							//2.指定配置中的线程模型
                        .group(config.getGroup());
  			//3.IO处理逻辑
        bootstrap.handler(new RedisChannelInitializer(bootstrap, config, this, channels, type));
  			//4. 指定bootstrap配置选项
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout());
        bootstrap.option(ChannelOption.SO_KEEPALIVE, config.isKeepAlive());
        bootstrap.option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay());
        config.getNettyHook().afterBoostrapInitialization(bootstrap);
        return bootstrap;
    }

从上面的代码可以看到,客户端启动的引导类是 Bootstrap,负责启动客户端以及连接服务端,引导类创建完成之后,下面我们描述一下客户端启动的流程。

一. 首先,我们需要给它指定线程模型,驱动着连接的数据读写。然后,redisson默认指定 IO 模型为 NioSocketChannel

二. 接着,给引导类指定一系列处理链路,这里主要就是定义连接的业务处理逻辑,不理解没关系,在后面我们会详细分析

RedisChannelInitializer

org.redisson.client.handler.RedisChannelInitializer

RedisChannelInitializer

 @Override
    protected void initChannel(Channel ch) throws Exception {
      	// 开启SSL终端识别能力
        initSsl(config, ch);
        
        if (type == Type.PLAIN) {
          	//Redis正常连接处理类
            ch.pipeline().addLast(new RedisConnectionHandler(redisClient));
        } else {
          	//Redis订阅发布处理类
            ch.pipeline().addLast(new RedisPubSubConnectionHandler(redisClient));
        }
        
        ch.pipeline().addLast(
          	//链路检测狗
            connectionWatchdog,
          	//Redis协议命令编码器
            CommandEncoder.INSTANCE,
          	//Redis协议命令批量编码器
            CommandBatchEncoder.INSTANCE,
          	//Redis命令队列
            new CommandsQueue());
        
        if (pingConnectionHandler != null) {
           //心跳包连接处理类
            ch.pipeline().addLast(pingConnectionHandler);
        }
        
        if (type == Type.PLAIN) {
          	//Redis协议命令解码器
            ch.pipeline().addLast(new CommandDecoder(config.getExecutor(), config.isDecodeInExecutor()));
        } else {
          	//Redis订阅发布解码器
            ch.pipeline().addLast(new CommandPubSubDecoder(config.getExecutor(), config.isKeepPubSubOrder(), config.isDecodeInExecutor()));
        }

        config.getNettyHook().afterChannelInitialization(ch);
    }

图1 Redisson 链路处理图

Redisson处理链路

Redisson的处理链

Redisson的Pipeline里面的ChannelHandler比较多,我挑选其中CommandEncoderCommandDecoder进行源码剖析。

CommandDecoder_Encoder

失败重连

org.redisson.client.handler.ConnectionWatchdog#reconnect 重连机制

private void reconnect(final RedisConnection connection, final int attempts){
		//重试时间越来越久
    int timeout = 2 << attempts;
    if (bootstrap.config().group().isShuttingDown()) {
        return;
    }
    
    try {
        timer.newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                tryReconnect(connection, Math.min(BACKOFF_CAP, attempts + 1));
            }
        }, timeout, TimeUnit.MILLISECONDS);
    } catch (IllegalStateException e) {
        // skip
    }
}

netty中的Timer管理,使用了的Hashed time Wheel的模式,Time Wheel翻译为时间轮,是用于实现定时器timer的经典算法。

这个方法的声明是这样的:

 /**
     * Schedules the specified {@link TimerTask} for one-time execution after
     * the specified delay.
     *
     * @return a handle which is associated with the specified task
     *
     * @throws IllegalStateException       if this timer has been {@linkplain #stop() stopped} already
     * @throws RejectedExecutionException if the pending timeouts are too many and creating new timeout
     *                                    can cause instability in the system.
     */
    Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);

这个方法需要一个TimerTask对象以知道当时间到时要执行什么逻辑,然后需要delay时间数值和TimeUnit时间的单位。

Redis协议命令编码器

​ Redis 的作者认为数据库系统的瓶颈一般不在于网络流量,而是数据库自身内部逻辑处理上。所以即使 Redis 使用了浪费流量的文本协议,依然可以取得极高的访问性能。Redis 将所有数据都放在内存,用一个单线程对外提供服务,单个节点在跑满一个 CPU 核心的情况下可以达到了 10w/s 的超高 QPS。

RESP 是 Redis 序列化协议的简写。它是一种直观的文本协议,优势在于实现异常简单,解析性能极好。

Redis 协议将传输的结构数据分为 5 种最小单元类型,单元结束时统一加上回车换行符号

  1. 单行字符串 以 + 符号开头。
  2. 多行字符串 以 $ 符号开头,后跟字符串长度。
  3. 整数值 以 : 符号开头,后跟整数的字符串形式。
  4. 错误消息 以 - 符号开头。
  5. 数组 以 * 号开头,后跟数组的长度。

单行字符串 hello world

+hello world

多行字符串 hello world

$11
hello world

多行字符串当然也可以表示单行字符串。

整数 1024

:1024

错误 参数类型错误

-WRONGTYPE Operation against a key holding the wrong kind of value

数组 [1,2,3]

*3
:1
:2
:3


NULL 用多行字符串表示,不过长度要写成-1。

$-1


空串 用多行字符串表示,长度填 0。

$0


注意这里有两个。为什么是两个?因为两个之间,隔的是空串。

org.redisson.client.handler.CommandEncoder#encode()

private static final char ARGS_PREFIX = '*';
private static final char BYTES_PREFIX = '$';
private static final byte[] CRLF = "
".getBytes();


@Override
    protected void encode(ChannelHandlerContext ctx, CommandData<?, ?> msg, ByteBuf out) throws Exception {
        try {
          	//redis命令前缀
            out.writeByte(ARGS_PREFIX);
            int len = 1 + msg.getParams().length;
            if (msg.getCommand().getSubName() != null) {
                len++;
            }
            out.writeCharSequence(Long.toString(len), CharsetUtil.US_ASCII);
            out.writeBytes(CRLF);
            
            writeArgument(out, msg.getCommand().getName().getBytes(CharsetUtil.UTF_8));
            if (msg.getCommand().getSubName() != null) {
                writeArgument(out, msg.getCommand().getSubName().getBytes(CharsetUtil.UTF_8));
            }
          	......
        } catch (Exception e) {
            msg.tryFailure(e);
            throw e;
        }
    }

private void writeArgument(ByteBuf out, ByteBuf arg) {
    out.writeByte(BYTES_PREFIX);
    out.writeCharSequence(Long.toString(arg.readableBytes()), CharsetUtil.US_ASCII);
    out.writeBytes(CRLF);
    out.writeBytes(arg, arg.readerIndex(), arg.readableBytes());
    out.writeBytes(CRLF);
}

Redis协议命令解码器

org.redisson.client.handler.CommandDecoder#readBytes

 private static final char CR = '
';
 private static final char LF = '
';
 private static final char ZERO = '0';

private ByteBuf readBytes(ByteBuf is) throws IOException {
    long l = readLong(is);
    if (l > Integer.MAX_VALUE) {
        throw new IllegalArgumentException(
                "Java only supports arrays up to " + Integer.MAX_VALUE + " in size");
    }
    int size = (int) l;
    if (size == -1) {
        return null;
    }
    ByteBuf buffer = is.readSlice(size);
    int cr = is.readByte();
    int lf = is.readByte();
  	//判断是否以
开头
    if (cr != CR || lf != LF) {
        throw new IOException("Improper line ending: " + cr + ", " + lf);
    }
    return buffer;
}

数据序列化

Redisson的对象编码类是用于将对象进行序列化和反序列化,以实现对该对象在Redis里的读取和存储。Redisson提供了以下几种的对象编码应用,以供大家选择:

编码类名称说明
org.redisson.codec.JsonJacksonCodecJackson JSON 编码 默认编码
org.redisson.codec.AvroJacksonCodecAvro 一个二进制的JSON编码
org.redisson.codec.SmileJacksonCodecSmile 另一个二进制的JSON编码
org.redisson.codec.CborJacksonCodecCBOR 又一个二进制的JSON编码
org.redisson.codec.MsgPackJacksonCodecMsgPack 再来一个二进制的JSON编码
org.redisson.codec.IonJacksonCodecAmazon Ion 亚马逊的Ion编码,格式与JSON类似
org.redisson.codec.KryoCodecKryo 二进制对象序列化编码
org.redisson.codec.SerializationCodecJDK序列化编码
org.redisson.codec.FstCodecFST 10倍于JDK序列化性能而且100%兼容的编码
org.redisson.codec.LZ4CodecLZ4 压缩型序列化对象编码
org.redisson.codec.SnappyCodecSnappy 另一个压缩型序列化对象编码
org.redisson.client.codec.JsonJacksonMapCodec基于Jackson的映射类使用的编码。可用于避免序列化类的信息,以及用于解决使用byte[]遇到的问题。
org.redisson.client.codec.StringCodec纯字符串编码(无转换)
org.redisson.client.codec.LongCodec纯整长型数字编码(无转换)
org.redisson.client.codec.ByteArrayCodec字节数组编码
org.redisson.codec.CompositeCodec用来组合多种不同编码在一起

codec

Codec

public interface Codec {

  	//返回用于HMAP Redis结构中哈希映射值的对象解码器
    Decoder<Object> getMapValueDecoder();

  	//返回用于HMAP Redis结构中哈希映射值的对象编码器
    Encoder getMapValueEncoder();

  	//返回用于HMAP Redis结构中哈希映射键的对象解码器
    Decoder<Object> getMapKeyDecoder();

  	//返回用于HMAP Redis结构中哈希映射键的对象编码器
    Encoder getMapKeyEncoder();

    //返回用于除HMAP之外的任何存储Redis结构的对象解码器
    Decoder<Object> getValueDecoder();

    //返回用于除HMAP之外的任何存储Redis结构的对象编码器
    Encoder getValueEncoder();

    //返回用于加载解码过程中使用的类的类加载器对象
    ClassLoader getClassLoader();

}

BaseCodec

org.redisson.client.codec.BaseCodec

BaseCodec

  1. HashMap的键值对的编解码的处理类使用普通的对象编解码处理类进行分解。

    //返回用于除HMAP之外的任何存储Redis结构的对象解码器
        Decoder<Object> getValueDecoder();
    
    //返回用于除HMAP之外的任何存储Redis结构的对象编码器
        Encoder getValueEncoder();
    

SerializationCodec

org.redisson.codec.SerializationCodec

Decoder

SerializationCodec-decoder

Encoder

SerializationCodec-encoder

免责声明:文章转载自《高性能/并发的保证-Netty在Redisson的应用》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇ubuntu开机自动挂载硬盘MongoDB在linux下的启动下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

随便看看

三伏天,华为路由器 AX3 PRO 发热严重,网络断流,改装散热清凉一夏

我特意买了一个WIFI6路由器。我走到路由器前触摸了一下,然后我开始了散热改造之路。我翻箱倒柜,找到了密封多年的最小散热静音风扇。淘宝找到了最小的一个(12V1A/0.6A可以)。然后我拆下了风扇架。我将电线穿过路由器底部框架的散热孔,并通过路由器的12V电源孔直接焊接电线。注意,正负极不应连接在散热孔中,用热熔粘合剂将风扇定子固定在底板上。请注意,风扇叶的...

Hibernate 数据的批量插入、更新和删除

对于这个批处理场景,Hibernate提供了一个批处理解决方案。接下来,我们将从批插入、批更新和批删除三个方面介绍如何处理此批处理场景。为了避免这种情况,Hibernate为批量更新和批量删除提供了类似于SQL的HQL语法。...

Latex 双栏模式下表格太长怎么办?

有时一张桌子放不下任何一页。如果使用原始表包,它可能会溢出。因此,自动更改表格是很自然的。对于许多在线材料,建议使用Longtable。但是因为我的文章是双栏文章,所以这个包会有问题。例如,表格将只浮动在文本上,标题的显示也有问题。经过长时间的尝试,我终于找到了解决方案,而且非常简单。只需缩放表格。方法如下:egin{table*}[!...

CentOS 7 优化TCP链接

在优化服务器配置时,Summary发现服务器端的WAIT连接上有大量的TIME,需要进行优化。Tomcat案例查询与Tomcat对应的端口的tcp链接,发现存在大量TIME_WAIT链接,以及一些其他状态连接,总计400+。...

安装samba服务器实现Linux mint和Windows共享文件

安装samba服务器以实现Linuxmint和Windows共享文件。在Linuxmint普通用户下执行命令:sudoapt-geinstallsamba、installsamba和打开smb。conf配置文件,并执行命令gedit/etc/samba/smb-Coff,如果您想安装gedit(sudoapt-geinstallgedit),还可以使用Lin...

部署springboot+vue项目文档(若依ruoyi项目部署步骤)

1: 部署Linux+nginx部署背景代码1.1因为我使用了idea工具进行开发,所以终端中的mvnclean包生成了相应的jar包。这个jar包可以在相应文件所在目录的目标中找到。linux服务器需要加载redis和nginx。redis存储缓存数据,nginx用于代理前端和后端服务。打包vue项目并将dist文件复制到tomcat的webapps目录中...