前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >基于Redis实现分布式消息队列(二)

基于Redis实现分布式消息队列(二)

作者头像
后端技术探索
发布2018-08-09 16:44:06
6190
发布2018-08-09 16:44:06
举报
文章被收录于专栏:后端技术探索后端技术探索

1、访问Redis的工具类 public class RedisManager {

private static Pool<Jedis> pool;

protected final static Logger logger = Logger.getLogger(RedisManager.class);

static{ try { init(); } catch (Exception e) { e.printStackTrace(); } }

public static void init() throws Exception {

Properties props = ConfigManager.getProperties("redis"); logger.debug("初始化Redis连接池。"); if(props==null){ throw new RuntimeException("没有找到redis配置文件"); } // 创建jedis池配置实例 JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); // 设置池配置项值 int poolMaxTotal = Integer.valueOf(props.getProperty("redis.pool.maxTotal").trim()); jedisPoolConfig.setMaxTotal(poolMaxTotal);

int poolMaxIdle = Integer.valueOf(props.getProperty("redis.pool.maxIdle").trim()); jedisPoolConfig.setMaxIdle(poolMaxIdle);

long poolMaxWaitMillis = Long.valueOf(props.getProperty("redis.pool.maxWaitMillis").trim()); jedisPoolConfig.setMaxWaitMillis(poolMaxWaitMillis);

logger.debug(String.format("poolMaxTotal: %s , poolMaxIdle : %s , poolMaxWaitMillis : %s ", poolMaxTotal,poolMaxIdle,poolMaxWaitMillis));

// 根据配置实例化jedis池 String connectMode = props.getProperty("redis.connectMode"); String hostPortStr = props.getProperty("redis.hostPort");

logger.debug(String.format("host : %s ",hostPortStr)); logger.debug(String.format("mode : %s ",connectMode));

if(StringUtils.isEmpty(hostPortStr)){ throw new OptimusException("redis配置文件未配置主机-端口集"); } String[] hostPortSet = hostPortStr.split(","); if("single".equals(connectMode)){ String[] hostPort = hostPortSet[0].split(":"); pool = new JedisPool(jedisPoolConfig, hostPort[0], Integer.valueOf(hostPort[1].trim())); }else if("sentinel".equals(connectMode)){ Set<String> sentinels = new HashSet<String>(); for(String hostPort : hostPortSet){ sentinels.add(hostPort); } pool = new JedisSentinelPool("mymaster", sentinels, jedisPoolConfig); } }

/** * 使用完成后,必须调用 returnResource 还回。 * @return 获取Jedis对象 */ public static Jedis getResource(){ Jedis jedis = pool.getResource(); if(logger.isDebugEnabled()){ logger.debug("获得链接:" + jedis); } return jedis; }

/** * 获取Jedis对象。 * * 用完后,需要调用returnResource放回连接池。 * * @param db 数据库序号 * @return */ public static Jedis getResource(int db){ Jedis jedis = pool.getResource(); jedis.select(db); if(logger.isDebugEnabled()){ logger.debug("获得链接:" + jedis); } return jedis; }

/** * @param jedis */ public static void returnResource(Jedis jedis){ if(jedis!=null){ pool.returnResource(jedis); if(logger.isDebugEnabled()){ logger.debug("放回链接:" + jedis); } } }

/** * 需要通过Spring确认这个方法被调用。 * @throws Exception */ public static void destroy() throws Exception { pool.destroy(); } }

这个类没有通过技术手段强制调用returnResource和destroy,需要想想办法。

2、队列接口 public interface TaskQueue {

/** * 获取队列名 * @return */ String getName();

/** * 往队列中添加任务 * @param task */ void pushTask(String task);

/** * 从队列中取出一个任务 * @return */ String popTask();

}

用String类型描述任务,也可以考虑byte[],要求对每个任务描述的数据尽可能短。

3、队列的Redis实现类 /** * 任务队列Redis实现。 * * 采用每次获取Jedis并放回pool的方式。 * 如果获得Jedis后一直不放手,反复重用,两个操作耗时可以降低1/3。 * 暂时先忍受这种低性能,不明确Jedis是否线程安全。 * */ public class TaskQueueRedisImpl implements TaskQueue {

private final static int REDIS_DB_IDX = 9;

private final static Logger logger = Logger.getLogger(TaskQueueRedisImpl.class);

private final String name;

/** * 构造函数。 * * @param name */ public TaskQueueRedisImpl(String name) { this.name = name; }

/* (non-Javadoc) * @see com.gwssi.common.mq.TaskQueue#getName() */ public String getName() { return this.name; } /* (non-Javadoc) * @see com.gwssi.common.mq.TaskQueue#pushTask(String) */ public void pushTask(String task) { Jedis jedis = null; try{ jedis = RedisManager.getResource(REDIS_DB_IDX); jedis.lpush(this.name, task); }catch(Throwable e){ logger.error(e.getMessage(),e); }finally{ if(jedis!=null){ RedisManager.returnResource(jedis); } } }

/* (non-Javadoc) * @see com.gwssi.common.mq.TaskQueue#popTask() */ public String popTask() { Jedis jedis = null; String task = null; try{ jedis = RedisManager.getResource(REDIS_DB_IDX); task = jedis.rpop(this.name); }catch(Throwable e){ logger.error(e.getMessage(),e); }finally{ if(jedis!=null){ RedisManager.returnResource(jedis); } } return task; }

}

4、获取队列实例的工具类 /** * <pre> * // 获得队列 * TaskQueue tq = TaskQueueManager.get(TaskQueueManager.SMS_QUEUE); * * // 添加任务到队列 * String task = "task id"; * tq.pushTask(task); * * // 从队列中取出任务执行 * String taskToDo = tq.popTask(); * </pre> * @author liuhailong */ public class TaskQueueManager {

protected final static Logger logger = Logger.getLogger(TaskQueueManager.class);

private static Map<String, TaskQueueRedisImpl> queneMap = new ConcurrentHashMap<String, TaskQueueRedisImpl>();

/** * 短信队列名。 */ public static final String SMS_QUEUE = "SMS_QUEUE";

/** * 规则队列名。 */ public static final String RULE_QUEUE = "RULE_QUEUE";

private static void initQueneMap() { logger.debug("初始化任务队列..."); queneMap.put(RULE_QUEUE, new TaskQueueRedisImpl(RULE_QUEUE)); logger.debug("建立队列:"+RULE_QUEUE); queneMap.put(SMS_QUEUE, new TaskQueueRedisImpl(SMS_QUEUE)); logger.debug("建立队列:"+SMS_QUEUE); }

static { initQueneMap(); }

public static TaskQueue get(String name){ return getRedisTaskQueue(name); }

public static TaskQueue getRedisTaskQueue(String name){ return queneMap.get(name); }

}

和具体的队列过于紧耦合,但简单好用。 先跑起来再说。

5、向队列中添加任务的代码 TaskQueue tq = TaskQueueManager.get(TaskQueueManager.SMS_QUEUE); tq.pushTask(smsMessageId);1 6、从队列中取出任务执行的代码 public class SmsSendTask{

protected final static Logger logger = Logger.getLogger(SmsSendTask.class);

protected static SmsSendService smsSendService = new SmsSendServiceUnicomImpl(); /** * 入口方法。 */ public void execute() { TaskQueue taskQueue = null; String task = null; try { taskQueue = TaskQueueManager.get(TaskQueueManager.SMS_QUEUE);

// 非线程安全 Set<Serializable> executedTaskSet = new HashSet<Serializable>();

task = taskQueue.popTask(); while(task!=null){ // 判断是否把所有任务都执行一遍了,避免死循环 if(executedTaskSet.contains(task)){ taskQueue.pushTask(task); break; }

executeSingleTask(taskQueue,task);

task = taskQueue.popTask(); } }catch(Throwable e){ logger.error(e.getMessage(),e); e.printStackTrace(); } }

/** * 发送单条短信。 * * 取出任务并执行,如果失败,放回任务列表。 * * @param taskQueue * @param task */ @SuppressWarnings({ "rawtypes", "unchecked" }) private void executeSingleTask(TaskQueue taskQueue, String task) { try { // do the job String smsId = task; Map<String,String> sms = smsSendService.getSmsList(smsId);

smsSendService.send(sms);

smsSendService.updateSmsStatus(task,SmsSendService.STATUS_SENT);

String opType = "2"; TaskQueueUtil.taskLog(taskQueue.getName(), opType, task); } catch (Throwable e) { if(task!=null){ taskQueue.pushTask(task); smsSendService.updateSmsStatus(task,SmsSendService.STATUS_WAIT); if(logger.isDebugEnabled()){ logger.error(String.format("任务%s执行失败:%s,重新放回队列", task, e.getMessage())); } }else { e.printStackTrace(); } } }

}

这部分代码是固定模式,而且不这样做存在重大缺陷,会有任务执行失败,被丢弃,这部分代码应该写到队列实现中。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2016-04-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 nginx 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档