前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >[Redis] 分布式缓存中间件 Redis 之 分布式锁实战

[Redis] 分布式缓存中间件 Redis 之 分布式锁实战

作者头像
架构探险之道
发布2020-03-19 17:12:59
7860
发布2020-03-19 17:12:59
举报
文章被收录于专栏:架构探险之道架构探险之道

[Redis] 分布式缓存中间件 Redis 之 分布式锁实战

概要

  • Redis 环境准备
  • Redis Setnx 实现分布式锁
  • Redission 源码分析

环境准备Redis 如何实现分布式锁线程不安全单机锁分布式锁代码实现Redisson 集成和源码分析Redisson 集成源码分析 `RedissonLock`加锁解锁集群分布式锁失效判断机制总结REFERENCES更多

手机用户请横屏获取最佳阅读体验,REFERENCES中是本文参考的链接,如需要链接和更多资源,可以关注其他博客发布地址。

平台

地址

CSDN

https://blog.csdn.net/sinat_28690417

简书

https://www.jianshu.com/u/3032cc862300

个人博客

https://yiyuer.github.io/NoteBooks/


正文

环境准备

Docker Redis

  • Docker 拉取 Redis 镜像并安装。

docker pull redis

  • 启动镜像

docker run -itd --name redis-test -p 6379:6379 redis-6379

  • 进入容器

docker exec -it redis-test /bin/bash

  • 使用 redis-cli

.

Jmeter 用来模拟并发

官网

  • Linux下载后解压 sh /bin/jemeter
  • Windows 运行 /bin/jmeter.bat

Redis 如何实现分布式锁

并发场景模拟: 商店库存 stock ,每次 http 接口调用,表示下单,并进行库存的 -1 操作

项目启动默认设置库存1000

代码语言:javascript
复制
/**
 * <p>
 *
 * </p>
 *
 * @author Helios
 * @date 2020/3/14 6:41 下午
 */
@Component
@Slf4j
public class AppStartListener implements ApplicationListener<ContextRefreshedEvent> {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        if (Objects.isNull(event.getApplicationContext().getParent())) {
            initStock();
        }
    }

    private void initStock() {
        log.info("init goods stock started");
        stringRedisTemplate.opsForValue().set("stock", "1000");
    }
}

初始状态查询:库存位1000

.

线程不安全

代码语言:javascript
复制
/**
 * 下单:线程不安全
 * @return
 */
@RequestMapping("goods/v1/stock")
public String order() {
    //查询库存
    Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
    if (stock > 0) {
        stock = stock - 1;
        stringRedisTemplate.opsForValue().set("stock", stock + "");
        log.info("order success and stock -1");
    } else {
        log.error("order failed!");
    }
    return "request success" + " stock : " + stock;
}

通过 Jmeter 模拟 1s 内发起 200 次请求

.

再次查询库存

.

单机锁

代码语言:javascript
复制
//...
private Object lock = new Object();
//...

/**
 * 下单:synchronized 加锁,单机锁
 *
 * @return
 */
@RequestMapping("goods/v2/stock")
public String localOrderLock() {
    //查询库存
    Integer stock = 0;
    synchronized (lock) {
        stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        if (stock > 0) {
            stock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", stock + "");
            log.info("order success and stock -1");
        } else {
            log.error("order failed!");
        }
    }
    return "request success" + " stock : " + stock;
}

查询库存

.

分析

上面的结果,可以看出,库存扣减正常,但是这仅仅针对的是单个 JVM 中运行的实例,无法在分布式场景中实现共享资源库存 stock的访问控制。

分布式锁

Redis Setnx 命令

Redis Setnx(SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值。

.

.

代码实现

简单实现

代码语言:javascript
复制
/**
 * 下单:setnx 加锁,分布式锁
 *
 * @return
 */
@RequestMapping("goods/v3/stock")
public String setnx() {
    //redis setnx
    //Boolean stockLock = stringRedisTemplate.opsForValue().setIfAbsent(productKey, "xxx");
    //避免服务器异常,导致锁未被删除,设置超时时间
    //stringRedisTemplate.expire(productKey, 30, TimeUnit.SECONDS);
    //上面的非原子性,无法确保锁的超时时间一定会被设置
    Boolean stockLock = stringRedisTemplate.opsForValue().setIfAbsent(productKey, "xxx",30, TimeUnit.SECONDS);
    if (!stockLock) {
        return "error";
    }
    Integer stock = 0;
    //处理运行异常,避免死锁,确保系统正常运行场景下每次锁被删除
    try {
        //查询库存 redis.get
        stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        if (stock > 0) {
            // 更新库存
            stock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", stock + "");
            log.info("order success and stock -1");
        } else {
            log.error("order failed!");
        }
    } finally {
        //库存减完后删除锁,确保其他线程不被一致阻塞
        stringRedisTemplate.delete(productKey);
    }
    return "request success" + " stock : " + stock;
}

加锁逻辑和业务解耦

代码语言:javascript
复制
public interface IRedisLock {

    boolean tryLock(String key, long timeout, TimeUnit timeUnit);

    void releaseLock(String key);
}


@Component
public class RedisLockImpl implements IRedisLock {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    private ThreadLocal<String> lockLocal = new ThreadLocal<>();

    @Override
    public boolean tryLock(String key, long timeout, TimeUnit timeUnit) {
        String uuid = UUID.randomUUID().toString();
        lockLocal.set(uuid);
        boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid,timeout, timeUnit);
        return lock;
    }

    @Override
    public void releaseLock(String key) {
        if (lockLocal.get().equals(stringRedisTemplate.opsForValue().get(key))) {
            lockLocal.remove();
            stringRedisTemplate.delete(key);
        }
    }
}

/**
 * 下单:setnx 加锁,锁控制和业务逻辑解耦
 *
 * @return
 */
@RequestMapping("goods/v4/stock")
public String setnx2() {
    //业务解耦
    if (!redisLock.tryLock(productKey,30, TimeUnit.SECONDS)) {
        //此处返回 error,实际业务中一般不希望直接返回错误给用户,而是让用户等待一段时间,因此需要实现阻塞锁,见 RedisLockImpl
        return "error";
    }
    Integer stock = 0;
    //处理运行异常,避免死锁,确保系统正常运行场景下每次锁被删除
    try {
        //TODO STH
    } finally {
        //库存减完后删除锁,确保其他线程不被一致阻塞
        redisLock. releaseLock(productKey);
    }
    return "request success" + " stock : " + stock;
}

锁特性

  • 互斥性
  • 锁超时
  • 支持阻塞和非阻塞
  • 可重入性
  • 高可用
代码语言:javascript
复制
/*
 * @ProjectName: 编程学习
 * @Copyright:   2019 HangZhou Ashe Dev, Ltd. All Right Reserved.
 * @address:     https://yiyuery.github.io/NoteBooks/
 * @date:        2020/3/14 8:48 下午
 * @description: 本内容仅限于编程技术学习使用,转发请注明出处.
 */
package com.example.redis.lock.impl;

import com.example.redis.lock.IRedisLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * <p>
 *
 * </p>
 *
 * @author Helios
 * @date 2020/3/14 8:48 下午
 */
@Component
public class RedisLockImpl implements IRedisLock {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    private ThreadLocal<String> lockValLocal = new ThreadLocal<>();

    private ThreadLocal<Long> asyncThreadIDLocal = new ThreadLocal<>();

    @Override
    public boolean tryLock(String key, long timeout, TimeUnit timeUnit) {
//        String uuid = UUID.randomUUID().toString();
//        lockLocal.set(uuid);
//        boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid,timeout, timeUnit);
        //可重入实现
        boolean lock = false;
        if (Objects.isNull(lockValLocal.get())) {
            String uuid = UUID.randomUUID().toString();
            lockValLocal.set(uuid);
            lock = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, timeUnit);
            //阻塞锁
            if (!lock) {
                while (true) {
                    lock = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, timeUnit);
                    if (lock) {
                        break;
                    }
                }
                //避免锁超时时间提前结束,确保业务代码运行期间,锁不会因为超时而失效,导致线程不安全 => 业务代码执行过程中,锁异步续期(使用异步是因为要避免阻塞当前线程)
                Thread thread = new Thread(() -> {
                    while (true) {
                        try {
                            Thread.sleep(10000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        stringRedisTemplate.expire(key, timeout, timeUnit);
                    }
                });
                thread.start();
                //为删除锁时需要结束续命线程, 记录线程 ID
                asyncThreadIDLocal.set(thread.getId());
            }
        } else {
            lock = true;
        }
        return lock;
    }

    @Override
    public void releaseLock(String key) {
        if (lockValLocal.get().equals(stringRedisTemplate.opsForValue().get(key))) {
            lockValLocal.remove();
            stringRedisTemplate.delete(key);
            deleteAsyncExpireThread();
        }
    }

    /**
     * <p>
     * 结束续命线程
     * </p>
     *
     * @return
     * @author Helios
     * @date
     * @params []
     */
    private void deleteAsyncExpireThread() {
        Long threadId = asyncThreadIDLocal.get();
        Thread th = null;
        if (Objects.nonNull(threadId)) {
            asyncThreadIDLocal.remove();
            ThreadGroup group = Thread.currentThread().getThreadGroup();
            while (group != null) {
                boolean stop = false;
                Thread[] threads = new Thread[(int) (group.activeCount() * 1.2)];
                int count = group.enumerate(threads, true);
                for (int i = 0; i < count; i++) {
                    if (threadId == threads[i].getId()) {
                        th = threads[i];
                        stop = true;
                        break;
                    }
                }
                if (stop) {
                    break;
                }
                group = group.getParent();
            }
            if (Objects.nonNull(th)) {
                th.interrupt();
            }
        }
    }
}

Redisson 集成和源码分析

最新版的Redisson要求redis能够支持eval的命令,否则无法实现,即Redis要求2.6版本以上。在lua脚本中可以调用大部分的Redis命令,使用脚本的好处如下:

  • 减少网络开销:在Redis操作需求需要向Redis发送5次请求,而使用脚本功能完成同样的操作只需要发送一个请求即可,减少了网络往返时延。
  • 原子操作:Redis会将整个脚本作为一个整体执行,中间不会被其他命令插入。换句话说在编写脚本的过程中无需担心会出现竞态条件,也就无需使用事务。事务可以完成的所有功能都可以用脚本来实现。
  • 复用:客户端发送的脚本会永久存储在Redis中,这就意味着其他客户端(可以是其他语言开发的项目)可以复用这一脚本而不需要使用代码完成同样的逻辑。

Redisson 集成

增加依赖

代码语言:javascript
复制
compile 'org.redisson:redisson-spring-boot-starter:3.11.5

Redisson GitHub

简单使用

代码语言:javascript
复制
@Configuration
public class RedissonConfig {

    @Value("${spring.redis.host}")
    private String host;

    @Value("${spring.redis.port}")
    private String port;

    //@Value("${spring.redis.password}")
    private String password;

    @Bean
    public RedissonClient redissonClient() {

        Config config = new Config();
        config.useSingleServer().setAddress("redis://" + host + ":" + port);
        //.setPassword(password);
        //添加主从配置
        //config.useMasterSlaveServers().setMasterAddress("").setPassword("").addSlaveAddress(new String[]{"",""});
        return Redisson.create(config);
    }
}


@Autowired
private RedissonClient redissonClient;
//...

/**
 * 下单:setnx 加锁,锁控制和业务逻辑解耦
 *
 * @return
 */
@RequestMapping("goods/v5/stock")
public String redissonLock() {
    //Redisson Lock
    RLock lock = redisson.getLock(productKey);
    if (!lock.tryLock()) {
        return "error";
    }
    Integer stock = 0;
    //处理运行异常,避免死锁,确保系统正常运行场景下每次锁被删除
    try {
       //TODO STH
    } finally {
        //库存减完后删除锁,确保其他线程不被一致阻塞
        lock.unlock();
    }
    return "request success" + " stock : " + stock;
}

.

.

源码分析 `RedissonLock`

使用到的全局变量

  • EXPIRATION_RENEWAL_MAP:存储entryName和其过期时间,底层用的netty的PlatformDependent.newConcurrentHashMap()
  • internalLockLeaseTime:锁默认释放的时间:30 * 1000,即30秒
  • id:UUID,用作客户端的唯一标识
  • pubSub:订阅者模式,当释放锁的时候,其他客户端能够知道锁已经被释放的消息,并让队列中的第一个消费者获取锁。使用PUB/SUB消息机制的优点:减少申请锁时的等待时间、安全、 锁带有超时时间、锁的标识唯一,防止死锁 锁设计为可重入,避免死锁。
  • commandExecutor:命令执行器,异步执行器
加锁
代码语言:javascript
复制
@Override
public void lock() {
    try {
        lock(-1, null, false);
    } catch (InterruptedException e) {
        throw new IllegalStateException();
    }
}
//-- 阻塞锁
 private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        //记录当前线程 ID
        long threadId = Thread.currentThread().getId();
        //如果为空,当前线程获取锁成功,否则已经被其他客户端加锁,TTL 表示剩余存活时间
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return;
        }

        //等待释放,并订阅锁
        RFuture<RedissonLockEntry> future = subscribe(threadId);
        commandExecutor.syncSubscription(future);

        try {
            //如果没有拿到锁,循环获取:阻塞锁逻辑
            while (true) {
                 // 重新尝试获取锁
                ttl = tryAcquire(leaseTime, unit, threadId);
                // 成功获取锁
                if (ttl == null) {
                    break;
                }

                //如果没拿到锁, 等待锁释放
                // waiting for message
                if (ttl >= 0) {
                    try {
                        getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        if (interruptibly) {
                            throw e;
                        }
                        getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    }
                } else {
                    if (interruptibly) {
                        getEntry(threadId).getLatch().acquire();
                    } else {
                        getEntry(threadId).getLatch().acquireUninterruptibly();
                    }
                }
            }
        } finally {
            //解除当前线程和锁事件的订阅
            unsubscribe(future, threadId);
        }
//        get(lockAsync(leaseTime, unit));
    }


//-- 异步续期
private void renewExpiration() {
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }

        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }

                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.onComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock " + getName() + " expiration", e);
                        return;
                    }

                    if (res) {
                        // reschedule itself
                        renewExpiration();
                    }
                });
            }
            // 内部异步框架,默认时30s 失效,此处 30/3 = 10 s 进行一次续期
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

        ee.setTimeout(task);
    }

//----

public class RedissonLockEntry implements PubSubEntry<RedissonLockEntry> {

    private int counter;

    private final Semaphore latch;
    private final RPromise<RedissonLockEntry> promise;
    private final ConcurrentLinkedQueue<Runnable> listeners = new ConcurrentLinkedQueue<Runnable>();

    public RedissonLockEntry(RPromise<RedissonLockEntry> promise) {
        super();
        this.latch = new Semaphore(0);
        this.promise = promise;
    }

    public void aquire() {
        counter++;
    }

    public int release() {
        return --counter;
    }

    public RPromise<RedissonLockEntry> getPromise() {
        return promise;
    }

    public void addListener(Runnable listener) {
        listeners.add(listener);
    }

    public boolean removeListener(Runnable listener) {
        return listeners.remove(listener);
    }

    public ConcurrentLinkedQueue<Runnable> getListeners() {
        return listeners;
    }

    public Semaphore getLatch() {
        return latch;
    }

}

//--- PublishSubscribe

public void unsubscribe(E entry, String entryName, String channelName) {
    AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));
    semaphore.acquire(new Runnable() {
        @Override
        public void run() {
            if (entry.release() == 0) {
                // just an assertion
                boolean removed = entries.remove(entryName) == entry;
                if (!removed) {
                    throw new IllegalStateException();
                }
                service.unsubscribe(new ChannelName(channelName), semaphore);
            } else {
                semaphore.release();
            }
        }
    });

}
解锁
代码语言:javascript
复制
 @Override
    public void unlock() {
        try {
            get(unlockAsync(Thread.currentThread().getId()));
        } catch (RedisException e) {
            if (e.getCause() instanceof IllegalMonitorStateException) {
                throw (IllegalMonitorStateException) e.getCause();
            } else {
                throw e;
            }
        }
}

@Override
public RFuture<Void> unlockAsync(long threadId) {
    RPromise<Void> result = new RedissonPromise<Void>();
    RFuture<Boolean> future = unlockInnerAsync(threadId);

    future.onComplete((opStatus, e) -> {
        if (e != null) {
            cancelExpirationRenewal(threadId);
            result.tryFailure(e);
            return;
        }

        if (opStatus == null) {
            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + threadId);
            result.tryFailure(cause);
            return;
        }

        cancelExpirationRenewal(threadId);
        result.trySuccess(null);
    });

    return result;
}
//-- 异步解锁 Lua 脚本
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            //如果keys[1]不存在,则返回
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            //为哈希表 KEYS[1] 中的域  ARGV[3] 的值加上增量 -1 : 此处相当于可重入锁的计数状态 -1
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            "if (counter > 0) then " +
                //如果还是重入状态,即锁仍然被持有,则进行续命,增加延时
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            "else " +
                //反之,说明锁已全部释放,则删除当前 KEYS[1] 对应锁,并发布删除锁的消息通知
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));

}

HINCRBY key field increment: 为哈希表 key 中的域 field 的值加上增量 increment 。增量也可以为负数,相当于对给定域进行减法操作。 如果 key 不存在,一个新的哈希表被创建并执行 HINCRBY 命令。如果域 field 不存在,那么在执行命令前,域的值被初始化为 0 。对一个储存字符 串值的域 field 执行 HINCRBY 命令将造成一个错误。本操作的值被限制在 64 位(bit)有符号数字表示之内。

集群分布式锁失效判断机制

我们都知道 Redis 本身是支持集群模式的,那么如果其中一个或多个节点挂掉,对应的分布式锁如何在集群场景下保证功能的高可用呢?

  • 单点 Redis 锁的缺陷

这个缺陷其实很明显,如果只有一个 Redis 实例,一旦这个挂了,所有依赖他的服务都挂了。显然不太适合大型的应用。

  • 简单的Redis主从架构碰到的问题

为了避免单点故障,我们给Redis做一个Master/Slave的主从架构,一个Master,一台Slave。下面就会碰到这么一个问题。下面是使用场景。

  1. 客户端A在Master上获取到一个锁。
  2. Master把这个数据同步到Slave的时候挂了(因为Master和Slave之间同步是异步的)。
  3. Slave变成了Master。
  4. 客户端B通过相同的key,和value获取到锁。即分布式锁失效

Redlock算法

假设我们有N(假设5)个Redis master实例,所有节点相互独立,并且业务系统也是单纯的调用,并没有什么其他的类似消息重发之类的辅助系统。下面来模拟一下算法:

  • 客户端获取服务器当前的的时间t0,毫秒数。
  • 使用相同的key和value依次向5个实例获取锁。客户端在获取锁的时候自身设置一个远小于业务锁需要的持续时间的超时时间。举个例子,假设锁需要10秒,超时时间可以设置成比如5-50毫秒。这个避免某个Redis本身已经挂了,但是客户端一直在尝试获取锁的情况。超时了之后就直接跳到下一个节点。
  • 客户端通过当前时间t1减去t0,计算获取锁所消耗的时间t2=t1-t0。只有t2小于锁的业务有效时间(也就是第二步的10秒),并且,客户端在至少3=(5/2+1)台上获取到锁我们才认为锁获取成功。
  • 如果锁已经获取,那么锁的业务有效时间为10s-t2。
  • 如果客户端没有获取到锁,可能是没有在大于等于N/2+1个实例上获取锁,也可能是有效时间(10s-t2)为负数,我们就尝试去释放锁,即使是并没有在那个节点上获取到。

锁的释放

释放比较简单,直接删除所有实例上对应的key就好。

总结

本文通过介绍 Redis 环境搭建和分布式锁的实战,介绍锁需要具备的一些特性,并分析了 Redisson 源码,以及集群部署 Redis 场景下,锁失效的判断机制介绍。

REFERENCES

  • Redis 命令参考
  • Redisson是如何实现分布式锁的?
  • Redisson实现分布式锁(2)—RedissonLock
  • RedissonLock分布式锁源码分析
  • 分布式Redis的分布式锁 Redlock
  • How to do distributed locking
  • Is Redlock safe?
  • Redisson实现分布式锁(1)---原理

.

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-03-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 架构探险之道 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • [Redis] 分布式缓存中间件 Redis 之 分布式锁实战
    • 环境准备
      • Redis 如何实现分布式锁
        • 线程不安全
        • 单机锁
        • 分布式锁
      • Redisson 集成和源码分析
        • Redisson 集成
        • 源码分析 `RedissonLock`
        • 集群分布式锁失效判断机制
      • 总结
        • REFERENCES
        相关产品与服务
        云数据库 Redis
        腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档