前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Redis入坟(二)高级特性,发布订阅、事务、Lua脚本

Redis入坟(二)高级特性,发布订阅、事务、Lua脚本

作者头像
源码之路
发布2020-09-04 10:31:03
8330
发布2020-09-04 10:31:03
举报
文章被收录于专栏:源码之路源码之路

目标 1、学习 Redis 的一些高级特性,包括发布订阅、事务、Lua 脚本

1、发布订阅模式

1.1列表的局限

前面我们说通过队列的 rpush 和 lpop 可以实现消息队列(队尾进队头出),但是消费者需要不停地调用 lpop 查看 List 中是否有等待处理的消息(比如写一个 while 循环)。

为了减少通信的消耗,可以 sleep()一段时间再消费,但是会有两个问题:

  1. 如果生产者生产消息的速度远大于消费者消费消息的速度,List 会占用大量的内存。
  2. 消息的实时性降低 list 还提供了一个阻塞的命令:blpop,没有任何元素可以弹出的时候,连接会被阻塞。blpop queue 5,阻塞5秒。 基于 list 实现的消息队列,不支持一对多的消息分发。

1.2发布订阅模式

除了通过 list 实现消息队列之外,Redis 还提供了一组命令实现发布/订阅模式。

这种方式,发送者和接收者没有直接关联(实现了解耦),接收者也不需要持续尝试获取消息。

1.2.1 订阅频道

可以订阅一个或者多个频道。消息的发布者(生产者)可以给指定的频道发布消息。只要有消息到达了频道,所有订阅了这个频道的订阅者都会收到这条消息。

需要注意的注意是,发出去的消息不会被持久化,因为它已经从队列里面移除了,所以消费者只能收到它开始订阅这个频道之后发布的消息。

下面我们来看一下发布订阅命令的使用方法。 订阅者订阅频道:可以一次订阅多个,比如这个客户端订阅了 3 个频道。

subscribe channel-1 channel-2 channel-3

发布者可以向指定频道发布消息(并不支持一次向多个频道发送消息):

publish channel-1 2673

取消订阅(不能在订阅状态下使用):

unsubscribe channel-1

1.2.2 按规则(Pattern) 订阅频道

支持?和占位符。?代表一个字符,代表 0 个或者多个字符。 消费端 1,关注运动信息:

psubscribe *sport

消费端 2,关注所有新闻:

psubscribe news*

消费端 3,关注天气新闻:

psubscribe news-weather

生产者,发布 3 条信息

publish news-sport yaoming
publish news-music jaychou
publish news-weather rain

java 代码

package pubsub;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

public class PublishSubscribe {

    public static void main(String[] args) {
        Jedis jedis = new Jedis("192.168.0.224", 6379);
        jedis.subscribe(new Subscriber(),"channel");

    }
}
class Subscriber extends JedisPubSub{

    @Override
    public void onMessage(String channel, String message) {
        System.out.println("接收到的消息:"+message);
    }

    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        System.out.println("onSubscribe---channel:"+channel+",subscribedChannels:"+subscribedChannels);
    }

    @Override
    public void onPUnsubscribe(String pattern, int subscribedChannels) {
        System.out.println("onPUnsubscribe---pattern:"+pattern+",subscribedChannels:"+subscribedChannels);
    }

    @Override
    public void onPSubscribe(String pattern, int subscribedChannels) {
        System.out.println("onPSubscribe---pattern:"+pattern+",subscribedChannels:"+subscribedChannels);
    }

    @Override
    public void unsubscribe(String... channels) {
        super.unsubscribe(channels);
    }
}


package pubsub;

import redis.clients.jedis.Jedis;

public class PublishTest {
    public static void main(String[] args) {
        Jedis jedis = new Jedis("192.168.0.224", 6379);
        jedis.publish("channel","你好呀");
        jedis.close();

    }
}

2 、Redis 事务

2.1 为什么要用事务

我们知道 Redis 的单个命令是原子性的(比如 get set mget mset),如果涉及到多个命令的时候,需要把多个命令作为一个不可分割的处理序列,就需要用到事务。

例如我们之前说的用 setnx 实现分布式锁,我们先 set,然后设置对 key 设置 expire,防止 del 发生异常的时候锁不会被释放,业务处理完了以后再 del,这三个动作我们就希望它们作为一组命令执行。

Redis 的事务有两个特点:

  1. 按进入队列的顺序执行。
  2. 不会受到其他客户端的请求的影响。 Redis 的事务涉及到四个命令:multi(开启事务),exec(执行事务),discard(取消事务),watch(监视)

2.2 事务的用法

案例场景:tom 和 mic 各有 1000 元,tom 需要向 mic 转账 100 元。 tom 的账户余额减少 100 元,mic 的账户余额增加 100 元。

127.0.0.1:6379> set tom 1000
OK
127.0.0.1:6379> set mic 1000
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379> decrby tom 100
QUEUED
127.0.0.1:6379> incrby mic 100
QUEUED
127.0.0.1:6379> exec
1) (integer) 900
2) (integer) 1100
127.0.0.1:6379> get tom
"900"
127.0.0.1:6379> get mic
"1100"

通过 multi 的命令开启事务。事务不能嵌套,多个 multi 命令效果一样。 multi 执行后,客户端可以继续向服务器发送任意多条命令, 这些命令不会立即被执行, 而是被放到一个队列中, 当 exec 命令被调用时, 所有队列中的命令才会被执行。

通过 exec 的命令执行事务。如果没有执行 exec,所有的命令都不会被执行。如果中途不想执行事务了,怎么办?可以调用 discard 可以清空事务队列,放弃执行。

multi
set k1 1
set k2 2
set k3 3
discard

2.3 watch 命令

在 Redis 中还提供了一个 watch 命令。 它可以为 Redis 事务提供 CAS 乐观锁行为(Check and Set / Compare and Swap),也就是多个线程更新变量的时候,会跟原值做比较,只有它没有被其他线程修改的情况下,才更新成新的值。

我们可以用 watch 监视一个或者多个 key,如果开启事务之后,至少有一个被监视key 键在 exec 执行之前被修改了, 那么整个事务都会被取消(key 提前过期除外)。可以用 unwatch 取消。

2.4 事务可能遇到的问题

我们把事务执行遇到的问题分成两种,一种是在执行 exec 之前发生错误,一种是在执行 exec 之后发生错误。

2.4.1 在执行 exec 之前发生错误

比如:入队的命令存在语法错误,包括参数数量,参数名等等(编译器错误)。

127.0.0.1:6379> multi
OK
127.0.0.1:6379> set gupao 666
QUEUED
127.0.0.1:6379> hset qingshan 2673
(error) ERR wrong number of arguments for 'hset' command
127.0.0.1:6379> exec
(error) EXECABORT Transaction discarded because of previous errors.

在这种情况下事务会被拒绝执行,也就是队列中所有的命令都不会得到执行。

2.4.2 在执行 exec 之后发生错误

比如,类型错误,比如对 String 使用了 Hash 的命令,这是一种运行时错误。

127.0.0.1:6379> flushall
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set k1 1
QUEUED
127.0.0.1:6379> hset k1 a b
QUEUED
127.0.0.1:6379> exec
1) OK
2) (error) WRONGTYPE Operation against a key holding the wrong kind of value
127.0.0.1:6379> get k1
"1"

最后我们发现 set k1 1 的命令是成功的,也就是在这种发生了运行时异常的情况下,只有错误的命令没有被执行,但是其他命令没有受到影响。

这个显然不符合我们对原子性的定义,也就是我们没办法用 Redis 的这种事务机制来实现原子性,保证数据的一致。

思考(作业): 为什么在一个事务中存在错误,Redis 不回滚?

3 Lua 脚本

Lua是一种轻量级脚本语言,它是用 C 语言编写的,跟数据的存储过程有点类似。 使用 Lua 脚本来执行 Redis 命令的好处:

  1. 一次发送多个命令,减少网络开销。
  2. Redis 会将整个脚本作为一个整体执行,不会被其他请求打断,保持原子性。
  3. 对于复杂的组合命令,我们可以放在文件中,可以实现程序之间的命令集复用。

3.1 在 Redis 中调用 Lua 脚本

使用 eval 方法,语法格式:

redis> eval lua-script key-num [key1 key2 key3 ....] [value1 value2 value3 ....]

 eval 代表执行 Lua 语言的命令。  lua-script 代表 Lua 语言脚本内容。  key-num 表示参数中有多少个 key, 需要注意的是 Redis 中 key 是从 1 开始的, 如果没有 key 的参数, 那么写 0。  [key1 key2 key3…]是 key 作为参数传递给 Lua 语言, 也可以不填, 但是需要和 key-num 的个数对应起来。  [value1 value2 value3 ….]这些参数传递给 Lua 语言, 它们是可填可不填的。

示例,返回一个字符串,0 个参数:

redis> eval "return 'Hello World'" 0

3.2 在 Lua 脚本中调用 Redis 命令

使用 redis.call(command, key [param1, param2…])进行操作。语法格式:

redis> eval "redis.call('set',KEYS[1],ARGV[1])" 1 lua-key lua-value

 command 是命令, 包括 set、 get、 del 等。  key 是被操作的键。  param1,param2…代表给 key 的参数。

注意跟 Java 不一样,定义只有形参,调用只有实参。 Lua 是在调用时用 key 表示形参,argv 表示参数值(实参)。

3.2.1 设置键值对

在 Redis 中调用 Lua 脚本执行 Redis 命令 以上命令等价于 set gupao 2673 在 redis-cli 中直接写 Lua 脚本不够方便,也不能实现编辑和复用,通常我们会把脚本放在文件里面,然后执行这个文件。

3.2.2 在 Redis 中调用 Lua 脚本文件中的命令, 操作 Redis

创建 Lua 脚本文件:

cd /usr/local/soft/redis5.0.5/src
vim gupao.lua

Lua 脚本内容,先设置,再取值:

redis.call('set','gupao','lua666')
return redis.call('get','gupao')

在 Redis 客户端中调用 Lua 脚本

cd /usr/local/soft/redis5.0.5/src
redis-cli --eval gupao.lua 0

得到返回值:

[root@localhost src]# redis-cli --eval gupao.lua 0
"lua666"

3.2.3 案例: 对 IP 进行限流

需求:某个IP,在 X 秒内只能访问 Y 次。 设计思路:用 key 记录 IP,用 value 记录访问次数。拿到 IP 以后,对 IP+1。如果是第一次访问,对 key 设置过期时间(参数 1)。否则判断次数,超过限定的次数(参数 2),返回 0。如果没有超过次数则返回 1。超过时间,key 过期之后,可以再次访问。KEY[1]是 IP, ARGV[1]是过期时间 X,ARGV[2]是限制访问的次数 Y。

-- ip_limit.lua
-- IP 限流, 对某个 IP 频率进行限制 , 6 秒钟访问 10 次
local num=redis.call('incr',KEYS[1])
if tonumber(num)==1 then
  redis.call('expire',KEYS[1],ARGV[1])
  return 1
elseif tonumber(num)>tonumber(ARGV[2]) then
  return 0
else
  return 1
end

6 秒钟内限制访问 10 次,调用测试(连续调用 10 次):

./redis-cli --eval "ip_limit.lua" app:ip:limit:192.168.8.111 , 6 10

 app:ip:limit:192.168.8.111 是 key 值 ,后面是参数值,中间要加上一个空格 和 一个逗号,再加上一个 空格 。 即:./redis-cli –eval [lua 脚本] [key…]空格,空格[args…]  多个参数之间用一个 空格 分割

3.2.4 缓存 Lua 脚本

为什么要缓存 在脚本比较长的情况下,如果每次调用脚本都需要把整个脚本传给 Redis 服务端,会产生比较大的网络开销。为了解决这个问题,Redis 提供了 EVALSHA 命令,允许开发者通过脚本内容的 SHA1 摘要来执行脚本。

如何缓存 Redis 在执行 script load 命令时会计算脚本的 SHA1 摘要并记录在脚本缓存中,执行 EVALSHA 命令时 Redis 会根据提供的摘要从脚本缓存中查找对应的脚本内容,如果找到了则执行脚本,否则会返回错误:"NOSCRIPT No matching script. Please use EVAL."

127.0.0.1:6379> script load "return 'Hello World'"
"470877a599ac74fbfda41caa908de682c5fc7d4b"
127.0.0.1:6379> evalsha "470877a599ac74fbfda41caa908de682c5fc7d4b" 0
"Hello World"

自乘案例 Redis 有 incrby 这样的自增命令,但是没有自乘,比如乘以 3,乘以 5。 我们可以写一个自乘的运算,让它乘以后面的参数:

local curVal = redis.call("get", KEYS[1])
if curVal == false then
  curVal = 0
else
  curVal = tonumber(curVal)
end
curVal = curVal * tonumber(ARGV[1])
redis.call("set", KEYS[1], curVal)
return curVal

把这个脚本变成单行,语句之间使用分号隔开

local curVal = redis.call("get", KEYS[1]); if curVal == false then curVal = 0 else curVal = tonumber(curVal) end; curVal
= curVal * tonumber(ARGV[1]); redis.call("set", KEYS[1], curVal); return curVal

script load '命令'

127.0.0.1:6379> script load 'local curVal = redis.call("get", KEYS[1]); if curVal == false then curVal = 0 else curVal =
tonumber(curVal) end; curVal = curVal * tonumber(ARGV[1]); redis.call("set", KEYS[1], curVal); return curVal'

"be4f93d8a5379e5e5b768a74e77c8a4eb0434441"

调用:

127.0.0.1:6379> set num 2
OK
127.0.0.1:6379> evalsha be4f93d8a5379e5e5b768a74e77c8a4eb0434441 1 num 6
(integer) 12

3.2.5 脚本超时

Redis 的指令执行本身是单线程的,这个线程还要执行客户端的 Lua 脚本,如果 Lua脚本执行超时或者陷入了死循环,是不是没有办法为客户端提供服务了呢?

eval 'while(true) do end' 0

为 了防 止 某个 脚本 执 行时 间 过长 导 致 Redis 无 法提 供 服务 , Redis 提 供 了lua-time-limit 参数限制脚本的最长运行时间,默认为 5 秒钟。 lua-time-limit 5000(redis.conf 配置文件中)

当脚本运行时间超过这一限制后,Redis 将开始接受其他命令但不会执行(以确保脚本的原子性,因为此时脚本并没有被终止),而是会返回“BUSY”错误。

Redis 提供了一个 script kill 的命令来中止脚本的执行。新开一个客户端:

script kill

如果当前执行的 Lua 脚本对 Redis 的数据进行了修改(SET、DEL 等),那么通过script kill 命令是不能终止脚本运行的。

127.0.0.1:6379> eval "redis.call('set','gupao','666') while true do end" 0

因为要保证脚本运行的原子性,如果脚本执行了一部分终止,那就违背了脚本原子性的要求。最终要保证脚本要么都执行,要么都不执行。

127.0.0.1:6379> script kill
(error) UNKILLABLE Sorry the script already executed write commands against the dataset. You can either wait the script termination or kill the server in a hard way using the SHUTDOWN NOSAVE command.

遇到这种情况,只能通过 shutdown nosave 命令来强行终止 redis。 shutdown nosave 和 shutdown 的区别在于 shutdown nosave 不会进行持久化操作,意味着发生在上一次快照后的数据库修改都会丢失。

总结:如果我们有一些特殊的需求,可以用 Lua 来实现,但是要注意那些耗时的操作。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、发布订阅模式
    • 1.1列表的局限
      • 1.2发布订阅模式
        • 1.2.1 订阅频道
        • 1.2.2 按规则(Pattern) 订阅频道
        • java 代码
    • 2 、Redis 事务
      • 2.1 为什么要用事务
        • 2.2 事务的用法
          • 2.3 watch 命令
            • 2.4 事务可能遇到的问题
              • 2.4.1 在执行 exec 之前发生错误
              • 2.4.2 在执行 exec 之后发生错误
          • 3 Lua 脚本
            • 3.1 在 Redis 中调用 Lua 脚本
              • 3.2 在 Lua 脚本中调用 Redis 命令
                • 3.2.1 设置键值对
                • 3.2.2 在 Redis 中调用 Lua 脚本文件中的命令, 操作 Redis
                • 3.2.3 案例: 对 IP 进行限流
                • 3.2.4 缓存 Lua 脚本
                • 3.2.5 脚本超时
            相关产品与服务
            云数据库 Redis
            腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档