前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >我在生产项目里是如何使用Redis发布订阅的?(二)Java版代码实现(含源码)

我在生产项目里是如何使用Redis发布订阅的?(二)Java版代码实现(含源码)

原创
作者头像
编程大道
修改2019-07-16 17:56:37
7880
修改2019-07-16 17:56:37
举报
文章被收录于专栏:编程大道编程大道

上篇文章讲了在实际项目里的哪些业务场景用到Redis发布订阅,这篇文章就讲一下,在Java中如何实现的。

图解代码结构

发布订阅的理论以及使用场景大家都已经有了大致了解了,但是怎么用代码实现发布订阅呢?在这里给大家分享一下实现方式。

我们以上篇文章讲述的第三种使用场景为例,先来看一下整体实现类图吧。

解释一下,这里我们首先定义一个统一接口 ICacheUpdate ,只有一个 update 方法,我们令 Service 层实现这个方法,执行具体的更新操作。

我们再来看 RedisMsgPubSub,它继承 redis.clients.jedis.JedisPubSub,主要重写其 onMessage() 方法(订阅的频道有消息到来时会触发这个方法),我们在这个方法里调用 RedisMsgPubSub 的 update 方法执行更新操作。

当我们有多个 Service 实现 ICacheUpdate 时,我们就非常迫切地需要一个管理器来集中管理这些 Service,并且当触发 onMessage 方法时要告诉onMessage方法具体调用哪个 ICacheUpdate 的实现类,所以我们有了 PubSubManager 。并且我们单独开启一个线程来维护发布订阅,所以管理器继承了 Thread 类。

代码实现

具体代码:

统一接口

ICacheUpdate.java

public interface ICacheUpdate {
    public void update();
}

Service层

实现ICacheUpdate的update方法,执行具体的更新操作

InfoService.java

public class InfoService implements ICacheUpdate {
	private static Logger logger = LoggerFactory.getLogger(InfoService.class);
	@Autowired
	private RedisCache redisCache;
	@Autowired
	private InfoMapper infoMapper;
	/**
	 * 按信息类型分类查询信息
	 * @return
	 */
	public Map<String, List<Map<String, Object>>> selectAllInfo(){
		Map<String, List<Map<String, Object>>> resultMap = new HashMap<String, List<Map<String, Object>>>();
		List<String> infoTypeList = infoMapper.selectInfoType();//信息表中所有涉及的信息类型
		logger.info("-------按信息类型查找公共信息开始----"+infoTypeList);
		if(infoTypeList!=null && infoTypeList.size()>0) {
			for (String infoType : infoTypeList) {
				List<Map<String, Object>> result = infoMapper.selectByInfoType(infoType);
				resultMap.put(infoType, result);
			}
		}
		return resultMap;
	}
	@Override
	public void update() {
		//缓存首页信息
		logger.info("InfoService selectAllInfo 刷新缓存");
		Map<String, List<Map<String, Object>>> resultMap = this.selectAllInfo();
		Set<String> keySet = resultMap.keySet();
		for(String key:keySet){
			List<Map<String, Object>> value = resultMap.get(key);
			redisCache.putObject(GlobalSt.PUBLIC_INFO_ALL+key, value);
		}
	}
}

Redis发布订阅的扩展类

作用:

1、统一管理ICacheUpdate,把所有实现ICacheUpdate接口的类添加到updates容器

2、重写onMessage方法,订阅到消息后进行刷新缓存的操作

RedisMsgPubSub.java

/**
 * Redis发布订阅的扩展类
 * 作用:1、统一管理ICacheUpdate,把所有实现ICacheUpdate接口的类添加到updates容器
 * 2、重写onMessage方法,订阅到消息后进行刷新缓存的操作
 */
public class RedisMsgPubSub extends JedisPubSub {
    private static Logger logger = LoggerFactory.getLogger(RedisMsgPubSub.class);
    private Map<String , ICacheUpdate> updates = new HashMap<String , ICacheUpdate>();
    //1、由updates统一管理ICacheUpdate
    public boolean addListener(String key , ICacheUpdate update) {
        if(update == null) 
            return false;
	updates.put(key, update);
	return true;
    }
    /**
     * 2、重写onMessage方法,订阅到消息后进行刷新缓存的操作
     * 订阅频道收到的消息
     */
    @Override  
    public void onMessage(String channel, String message) {
        logger.info("RedisMsgPubSub onMessage channel:{},message :{}" ,channel, message);
        ICacheUpdate updater = null;
        if(StringUtil.isNotEmpty(message)) 
            updater = updates.get(message);
        if(updater!=null)
            updater.update();
    }
    //other code...
}

发布订阅的管理器 执行的操作: 1、将所有需要刷新加载的Service类(实现ICacheUpdate接口)添加到RedisMsgPubSub的updates中 2、启动线程订阅pubsub_config频道,收到消息后的五秒后再次订阅(避免订阅到一次消息后结束订阅) PubSubManager.java

public class PubSubManager extends Thread{
    private static Logger logger = LoggerFactory.getLogger(PubSubManager.class);

    public static Jedis jedis;
    RedisMsgPubSub msgPubSub = new RedisMsgPubSub();
    //频道
    public static final String PUNSUB_CONFIG = "pubsub_config";
    //1.将所有需要刷新加载的Service类(实现ICacheUpdate接口)添加到RedisMsgPubSub的updates中
    public boolean addListener(String key, ICacheUpdate listener){
        return msgPubSub.addListener(key,listener);
    }
    @Override
    public void run(){
        while (true){
            try {
                JedisPool jedisPool = SpringTools.getBean("jedisPool", JedisPool.class);
                if(jedisPool!=null){
                    jedis = jedisPool.getResource();
                    if(jedis!=null){
                        //2.启动线程订阅pubsub_config频道 阻塞
                        jedis.subscribe(msgPubSub,PUNSUB_CONFIG);
                    }
                }
            } catch (Exception e) {
                logger.error("redis connect error!");
            } finally {
                if(jedis!=null)
                    jedis.close();
            }
            try {
                //3.收到消息后的五秒后再次订阅(避免订阅到一次消息后结束订阅)
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                logger.error("InterruptedException in redis sleep!");
            }
        }
    }
}

到此,Redis的发布订阅大致已经实现。我们什么时候启用呢?我们可以选择在启动项目时完成订阅和基础数据的加载,所以我们通过实现javax.servlet.SevletContextListener来完成这一操作。然后将监听器添加到web.xml。 CacheInitListener.java

/**
 * 加载系统参数
 */
public class CacheInitListener implements ServletContextListener{
    private static Logger logger = LoggerFactory.getLogger(CacheInitListener.class);
    @Override
    public void contextDestroyed(ServletContextEvent arg0) {
    }
    @Override
    public void contextInitialized(ServletContextEvent arg0) {
        logger.info("---CacheListener初始化开始---");
        init();
        logger.info("---CacheListener初始化结束---");
    }
    public void init() {
        try {
            //获得管理器
            PubSubManager pubSubManager = SpringTools.getBean("pubSubManager", PubSubManager.class);

            InfoService infoService = SpringTools.getBean("infoService", InfoService.class);
            //添加到管理器
            pubSubManager.addListener("infoService", infoService);
            //other service...

            //启动线程执行订阅操作
            pubSubManager.start();
            //初始化加载
            loadParamToRedis();
        } catch (Exception e) {
            logger.info(e.getMessage(), e);
        }
    }
    private void loadParamToRedis() {
        InfoService infoService = SpringTools.getBean("infoService", InfoService.class);
        infoService.update();
        //other service...
    }
}

web.xml

<listener>
	<listener-class>com.xxx.listener.CacheInitListener</listener-class>
</listener>

【end】

文章首发于公众号@编程大道,欢迎关注。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 图解代码结构
  • 代码实现
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档