前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >松哥手把手教你用 Redis 做延迟消息队列

松哥手把手教你用 Redis 做延迟消息队列

作者头像
江南一点雨
发布2020-03-28 20:48:26
1.1K0
发布2020-03-28 20:48:26
举报
文章被收录于专栏:玩转JavaEE玩转JavaEE

我们平时说到消息队列,一般都是指 RabbitMQ、RocketMQ、ActiveMQ 以及大数据里边的 Kafka,这些是我们比较常见的消息中间件,也是非常专业的消息中间件,作为专业的中间件,它里边提供了许多功能。

松哥之前也有两篇介绍的文章:

  1. 我是如何在微人事项目中提高RabbitMQ消息可靠性的?
  2. Spring Boot 整合 RabbitMQ,消息重复消费怎么办?

但是,当我们需要使用消息中间件的时候,并非每次都需要非常专业的消息中间件,假如我们只有一个消息队列,只有一个消费者,那就没有必要去使用上面这些专业的消息中间件,这种情况我们可以直接使用 Redis 来做消息队列。

Redis 的消息队列不是特别专业,他没有很多高级特性,适用简单的场景,如果对于消息可靠性有着极高的追求,那么不适合使用 Redis 做消息队列。

好了,我们一起来撸代码(本视频节选自松哥自制的 Spring Boot + Vue 系列视频教程):

以下是视频笔记:

1.消息队列

Redis 做消息队列,使用它里边的 List 数据结构就可以实现,我们可以使用 lpush/rpush 操作来实现入队,然后使用 lpop/rpop 来实现出队。

回顾一下:

在客户端(例如 Java 端),我们会维护一个死循环来不停的从队列中读取消息,并处理,如果队列中有消息,则直接获取到,如果没有消息,就会陷入死循环,直到下一次有消息进入,这种死循环会造成大量的资源浪费,这个时候,我们可以使用之前讲的 blpop/brpop 。

2.延迟消息队列

延迟队列可以通过 zset 来实现,因为 zset 中有一个 score,我们可以把时间作为 score,将 value 存到 redis 中,然后通过轮询的方式,去不断的读取消息出来。

首先,如果消息是一个字符串,直接发送即可,如果是一个对象,则需要对对象进行序列化,这里我们使用 JSON 来实现序列化和反序列化。

所以,首先在项目中,添加 JSON 依赖:

代码语言:javascript
复制
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.10.3</version>
</dependency>

接下来,构造一个消息对象:

代码语言:javascript
复制
public class JavaboyMessage {
    private String id;
    private Object data;

    @Override
    public String toString() {
        return "JavaboyMessage{" +
                "id='" + id + '\'' +
                ", data=" + data +
                '}';
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public Object getData() {
        return data;
    }

    public void setData(Object data) {
        this.data = data;
    }
}

接下来封装一个消息队列:

代码语言:javascript
复制
public class DelayMsgQueue {
    private Jedis jedis;
    private String queue;

    public DelayMsgQueue(Jedis jedis, String queue) {
        this.jedis = jedis;
        this.queue = queue;
    }

    /**
     * 消息入队
     *
     * @param data 要发送的消息
     */
    public void queue(Object data) {
        //构造一个 JavaboyMessage
        JavaboyMessage msg = new JavaboyMessage();
        msg.setId(UUID.randomUUID().toString());
        msg.setData(data);
        //序列化
        try {
            String s = new ObjectMapper().writeValueAsString(msg);
            System.out.println("msg publish:" + new Date());
            //消息发送,score 延迟 5 秒
            jedis.zadd(queue, System.currentTimeMillis() + 5000, s);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }

    /**
     * 消息消费
     */
    public void loop() {
        while (!Thread.interrupted()) {
            //读取 score 在 0 到当前时间戳之间的消息
            Set<String> zrange = jedis.zrangeByScore(queue, 0, System.currentTimeMillis(), 0, 1);
            if (zrange.isEmpty()) {
                //如果消息是空的,则休息 500 毫秒然后继续
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    break;
                }
                continue;
            }
            //如果读取到了消息,则直接读取消息出来
            String next = zrange.iterator().next();
            if (jedis.zrem(queue, next) > 0) {
                //抢到了,接下来处理业务
                try {
                    JavaboyMessage msg = new ObjectMapper().readValue(next, JavaboyMessage.class);
                    System.out.println("receive msg:" + msg);
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

测试:

代码语言:javascript
复制
public class DelayMsgTest {
    public static void main(String[] args) {
        Redis redis = new Redis();
        redis.execute(jedis -> {
            //构造一个消息队列
            DelayMsgQueue queue = new DelayMsgQueue(jedis, "javaboy-delay-queue");
            //构造消息生产者
            Thread producer = new Thread(){
                @Override
                public void run() {
                    for (int i = 0; i < 5; i++) {
                        queue.queue("www.javaboy.org>>>>" + i);
                    }
                }
            };
            //构造一个消息消费者
            Thread consumer = new Thread(){
                @Override
                public void run() {
                    queue.loop();
                }
            };
            //启动
            producer.start();
            consumer.start();
            //休息 7 秒后,停止程序
            try {
                Thread.sleep(7000);
                consumer.interrupt();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
}

就这样,我们利用 Java 代码结合 Redis 中的 zset 就非常方便的实现了延迟消息队列。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-03-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 江南一点雨 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.消息队列
  • 2.延迟消息队列
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档