前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Redis学习系列七分布式锁

Redis学习系列七分布式锁

作者头像
郑小超.
发布2019-01-03 16:35:37
9380
发布2019-01-03 16:35:37
举报
文章被收录于专栏:GreenLeavesGreenLeaves

一、简介

熟悉.Net多线程的都知道,当多个线程同时操作一个全局缓存对象(static对象实例、Dictionary、List等)时,会存在多线程争用问题,包括EF、Dapper等本身的缓存机制,都存在多线程争用问题,当我们在享受多线程带来的好处的同时,千万要注意这个问题.如果不了解多线程,请移步到我的C#多线程分类下.但是实际的业务场景中经常存在需要根据每个缓存对象的状态,进行一系列判断之后,在进行修改的操作,但是这个操作必须保证有序性,不能多个线程同时去读,否则就乱套了.比如你要进行一个数据库表字段的递增操作,首先可能时先去把最后一条记录读出来,然后拿到对应的字段,然后更新回数据库,但是这个时候如果在多线程环境下,多个线程可能同时去读,如果用了EF、Dapeer等ORM,它们会把数据读到缓存中,这个时候多个线程拿到了相同的数据,然后同步+1操作,那么这个时候如果有三个线程,那么只会进行一次+1操作,而不是三次.

Redis也是如此,所以这个时候就需要使用Redis的分布式锁,来限制这个行为,如果你用了他的异步Api,我前面的随笔用的都是异步的.

二、分布式锁实战

哇,踩坑踩了还久,终于明白了StackExchange.Redis怎么处理分布式锁,我刚开始使用Api时,是这么认为的.当一个线程使用Redis的数据时(该数据已加上了分布式锁),另外一个线程则不能操作这个数据,会等待前面的锁释放(这个过程Redis帮助我们完成),但是事实证明我太年轻了.Redis还是Redis,即使时分布式锁,也是一种缓存数据(这一点和C#完全不一样,所以需要注意).为什么会这样呢?看下面的代码:

代码语言:javascript
复制
    class Program
    {
        static Program()
        {
            //链式配置Redis
            AppConfiguration.Current.ConfigureRedis<RedisConfig>();
        }

        static void Main(string[] args)
        {
            StringSetGetAsync();
            Console.ReadKey();
        }

        static async void StringSetGetAsync()
        {
            //需要锁的Redis数据的键
            var key = "用户信息Id";

            //先异步写入一个
            var kv=new KeyValuePair<RedisKey, RedisValue>(key, 1);
            await RedisClient.StringSetAsync(new KeyValuePair<RedisKey, RedisValue>[] { kv });
             
            //对用户的信息进行加锁操作
            var time = 100000;
            //请求Id,这里最好能区分客户端的调用标识,方便异常时定位错误
            var requestId = "锁的键";
            var lockResult = await RedisClient.LockTakeAsync(requestId, key, TimeSpan.FromMilliseconds(time));
            if (lockResult)
            {
                Console.WriteLine("对用户信息加锁成功,请求的锁Id为:{0},锁的时间周期为:{1}毫秒", key, TimeSpan.FromMilliseconds(time));
            }
            else
            {
                Console.WriteLine("加锁失败,key为{0}的数据结构已被其它请求占用!", key);
            }

            //这里开启一个新的线程去访问Redis,先查,在修改上面的用户数据,这里如果Redis帮我们判断的话,那我们是读不出数据的,而且不能修改该数据的
            //因为上面的线程已经对当前用户数据加锁了
            var result=await RedisClient.StringGetAsync(key);
            Console.WriteLine("成功查到了数据,值为:{0}", result);
            var newKv = new KeyValuePair<RedisKey, RedisValue>(key, 2);
            if (await RedisClient.StringSetAsync(new KeyValuePair<RedisKey, RedisValue>[] { newKv }))
            {
                var newResult = await RedisClient.StringGetAsync(new RedisKey[] { key });
                Console.WriteLine("数据修改成功,修改后的数据为:{0}",newResult[0]);
            }
        }
    }

look,Redis并没有帮助我们做了这个事情,他还是让第二个线程去修改了用户的数据.但是,打开Redis桌面管理工具,如下信息:

多了一条数据,里面记录了锁的相关信息,现在我终于明白了,这个事情还是得自己来,他不会帮你做,他只是个缓存,所以修改代码如下:

代码语言:javascript
复制
    class Program
    {
        static Program()
        {
            //链式配置Redis
            AppConfiguration.Current.ConfigureRedis<RedisConfig>();
        }

        static void Main(string[] args)
        {
            StringSetGetAsync();
            Console.ReadKey();
        }

        static async void StringSetGetAsync()
        {
            //需要锁的Redis数据的键
            var key = "用户信息Id";

            //先异步写入一个
            var kv=new KeyValuePair<RedisKey, RedisValue>(key, 1);
            await RedisClient.StringSetAsync(new KeyValuePair<RedisKey, RedisValue>[] { kv });
             
            //对用户的信息进行加锁操作
            var time = 100000;
            //请求Id,这里最好能区分客户端的调用标识,方便异常时定位错误
            var requestId = "锁的键";
            var lockResult = await RedisClient.LockTakeAsync(requestId, key, TimeSpan.FromMilliseconds(time));
            if (lockResult)
            {
                Console.WriteLine("对用户信息加锁成功,请求的锁Id为:{0},锁的时间周期为:{1}毫秒", key, TimeSpan.FromMilliseconds(time));
            }
            else
            {
                Console.WriteLine("加锁失败,key为{0}的数据结构已被其它请求占用!", key);
            }

            //修改前判断当前用户数据有没有被加锁
            var userInfoLockObj = (int)(await RedisClient.LockQueryAsync(key));
            if (userInfoLockObj >= 1)
            {
                Console.WriteLine("当前用户键为:{0}的数据被别的线程(或者进程占用了),无法进行操作,请等待锁释放!");
            }
            else
            {
                var result = await RedisClient.StringGetAsync(key);
                Console.WriteLine("成功查到了数据,值为:{0}", result);
                var newKv = new KeyValuePair<RedisKey, RedisValue>(key, 2);
                if (await RedisClient.StringSetAsync(new KeyValuePair<RedisKey, RedisValue>[] { newKv }))
                {
                    var newResult = await RedisClient.StringGetAsync(new RedisKey[] { key });
                    Console.WriteLine("数据修改成功,修改后的数据为:{0}", newResult[0]);
                }
            }
            
        }
    }

操作前,自己去查当前用户信息有没有被加锁,Redis会把锁的数据写入缓存,并在过期时间到了时,自动回收对应锁的内存,这样当下个线程进来时,查不到锁,那么就可以操作这个数据了.这里我选择轮询判断,当然也可以使用队列,将请求插入到队列中,开启一个线程去消费这个队列,轮询代码如下:

代码语言:javascript
复制
    class Program
    {
        static Program()
        {
            //链式配置Redis
            AppConfiguration.Current.ConfigureRedis<RedisConfig>();
        }

        static void Main(string[] args)
        {
            StringSetGetAsync();
            Console.ReadKey();
        }

        static async void StringSetGetAsync()
        {
            //需要锁的Redis数据的键
            var key = "用户信息Id";

            //先异步写入一个
            var kv = new KeyValuePair<RedisKey, RedisValue>(key, 1);
            await RedisClient.StringSetAsync(new KeyValuePair<RedisKey, RedisValue>[] { kv });

            //对用户的信息进行加锁操作
            var time = 10000;
            //请求Id,这里最好能区分客户端的调用标识,方便异常时定位错误
            var requestId = "锁的键";
            var lockResult = await RedisClient.LockTakeAsync(requestId, key, TimeSpan.FromMilliseconds(time));
            if (lockResult)
            {
                Console.WriteLine("对用户信息加锁成功,请求的锁Id为:{0},锁的时间周期为:{1}毫秒", key, TimeSpan.FromMilliseconds(time));
            }
            else
            {
                Console.WriteLine("加锁失败,key为{0}的数据结构已被其它请求占用!", key);
            }

            //轮询判断锁是否被释放,释放就修改数据
            while (true)
            {
                var userInfoLockObj =await RedisClient.LockQueryAsync(requestId);
                if (userInfoLockObj== key)
                {
                    //如果加锁,休息一秒后重试
                    Thread.Sleep(1000);
                    Console.WriteLine("当前用户被加锁了,不能修改数据!");
                }
                else
                {
                    //锁被释放了,这个时候可以操作数据了
                    var result = await RedisClient.StringGetAsync(key);
                    Console.WriteLine("成功查到了数据,值为:{0}", result);
                    var newKv = new KeyValuePair<RedisKey, RedisValue>(key, 2);
                    if (await RedisClient.StringSetAsync(new KeyValuePair<RedisKey, RedisValue>[] { newKv }))
                    {
                        var newResult = await RedisClient.StringGetAsync(new RedisKey[] { key });
                        Console.WriteLine("数据修改成功,修改后的数据为:{0}", newResult[0]);
                    }
                    break;
                }
              
            }
        }
    }

10秒后,操作数据成功!当然队列更加友好.不阻塞线程!这个例子举得也不是很好,大多数情况下,锁不会被一个线程占用这么久,一般用完就被释放,1秒已经很长了.

注:这个过程不会存在死锁的问题(除非Redis内部的设置过期的进程挂了),因为现在这个版本的Redis支持setnc和expire一起执行,属于原子指令,即在设置锁的同时设置过期时间.这个过程是同步的,据我所知老版本的Redis可能这两个指令时分开的,可能会存在"长生不老"的锁.

三、分布式锁超时问题

如果你理解上面的内容,就会发现分布式锁,并不能解决超时问题,感觉这一点和C#自带的Timer类的问题很像,线程不会等待你执行完毕,在开启第二轮的轮询任务,线程不会等你.Timer中我提供了解决方案,Redis也存在相同的问题,但是两者的解决方案不一样,Timer是通过回调的方式,当第一轮循环任务做完,在重启Timer,执行第二轮任务.

而Redis则需要通过守护线程的方式去做,代码如下:

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018-12-29 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档