前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >深入浅出Zookeeper源码(二):存储技术

深入浅出Zookeeper源码(二):存储技术

作者头像
泊浮目
发布2024-01-09 13:42:40
1000
发布2024-01-09 13:42:40
举报
文章被收录于专栏:狗哥的专栏狗哥的专栏

版本

日期

备注

1.0

2020.3.12

文章首发

1.0.1

2020.3.16

改进部分大小写问题及形容方式

1.0.2

2020.3.21

改进可能会引起错误理解的部分

1.0.3

2020.3.29

修改标题

1.0.4

2020.4.18

改进小结部分

1.0.5

2020.6.26

更新部分部分解释,改进注释风格

1.0.6

2020.7.6

增加部分详细解释

前言

在上篇文章中,我们简单提到了Zookeeper的几个核心点。在这篇文章中,我们就来探索其存储技术。在开始前,读者可以考虑思考下列问题:

  • Zookeeper的数据存储是如何实现的?
  • Zookeeper进行一次写操作的时候,会发生什么å?
  • 当一个Zookeeper新加入现有集群时,如何同步现集群中的数据?

抱着问题,我们进入下面的内容。

Zookeper本地存储模型

众所周知,Zookeeper不擅长大量数据的读写,因为:

  1. 本质上就是一个内存里的字典。
  2. 持久化节点的写入由于WAL会导致刷盘,过大的数据会引起额外的seek
  3. 同样的,在zk启动时,所有的数据会从WAL的日志中读出。如果过大,也会导致启动时间较长。

而内存中的数据,也被称为ZkDatabase(Zk的内存数据库),由它来负责管理Zk的会话DataTree存储和事务日志,它也会定时向磁盘dump快照数据,在Zk启动时,也会通过事务日志和快照数据来恢复内存中的数据。

既然Zk的数据是在内存里的,那么它是如何解决数据持久化问题的呢?上一段我们已经提到了:即通过事务日志——WAL,在每次写请求前,都会根据目前的zxid来写log,将请求先记录到日志中。

接下来,我们来谈谈WAL的优化措施。

WAL的优化

WAL优化方案1:Group Commit

一般的WAL中每次写完END都要调用一次耗时的sync API,这其实是会影响到系统的性能。为了解决这个问题,我们可以一次提交多个数据写入——只在最后一个数据写入的END日志之后,才调用sync API。like this:

  • without group commit: BEGIN Data1 END Sync BEGIN Data2 END Sync BEGIN Data3 END Sync
  • with group commit: BEGIN Data1 END BEGIN Data2 END BEGIN Data3 END Sync

凡事都有代价,这可能会引起数据一致性相关的问题。

WAL优化方案2:File Padding

在往 WAL 里面追加日志的时候,如果当前的文件 block 不能保存新添加的日志,就要为文件分配新的 block,这要更新文件 inode 里面的信息(例如 size)。如果我们使用的是 HHD 的话,就要先 seek 到 inode 所在的位置,然后回到新添加 block 的位置进行日志追加,这些都是发生在写事务日志时,这会明显拖慢系统的性能。

为了减少这些 seek,我们可以预先为 WAL 分配 block。例如 ZooKeeper 当检测到当前事务日志文件不足4KB时,就会填充0使该文件到64MB(这里0仅仅作为填充位)。并新建一个64MB的文件。

所以这也是Zookeeper不擅长读写大数据的原因之一,这会引起大量的block分配。

WAL优化方案3:Snapshot

如果我们使用一个内存数据结构加 WAL 的存储方案,WAL 就会一直增长。这样在存储系统启动的时候,就要读取大量的 WAL 日志数据来重建内存数据。快照可以解决这个问题。

除了解决启动时间过长的问题之外,快照还可以减少存储空间的使用。WAL 的多个日志条目有可能是对同一个数据的改动,通过快照,就可以只保留最新的数据改动(Merge)。

Zk的确采用了这个方案来做优化。还带来的一个好处是:在一个节点加入时,可以用最新的Snapshot传过去便于同步数据。

源码解析

本节内容都以3.5.7版本为例

核心接口和类

  • TxnLog:接口类型,提供读写事务日志的API。
  • FileTxnLog:基于文件的TxnLog实现。
  • Snapshot:快照接口类型,提供序列化、反序列化、访问快照API。
  • FileSnapshot:基于文件的Snapshot实现。
  • FileTxnSnapLog:TxnLog和Snapshot的封装
  • DataTree:Zookeeper的内存数据结构,ZNode构成的树。
  • DataNode:表示一个ZNode。

TxnLog

TxnLog是我们前面提到的事务日志。那么接下来我们就来看它的相关源码。

先看注释:

代码语言:javascript
复制
package org.apache.zookeeper.server.persistence;

import ...

/**
 * This class implements the TxnLog interface. It provides api's
 * to access the txnlogs and add entries to it.
 * <p>
 * The format of a Transactional log is as follows:
 * <blockquote><pre>
 * LogFile:
 *     FileHeader TxnList ZeroPad
 *
 * FileHeader: {
 *     magic 4bytes (ZKLG)
 *     version 4bytes
 *     dbid 8bytes
 *   }
 *
 * TxnList:
 *     Txn || Txn TxnList
 *
 * Txn:
 *     checksum Txnlen TxnHeader Record 0x42
 *
 * checksum: 8bytes Adler32 is currently used
 *   calculated across payload -- Txnlen, TxnHeader, Record and 0x42
 *
 * Txnlen:
 *     len 4bytes
 *
 * TxnHeader: {
 *     sessionid 8bytes
 *     cxid 4bytes
 *     zxid 8bytes
 *     time 8bytes
 *     type 4bytes
 *   }
 *
 * Record:
 *     See Jute definition file for details on the various record types
 *
 * ZeroPad:
 *     0 padded to EOF (filled during preallocation stage)
 * </pre></blockquote>
 */
public class FileTxnLog implements TxnLog, Closeable {

在注释中,我们可以看到一个FileLog由三部分组成:

  • FileHeader
  • TxnList
  • ZerdPad

关于FileHeader,可以理解其为一个标示符。TxnList则为主要内容。ZeroPad是一个终结符。

TxnLog.append

我们来看看最典型的append方法,可以将其理解WAL过程中的核心方法:

代码语言:javascript
复制
    /**
     * append an entry to the transaction log
     * @param hdr the header of the transaction
     * @param txn the transaction part of the entry
     * returns true iff something appended, otw false
     */
    public synchronized boolean append(TxnHeader hdr, Record txn)
        throws IOException
    {
        if (hdr == null) { //为null意味着这是一个读请求,直接返回
            return false;
        }
        if (hdr.getZxid() <= lastZxidSeen) {
            LOG.warn("Current zxid " + hdr.getZxid()
                    + " is <= " + lastZxidSeen + " for "
                    + hdr.getType());
        } else {
            lastZxidSeen = hdr.getZxid();
        }
        if (logStream==null) { //为空的话则new一个Stream
           if(LOG.isInfoEnabled()){
                LOG.info("Creating new log file: " + Util.makeLogName(hdr.getZxid()));
           }

           logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
           fos = new FileOutputStream(logFileWrite);
           logStream=new BufferedOutputStream(fos);
           oa = BinaryOutputArchive.getArchive(logStream);
           FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
           fhdr.serialize(oa, "fileheader");   //写file header
           // Make sure that the magic number is written before padding.
           logStream.flush();      // zxid必须比日志先落盘
           filePadding.setCurrentSize(fos.getChannel().position());
           streamsToFlush.add(fos); //加入需要Flush的队列
        }
        filePadding.padFile(fos.getChannel());   //确定是否要扩容。每次64m扩容
        byte[] buf = Util.marshallTxnEntry(hdr, txn);  //序列化写入
        if (buf == null || buf.length == 0) {
            throw new IOException("Faulty serialization for header " +
                    "and txn");
        }
        Checksum crc = makeChecksumAlgorithm();   //生成butyArray的checkSum
        crc.update(buf, 0, buf.length);
        oa.writeLong(crc.getValue(), "txnEntryCRC");//写入日志里
        Util.writeTxnBytes(oa, buf);
        return true;
    }

这里有个zxid(ZooKeeper Transaction Id),有点像MySQL的GTID。每次对Zookeeper的状态的改变都会产生一个zxid,zxid是全局有序的,如果zxid1小于zxid2,则zxid1在zxid2之前发生。

简单分析一下写入过程:

  1. 确定要写的事务日志:当Zk启动完成或日志写满时,会与日志文件断开连接。这个时候会根据zxid创建一个日志。
  2. 是否需要预分配:如果检测到当前日志剩余空间不足4KB时
  3. 事务序列化
  4. 为每个事务生成一个Checksum,目的是为了校验数据的完整性和一致性。
  5. 写入文件,不过是写在Buffer里,并未落盘。
  6. 落盘。根据用户配置来决定是否强制落盘。
TxnLog.commit

这个方法被调用的时机大致有:

  • 服务端比较闲的时候去调用
  • 到请求数量超出1000时,调用。之前提到过GroupCommit,其实就是在这个时候调用的。
  • zk的shutdown钩子被调用时,调用
代码语言:javascript
复制
    /**
     * commit the logs. make sure that everything hits the
     * disk
     */
    public synchronized void commit() throws IOException {
        if (logStream != null) {
            logStream.flush();
        }
        for (FileOutputStream log : streamsToFlush) {
            log.flush();
            if (forceSync) {
                long startSyncNS = System.nanoTime();

                FileChannel channel = log.getChannel();
                channel.force(false);//对应fdataSync

                syncElapsedMS = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
                if (syncElapsedMS > fsyncWarningThresholdMS) {
                    if(serverStats != null) {
                        serverStats.incrementFsyncThresholdExceedCount();
                    }
                    LOG.warn("fsync-ing the write ahead log in "
                            + Thread.currentThread().getName()
                            + " took " + syncElapsedMS
                            + "ms which will adversely effect operation latency. "
                            + "File size is " + channel.size() + " bytes. "
                            + "See the ZooKeeper troubleshooting guide");
                }
            }
        }
        while (streamsToFlush.size() > 1) {
            streamsToFlush.removeFirst().close();
        }
    }

代码非常的简单。如果logStream还有,那就先刷下去。然后遍历待flush的队列(是个链表,用来保持操作顺序),同时还会关注写入的时间,如果过长,则会打一个Warn的日志。

DataTree和DataNode

DataTree是Zk的内存数据结构——就是我们之前说到的MTable。它以树状结构来组织DataNode。

这么听起来可能有点云里雾里,不妨直接看一下DataNode的相关代码。

代码语言:javascript
复制
public class DataNode implements Record {
    /** the data for this datanode */
    byte data[];

    /**
     * the acl map long for this datanode. the datatree has the map
     */
    Long acl;

    /**
     * the stat for this node that is persisted to disk.
     */
    public StatPersisted stat;

    /**
     * the list of children for this node. note that the list of children string
     * does not contain the parent path -- just the last part of the path. This
     * should be synchronized on except deserializing (for speed up issues).
     */
    private Set<String> children = null;
.....
}

如果用过ZkClient的小伙伴,可能非常熟悉。这就是我们根据一个path获取数据时返回的相关属性——这就是用来描述存储数据的一个类。注意,DataNode还会维护它的Children。

简单了解DataNode后,我们来看一下DataTree。为了避免干扰,我们选出最关键的成员变量:

代码语言:javascript
复制
public class DataTree {
    private static final Logger LOG = LoggerFactory.getLogger(DataTree.class);

    /**
     * This hashtable provides a fast lookup to the datanodes. The tree is the
     * source of truth and is where all the locking occurs
     */
    private final ConcurrentHashMap<String, DataNode> nodes =
        new ConcurrentHashMap<String, DataNode>();

    private final WatchManager dataWatches = new WatchManager();

    private final WatchManager childWatches = new WatchManager();

    /**
     * This hashtable lists the paths of the ephemeral nodes of a session.
     */
    private final Map<Long, HashSet<String>> ephemerals =
        new ConcurrentHashMap<Long, HashSet<String>>();
    .......
}

我们可以看到,DataTree本质上是通过一个ConcurrentHashMap来存储DataNode的(临时节点也是)。保存的是 DataNode 的 path 到 DataNode 的映射。

那为什么要保存两个状态呢?这得看调用它们被调用的场景:

  • 一般CRUD ZNode的请求都是走ConcurrentHashMap的
  • 序列化DataTree的时候会从Root节点开始遍历所有节点

如果需要获取所有节点的信息,显然遍历树会比一个个从ConcurrentHashMap 拿快。

接下来看一下序列化的相关代码:

DataNode的序列化方法
代码语言:javascript
复制
    /**
     * this method uses a stringbuilder to create a new path for children. This
     * is faster than string appends ( str1 + str2).
     *
     * @param oa
     *            OutputArchive to write to.
     * @param path
     *            a string builder.
     * @throws IOException
     * @throws InterruptedException
     */
    void serializeNode(OutputArchive oa, StringBuilder path) throws IOException {
        String pathString = path.toString();
        DataNode node = getNode(pathString);
        if (node == null) {
            return;
        }
        String children[] = null;
        DataNode nodeCopy;
        synchronized (node) {
            StatPersisted statCopy = new StatPersisted();
            copyStatPersisted(node.stat, statCopy);
            //we do not need to make a copy of node.data because the contents
            //are never changed
            nodeCopy = new DataNode(node.data, node.acl, statCopy);
            Set<String> childs = node.getChildren();
            children = childs.toArray(new String[childs.size()]);
        }
        serializeNodeData(oa, pathString, nodeCopy);
        path.append('/');
        int off = path.length();
        for (String child : children) {
            // since this is single buffer being resused
            // we need
            // to truncate the previous bytes of string.
            path.delete(off, Integer.MAX_VALUE);
            path.append(child);
            serializeNode(oa, path);
        }
    }

可以看到,的确是通过DataNode的Children来遍历所有节点。

DataNode的反序列化方法

接下来看一下反序列化的代码:

代码语言:javascript
复制
    public void deserialize(InputArchive ia, String tag) throws IOException {
        aclCache.deserialize(ia);
        nodes.clear();
        pTrie.clear();
        String path = ia.readString("path");
        while (!"/".equals(path)) {
            DataNode node = new DataNode();
            ia.readRecord(node, "node");
            nodes.put(path, node);
            synchronized (node) {
                aclCache.addUsage(node.acl);
            }
            int lastSlash = path.lastIndexOf('/');
            if (lastSlash == -1) {
                root = node;
            } else {
                String parentPath = path.substring(0, lastSlash);
                DataNode parent = nodes.get(parentPath);
                if (parent == null) {
                    throw new IOException("Invalid Datatree, unable to find " +
                            "parent " + parentPath + " of path " + path);
                }
                parent.addChild(path.substring(lastSlash + 1));
                long eowner = node.stat.getEphemeralOwner();
                EphemeralType ephemeralType = EphemeralType.get(eowner);
                if (ephemeralType == EphemeralType.CONTAINER) {
                    containers.add(path);
                } else if (ephemeralType == EphemeralType.TTL) {
                    ttls.add(path);
                } else if (eowner != 0) {
                    HashSet<String> list = ephemerals.get(eowner);
                    if (list == null) {
                        list = new HashSet<String>();
                        ephemerals.put(eowner, list);
                    }
                    list.add(path);
                }
            }
            path = ia.readString("path");
        }
        nodes.put("/", root);
        // we are done with deserializing the
        // the datatree
        // update the quotas - create path trie
        // and also update the stat nodes
        setupQuota();

        aclCache.purgeUnused();
    }

因为序列化的时候是前序遍历。所以反序列化时是先反序列化父亲节点,再反序列化孩子节点。

Snapshot

那么DataTree在什么情况下会序列化呢?在这里就要提到快照了。

前面提到过:如果我们使用一个内存数据结构加 WAL 的存储方案,WAL 就会一直增长。这样在存储系统启动的时候,就要读取大量的 WAL 日志数据来重建内存数据。快照可以解决这个问题。

除了减少WAL日志,Snapshot还会在Zk全量同步时被用到——当一个全新的ZkServer(这个一般叫Learner)被加入集群时,Leader服务器会将本机上的数据全量同步给新来的ZkServer。

序列化

接下来看一下代码入口:

代码语言:javascript
复制
    /**
     * serialize the datatree and session into the file snapshot
     * @param dt the datatree to be serialized
     * @param sessions the sessions to be serialized
     * @param snapShot the file to store snapshot into
     */
    public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot)
            throws IOException {
        if (!close) {
            try (OutputStream sessOS = new BufferedOutputStream(new FileOutputStream(snapShot));
                 CheckedOutputStream crcOut = new CheckedOutputStream(sessOS, new Adler32())) {
                //CheckedOutputStream cout = new CheckedOutputStream()
                OutputArchive oa = BinaryOutputArchive.getArchive(crcOut);
                FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId);
                serialize(dt, sessions, oa, header);
                long val = crcOut.getChecksum().getValue();
                oa.writeLong(val, "val");
                oa.writeString("/", "path");
                sessOS.flush();
            }
        } else {
            throw new IOException("FileSnap has already been closed");
        }
    }

JavaIO的基础知识在这不再介绍,有兴趣的人可以自行查阅资料或看 从一段代码谈起——浅谈JavaIO接口

本质就是创建文件,并调用DataTree的序列化方法,DataTree的序列化其实就是遍历DataNode去序列化,最后将这些序列化的内容写入文件。

反序列化
代码语言:javascript
复制
    /**
     * deserialize a data tree from the most recent snapshot
     * @return the zxid of the snapshot
     */
    public long deserialize(DataTree dt, Map<Long, Integer> sessions)
            throws IOException {
        // we run through 100 snapshots (not all of them)
        // if we cannot get it running within 100 snapshots
        // we should  give up
        List<File> snapList = findNValidSnapshots(100);
        if (snapList.size() == 0) {
            return -1L;
        }
        File snap = null;
        boolean foundValid = false;
        for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) {
            snap = snapList.get(i);
            LOG.info("Reading snapshot " + snap);
            try (InputStream snapIS = new BufferedInputStream(new FileInputStream(snap));
                 CheckedInputStream crcIn = new CheckedInputStream(snapIS, new Adler32())) {
                InputArchive ia = BinaryInputArchive.getArchive(crcIn);
                deserialize(dt, sessions, ia);
                long checkSum = crcIn.getChecksum().getValue();
                long val = ia.readLong("val");
                if (val != checkSum) {
                    throw new IOException("CRC corruption in snapshot :  " + snap);
                }
                foundValid = true;
                break;
            } catch (IOException e) {
                LOG.warn("problem reading snap file " + snap, e);
            }
        }
        if (!foundValid) {
            throw new IOException("Not able to find valid snapshots in " + snapDir);
        }
        dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);
        return dt.lastProcessedZxid;
    }

简单来说,先读取Snapshot文件们。并反序列化它们,组成DataTree。

小结

在本文中,笔者和大家一起学习了Zk的底层存储技术。在此处,我们做个简单的回顾:

  • zk的数据主要维护在内存中。在写入内存前,会做WAL,同时也会定期的做快照持久化到磁盘
  • WAL的常见优化手段有三种:Group Commit、File Padding、Snapshot

另外,Zk中序列化技术用的是Apache Jute——本质上调用了JavaDataOutput和Input,较为简单。故没在本文中展开。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2024-01-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • Zookeper本地存储模型
    • WAL的优化
      • WAL优化方案1:Group Commit
      • WAL优化方案2:File Padding
      • WAL优化方案3:Snapshot
  • 源码解析
    • 核心接口和类
      • TxnLog
      • DataTree和DataNode
      • Snapshot
  • 小结
相关产品与服务
云数据库 MySQL
腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档