前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊claudb的importRDB

聊聊claudb的importRDB

原创
作者头像
code4it
修改2020-08-20 10:01:09
2950
修改2020-08-20 10:01:09
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下claudb的importRDB

importRDB

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/DBServerState.java

代码语言:javascript
复制
public class DBServerState {
​
    //......
​
  public void importRDB(InputStream input) throws IOException {
    RDBInputStream rdb = new RDBInputStream(input);
​
    Map<Integer, Map<DatabaseKey, DatabaseValue>> load = rdb.parse();
    for (Map.Entry<Integer, Map<DatabaseKey, DatabaseValue>> entry : load.entrySet()) {
      databases.get(entry.getKey()).overrideAll(ImmutableMap.from(entry.getValue()));
    }
  }
​
    //......
}
  • importRDB方法创建RDBInputStream,然后执行其parse方法进行解析,之后遍历解析结果,挨个执行databases.get(entry.getKey()).overrideAll(ImmutableMap.from(entry.getValue()))

RDBInputStream

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/persistence/RDBInputStream.java

代码语言:javascript
复制
public class RDBInputStream {
​
  private static final SafeString REDIS_PREAMBLE = safeString("REDIS");
​
  private static final long TO_MILLIS = 1000L;
​
  private static final int HASH = 0x04;
  private static final int SORTED_SET = 0x03;
  private static final int SET = 0x02;
  private static final int LIST = 0x01;
  private static final int STRING = 0x00;
​
  private static final int TTL_MILLISECONDS = 0xFC;
  private static final int TTL_SECONDS = 0xFD;
  private static final int SELECT = 0xFE;
  private static final int END_OF_STREAM = 0xFF;
​
  private static final int REDIS_VERSION = 6;
  private static final int VERSION_LENGTH = 4;
  private static final int REDIS_LENGTH = 5;
​
  private final CheckedInputStream in;
​
  public RDBInputStream(InputStream in) {
    this.in = new CheckedInputStream(in, new CRC64());
  }
​
  public Map<Integer, Map<DatabaseKey, DatabaseValue>> parse() throws IOException {
    Map<Integer, Map<DatabaseKey, DatabaseValue>> databases = new HashMap<>();
​
    int version = version();
​
    if (version > REDIS_VERSION) {
      throw new IOException("invalid version: " + version);
    }
​
    Long expireTime = null;
    HashMap<DatabaseKey, DatabaseValue> db = null;
    for (boolean end = false; !end;) {
      int read = in.read();
      switch (read) {
      case SELECT:
        db = new HashMap<>();
        databases.put(readLength(), db);
        break;
      case TTL_SECONDS:
        expireTime = parseTimeSeconds();
        break;
      case TTL_MILLISECONDS:
        expireTime = parseTimeMillis();
        break;
      case STRING:
        ensure(db, readKey(), readString(expireTime));
        expireTime = null;
        break;
      case LIST:
        ensure(db, readKey(), readList(expireTime));
        expireTime = null;
        break;
      case SET:
        ensure(db, readKey(), readSet(expireTime));
        expireTime = null;
        break;
      case SORTED_SET:
        ensure(db, readKey(), readSortedSet(expireTime));
        expireTime = null;
        break;
      case HASH:
        ensure(db, readKey(), readHash(expireTime));
        expireTime = null;
        break;
      case END_OF_STREAM:
        // end of stream
        end = true;
        db = null;
        expireTime = null;
        break;
      default:
        throw new IOException("not supported: " + read);
      }
    }
​
    verifyChecksum();
​
    return databases;
  }
​
  private int version() throws IOException {
    SafeString redis = new SafeString(read(REDIS_LENGTH));
    if (!redis.equals(REDIS_PREAMBLE)) {
      throw new IOException("not valid stream");
    }
    return parseVersion(read(VERSION_LENGTH));
  }
​
  private int parseVersion(byte[] version) {
    StringBuilder sb = new StringBuilder();
    for (byte b : version) {
      sb.append((char) b);
    }
    return Integer.parseInt(sb.toString());
  }
​
  private void verifyChecksum() throws IOException {
    long calculated = in.getChecksum().getValue();
​
    long readed = parseChecksum();
​
    if (calculated != readed) {
      throw new IOException("invalid checksum: " + readed);
    }
  }
​
  private long parseChecksum() throws IOException {
    return ByteUtils.byteArrayToLong(read(Long.BYTES));
  }
​
  //......
​
}
  • RDBInputStream的构造器使用CheckedInputStream对InputStream进行包装;其parse方法先解析version进行校验,之后循环进行in.read(),分别解析SELECT、TTL_SECONDS、TTL_MILLISECONDS、STRING、LIST、SET、SORTED_SET、HASH、END_OF_STREAM;最后执行verifyChecksum校验checksum

readKey

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/persistence/RDBInputStream.java

代码语言:javascript
复制
  private DatabaseKey readKey() throws IOException {
    return new DatabaseKey(readSafeString());
  }
​
  private SafeString readSafeString() throws IOException {
    int length = readLength();
    return new SafeString(read(length));
  }
​
  private int readLength() throws IOException {
    int length = in.read();
    if (length < 0x40) {
      // 1 byte: 00XXXXXX
      return length;
    } else if (length < 0x80) {
      // 2 bytes: 01XXXXXX XXXXXXXX
      int next = in.read();
      return readLength(length, next);
    } else {
      // 5 bytes: 10...... XXXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
      return byteArrayToInt(read(Integer.BYTES));
    }
  }
​
  private byte[] read(int size) throws IOException {
    byte[] array = new byte[size];
    int read = in.read(array);
    if (read != size) {
      throw new IOException("error reading stream");
    }
    return array;
  }      
  • readKey方法执行readSafeString,先readLength,在根据length进行read

readString

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/persistence/RDBInputStream.java

代码语言:javascript
复制
  private DatabaseValue readString(Long expireTime) throws IOException {
    return string(readSafeString()).expiredAt(expireTime != null ? ofEpochMilli(expireTime) : null);
  }
  • readString方法先readSafeString,然后转为DatabaseValue再设置expiredAt

readList

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/persistence/RDBInputStream.java

代码语言:javascript
复制
  private DatabaseValue readList(Long expireTime) throws IOException {
    int size = readLength();
    List<SafeString> list = new LinkedList<>();
    for (int i = 0; i < size; i++) {
      list.add(readSafeString());
    }
    return list(list).expiredAt(expireTime != null ? ofEpochMilli(expireTime) : null);
  }
  • readList方法先通过readLength读取list元素个数,在挨个执行readSafeString,最后将list转为DatabaseValue再设置expiredAt

readSet

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/persistence/RDBInputStream.java

代码语言:javascript
复制
  private DatabaseValue readSet(Long expireTime) throws IOException {
    int size = readLength();
    Set<SafeString> set = new LinkedHashSet<>();
    for (int i = 0; i < size; i++) {
      set.add(readSafeString());
    }
    return set(set).expiredAt(expireTime != null ? ofEpochMilli(expireTime) : null);
  }
  • readSet方法先通过readLength读取set元素个数,在挨个执行readSafeString,最后将set转为DatabaseValue再设置expiredAt

readSortedSet

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/persistence/RDBInputStream.java

代码语言:javascript
复制
  private DatabaseValue readSortedSet(Long expireTime) throws IOException {
    int size = readLength();
    Set<Entry<Double, SafeString>> entries = new LinkedHashSet<>();
    for (int i = 0; i < size; i++) {
      SafeString value = readSafeString();
      Double score = readDouble();
      entries.add(score(score, value));
    }
    return zset(entries).expiredAt(expireTime != null ? ofEpochMilli(expireTime) : null);
  }
  • readSortedSet方法先通过readLength读取set元素个数,在挨个执行readSafeString、readDouble,添加到LinkedHashSet,最后将entries转为DatabaseValue再设置expiredAt;与readSet的差别在于多读取了score

readHash

代码语言:javascript
复制
  private DatabaseValue readHash(Long expireTime) throws IOException {
    int size = readLength();
    Set<Tuple2<SafeString, SafeString>> entries = new LinkedHashSet<>();
    for (int i = 0; i < size; i++) {
      entries.add(entry(readSafeString(), readSafeString()));
    }
    return hash(entries).expiredAt(expireTime != null ? ofEpochMilli(expireTime) : null);
  }
  • readHash方法先通过readLength读取hash元素个数,在挨个执行readSafeString、readSafeString,转为entry添加到LinkedHashSet,最后将entries转为DatabaseValue再设置expiredAt

小结

importRDB方法创建RDBInputStream,然后执行其parse方法进行解析,之后遍历解析结果,挨个执行databases.get(entry.getKey()).overrideAll(ImmutableMap.from(entry.getValue()))

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • importRDB
  • RDBInputStream
    • readKey
      • readString
        • readList
          • readSet
            • readSortedSet
              • readHash
              • 小结
              • doc
              相关产品与服务
              云数据库 Redis
              腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档