首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >高性能/并发的保证-Netty在Redisson的应用

高性能/并发的保证-Netty在Redisson的应用

作者头像
sanshengshui
发布2020-04-14 16:45:05
2.5K0
发布2020-04-14 16:45:05
举报
文章被收录于专栏:穆书伟穆书伟
背景图
背景图

前言

​ Redisson Github: https://github.com/redisson/redisson

​ Redisson 官网:https://redisson.pro/

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。

以下是Redisson的结构:

Redisson_structure
Redisson_structure

Redisson底层采用的是Netty 框架。支持Redis 2.8以上版本,支持Java1.6+以上版本。

客户端初始化

createBootstrap

org.redisson.client.RedisClient#createBootstrap

private Bootstrap createBootstrap(RedisClientConfig config, Type type) {
        Bootstrap bootstrap = new Bootstrap()
                        .resolver(config.getResolverGroup())
          							//1.指定配置中的IO类型
                        .channel(config.getSocketChannelClass())
          							//2.指定配置中的线程模型
                        .group(config.getGroup());
  			//3.IO处理逻辑
        bootstrap.handler(new RedisChannelInitializer(bootstrap, config, this, channels, type));
  			//4. 指定bootstrap配置选项
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout());
        bootstrap.option(ChannelOption.SO_KEEPALIVE, config.isKeepAlive());
        bootstrap.option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay());
        config.getNettyHook().afterBoostrapInitialization(bootstrap);
        return bootstrap;
    }

从上面的代码可以看到,客户端启动的引导类是 Bootstrap,负责启动客户端以及连接服务端,引导类创建完成之后,下面我们描述一下客户端启动的流程。

一. 首先,我们需要给它指定线程模型,驱动着连接的数据读写。然后,redisson默认指定 IO 模型为 NioSocketChannel

二. 接着,给引导类指定一系列处理链路,这里主要就是定义连接的业务处理逻辑,不理解没关系,在后面我们会详细分析

RedisChannelInitializer

org.redisson.client.handler.RedisChannelInitializer

RedisChannelInitializer
RedisChannelInitializer
 @Override
    protected void initChannel(Channel ch) throws Exception {
      	// 开启SSL终端识别能力
        initSsl(config, ch);
        
        if (type == Type.PLAIN) {
          	//Redis正常连接处理类
            ch.pipeline().addLast(new RedisConnectionHandler(redisClient));
        } else {
          	//Redis订阅发布处理类
            ch.pipeline().addLast(new RedisPubSubConnectionHandler(redisClient));
        }
        
        ch.pipeline().addLast(
          	//链路检测狗
            connectionWatchdog,
          	//Redis协议命令编码器
            CommandEncoder.INSTANCE,
          	//Redis协议命令批量编码器
            CommandBatchEncoder.INSTANCE,
          	//Redis命令队列
            new CommandsQueue());
        
        if (pingConnectionHandler != null) {
           //心跳包连接处理类
            ch.pipeline().addLast(pingConnectionHandler);
        }
        
        if (type == Type.PLAIN) {
          	//Redis协议命令解码器
            ch.pipeline().addLast(new CommandDecoder(config.getExecutor(), config.isDecodeInExecutor()));
        } else {
          	//Redis订阅发布解码器
            ch.pipeline().addLast(new CommandPubSubDecoder(config.getExecutor(), config.isKeepPubSubOrder(), config.isDecodeInExecutor()));
        }

        config.getNettyHook().afterChannelInitialization(ch);
    }

图1 Redisson 链路处理图

Redisson处理链路
Redisson处理链路

Redisson的处理链

Redisson的Pipeline里面的ChannelHandler比较多,我挑选其中CommandEncoderCommandDecoder进行源码剖析。

CommandDecoder_Encoder
CommandDecoder_Encoder

失败重连

org.redisson.client.handler.ConnectionWatchdog#reconnect 重连机制

private void reconnect(final RedisConnection connection, final int attempts){
		//重试时间越来越久
    int timeout = 2 << attempts;
    if (bootstrap.config().group().isShuttingDown()) {
        return;
    }
    
    try {
        timer.newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                tryReconnect(connection, Math.min(BACKOFF_CAP, attempts + 1));
            }
        }, timeout, TimeUnit.MILLISECONDS);
    } catch (IllegalStateException e) {
        // skip
    }
}

netty中的Timer管理,使用了的Hashed time Wheel的模式,Time Wheel翻译为时间轮,是用于实现定时器timer的经典算法。

这个方法的声明是这样的:

 /**
     * Schedules the specified {@link TimerTask} for one-time execution after
     * the specified delay.
     *
     * @return a handle which is associated with the specified task
     *
     * @throws IllegalStateException       if this timer has been {@linkplain #stop() stopped} already
     * @throws RejectedExecutionException if the pending timeouts are too many and creating new timeout
     *                                    can cause instability in the system.
     */
    Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);

这个方法需要一个TimerTask对象以知道当时间到时要执行什么逻辑,然后需要delay时间数值和TimeUnit时间的单位。

Redis协议命令编码器

​ Redis 的作者认为数据库系统的瓶颈一般不在于网络流量,而是数据库自身内部逻辑处理上。所以即使 Redis 使用了浪费流量的文本协议,依然可以取得极高的访问性能。Redis 将所有数据都放在内存,用一个单线程对外提供服务,单个节点在跑满一个 CPU 核心的情况下可以达到了 10w/s 的超高 QPS。

RESP 是 Redis 序列化协议的简写。它是一种直观的文本协议,优势在于实现异常简单,解析性能极好。

Redis 协议将传输的结构数据分为 5 种最小单元类型,单元结束时统一加上回车换行符号\r\n

  1. 单行字符串 以 + 符号开头。
  2. 多行字符串 以 $ 符号开头,后跟字符串长度。
  3. 整数值 以 : 符号开头,后跟整数的字符串形式。
  4. 错误消息 以 - 符号开头。
  5. 数组 以 * 号开头,后跟数组的长度。

单行字符串 hello world

+hello world\r\n

多行字符串 hello world

$11\r\nhello world\r\n

多行字符串当然也可以表示单行字符串。

整数 1024

:1024\r\n

错误 参数类型错误

-WRONGTYPE Operation against a key holding the wrong kind of value\r\n

数组 [1,2,3]

*3\r\n:1\r\n:2\r\n:3\r\n

NULL 用多行字符串表示,不过长度要写成-1。

$-1\r\n

空串 用多行字符串表示,长度填 0。

$0\r\n\r\n

注意这里有两个\r\n。为什么是两个?因为两个\r\n之间,隔的是空串。

org.redisson.client.handler.CommandEncoder#encode()

private static final char ARGS_PREFIX = '*';
private static final char BYTES_PREFIX = '$';
private static final byte[] CRLF = "\r\n".getBytes();


@Override
    protected void encode(ChannelHandlerContext ctx, CommandData<?, ?> msg, ByteBuf out) throws Exception {
        try {
          	//redis命令前缀
            out.writeByte(ARGS_PREFIX);
            int len = 1 + msg.getParams().length;
            if (msg.getCommand().getSubName() != null) {
                len++;
            }
            out.writeCharSequence(Long.toString(len), CharsetUtil.US_ASCII);
            out.writeBytes(CRLF);
            
            writeArgument(out, msg.getCommand().getName().getBytes(CharsetUtil.UTF_8));
            if (msg.getCommand().getSubName() != null) {
                writeArgument(out, msg.getCommand().getSubName().getBytes(CharsetUtil.UTF_8));
            }
          	......
        } catch (Exception e) {
            msg.tryFailure(e);
            throw e;
        }
    }

private void writeArgument(ByteBuf out, ByteBuf arg) {
    out.writeByte(BYTES_PREFIX);
    out.writeCharSequence(Long.toString(arg.readableBytes()), CharsetUtil.US_ASCII);
    out.writeBytes(CRLF);
    out.writeBytes(arg, arg.readerIndex(), arg.readableBytes());
    out.writeBytes(CRLF);
}

Redis协议命令解码器

org.redisson.client.handler.CommandDecoder#readBytes

 private static final char CR = '\r';
 private static final char LF = '\n';
 private static final char ZERO = '0';

private ByteBuf readBytes(ByteBuf is) throws IOException {
    long l = readLong(is);
    if (l > Integer.MAX_VALUE) {
        throw new IllegalArgumentException(
                "Java only supports arrays up to " + Integer.MAX_VALUE + " in size");
    }
    int size = (int) l;
    if (size == -1) {
        return null;
    }
    ByteBuf buffer = is.readSlice(size);
    int cr = is.readByte();
    int lf = is.readByte();
  	//判断是否以\r\n开头
    if (cr != CR || lf != LF) {
        throw new IOException("Improper line ending: " + cr + ", " + lf);
    }
    return buffer;
}

数据序列化

Redisson的对象编码类是用于将对象进行序列化和反序列化,以实现对该对象在Redis里的读取和存储。Redisson提供了以下几种的对象编码应用,以供大家选择:

编码类名称

说明

org.redisson.codec.JsonJacksonCodec

Jackson JSON 编码 默认编码

org.redisson.codec.AvroJacksonCodec

Avro 一个二进制的JSON编码

org.redisson.codec.SmileJacksonCodec

Smile 另一个二进制的JSON编码

org.redisson.codec.CborJacksonCodec

CBOR 又一个二进制的JSON编码

org.redisson.codec.MsgPackJacksonCodec

MsgPack 再来一个二进制的JSON编码

org.redisson.codec.IonJacksonCodec

Amazon Ion 亚马逊的Ion编码,格式与JSON类似

org.redisson.codec.KryoCodec

Kryo 二进制对象序列化编码

org.redisson.codec.SerializationCodec

JDK序列化编码

org.redisson.codec.FstCodec

FST 10倍于JDK序列化性能而且100%兼容的编码

org.redisson.codec.LZ4Codec

LZ4 压缩型序列化对象编码

org.redisson.codec.SnappyCodec

Snappy 另一个压缩型序列化对象编码

org.redisson.client.codec.JsonJacksonMapCodec

基于Jackson的映射类使用的编码。可用于避免序列化类的信息,以及用于解决使用byte[]遇到的问题。

org.redisson.client.codec.StringCodec

纯字符串编码(无转换)

org.redisson.client.codec.LongCodec

纯整长型数字编码(无转换)

org.redisson.client.codec.ByteArrayCodec

字节数组编码

org.redisson.codec.CompositeCodec

用来组合多种不同编码在一起

codec
codec

Codec

public interface Codec {

  	//返回用于HMAP Redis结构中哈希映射值的对象解码器
    Decoder<Object> getMapValueDecoder();

  	//返回用于HMAP Redis结构中哈希映射值的对象编码器
    Encoder getMapValueEncoder();

  	//返回用于HMAP Redis结构中哈希映射键的对象解码器
    Decoder<Object> getMapKeyDecoder();

  	//返回用于HMAP Redis结构中哈希映射键的对象编码器
    Encoder getMapKeyEncoder();

    //返回用于除HMAP之外的任何存储Redis结构的对象解码器
    Decoder<Object> getValueDecoder();

    //返回用于除HMAP之外的任何存储Redis结构的对象编码器
    Encoder getValueEncoder();

    //返回用于加载解码过程中使用的类的类加载器对象
    ClassLoader getClassLoader();

}

BaseCodec

org.redisson.client.codec.BaseCodec

BaseCodec
BaseCodec
  1. HashMap的键值对的编解码的处理类使用普通的对象编解码处理类进行分解。 //返回用于除HMAP之外的任何存储Redis结构的对象解码器 Decoder<Object> getValueDecoder(); //返回用于除HMAP之外的任何存储Redis结构的对象编码器 Encoder getValueEncoder();

SerializationCodec

org.redisson.codec.SerializationCodec

Decoder

SerializationCodec-decoder
SerializationCodec-decoder

Encoder

SerializationCodec-encoder
SerializationCodec-encoder
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-04-09 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 客户端初始化
    • createBootstrap
      • RedisChannelInitializer
      • Redisson的处理链
        • 失败重连
          • Redis协议命令编码器
            • Redis协议命令解码器
            • 数据序列化
              • Codec
                • BaseCodec
                  • SerializationCodec
                  相关产品与服务
                  云数据库 Redis
                  腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档