基于Consul的分布式信号量实现

在之前《基于Consul的分布式锁实现》一文中我们介绍如何基于Consul的KV存储来实现分布式互斥锁。本文将继续讨论基于Consul的分布式锁实现。信号量是我们在实现并发控制时会经常使用的手段,主要用来限制同时并发线程或进程的数量,比如:Zuul默认情况下就使用信号量来限制每个路由的并发数,以实现不同路由间的资源隔离。

信号量(Semaphore),有时被称为信号灯,是在多线程环境下使用的一种设施,是可以用来保证两个或多个关键代码段不被并发调用。在进入一个关键代码段之前,线程必须获取一个信号量;一旦该关键代码段完成了,那么该线程必须释放信号量。其它想进入该关键代码段的线程必须等待直到第一个线程释放信号量。为了完成这个过程,需要创建一个信号量VI,然后将Acquire Semaphore VI以及Release Semaphore VI分别放置在每个关键代码段的首末端,确认这些信号量VI引用的是初始创建的信号量。如在这个停车场系统中,车位是公共资源,每辆车好比一个线程,看门人起的就是信号量的作用。

实现思路

- 信号量存储:semaphore/key

- acquired操作:

- 创建session

- 锁定key竞争者:semaphore/key/session

- 查询信号量:semaphore/key/.lock,可以获得如下内容(如果是第一次创建信号量,将获取不到,这个时候就直接创建)

- 如果持有者已达上限,返回false,如果阻塞模式,就继续尝试acquired操作

- 如果持有者未达上限,更新semaphore/key/.lock的内容,将当前线程的sessionId加入到holders中。注意:更新的时候需要设置cas,它的值是“查询信号量”步骤获得的“ModifyIndex”值,该值用于保证更新操作的基础没有被其他竞争者更新。如果更新成功,就开始执行具体逻辑。如果没有更新成功,说明有其他竞争者抢占了资源,返回false,阻塞模式下继续尝试acquired操作

- release操作:

- 从semaphore/key/.lock的holders中移除当前sessionId

- 删除semaphore/key/session

- 删除当前的session

流程图

代码实现

public class Semaphore {
    private Logger logger = Logger.getLogger(getClass());
    private static final String prefix = "semaphore/";  // 信号量参数前缀
    private ConsulClient consulClient;
    private int limit;
    private String keyPath;
    private String sessionId = null;
    private boolean acquired = false;
    /**
     *
     * @param consulClient consul客户端实例
     * @param limit 信号量上限值
     * @param keyPath 信号量在consul中存储的参数路径
     */
    public Semaphore(ConsulClient consulClient, int limit, String keyPath) {
        this.consulClient = consulClient;
        this.limit = limit;
        this.keyPath = prefix + keyPath;
    }
    /**
     * acquired信号量
     *
     * @param block 是否阻塞。如果为true,那么一直尝试,直到获取到该资源为止。
     * @return
     * @throws IOException
     */
    public Boolean acquired(boolean block) throws IOException {
        if(acquired) {
            logger.error(sessionId + " - Already acquired");
            throw new RuntimeException(sessionId + " - Already acquired");
        }
        // create session
        clearSession();
        this.sessionId = createSessionId("semaphore");
        logger.debug("Create session : " + sessionId);
        // add contender entry
        String contenderKey = keyPath + "/" + sessionId;
        logger.debug("contenderKey : " + contenderKey);
        PutParams putParams = new PutParams();
        putParams.setAcquireSession(sessionId);
        Boolean b = consulClient.setKVValue(contenderKey, "", putParams).getValue();
        if(!b) {
            logger.error("Failed to add contender entry : " + contenderKey + ", " + sessionId);
            throw new RuntimeException("Failed to add contender entry : " + contenderKey + ", " + sessionId);
        }
        while(true) {
            // try to take the semaphore
            String lockKey = keyPath + "/.lock";
            String lockKeyValue;
            GetValue lockKeyContent = consulClient.getKVValue(lockKey).getValue();
            if (lockKeyContent != null) {
                // lock值转换
                lockKeyValue = lockKeyContent.getValue();
                BASE64Decoder decoder = new BASE64Decoder();
                byte[] v = decoder.decodeBuffer(lockKeyValue);
                String lockKeyValueDecode = new String(v);
                logger.debug("lockKey=" + lockKey + ", lockKeyValueDecode=" + lockKeyValueDecode);
                Gson gson = new Gson();
                ContenderValue contenderValue = gson.fromJson(lockKeyValueDecode, ContenderValue.class);
                // 当前信号量已满
                if(contenderValue.getLimit() == contenderValue.getHolders().size()) {
                    logger.debug("Semaphore limited " + contenderValue.getLimit() + ", waiting...");
                    if(block) {
                        // 如果是阻塞模式,再尝试
                        try {
                            Thread.sleep(100L);
                        } catch (InterruptedException e) {
                        }
                        continue;
                    }
                    // 非阻塞模式,直接返回没有获取到信号量
                    return false;
                }
                // 信号量增加
                contenderValue.getHolders().add(sessionId);
                putParams = new PutParams();
                putParams.setCas(lockKeyContent.getModifyIndex());
                boolean c = consulClient.setKVValue(lockKey, contenderValue.toString(), putParams).getValue();
                if(c) {
                    acquired = true;
                    return true;
                }
                else
                    continue;
            } else {
                // 当前信号量还没有,所以创建一个,并马上抢占一个资源
                ContenderValue contenderValue = new ContenderValue();
                contenderValue.setLimit(limit);
                contenderValue.getHolders().add(sessionId);
                putParams = new PutParams();
                putParams.setCas(0L);
                boolean c = consulClient.setKVValue(lockKey, contenderValue.toString(), putParams).getValue();
                if (c) {
                    acquired = true;
                    return true;
                }
                continue;
            }
        }
    }
    /**
     * 创建sessionId
     * @param sessionName
     * @return
     */
    public String createSessionId(String sessionName) {
        NewSession newSession = new NewSession();
        newSession.setName(sessionName);
        return consulClient.sessionCreate(newSession, null).getValue();
    }
    /**
     * 释放session、并从lock中移除当前的sessionId
     * @throws IOException
     */
    public void release() throws IOException {
        if(this.acquired) {
            // remove session from lock
            while(true) {
                String contenderKey = keyPath + "/" + sessionId;
                String lockKey = keyPath + "/.lock";
                String lockKeyValue;
                GetValue lockKeyContent = consulClient.getKVValue(lockKey).getValue();
                if (lockKeyContent != null) {
                    // lock值转换
                    lockKeyValue = lockKeyContent.getValue();
                    BASE64Decoder decoder = new BASE64Decoder();
                    byte[] v = decoder.decodeBuffer(lockKeyValue);
                    String lockKeyValueDecode = new String(v);
                    Gson gson = new Gson();
                    ContenderValue contenderValue = gson.fromJson(lockKeyValueDecode, ContenderValue.class);
                    contenderValue.getHolders().remove(sessionId);
                    PutParams putParams = new PutParams();
                    putParams.setCas(lockKeyContent.getModifyIndex());
                    consulClient.deleteKVValue(contenderKey);
                    boolean c = consulClient.setKVValue(lockKey, contenderValue.toString(), putParams).getValue();
                    if(c) {
                        break;
                    }
                }
            }
            // remove session key
        }
        this.acquired = false;
        clearSession();
    }
    public void clearSession() {
        if(sessionId != null) {
            consulClient.sessionDestroy(sessionId, null);
            sessionId = null;
        }
    }
    class ContenderValue implements Serializable {
        private Integer limit;
        private List<String> holders = new ArrayList<>();
        public Integer getLimit() {
            return limit;
        }
        public void setLimit(Integer limit) {
            this.limit = limit;
        }
        public List<String> getHolders() {
            return holders;
        }
        public void setHolders(List<String> holders) {
            this.holders = holders;
        }
        @Override
        public String toString() {
            return new Gson().toJson(this);
        }
    }
}

单元测试

下面单元测试的逻辑:通过线程的方式来模拟不同的分布式服务来获取信号量执行业务逻辑。由于信号量与简单的分布式互斥锁有所不同,它不是只限定一个线程可以操作,而是可以控制多个线程的并发,所以通过下面的单元测试,我们设置信号量为3,然后同时启动15个线程来竞争的情况,来观察分布式信号量实现的结果如何。

public class TestLock {
    private Logger logger = Logger.getLogger(getClass());
    @Test
    public void testSemaphore() throws Exception {
        new Thread(new SemaphoreRunner(1)).start();
        new Thread(new SemaphoreRunner(2)).start();
        new Thread(new SemaphoreRunner(3)).start();
        new Thread(new SemaphoreRunner(4)).start();
        new Thread(new SemaphoreRunner(5)).start();
        new Thread(new SemaphoreRunner(6)).start();
        new Thread(new SemaphoreRunner(7)).start();
        new Thread(new SemaphoreRunner(8)).start();
        new Thread(new SemaphoreRunner(9)).start();
        new Thread(new SemaphoreRunner(10)).start();
        Thread.sleep(1000000L);
    } 
}
public class SemaphoreRunner implements Runnable {
    private Logger logger = Logger.getLogger(getClass()); 
    private int flag;
    public SemaphoreRunner(int flag) {
        this.flag = flag;
    }
    @Override
    public void run() {
        Semaphore semaphore = new Semaphore(new ConsulClient(), 3, "mg-init");
        try {
            if (semaphore.acquired(true)) {
                // 获取到信号量,执行业务逻辑
                logger.info("Thread " + flag + " start!");
                Thread.sleep(new Random().nextInt(10000));
                logger.info("Thread " + flag + " end!");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                // 信号量释放、Session锁释放、Session删除
                semaphore.release();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

执行结果:

INFO  [Thread-6] SemaphoreRunner - Thread 7 start!
 INFO  [Thread-2] SemaphoreRunner - Thread 3 start!
 INFO  [Thread-7] SemaphoreRunner - Thread 8 start!
 INFO  [Thread-2] SemaphoreRunner - Thread 3 end!
 INFO  [Thread-5] SemaphoreRunner - Thread 6 start!
 INFO  [Thread-6] SemaphoreRunner - Thread 7 end!
 INFO  [Thread-9] SemaphoreRunner - Thread 10 start!
 INFO  [Thread-5] SemaphoreRunner - Thread 6 end!
 INFO  [Thread-1] SemaphoreRunner - Thread 2 start!
 INFO  [Thread-7] SemaphoreRunner - Thread 8 end!
 INFO  [Thread-10] SemaphoreRunner - Thread 11 start!
 INFO  [Thread-10] SemaphoreRunner - Thread 11 end!
 INFO  [Thread-12] SemaphoreRunner - Thread 13 start!
 INFO  [Thread-1] SemaphoreRunner - Thread 2 end!
 INFO  [Thread-3] SemaphoreRunner - Thread 4 start!
 INFO  [Thread-9] SemaphoreRunner - Thread 10 end!
 INFO  [Thread-0] SemaphoreRunner - Thread 1 start!
 INFO  [Thread-3] SemaphoreRunner - Thread 4 end!
 INFO  [Thread-14] SemaphoreRunner - Thread 15 start!
 INFO  [Thread-12] SemaphoreRunner - Thread 13 end!
 INFO  [Thread-0] SemaphoreRunner - Thread 1 end!
 INFO  [Thread-13] SemaphoreRunner - Thread 14 start!
 INFO  [Thread-11] SemaphoreRunner - Thread 12 start!
 INFO  [Thread-13] SemaphoreRunner - Thread 14 end!
 INFO  [Thread-4] SemaphoreRunner - Thread 5 start!
 INFO  [Thread-4] SemaphoreRunner - Thread 5 end!
 INFO  [Thread-8] SemaphoreRunner - Thread 9 start!
 INFO  [Thread-11] SemaphoreRunner - Thread 12 end!
 INFO  [Thread-14] SemaphoreRunner - Thread 15 end!
 INFO  [Thread-8] SemaphoreRunner - Thread 9 end!

从测试结果,我们可以发现当信号量持有者数量达到信号量上限3的时候,其他竞争者就开始进行等待了,只有当某个持有者释放信号量之后,才会有新的线程变成持有者,从而开始执行自己的业务逻辑。所以,分布式信号量可以帮助我们有效的控制同时操作某个共享资源的并发数。

优化建议与参考文档

同前文一样,这里只是做了简单的实现。线上应用还必须加入TTL的session清理以及对.lock资源中的无效holder进行清理的机制。

参考文档:

https://www.consul.io/docs/guides/semaphore.html

原文发布于微信公众号 - 程序猿DD(didispace)

原文发表时间:2017-04-19

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Java帮帮-微信公众号-技术文章全总结

Spring-IOC(2)

scope <!--Spring使用scope标签来制定bean的作用域(默认为Singleton)--> <bean id="singletonBean" c...

34210
来自专栏菩提树下的杨过

java 利用JAX-RS快速开发RESTful 服务

JAX-RS(Java API for RESTful Web Services)同样也是JSR的一部分,详细规范定义见 https://jcp.org/en/...

2847
来自专栏闻道于事

深入分析Spring Boot2,解决 java.lang.ArrayStoreException异常

将某个项目从Spring Boot1升级Spring Boot2之后出现如下报错,查了很多不同的解决方法都没有解决:

8102
来自专栏java技术学习之道

事物在Controller层的探索

1123
来自专栏Zephery

Spring中Bean

一、什么是Bean 1、Java面向对象,对象有方法和属性,那么就需要对象实例来调用方法和属性(即实例化); 2、凡是有方法或属性的类都需要实例化,这样才能具象...

2976
来自专栏光变

Spring PlaceHolder使用注意事项

对Spring Property Placeholder如何使用,以及使用过程中遇到的问题做了简单的描述。

1001
来自专栏开发与安全

linux网络编程之POSIX 共享内存和 系列函数

在前面介绍了system v 共享内存的相关知识,现在来稍微看看posix 共享内存 和系列函数。 共享内存简单来说就是一块真正的物理内存区域,可以使用一些函数...

2190
来自专栏happyJared

Spring Boot中读取配置属性的几种方式

  本文介绍Spring Boot中读取配置属性的几种方式,项目示例中用到的application.yml和application.properties定义如下...

1.3K2
来自专栏猿天地

高性能NIO框架Netty-对象传输

上篇文章高性能NIO框架Netty入门篇我们对Netty做了一个简单的介绍,并且写了一个入门的Demo,客户端往服务端发送一个字符串的消息,服务端回复一个字符串...

3108
来自专栏斑斓

在Scala项目中使用Spring Cloud

由于Scala本身属于JVM下的语言,因此它能够较好地与Java项目融合在一起。在Scala中调用Java库,基本上与在Java中调用Java库的方式是相同的(...

4675

扫码关注云+社区

领取腾讯云代金券