前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Cat消息存储

Cat消息存储

作者头像
spilledyear
发布2019-12-12 21:39:19
7380
发布2019-12-12 21:39:19
举报
文章被收录于专栏:小白鼠小白鼠
  1. 消息格式为 应用名-IP-小时正点数-消息递增号 MessageId
  2. 每个 应用 + IP + 整点小时 对应: 一个索引文件 和 一个数据文件
  3. 消息经过编码后,首4字节为该消息的大小,从文件中读消息的时候会用到这个特性

写消息过程

  1. 获取MessageBlock中的MessageTree个数,进行遍历
  2. 获得每个MessageTree的index(索引递增号) 和 每个MessageTree的size(数据大小)
  3. 设置索引文件的起始位置 索引递增号*6
  4. 将该该消息所对应block在数据文件中的起始地址写到索引文件(4字节)
  5. 将该该消息在block中的偏移量写入索引文件(2字节)
  6. 将block的内容长度写入数据文件
  7. 将block的内容写入dataFile
代码语言:javascript
复制
// MessageBlockWriter.java
public synchronized void writeBlock(MessageBlock block) throws IOException {
    // block中消息条数
    int len = block.getBlockSize();
    // block大小
    byte[] data = block.getData();

    // 用于在遍历过程中记录每条消息的偏移量,遍历完成之后,blockSize等于block的大小
    int blockSize = 0;

    ByteBuffer buffer = ByteBuffer.allocate(4 + data.length);
    buffer.order(ByteOrder.BIG_ENDIAN);

    for (int i = 0; i < len; i++) {
        // 消息的递增号
        int seq = block.getIndex(i);
        // 消息的大小
        int size = block.getSize(i);

        // m_indexFile.seek(seq * 6L);
        // 该消息在索引文件的起始位置 递增号*6 ,表示每条消息在索引文件中占6个字节大小
        m_indexChannel.position(seq * 6L);

        // m_indexFile.writeInt(m_blockAddress);
        // m_indexFile.writeShort(blockSize);
        // 用于记录该消息所对应block在数据文件中的起始地址
        buffer.putInt(m_blockAddress);
        // 用于记录该消息在block中的偏移量
        buffer.putShort((short) blockSize);
        buffer.flip();
        // 写入索引文件
        m_indexChannel.write(buffer);

        // 计算下一条消息在该block中的偏移量
        blockSize += size;

        buffer.clear();
    }

    // m_dataFile.writeInt(data.length);
    // m_dataFile.write(data);
    buffer = ByteBuffer.allocate(4 + data.length);
    buffer.order(ByteOrder.BIG_ENDIAN);
    // 先在数据文件中用4个字节记录 block 的大小
    buffer.putInt(data.length);
    // 再将block的内容写入数据文件
    buffer.put(data);
    buffer.flip();
    m_dataChannel.write(buffer);

    // 更新 m_blockAddress 的值,即数据文件下一次写入时的起始位置
    m_blockAddress += data.length + 4;
}

即数据文件中的存储结构为: 【blockSize(4byte)->blockData】=>【blockSize(4byte)->blockData】

索引文件的存储结构为: 【blackAddr(4byte)->messageOffsetInBlock(2byte)】 => 【blackAddr(4byte)->messageOffsetInBlock(2byte)】

读消息过程

对于真正的文件存储,block在这里其实是一个抽象的概念; 如果是直接以Message为单位进行写文件,那这个 block 和 索引文件中的block偏移量 就没有什么意义了。但实际上消息是以block为单位进行写文件,一个block最大为64K,而一个block中又存在多条消息,所以每条消息在它所属的block中有一个偏移量

  1. 根据 索引递增号从索引文件读前4个字节 找到block的地址
  2. 该地址为起始地址,从数据文件中读取一个int类型数据(4个字节)作为该block的长度
  3. 根据该长度读取整个block的内容到byte数组
  4. 根据 索引递增号从索引文件读后2个字节 找到该消息在该block中的偏移地址
  5. 以偏移地址为起始地址,读取一个int类型数据(4个字节)作为该消息的大小(为什么读4字节?这是在对消息编码时决定的,首4字节表示该消息的大小)
  6. 根据偏移地址 和 上一步获取的int类型数据大小 读取Message
代码语言:javascript
复制
// MessageBlockReader.java
private DataInputStream createDataInputStream(byte[] buf) {
    DataInputStream in = null;

    try {
        in = new DataInputStream(new SnappyInputStream(new ByteArrayInputStream(buf)));
    } catch (IOException e) {
        try {
            in = new DataInputStream(new GZIPInputStream(new ByteArrayInputStream(buf)));
        } catch (IOException ioe) {
            Cat.logError(ioe);
        }
    }
    return in;
}

public byte[] readMessage(int index) throws IOException {
    int blockAddress = 0;
    int blockOffset = 0;

    // 索引 在索引文件的起始位置
    m_indexFile.seek(index * 6L);

    // 读出4字节,该值代表block在数据文件的起始位置
    blockAddress = m_indexFile.readInt();
    // 读出2字节 该值代表Message在block中的偏移量
    blockOffset = m_indexFile.readShort() & 0xFFFF;

    // 从数据文件的 blockAddress 地址开始访问数据
    m_dataFile.seek(blockAddress);
    // 4字节里面存的是block块的长度
    byte[] buf = new byte[m_dataFile.readInt()];
    // 从数据文件中读取整个block到buf数组
    m_dataFile.readFully(buf);

    DataInputStream in = createDataInputStream(buf);

    if (in != null) {
        try {
            // 跳到block中的偏移量
            in.skip(blockOffset);
            
            // 该值代表消息长度
            int len = in.readInt();

            byte[] data = new byte[len];
            
            // 从block中读取Message
            in.readFully(data);
            return data;
        } finally {
            try {
                in.close();
            } catch (Exception e) {
                // ignore it
            }
        }
    } else {
        return null;
    }
}

听说还有V2版本,分 以一级索引和二级索引,可我拉代码没看到呀

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 写消息过程
  • 读消息过程
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档