应用 + IP + 整点小时
对应: 一个索引文件 和 一个数据文件MessageBlock
中的MessageTree
个数,进行遍历MessageTree
的index(索引递增号) 和 每个MessageTree
的size(数据大小)索引递增号*6
// MessageBlockWriter.java
public synchronized void writeBlock(MessageBlock block) throws IOException {
// block中消息条数
int len = block.getBlockSize();
// block大小
byte[] data = block.getData();
// 用于在遍历过程中记录每条消息的偏移量,遍历完成之后,blockSize等于block的大小
int blockSize = 0;
ByteBuffer buffer = ByteBuffer.allocate(4 + data.length);
buffer.order(ByteOrder.BIG_ENDIAN);
for (int i = 0; i < len; i++) {
// 消息的递增号
int seq = block.getIndex(i);
// 消息的大小
int size = block.getSize(i);
// m_indexFile.seek(seq * 6L);
// 该消息在索引文件的起始位置 递增号*6 ,表示每条消息在索引文件中占6个字节大小
m_indexChannel.position(seq * 6L);
// m_indexFile.writeInt(m_blockAddress);
// m_indexFile.writeShort(blockSize);
// 用于记录该消息所对应block在数据文件中的起始地址
buffer.putInt(m_blockAddress);
// 用于记录该消息在block中的偏移量
buffer.putShort((short) blockSize);
buffer.flip();
// 写入索引文件
m_indexChannel.write(buffer);
// 计算下一条消息在该block中的偏移量
blockSize += size;
buffer.clear();
}
// m_dataFile.writeInt(data.length);
// m_dataFile.write(data);
buffer = ByteBuffer.allocate(4 + data.length);
buffer.order(ByteOrder.BIG_ENDIAN);
// 先在数据文件中用4个字节记录 block 的大小
buffer.putInt(data.length);
// 再将block的内容写入数据文件
buffer.put(data);
buffer.flip();
m_dataChannel.write(buffer);
// 更新 m_blockAddress 的值,即数据文件下一次写入时的起始位置
m_blockAddress += data.length + 4;
}
即数据文件中的存储结构为: 【blockSize(4byte)->blockData】=>【blockSize(4byte)->blockData】
索引文件的存储结构为: 【blackAddr(4byte)->messageOffsetInBlock(2byte)】 => 【blackAddr(4byte)->messageOffsetInBlock(2byte)】
对于真正的文件存储,block在这里其实是一个抽象的概念; 如果是直接以Message为单位进行写文件,那这个 block 和 索引文件中的block偏移量 就没有什么意义了。但实际上消息是以block为单位进行写文件,一个block最大为64K,而一个block中又存在多条消息,所以每条消息在它所属的block中有一个偏移量
// MessageBlockReader.java
private DataInputStream createDataInputStream(byte[] buf) {
DataInputStream in = null;
try {
in = new DataInputStream(new SnappyInputStream(new ByteArrayInputStream(buf)));
} catch (IOException e) {
try {
in = new DataInputStream(new GZIPInputStream(new ByteArrayInputStream(buf)));
} catch (IOException ioe) {
Cat.logError(ioe);
}
}
return in;
}
public byte[] readMessage(int index) throws IOException {
int blockAddress = 0;
int blockOffset = 0;
// 索引 在索引文件的起始位置
m_indexFile.seek(index * 6L);
// 读出4字节,该值代表block在数据文件的起始位置
blockAddress = m_indexFile.readInt();
// 读出2字节 该值代表Message在block中的偏移量
blockOffset = m_indexFile.readShort() & 0xFFFF;
// 从数据文件的 blockAddress 地址开始访问数据
m_dataFile.seek(blockAddress);
// 4字节里面存的是block块的长度
byte[] buf = new byte[m_dataFile.readInt()];
// 从数据文件中读取整个block到buf数组
m_dataFile.readFully(buf);
DataInputStream in = createDataInputStream(buf);
if (in != null) {
try {
// 跳到block中的偏移量
in.skip(blockOffset);
// 该值代表消息长度
int len = in.readInt();
byte[] data = new byte[len];
// 从block中读取Message
in.readFully(data);
return data;
} finally {
try {
in.close();
} catch (Exception e) {
// ignore it
}
}
} else {
return null;
}
}
听说还有V2版本,分 以一级索引和二级索引,可我拉代码没看到呀