前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊claudb的exportRDB

聊聊claudb的exportRDB

作者头像
code4it
发布2020-08-21 17:04:20
3410
发布2020-08-21 17:04:20
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下claudb的exportRDB

exportRDB

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/DBServerState.java

代码语言:javascript
复制
public class DBServerState {

  //......

  public void exportRDB(OutputStream output) throws IOException {
    RDBOutputStream rdb = new RDBOutputStream(output);
    rdb.preamble(RDB_VERSION);
    for (int i = 0; i < databases.size(); i++) {
      Database db = databases.get(i);
      if (!db.isEmpty()) {
        rdb.select(i);
        rdb.dabatase(db);
      }
    }
    rdb.end();
  }

  //......

}
  • exportRDB方法先通过rdb.preamble(RDB_VERSION)写入redis魔数及版本;然后遍历databases,挨个执行rdb.select(i)写入SELECT及db的长度,在执行rdb.dabatase(db),遍历entry,挨个按expiredAt、type、key、value写入数据

RDBOutputStream

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/persistence/RDBOutputStream.java

代码语言:javascript
复制
public class RDBOutputStream {

  private static final byte[] REDIS = safeString("REDIS").getBytes();

  private static final int TTL_MILLISECONDS = 0xFC;
  private static final int END_OF_STREAM = 0xFF;
  private static final int SELECT = 0xFE;

  private final CheckedOutputStream out;

  public RDBOutputStream(OutputStream out) {
    super();
    this.out = new CheckedOutputStream(out, new CRC64());
  }

  public void preamble(int version) throws IOException {
    out.write(REDIS);
    out.write(version(version));
  }

  private byte[] version(int version) {
    StringBuilder sb = new StringBuilder(String.valueOf(version));
    for (int i = sb.length(); i < Integer.BYTES; i++) {
      sb.insert(0, '0');
    }
    return sb.toString().getBytes(StandardCharsets.UTF_8);
  }

  public void select(int db) throws IOException {
    out.write(SELECT);
    length(db);
  }

  public void dabatase(Database db) throws IOException {
    for (Tuple2<DatabaseKey, DatabaseValue> entry : db.entrySet()) {
      value(entry.get1(), entry.get2());
    }
  }

  private void value(DatabaseKey key, DatabaseValue value) throws IOException {
    expiredAt(value.getExpiredAt());
    type(value.getType());
    key(key);
    value(value);
  }

  private void expiredAt(Instant expiredAt) throws IOException {
    if (expiredAt != null) {
      out.write(TTL_MILLISECONDS);
      out.write(ByteUtils.toByteArray(expiredAt.toEpochMilli()));
    }
  }

  private void type(DataType type) throws IOException {
    out.write(type.ordinal());
  }

  private void key(DatabaseKey key) throws IOException {
    string(key.getValue());
  }

  private void value(DatabaseValue value) throws IOException {
    switch (value.getType()) {
    case STRING:
      string(value.getString());
      break;
    case LIST:
      list(value.getList());
      break;
    case HASH:
      hash(value.getHash());
      break;
    case SET:
      set(value.getSet());
      break;
    case ZSET:
      zset(value.getSortedSet());
      break;
    default:
      break;
    }
  }

  private void length(int length) throws IOException {
    if (length < 0x40) {
      // 1 byte: 00XXXXXX
      out.write(length);
    } else if (length < 0x4000) {
      // 2 bytes: 01XXXXXX XXXXXXXX
      int b1 = length >> 8;
      int b2 = length & 0xFF;
      out.write(0x40 | b1);
      out.write(b2);
    } else {
      // 5 bytes: 10...... XXXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
      out.write(0x80);
      out.write(toByteArray(length));
    }
  }

  private void string(String value) throws IOException {
    string(safeString(value));
  }

  private void string(SafeString value) throws IOException {
    byte[] bytes = value.getBytes();
    length(bytes.length);
    out.write(bytes);
  }

  private void string(double value) throws IOException {
    string(String.valueOf(value));
  }

  private void list(ImmutableList<SafeString> value) throws IOException {
    length(value.size());
    for (SafeString item : value) {
      string(item);
    }
  }

  private void hash(ImmutableMap<SafeString, SafeString> value) throws IOException {
    length(value.size());
    for (Tuple2<SafeString, SafeString> entry : value.entries()) {
      string(entry.get1());
      string(entry.get2());
    }
  }

  private void set(ImmutableSet<SafeString> value) throws IOException {
    length(value.size());
    for (SafeString item : value) {
      string(item);
    }
  }

  private void zset(NavigableSet<Entry<Double, SafeString>> value) throws IOException {
    length(value.size());
    for (Entry<Double, SafeString> item : value) {
      string(item.getValue());
      string(item.getKey());
    }
  }

  public void end() throws IOException {
    out.write(END_OF_STREAM);
    out.write(toByteArray(out.getChecksum().getValue()));
    out.flush();
  }
}
  • RDBOutputStream的构造器用CheckedOutputStream包装了OutputStream;其dabatase方法遍历db.entrySet(),挨个执行value方法;value方法先分别执行expiredAt、type、key、value方法;value方法针对STRING、LIST、HASH、SET、ZSET这几种value类型做了不同的处理;string方法直接写入string;list方法先写入list大小,再挨个写入list元素;hash方法先写入hash大小,再挨个写入key和value;set先写入set大小,在挨个写入set元素;zset先写入zset大小,再挨个写入value和score;end方法写入END_OF_STREAM,然后再写入checksum

CheckedOutputStream

java.base/java/util/zip/CheckedOutputStream.java

代码语言:javascript
复制
public
class CheckedOutputStream extends FilterOutputStream {
    private Checksum cksum;

    /**
     * Creates an output stream with the specified Checksum.
     * @param out the output stream
     * @param cksum the checksum
     */
    public CheckedOutputStream(OutputStream out, Checksum cksum) {
        super(out);
        this.cksum = cksum;
    }

    /**
     * Writes a byte. Will block until the byte is actually written.
     * @param b the byte to be written
     * @exception IOException if an I/O error has occurred
     */
    public void write(int b) throws IOException {
        out.write(b);
        cksum.update(b);
    }

    /**
     * Writes an array of bytes. Will block until the bytes are
     * actually written.
     * @param b the data to be written
     * @param off the start offset of the data
     * @param len the number of bytes to be written
     * @exception IOException if an I/O error has occurred
     */
    public void write(byte[] b, int off, int len) throws IOException {
        out.write(b, off, len);
        cksum.update(b, off, len);
    }

    /**
     * Returns the Checksum for this output stream.
     * @return the Checksum
     */
    public Checksum getChecksum() {
        return cksum;
    }
}
  • CheckedOutputStream继承了FilterOutputStream,其构造器要求输入OutputStream及Checksum,每次write的时候都会执行cksum.update;其getChecksum方法直接返回cksum

CRC64

代码语言:javascript
复制
public class CRC64 implements Checksum {

  private static final int LOOKUPTABLE_SIZE = 256;
  private static final long POLY64REV = 0xC96C5795D7870F42L;
  private static final long LOOKUPTABLE[] = new long[LOOKUPTABLE_SIZE];

  private long crc = -1;

  static {
    for (int b = 0; b < LOOKUPTABLE.length; ++b) {
      long r = b;
      for (int i = 0; i < Long.BYTES; ++i) {
        if ((r & 1) == 1) {
          r = (r >>> 1) ^ POLY64REV;
        } else {
          r >>>= 1;
        }
      }
      LOOKUPTABLE[b] = r;
    }
  }

  @Override
  public void update(int b) {
    crc = LOOKUPTABLE[((b & 0xFF) ^ (int) crc) & 0xFF] ^ (crc >>> 8);
  }

  @Override
  public void update(byte[] buf, int off, int len) {
    int end = off + len;

    while (off < end) {
      crc = LOOKUPTABLE[(buf[off++] ^ (int) crc) & 0xFF] ^ (crc >>> 8);
    }
  }

  @Override
  public long getValue() {
    return ~crc;
  }

  @Override
  public void reset() {
    crc = -1;
  }
}
  • CRC64实现了Checksum接口,其update方法会借用LOOKUPTABLE更新crc

小结

exportRDB方法先通过rdb.preamble(RDB_VERSION)写入redis魔数及版本;然后遍历databases,挨个执行rdb.select(i)写入SELECT及db的长度,在执行rdb.dabatase(db),遍历entry,挨个按expiredAt、type、key、value写入数据

doc

  • DBServerState
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-08-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • exportRDB
  • RDBOutputStream
  • CheckedOutputStream
  • CRC64
  • 小结
  • doc
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档