首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Redis实现消息队列及延迟队列

Redis实现消息队列及延迟队列

作者头像
半月无霜
发布2023-03-03 14:52:29
1.5K0
发布2023-03-03 14:52:29
举报
文章被收录于专栏:半月无霜半月无霜

Redis实现消息队列及延迟队列

一、介绍

在选择消息中间件的问题上,我们有很多解决方案,具体选择哪一种还是要根据实际的情况来进行确认。

如果直接有成熟的第三方消息中间件,能用就直接用,如rabbitMqkafka等。

再如果,推送的消息比较简单,又恰好有个redis,那么就选择redis吧。

下面,将进行介绍,如果使用redis作为消息队列,我们该如何编写这段程序。

二、消息队列

前置工作,本次使用的工程框架直接是springBoot,其他maven依赖就不贴出来了,主要是要有这个redis的依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

有了依赖,记得在application.yml配置文件中加入对应redis的配置信息

spring:
  redis:
    database: 0
    host: localhost
    port: 6379

还有一件事,redisTemplate的这个bean我们要进行润色一下,虽然用自带的也行,但作为一个强迫症,我还是希望我写入的keyredis中的key一致

package com.banmoon.test.config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
public class RedisConfig {

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate();
        redisTemplate.setConnectionFactory(factory);
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper mapper = new ObjectMapper();
        mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(mapper);
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        // key采用String的序列化方式
        redisTemplate.setKeySerializer(stringRedisSerializer);
        // hash的key也采用String的序列化方式
        redisTemplate.setHashKeySerializer(stringRedisSerializer);
        // value序列化方式采用jackson
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
        // hash的value序列化方式采用jackson
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }

}

好的准备工作完成,先来看生产者

1)生产者

package com.banmoon.test.queue.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

@Component
public class RedisTestProducer {

    public static final String REDIS_TEST_KEY = "test:queue";

    @Autowired
    private RedisTemplate redisTemplate;

    public long push (String... params) {
        Long l = redisTemplate.opsForList().rightPushAll(REDIS_TEST_KEY, params);
        return l;
    }

}

生产者很简单,就是向redislist中推送数据

主要在于消费者,该如何获取到其中的消息

2)消费者

package com.banmoon.test.queue.consumer;

import cn.hutool.core.util.StrUtil;
import com.banmoon.test.queue.producer.RedisTestProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.concurrent.TimeUnit;

@Slf4j
@Component
public class RedisTestConsumer {

    @Autowired
    private RedisTemplate redisTemplate;

    @PostConstruct
    public void pop() {
        new Thread(() -> {
            while (true) {
                try {
                    // 阻塞取出队首
					String params = (String) redisTemplate.opsForList().leftPop(RedisTestProducer.REDIS_TEST_KEY, 10, TimeUnit.SECONDS);
                    if (StrUtil.isNotBlank(params))
                        log.info("模拟消费消息:{}", params);
                    // 避免高频轮循,添加休眠
                    TimeUnit.MILLISECONDS.sleep(1000);
                } catch (InterruptedException e) {
                    // 不做任何处理,切记不要因为异常导致了消费线程的退出
                }
            }
        }, RedisTestProducer.REDIS_TEST_KEY).start();
    }

}

上述就是消费者,其中注意几点

  • 这里服务启动时,用到了bean初始化的一个方法,大家也可以使用静态代码块,只要让这个消费线程启动就行
  • 线程启动,切记不要让异常导致了线程的退出。因为这样就没有消费者了,要时刻保证消费者的在线
  • 在取出队首的消息时,用到了阻塞机制。当没有获取到消息,该线程会进行阻塞,直到有消息入队或者阻塞超时,才会返回消息。避免死循环带来了cpu高载荷

3)测试

启动该springBoot项目,同时执行下面这段测试代码,调用三次生产者

package com.banmoon.test;

import com.banmoon.test.queue.producer.RedisTestProducer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class ServiceTest {

    @Autowired
    private RedisTestProducer redisTestProducer;

    @Test
    void insertTest() {
        redisTestProducer.push("a", "b", "c");
    }

}

查看springBoot项目的控制台,消费者有进行消费

image-20220517140449171
image-20220517140449171

三、延迟队列

延迟队列的应用场景还是比较多见的,比如

  • 用户下单后,此订单超30分钟后取消
  • 用户订阅,指定时间推送订阅消息事件

很多类似的业务场景,我们不再依赖定时,使用消息中间件就可以完成这类功能。

redis实现延迟队列之前,我有必要说一下setzset,主要是这个zset

set大家都很熟悉,与list不同,set是无序且内部元素不重复。

那么zset呢,它结合了setlist的特点

  • 集合内元素不会重复
  • 元素以有序的方式排列

zset中的元素都会关联一个分数score,内部将通过这个score对集合元素进行的排序。

虽然zset集合中元素不会重复,但score可以重复。如果有两个score相同的元素,将按照元素的字典序进行排序。

1)生产者

上面描述了这么多,我们该如何使用,先看生产者

package com.banmoon.test.queue.producer;

import cn.hutool.core.date.DateField;
import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.util.Date;

@Slf4j
@Component
public class RedisTestDelayProducer {

    public static final String REDIS_DELAY_TEST_KEY = "test:delay:queue";

    @Autowired
    private RedisTemplate redisTemplate;

    public Boolean push (String params, int offset, DateField dateField) {
        long score = DateUtil.offset(new Date(), dateField, offset).getTime();
        Boolean b = redisTemplate.opsForZSet().addIfAbsent(REDIS_DELAY_TEST_KEY, params, score);
        log.info("生产消息:{},推送是否成功:{}", params, b);
        return b;
    }

}

可以看到,这边使用将消费时间点的时间戳,作为了score,生产的消息

2)消费者

package com.banmoon.test.queue.consumer;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.util.StrUtil;
import com.banmoon.test.queue.producer.RedisTestDelayProducer;
import com.banmoon.test.queue.producer.RedisTestProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.TimeUnit;

@Slf4j
@Component
public class RedisTestDelayConsumer {

    @Autowired
    private RedisTemplate redisTemplate;

    @PostConstruct
    public void pop() {
        new Thread(() -> {
            while (true) {
                try {
                    // 查看范围中的消息
                    Set<Object> set = redisTemplate.opsForZSet().rangeByScore(RedisTestDelayProducer.REDIS_DELAY_TEST_KEY, 0, new Date().getTime(), 0, 1);
                    // 判断是否为空
                    if (CollUtil.isNotEmpty(set)) {
                        String params = (String) set.iterator().next();
                        // 删除范围中的消息
                        Long success = redisTemplate.opsForZSet().remove(RedisTestDelayProducer.REDIS_DELAY_TEST_KEY, params);
                        if (success > 0) {
                            log.info("模拟消费消息:{}", params);
                        }
                    } else {
                        // 避免高频轮循,添加休眠
                        TimeUnit.MILLISECONDS.sleep(1000);
                    }
                } catch (InterruptedException e) {
                    // 不做任何处理,切记不要因为异常导致了消费线程的退出
                }
            }
        }, RedisTestDelayProducer.REDIS_DELAY_TEST_KEY).start();
    }

}

消费的逻辑,基本就是,取出当前时间点,要执行的消息。

score保证了队列中的消息有序性,且作为时间戳,所以可以完成延迟队列的对应功能。

注意事项和上面的普通队列差不多,简单注意一下就好。

3)测试

启动该springBoot项目,同时执行下面这段测试代码,调用三次生产者,分别在10秒后,30秒后,1分钟后进行消费

package com.banmoon.test;

import cn.hutool.core.date.DateField;
import com.banmoon.test.queue.producer.RedisTestDelayProducer;
import com.banmoon.test.queue.producer.RedisTestProducer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class ServiceTest {

    @Autowired
    private RedisTestDelayProducer redisTestDelayProducer;

    @Test
    void insertTest() {
        redisTestDelayProducer.push("a", 10, DateField.SECOND);
        redisTestDelayProducer.push("b", 30, DateField.SECOND);
        redisTestDelayProducer.push("c", 1, DateField.MINUTE);
    }

}

查看springBoot项目的控制台,注意查看消费者打印的日志,主要看看三条日志的时间间隔

image-20220517171656067
image-20220517171656067

四、最后

我还要讲一下,redis作为消息队列的优缺点

  • 优点
    • 使用相对简单
    • 不用专门维护专业的消息中间件,降低服务和运维成本
  • 缺点
    • 没有ack,消息确认机制,存在消息丢失的可能
    • 死循环进行监听队列,消息队列一多,所需要的线程资源也会增多,服务器的负担会增大

所以,如果是简单的日志推送,消息推送等,可以使用redis队列。相反,如果对消息的可靠性有很大的要求,建议还是不要使用redis作为消息中间件了。

我是半月,祝你幸福!!!

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-05-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Redis实现消息队列及延迟队列
    • 一、介绍
      • 二、消息队列
        • 1)生产者
        • 2)消费者
        • 3)测试
      • 三、延迟队列
        • 1)生产者
        • 2)消费者
        • 3)测试
      • 四、最后
      相关产品与服务
      消息队列 CMQ 版
      消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档