前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ZooKeeper构建分布式锁(选译)

ZooKeeper构建分布式锁(选译)

作者头像
java达人
发布2018-01-31 12:48:45
7350
发布2018-01-31 12:48:45
举报
文章被收录于专栏:java达人java达人

作者:Scott Leberknight

译者: java达人

来源:http://www.sleberknight.com/blog/sleberkn/entry/building_a_distributed_lock_revisited(点击阅读原文前往)

ZooKeeper分布式协调5: 构建分布式锁

这是介绍Apache ZooKeeper系列博客的第五篇。在第四篇博客中,您看到了 ZooKeeper体系结构和数据一致性的高层次视图。在这个博客中,我们将使用到目前为止所获得的所有知识来实现一个分布式锁。

您已经了解了如何与Apache ZooKeeper交互,并了解了它的体系结构和一致性模型。现在让我们使用这些知识来构建一个分布式锁。它的目标是在不同的机器上,甚至在不同的网络或不同的数据中心之间建立一个互斥锁。这也带来了一个好处,就是客户端对彼此一无所知;他们只知道他们需要使用这个锁来访问一些共享资源,并且他们拥有这个锁才能访问它。

为了构建这个锁,我们将创建一个持久的znode,它将作为父节点。希望获得锁的客户端将在父节点下面创建顺序的、临时的子节点。锁是由客户端进程拥有的,该进程的子节点具有最低的序列号。在图2中,锁节点有三个子节点,而节点1在这个时间点拥有锁,因为它的序列号是最低的。在删除节点1之后,锁被释放,然后拥有节点2的客户端拥有这个锁,以此类推。

图2-父锁节点和子节点

分布式锁节点

客户端确定自己是否拥有锁的算法在表面上是很简单的。客户端在父锁znode下创建一个新的序列的临时znode。然后,客户端获取锁节点的子节点,并在锁节点上设置观察。如果客户端创建的子znode具有最低的序列号,那么就获得了锁,并且它可以对被锁保护的资源执行所需的任何操作。如果它所创建的子znode没有最低的序列号,那么等待观察触发一个watch事件,然后执行相同的逻辑,即获取子节点,设置观察,并通过最低的序列号检查是否获取锁。客户端不断继续这个过程,直到获得锁。

虽然这听起来不太糟糕,但也有一些潜在的陷阱。首先,如果在znode创建过程中出现局部故障(例如由于连接断了),客户端如何知道它成功地创建了子节点?解决方案是将客户端 ZooKeeper session id嵌入到子节点名中,例如child-<sessionId>-;保留相同session(以及session ID)的被故障转移的客户端可以通过在子节点中查找它的session ID很容易地确定子znode是否创建。其次,在我们之前的算法中,每个客户端都在父锁节点上设置观察。但是,这有可能产生“羊群效应”——如果每个客户端都在监视父节点,那么在对子节点进行任何更改时,每个客户端都会收到通知,而不管客户端是否能够拥有该锁。如果有一小部分客户端,这可能无关紧要,但如果有很多个,它就有可能导致网络流量激增。例如,拥有child-9的客户端只需要观察它前面的子节点,它很可能是child-8,但如果第八个子节点莫名其妙地死了,则可能是更早的子节点。然后,通知只发送给实际上可以获取锁的客户端。

对我们来说,幸运的是, ZooKeeper在contrib 模块中附带了一个名为WriteLock的锁“秘方”。WriteLock使用上述算法实现了分布式锁,并考虑了局部故障和羊群效应。它通过LockListener实例使用了异步回调模型,当锁被获取时,它的lockAcquired 方法被调用,当锁释放时调用 lockReleased 方法。我们可以通过阻塞来构建一个同步锁,直到获得锁。清单6显示了如何在调用锁获取的方法之前使用 CountDownLatch来阻塞。(博客样例代码在GitHub上https://github.com/sleberknight/zookeeper-samples)

清单6-用WriteLock创建BlockingWriteLock

public class BlockingWriteLock {
  private String path;
  private WriteLock writeLock;
  private CountDownLatch signal = new CountDownLatch(1);
  public BlockingWriteLock(ZooKeeper zookeeper,
          String path, List<ACL> acls) {
    this.path = path;
    this.writeLock =
        new WriteLock(zookeeper, path, acls, new SyncLockListener());
  }
  public void lock() throws InterruptedException, KeeperException {
    writeLock.lock();
    signal.await();
  }
  public void unlock() {
    writeLock.unlock();
  }
  class SyncLockListener implements LockListener {
    @Override public void lockAcquired() {
      signal.countDown();
    }
    @Override public void lockReleased() { /* ignored */ }
  }
}

你可以使用清单7中的BlockingWriteLock

清单7-使用 BlockingWriteLock

BlockingWriteLock lock =
  new BlockingWriteLock(zooKeeper, path, ZooDefs.Ids.OPEN_ACL_UNSAFE);
try {
  lock.lock();
  // do something while we own the lock
} catch (Exception ex) {
  // handle appropriately
} finally {
  lock.unlock();
}

您可以更进一步,包装try/catch/finally,并创建一个实现接口的命令类。例如,您可以创建一个DistributedLockOperationExecutor类实现一个withLock方法,以DistributedLockOperation实例作为参数,如清单8所示。

清单8 - 包装 BlockingWriteLock try/catch/finally logic

DistributedLockOperationExecutor executor =
  new DistributedLockOperationExecutor(zooKeeper);
executor.withLock(lockPath, ZooDefs.Ids.OPEN_ACL_UNSAFE,
  new DistributedLockOperation() {
    @Override public Object execute() {
      // do something while we have the lock
    }
  });

在DistributedLockOperationExecutor 包装try / catch / finally的优点是,当你调用withLock,就可以消除样板代码,而且你不可能忘记释放锁。

第5部分结论

在ZooKeeper第5篇博客中,您实现了分布式锁,并看到了一些潜在的问题,比如连接丢失的局部故障,以及“羊群效应”。我们把我们最初的分布式锁理了一下,使用DistributedLockOperationExecutor和DistributedLockOperation实现同步,从而确保合适的连接处理和锁释放。

在下一个(也是最后一个)博客中,我们将简要介绍管理和优化 ZooKeeper,并介绍Apache Curator框架,最后总结我们学过的内容。

参考资料

博客源代码,https://github.com/sleberknight/zookeeper-samples

ZooKeeper演示稿,http://www.slideshare.net/scottleber/apache-zookeeper

ZooKeeper官网,http://zookeeper.apache.org/

再看分布式锁构建:使用Curator InterProcessMutex

去年夏天,我写了一系列的博客,介绍了Apache ZooKeeper,这是一种分布式的协调服务,应用于Hadoop、HBase和Storm等多个开源项目,用来管理机器集群。第五篇博客描述了如何使用 ZooKeeper来实现一个分布式锁。在这个博客中,我解释了分布式锁的目标是“在不同的机器上,甚至在不同的网络或者不同的数据中心之间,建立一个互斥锁。”我还提到一个重要的好处是“客户端对彼此一无所知;他们只知道他们需要使用这个锁来访问一些共享资源,除非他们拥有这个锁,否则他们不能够访问它。这个博客描述了如何使用ZooKeeper contrib模块自带的WriteLock“秘方”来创建同步的BlockingWriteLock,它使用更简单的语义,你只需调用lock()方法来获取锁,并调用unlock()释放锁。在前面的系列文章里,我们在Group Membership 示例学习了如何连接ZooKeeper,它使用Watcher和CountDownLatch 来阻塞,直到SyncConnected 事件被接收。所有这些代码都不是非常复杂,但也相当底层,特别是它要保持阻塞直到连接事件被接受,还有WriteLock秘方的实现。

在总结博客时,我提到了Curator项目,它最初是由Netflix开源的,后来由他们捐赠给Apache。 Curator wiki将Curator描述为“一组Java库,它使Apache ZooKeeper更易于使用”。在这个博客中,我们将看到如何使用 Curator来实现一个分布式锁,而不需要编写任何自己的包装器代码来获得连接或实现锁本身。在分布式锁博客中,我们看到了如何在一个持久的父锁节点中创建连续的临时子节点(例如,child-lock-node-0000000000, child-lock-node-0000000001, child-lock-node-0000000002等)。创建最小序列子节点的客户端持有该锁。我们看到了几个潜在的陷阱:首先,在局部故障的情况下,即(暂时的)连接丢失,客户端如何知道它是否成功地创建了一个子节点,以及它如何知道它创建了哪个子节点,即那个子节点的序列?我提到一个解决方案是在子节点中嵌入 ZooKeeper sessionID,这样客户端就可以很容易地识别它所创建的子节点。Jordan Zimmerman(Curator创造者)很友善地向该博客发表了评论,指出使用sessionID“不理想”,因为它“阻止了相同的ZK连接针对同一个锁在多线程下被使用”。他说,“使用GUID更好。”这就是Curator 所使用的。”

其次,我们注意到分布式锁客户端应该只看最靠前的子节点,而不是父节点,这是为了防止“羊群效应”,不需要让每一个客户端为每个子节点事件接受通知,实际上每个客户端只需要关注最前面的子节点。 Curator处理这两种情况,并拥有其他优点,比如连接到ZooKeeper的重试策略。闲话少叙,让我们看看如何在ZooKeeper中使用分布式锁。

首先,我们需要获得一个CuratorFramework的实例—这是一个接口,它代表高层次抽象API,用于与ZooKeeper一起工作。它提供了自动连接管理,包括重试操作、流式API,以及一些您可以在诸如锁、队列、leader选举等分布式数据结构中使用的开箱即用的‘菜谱’,我们可以使用CuratorFrameworkFactory和我们选择的RetryPolicy。

String hosts = "host-1:2181,host-2:2181,host-3:2181";
int baseSleepTimeMills = 1000;
int maxRetries = 3;
RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMills, maxRetries);
CuratorFramework client = CuratorFrameworkFactory.newClient(hosts, retryPolicy);
client.start();

在上面的代码中,我们首先创建一个重试策略—在本例中,使用一个以1000毫秒为基准的睡眠时间,然后再进行3次重试。然后我们可以使用CuratorFrameworkFactory.newClient()来获得CuratorFramework的实例。最后,我们需要调用start()(注意,我们在处理客户端时需要调用close()。现在我们有了一个客户端实例,我们可以使用InterProcessLock的实现来创建我们的分布式锁。最简单的一种是进程间互斥锁,它是一个可重入的互斥锁,可以跨jvm运行,通过使用 ZooKeeper来控制锁。

InterProcessLock lock = new InterProcessMutex(client, lockPath);
lock.acquire();
try {
  // do work while we hold the lock
} catch (Exception ex) {
  // handle exceptions as appropriate
} finally {
  lock.release();
}

上面的代码简单地为特定的锁路径(lockPath)创建一个进程互斥锁,获取锁,执行一些操作,然后释放锁。在这种情况下,acquire()将阻塞,直到锁可用为止。在许多情况下,无限期地阻塞是不好的,Curator提供了一个重载版本的acquire(),它需要等待锁的最大时间,如果锁在时间限制内获得就返回true,否则返回false。

InterProcessLock lock = new InterProcessMutex(client, lockPath);
if (lock.acquire(waitTimeSeconds, TimeUnit.SECONDS)) {
  try {
    // do work while we hold the lock
  } catch (Exception ex) {
    // handle exceptions as appropriate
  } finally {
    lock.release();
  }
} else {
  // we timed out waiting for lock, handle appropriately
}

上面的代码演示了如何使用acquire的超时版本。代码稍微复杂一些,因为您需要检查锁是否被获取,或者我们是否在超时等待。不管您使用的是哪个版本的acquire(),您都要在finally块中使用release() 的锁。最后一件事是记住当你完成时,需要关闭客户端:

client.close();

这基本上就是使用Curator的InterProcessMutex实现一个分布式锁的内容。连接处理管理、局部故障、“羊群效应”、自动重试等所有复杂特性,都由高层次的Curator API处理。套用Stu Halloway的话说,你至少应该理解你所从事工作层次的往下一层—在这种情况下,你应该对ZooKeeper的工作原理和分布式计算的潜在问题有一个很好的理解。但话说回来,请继续使用 Curator,在更高的抽象层次上工作,并享受Netflix和雅虎(它创造了ZooKeeper)的所有分布式计算体验的好处。最后,2014年新年快乐!(java达人:这篇文章是2014年写的,好的技术文章就是这样,具有长久的价值)

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2017-10-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 java达人 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档