前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >redisson分布式锁实现原理

redisson分布式锁实现原理

作者头像
叔牙
发布2023-09-07 09:38:28
5200
发布2023-09-07 09:38:28
举报

一、基于redisson实现分布式锁使用

Redisson是一个使用Java编写的开源库,它提供了对Redis数据库的访问和操作的封装,并在此基础上提供了各种分布式功能,包括分布式锁。

Redisson的分布式锁是基于Redis的原子性操作来实现的,它提供了简单且易于使用的API,可以在分布式环境中实现高效的分布式锁管理。

1.引入依赖

引入redis和redisson相关依赖:

代码语言:javascript
复制
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
</dependency>
2.编写配置

编写声明RedissonClient配置,server类型可以选ClusterServers,MasterSlaveServers,ReplicatedServers,SentinelServers和SingleServer,此处使用的server类型选单体redis:

代码语言:javascript
复制
@Configuration
public class RedissonConfiguration {
    @Value("${spring.redis.host}")
    private String host;
    @Value("${spring.redis.port}")
    private String port;
    @Value("${spring.redis.password:}")
    private String password;
    private int database = 0;
    @Bean
    public RedissonClient redisson() {
        String address = "redis://" + host + ":" + port;
        Config config = new Config();
        config.useSingleServer()
                .setAddress(address)
                .setPassword(password)
                .setDatabase(database);
        return Redisson.create(config);
    }
}
3.使用分布式锁

注入RedissonClient然后获取锁,加锁后进行独占业务操作,最后释放锁。

代码语言:javascript
复制
@Service
@Slf4j
public class TestRLock {
    @Resource
    private RedissonClient redissonClient;


    public void doSomething(String orderId) {
        RLock lock = redissonClient.getLock("place_order:" + orderId);
        try {
            if(lock.tryLock(5,10, TimeUnit.SECONDS)) {
                this.doBuzzExclusive(orderId);
            }
        } catch (Exception e) {
            log.error("occur error;orderId={}",orderId,e);
        } finally {
            //锁被持有,并且被当前线程持有
            if (lock.isLocked() && lock.isHeldByCurrentThread()) {
                lock.unlock();
            } 
        }
    }
    /**
     * 模拟独占处理
     * @param orderId
     */
    private void doBuzzExclusive(String orderId) {
        // TODO: do business exclusive
    }
}

这样就实现分布式锁对竞态资源的操作控制。

二、redisson分布式锁原理

1.建立连接

在Redisson中,Netty被用作底层的网络通信框架。它提供了高性能、异步非阻塞的网络通信能力,使得Redisson可以与Redis服务器进行快速、可靠的通信。

在使用Redisson创建RedissonClient实例时,它会自动初始化并启动Netty客户端,用于与Redis服务器建立连接。

从前边的分布式锁使用过程可以看出,RLock是由RedissonClient创建,那么与redis的连接交互也是由RedissonClient来实现,我们从创建RedissonClient过程看一下redisson如何与redis建立连接的。

代码语言:javascript
复制
public static RedissonClient create(Config config) {
    return new Redisson(config);
}

然后调用Redisson构造函数创建:

代码语言:javascript
复制
protected Redisson(Config config) {
    this.config = config;
    Config configCopy = new Config(config);


    connectionManager = ConfigSupport.createConnectionManager(configCopy);
    RedissonObjectBuilder objectBuilder = null;
    if (config.isReferenceEnabled()) {
        objectBuilder = new RedissonObjectBuilder(this);
    }
    commandExecutor = new CommandSyncService(connectionManager, objectBuilder);
    evictionScheduler = new EvictionScheduler(commandExecutor);
    writeBehindService = new WriteBehindService(commandExecutor);
}

这里会复制一份配置出来,然后创建连接管理器、命令执行器、定期定出调度、以及异步写服务。

此处主要关注命令执行器和连接管理器,此处用的是同步命令执行器,当然也有其他实现比如CommandBatchService批量执行器。

然后再看下创建连接管理器:

代码语言:javascript
复制
public static ConnectionManager createConnectionManager(Config configCopy) {
    UUID id = UUID.randomUUID();
    if (configCopy.getMasterSlaveServersConfig() != null) {
        validate(configCopy.getMasterSlaveServersConfig());
        return new MasterSlaveConnectionManager(configCopy.getMasterSlaveServersConfig(), configCopy, id);
    } else if (configCopy.getSingleServerConfig() != null) {
        validate(configCopy.getSingleServerConfig());
        return new SingleConnectionManager(configCopy.getSingleServerConfig(), configCopy, id);
    } else if (configCopy.getSentinelServersConfig() != null) {
        validate(configCopy.getSentinelServersConfig());
        return new SentinelConnectionManager(configCopy.getSentinelServersConfig(), configCopy, id);
    } else if (configCopy.getClusterServersConfig() != null) {
        validate(configCopy.getClusterServersConfig());
        return new ClusterConnectionManager(configCopy.getClusterServersConfig(), configCopy, id);
    } else if (configCopy.getReplicatedServersConfig() != null) {
        validate(configCopy.getReplicatedServersConfig());
        return new ReplicatedConnectionManager(configCopy.getReplicatedServersConfig(), configCopy, id);
    } else if (configCopy.getConnectionManager() != null) {
        return configCopy.getConnectionManager();
    }else {
        throw new IllegalArgumentException("server(s) address(es) not defined!");
    }
}

前边使用的是SingleServer,看一下SingleConnectionManager创建流程:

代码语言:javascript
复制
public SingleConnectionManager(SingleServerConfig cfg, Config config, UUID id) {
    super(create(cfg), config, id);
}

SingleConnectionManager继承了MasterSlaveConnectionManager,会调用父类构造器:

代码语言:javascript
复制
public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config, UUID id) {
    this(config, id);
    this.config = cfg;


    if (cfg.getSlaveAddresses().isEmpty()
            && (cfg.getReadMode() == ReadMode.SLAVE || cfg.getReadMode() == ReadMode.MASTER_SLAVE)) {
        throw new IllegalArgumentException("Slaves aren't defined. readMode can't be SLAVE or MASTER_SLAVE");
    }


    initTimer(cfg);
    initSingleEntry();
}

initTimer会创建空闲连接监听管理以及发布订阅管理器,然后调用initSingleEntry初始化单机客户端。

代码语言:javascript
复制
protected void initSingleEntry() {
    try {
        if (config.checkSkipSlavesInit()) {
            masterSlaveEntry = new SingleEntry(this, config);
        } else {
            masterSlaveEntry = new MasterSlaveEntry(this, config);
        }
        CompletableFuture<RedisClient> masterFuture = masterSlaveEntry.setupMasterEntry(new RedisURI(config.getMasterAddress()));
        masterFuture.join();


        //省略...
        startDNSMonitoring(masterFuture.getNow(null));
    } catch (Exception e) {
        //省略...
    }
}

创建SingleEntry,然后调用setupMasterEntry方法设置主节点连接,并且会调用startDNSMonitoring方法开启线程监听ip是否发生变成,如果变成会重新连接。继续看setupMasterEntry方法:

代码语言:javascript
复制
public CompletableFuture<RedisClient> setupMasterEntry(RedisURI address, String sslHostname) {
    RedisClient client = connectionManager.createClient(NodeType.MASTER, address, sslHostname);
    return setupMasterEntry(client);
}

此处会先创建redis客户端,然后调用setupMasterEntry方法设置主节点连接;先看一下创建RedisClient:

代码语言:javascript
复制
private RedisClient(RedisClientConfig config) {
    RedisClientConfig copy = new RedisClientConfig(config);
    //省略...
    channels = new DefaultChannelGroup(copy.getGroup().next());
    bootstrap = createBootstrap(copy, Type.PLAIN);
    pubSubBootstrap = createBootstrap(copy, Type.PUBSUB);
    //省略...
}

这里基本上都是netty启动器的相关设置和前置准备,可以看一下创建netty客户端启动器的操作:

代码语言:javascript
复制
private Bootstrap createBootstrap(RedisClientConfig config, Type type) {
    Bootstrap bootstrap = new Bootstrap()
                    .resolver(config.getResolverGroup())
                    .channel(config.getSocketChannelClass())
                    .group(config.getGroup());


    bootstrap.handler(new RedisChannelInitializer(bootstrap, config, this, channels, type));
    bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout());
    bootstrap.option(ChannelOption.SO_KEEPALIVE, config.isKeepAlive());
    bootstrap.option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay());
    config.getNettyHook().afterBoostrapInitialization(bootstrap);
    return bootstrap;
}

然后继续看setupMasterEntry做了什么事情.

代码语言:javascript
复制
private CompletableFuture<RedisClient> setupMasterEntry(RedisClient client) {
    CompletableFuture<InetSocketAddress> addrFuture = client.resolveAddr();
    return addrFuture.thenCompose(res -> {
        masterEntry = new ClientConnectionsEntry(
                client,
                config.getMasterConnectionMinimumIdleSize(),
                config.getMasterConnectionPoolSize(),
                config.getSubscriptionConnectionMinimumIdleSize(),
                config.getSubscriptionConnectionPoolSize(),
                connectionManager,
                NodeType.MASTER);
        //省略...
        CompletableFuture<Void> writeFuture = writeConnectionPool.add(masterEntry);
        //省略...
        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
    }).whenComplete((r, e) -> {
        if (e != null) {
            client.shutdownAsync();
        }
    }).thenApply(r -> client);
}

创建连接条目,并添加到连接池中。继续看添加到连接池操作:

代码语言:javascript
复制
public CompletableFuture<Void> add(ClientConnectionsEntry entry) {
    CompletableFuture<Void> promise = initConnections(entry, true);
    return promise.thenAccept(r -> {
        entries.add(entry);
    });
}

这里做了初始化连接操作,然后添加到连接池的队列中,接着看一下initConnections初始化连接:

代码语言:javascript
复制
private CompletableFuture<Void> initConnections(ClientConnectionsEntry entry, boolean checkFreezed) {
    //省略...
    int startAmount = Math.min(5, minimumIdleSize);
    AtomicInteger requests = new AtomicInteger(startAmount);
    for (int i = 0; i < startAmount; i++) {
        createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections);
    }
    return initPromise;
}

省略掉中间一些调用链,最终会调用到RedisClient的connectAsync方法:

代码语言:javascript
复制
public RFuture<RedisConnection> connectAsync() {
    CompletableFuture<InetSocketAddress> addrFuture = resolveAddr();
    CompletableFuture<RedisConnection> f = addrFuture.thenCompose(res -> {
        CompletableFuture<RedisConnection> r = new CompletableFuture<>();
        ChannelFuture channelFuture = bootstrap.connect(res);
        channelFuture.addListener(new ChannelFutureListener() {
          //省略...
        });
        return r;
    });
    return new CompletableFutureWrapper<>(f);
}

此处就用使用前面创建的Bootstrap进行连接操作,当然这里是初始化连接到连接池,如果并发比较大,连接池中初始连接数不够用,会在发起请求的时候创建新的连接。

2.加锁

加锁会先调用RedissonClient创建锁对象。

代码语言:javascript
复制
public RLock getLock(String name) {
    return new RedissonLock(commandExecutor, name);
}

然后创建RedissonLock:

代码语言:javascript
复制
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
    super(commandExecutor, name);
    this.commandExecutor = commandExecutor;
    this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
    this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}

调用父类构造函数,指定执行器、锁释放时间以及发布订阅组件。

然后继续看加锁逻辑,这里加锁我们使用tryLock并指定了等待时间、释放时间:

代码语言:javascript
复制
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long time = unit.toMillis(waitTime);
    long current = System.currentTimeMillis();
    long threadId = Thread.currentThread().getId();
    // 1.尝试获取锁,如果ttl返回null,代表加锁成功
    Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) {
        return true;
    }
    time -= System.currentTimeMillis() - current;
    // 申请锁的耗时如果大于等于最大等待时间,则申请锁失败.
    if (time <= 0) {
        acquireFailed(waitTime, unit, threadId);
        return false;
    }
    current = System.currentTimeMillis();
    /**
   * 2.订阅锁释放事件,并通过 await 方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题:
   * 基于信息量,当锁被其它资源占用时,当前线程通过 Redis 的 channel 订阅锁的释放事件,一旦锁释放会发消息通知待等待的线程进行竞争.
   *
   * 当 this.await 返回 false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败.
   * 当 this.await 返回 true,进入循环尝试获取锁.
   */
    CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
    try {
        subscribeFuture.get(time, TimeUnit.MILLISECONDS);
    } catch (ExecutionException | TimeoutException e) {
        if (!subscribeFuture.cancel(false)) {
            subscribeFuture.whenComplete((res, ex) -> {
                if (ex == null) {
                    unsubscribe(res, threadId);
                }
            });
        }
        acquireFailed(waitTime, unit, threadId);
        return false;
    }
    try {
        time -= System.currentTimeMillis() - current;
        // 计算获取锁的总耗时,如果大于等于最大等待时间,则获取锁失败.
        if (time <= 0) {
            acquireFailed(waitTime, unit, threadId);
            return false;
        }
        /**
       * 3.收到锁释放的信号后,在最大等待时间之内,循环一次接着一次的尝试获取锁
      * 获取锁成功,则立马返回 true,
       * 若在最大等待时间之内还没获取到锁,则认为获取锁失败,返回 false 结束循环
       */
        while (true) {
            long currentTime = System.currentTimeMillis();
            ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                return true;
            }


            time -= System.currentTimeMillis() - currentTime;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }


            // waiting for message
            currentTime = System.currentTimeMillis();
            if (ttl >= 0 && ttl < time) {
                commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
            }


            time -= System.currentTimeMillis() - currentTime;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
        }
    } finally {
        //无论是否获得锁,都要取消订阅解锁消息
        unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
    }
}

上述代码的核心逻辑是:

  • 尝试获取锁,如果获取成功则返回调用
  • 如果超过了等待时间,则返回获取失败
  • 订阅锁释放事件,并通过await方法阻塞等待锁释放,基于信号量,当锁被其它资源占用时,当前线程通过 Redis 的 channel 订阅锁的释放事件,一旦锁释放会发消息通知待等待的线程进行竞争获取锁
  • 收到锁释放的信号后,在最大等待时间之内,循环一次接着一次的尝试获取锁,获取锁成功,则返回true,若在最大等待时间之内还没获取到锁,则认为获取锁失败,返回false结束循环
  • 最后无论是否获得锁,都要取消订阅解锁消息,不再参与锁获取和竞争

尝试获取锁会调用tryAcquire方法,看一下实现:

代码语言:javascript
复制
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}

继续调用tryAcquireAsync方法,并同步获取其返回:

代码语言:javascript
复制
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture<Long> ttlRemainingFuture;
    if (leaseTime > 0) {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
    CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
        // lock acquired
        if (ttlRemaining == null) {
            if (leaseTime > 0) {
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else {
                scheduleExpirationRenewal(threadId);
            }
        }
        return ttlRemaining;
    });
    return new CompletableFutureWrapper<>(f);
}

如果传入锁释放时间且大于零,使用用户传入的释放时间,否则使用默认的释放时间30秒,然后调用tryLockInnerAsync获取锁并返回中心化节点数据的ttl时间。

如果用户传入了leaseTime就不会开启看门狗机制实现自动续期,如果没有传入则开启看门口续期机制。

继续看tryLockInnerAsync方法实现:

代码语言:javascript
复制
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    "return redis.call('pttl', KEYS[1]);",
            Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

为了保证操作的原子性,这里使用了lua脚本来操作redis,执行脚本时key是加锁的名称,ARGV分别是释放时间和线程信息。从脚本内容可以看出,锁在redis中的数据结构是hash,外层key存储的是锁的名称,内部field和value存储的是加锁客户端线程信息。脚本含义是:

  • 如果hash不存在,则直接放入加锁客户端信息并设置失效时间返回
  • 如果hash中存在加锁客户端的信息,则value加1实现重入逻辑,并设置过期时间返回
  • 否则竞争加锁失败,返回锁对应hash的过期时间

然后调用evalWriteAsync执行lua脚本:

代码语言:javascript
复制
protected <T> RFuture<T> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
    MasterSlaveEntry entry = commandExecutor.getConnectionManager().getEntry(getRawName());
    int availableSlaves = entry.getAvailableSlaves();
    CommandBatchService executorService = createCommandBatchService(availableSlaves);
    RFuture<T> result = executorService.evalWriteAsync(key, codec, evalCommandType, script, keys, params);
    if (commandExecutor instanceof CommandBatchService) {
        return result;
    }
    RFuture<BatchResult<?>> future = executorService.executeAsync();
    //省略...
    return new CompletableFutureWrapper<>(f);
}

调用evalWriteAsync执行命令,调用executeAsync等待批处理任务完成,并获取任务的结果。继续看evalWriteAsync实现:

代码语言:javascript
复制
private <T, R> RFuture<R> evalAsync(NodeSource nodeSource, boolean readOnlyMode, Codec codec, RedisCommand<T> evalCommandType,
                                    String script, List<Object> keys, boolean noRetry, Object... params) {
    if (isEvalCacheActive() && evalCommandType.getName().equals("EVAL")) {
      //省略...
        RedisExecutor<T, R> executor = new RedisExecutor<>(readOnlyMode, nodeSource, codec, cmd,
                                                    args.toArray(), promise, false,
                                                    connectionManager, objectBuilder, referenceType, noRetry);
        executor.execute();


        //省略...
        return new CompletableFutureWrapper<>(mainPromise);
    }
    
    List<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
    args.add(script);
    args.add(keys.size());
    args.addAll(keys);
    args.addAll(Arrays.asList(params));
    return async(readOnlyMode, nodeSource, codec, evalCommandType, args.toArray(), false, noRetry);
}

是否开启了脚本缓存走不同的逻辑,但是都是构造RedisExecutor并执行execute:

代码语言:javascript
复制
public void execute() {
  //省略...
    CompletableFuture<RedisConnection> connectionFuture = getConnection().toCompletableFuture();


    //省略...


    connectionFuture.whenComplete((connection, e) -> {
      //省略...
        sendCommand(attemptPromise, connection);
        //省略...
    });


    attemptPromise.whenComplete((r, e) -> {
        releaseConnection(attemptPromise, connectionFuture);


        checkAttemptPromise(attemptPromise, connectionFuture);
    });
}

最终会走到RedisConnection的send方法:

代码语言:javascript
复制
public <T, R> ChannelFuture send(CommandData<T, R> data) {
    return channel.writeAndFlush(data);
}

这里就是调用netty客户端使用channel发送请求到redis服务端了。执行完成,调用方就根据tryLock返回结果感知到是否加锁成功,然后执行下一步动作。

整个加锁执行链路时序如下:

当然redisson分布式锁还有其他实现,RedissonMultiLock和RedissonSpinLock:

  • RedissonMultiLock:是一种同时获取多个锁的分布式锁。可以将多个RedissonLock对象传递给RedissonMultiLock构造函数,然后通过调用lock()方法一次性获取所有的锁释放锁时,需要调用unlock()方法对每个锁进行解锁
  • RedissonSpinLock:是一种自旋锁,采用的是非阻塞式锁的方式。当一个线程获取到该锁后,如果其他线程尝试获取同一个锁,它们将反复自旋(忙等待)直到持有该锁的线程释放锁。自旋锁适用于锁竞争的情况下,短暂的锁竞争下,自旋锁的性能优于阻塞式锁

对于这两种锁实现不再展开分析,逻辑和原理主链路基本相似。

3.看门狗续期

获取锁的流程中,tryAcquireAsync中有一段代码:

代码语言:javascript
复制
CompletionStage<Boolean> f = acquiredFuture.thenApply(acquired -> {
    // lock acquired
    if (acquired) {
        if (leaseTime > 0) {
            internalLockLeaseTime = unit.toMillis(leaseTime);
        } else {
            scheduleExpirationRenewal(threadId);
        }
    }
    return acquired;
});

ttlRemaining不为空代表加锁成功,如果用户指定了锁释放时间就返回调用,否则就开启续期能力,也就是看门狗机制。

题外话,在面试中我经常问别人,redisson分布式锁如何解决续期问题的,有相当一部分人上来就说通过看门狗机制,我反问所有场景和用法都会开启看门狗机制吗?有人很武断的说会,也有人蚌埠住了。

言归正传,通过scheduleExpirationRenewal来具体看一下看门狗机制是什么样子的。

代码语言:javascript
复制
protected void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        try {
            renewExpiration();
        } finally {
            if (Thread.currentThread().isInterrupted()) {
                cancelExpirationRenewal(threadId);
            }
        }
    }
}

创建ExpirationEntry对象,存放线程续期信息,如果已经存在则已经存在与当前对象相同名称的续约信息,将当前线程ID加入到oldEntry中,表示需要更新该续约信息;否则调用renewExpiration方法操作续期,如何线程被中断则取消续期。主要看一下renewExpiration实现:

代码语言:javascript
复制
private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            
            CompletionStage<Boolean> future = renewExpirationAsync(threadId);
            //省略...
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    
    ee.setTimeout(task);
}

该方法做的事情就是每internalLockLeaseTime的1/3时间执行续期动作,internalLockLeaseTime默认是30秒,可以修改,并且延迟操作是通过netty的时间轮实现,每一次续期操作都会触发下一次延迟。

接着看一下renewExpirationAsync的实现:

代码语言:javascript
复制
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return 0;",
            Collections.singletonList(getRawName()),
            internalLockLeaseTime, getLockName(threadId));
}

同样是通过lua脚本操作redis,检查加锁的客户端线程是否存在,如果存在则通过pexpire命令重新设置过期时间,从而达到续期作用,并返回1(代表续期成功),否则返回0(续期失败)。

看门狗续期操作及流程大致如下:

4.释放锁

释放锁会调用RedissonLock的unlock方法操作,看一下unlock:

代码语言:javascript
复制
public void unlock() {
    try {
        get(unlockAsync(Thread.currentThread().getId()));
    } catch (RedisException e) {
        if (e.getCause() instanceof IllegalMonitorStateException) {
            throw (IllegalMonitorStateException) e.getCause();
        } else {
            throw e;
        }
    }
}

然后调用unlockAsync方法:

代码语言:javascript
复制
public RFuture<Void> unlockAsync(long threadId) {
    RFuture<Boolean> future = unlockInnerAsync(threadId);
    CompletionStage<Void> f = future.handle((opStatus, e) -> {
        cancelExpirationRenewal(threadId);
        //省略...
        return null;
    });
    return new CompletableFutureWrapper<>(f);
}

根据当前线程id释放锁,并且取消看门狗续期能力,主要看unlockInnerAsync方法释放锁。

代码语言:javascript
复制
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                    "end; " +
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                    "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                    "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return nil;",
            Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

也是通过lua脚本来操作redis实现释放锁,上述脚本主要做了以下操作:

  • 如果当前线程持有锁资源,那么减少hash中field的value值
  • 如果当前线程持有的hash中field的value值大于0,那么重新设置过期时间,从而支持重入能力
  • 如果当前线程持有的hash中field的value值大于0,那么需要释放锁,通过publish命令发布释放事件通知,告诉其他竞争者去抢占锁资源

这样就释放了锁资源,并且会通知其他订阅了事件的加锁参与者去尝试加锁。

三、分布式锁考虑的问题

1.续期问题

锁续期是分布式锁一定要考虑的问题,锁时间过短会导致锁释放了业务还在执行,但是锁又被其他客户端获取,从而导致数据不一致问题;锁时间过长又会导致其他客户端长时间等待,造成性能和体验问题。续期主要考虑以下两点:

  • 自动续期:所持有过程中,会处理比较复杂的业务,需要一种机制在业务可能在释放之前处理不完的情况下,让业务无感知实现自动续期,而不影响业务的执行。
  • 最大续期次数:互联网业务相对比较复杂多变,在服务依赖的资源或者服务出现短暂抖动或者不可用的情况下,可能短时间的续期解决不了问题,而无限制的续期又会影响的整个服务的性能或者拖垮服务,需要设置相对合理的策略,来限制最大续期次数和时间,从而来保证服务更高性能的表现。
2.可用性

可用性更多的依赖中心化资源的稳定性,redisson分布式锁是基于redis实现的,那么如果redis是单机模式,redisson做再大的努力也是徒劳。对于主从模式,redisson加锁肯定是操作的主节点,主从同步默认是异步的,在主节点加锁成功后,突然宕机,加锁数据尚未同步到从节点,此时从节点晋升为主节点,那么新的主节点不具有redisson加锁数据,新的请求来了之后会重新加锁,从而会出现问题。

对于集群模式下使用Redisson进行分布式锁时,至少要有半数以上的Redis节点在获取锁时才会视为成功,这个机制可以保证在网络分区或部分Redis节点故障的情况下,分布式锁仍然能够正常工作,避免因为单点故障导致整个系统的不可用性。

3.可重入性

Redisson分布式锁是支持可重入的,也就是说同一个线程可以多次获取同一个锁而不会造成死锁。当一个线程已经获取了一个分布式锁,并且没有释放锁之前,它可以再次请求获取相同名称的锁。在这种情况下,Redisson会维护一个计数器来记录锁的重入次数。每次成功获取锁时,计数器会加一;在释放锁时,计数器会相应地减一。只有当线程释放锁的次数与获取锁的次数相匹配(计数器为0),锁才会完全释放,其他线程才能获得该锁。这样可以保证同一个线程在持有锁的情况下,可以多次获取锁而不会被阻塞或产生死锁。

可重入性是Redisson分布式锁的一个重要特性,它使得在复杂的业务逻辑中能够灵活地使用锁,避免了线程自身因为重入而产生的问题。需要注意的是,重入次数计数器是基于线程级别的,不同线程之间的计数器是独立的,因此不能用于跨线程的重入。

4.死锁检测与恢复

Redisson分布式锁提供了死锁检测与恢复的机制,以帮助应对潜在的死锁情况。

首先,Redisson会为每个获取到的分布式锁设置一个过期时间(expire)。这个过期时间是最大持有锁的时间,确保即使持有锁的线程发生异常或没有正确释放锁,锁也能在一段时间后自动释放,避免长时间的死锁。

其次,Redisson引入异步续期(async renewal)机制。在获取锁成功后,Redisson会使用后台线程定期自动续期(renewal)锁的过期时间,以防止持有锁的线程因为某些原因没有及时续期导致锁的过期。这样可以减少因为网络延迟、GC暂停等问题而造成的误解锁。

此外,Redisson还提供了针对死锁的自动解锁(auto-unlock)功能。当一个线程持有锁的时间超过指定的阈值后,Redisson会自动解锁该锁,并触发一个解锁事件。通过监听解锁事件,可以实现对死锁的检测和恢复操作,例如记录日志、重试获取锁等。

需要注意的是,无法完全消除死锁的发生,因为死锁是由于复杂的并发环境和业务逻辑导致的。但是通过上述的机制,Redisson能够在大部分情况下检测到死锁,并提供自动解锁的功能,以减少死锁对系统的影响。要充分利用Redisson的死锁检测与恢复机制,建议合理设置过期时间、异步续期和自动解锁的阈值,并结合监控和日志来及时发现和解决潜在的死锁问题。

四、参考

https://github.com/redisson/redisson/wiki/8.-Distributed-locks-and-synchronizers

https://zhuanlan.zhihu.com/p/135864820

https://zhuanlan.zhihu.com/p/230433777

https://www.cnblogs.com/Leo_wl/p/16600565.html

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2023-07-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 PersistentCoder 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.引入依赖
  • 2.编写配置
  • 3.使用分布式锁
  • 二、redisson分布式锁原理
    • 1.建立连接
      • 2.加锁
        • 3.看门狗续期
          • 4.释放锁
          • 三、分布式锁考虑的问题
            • 1.续期问题
              • 2.可用性
                • 3.可重入性
                  • 4.死锁检测与恢复
                  • 四、参考
                  相关产品与服务
                  云数据库 Redis
                  腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档