基于Redis实现分布式消息队列(二)

1、访问Redis的工具类 public class RedisManager {

private static Pool<Jedis> pool;

protected final static Logger logger = Logger.getLogger(RedisManager.class);

static{ try { init(); } catch (Exception e) { e.printStackTrace(); } }

public static void init() throws Exception {

Properties props = ConfigManager.getProperties("redis"); logger.debug("初始化Redis连接池。"); if(props==null){ throw new RuntimeException("没有找到redis配置文件"); } // 创建jedis池配置实例 JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); // 设置池配置项值 int poolMaxTotal = Integer.valueOf(props.getProperty("redis.pool.maxTotal").trim()); jedisPoolConfig.setMaxTotal(poolMaxTotal);

int poolMaxIdle = Integer.valueOf(props.getProperty("redis.pool.maxIdle").trim()); jedisPoolConfig.setMaxIdle(poolMaxIdle);

long poolMaxWaitMillis = Long.valueOf(props.getProperty("redis.pool.maxWaitMillis").trim()); jedisPoolConfig.setMaxWaitMillis(poolMaxWaitMillis);

logger.debug(String.format("poolMaxTotal: %s , poolMaxIdle : %s , poolMaxWaitMillis : %s ", poolMaxTotal,poolMaxIdle,poolMaxWaitMillis));

// 根据配置实例化jedis池 String connectMode = props.getProperty("redis.connectMode"); String hostPortStr = props.getProperty("redis.hostPort");

logger.debug(String.format("host : %s ",hostPortStr)); logger.debug(String.format("mode : %s ",connectMode));

if(StringUtils.isEmpty(hostPortStr)){ throw new OptimusException("redis配置文件未配置主机-端口集"); } String[] hostPortSet = hostPortStr.split(","); if("single".equals(connectMode)){ String[] hostPort = hostPortSet[0].split(":"); pool = new JedisPool(jedisPoolConfig, hostPort[0], Integer.valueOf(hostPort[1].trim())); }else if("sentinel".equals(connectMode)){ Set<String> sentinels = new HashSet<String>(); for(String hostPort : hostPortSet){ sentinels.add(hostPort); } pool = new JedisSentinelPool("mymaster", sentinels, jedisPoolConfig); } }

/** * 使用完成后,必须调用 returnResource 还回。 * @return 获取Jedis对象 */ public static Jedis getResource(){ Jedis jedis = pool.getResource(); if(logger.isDebugEnabled()){ logger.debug("获得链接:" + jedis); } return jedis; }

/** * 获取Jedis对象。 * * 用完后,需要调用returnResource放回连接池。 * * @param db 数据库序号 * @return */ public static Jedis getResource(int db){ Jedis jedis = pool.getResource(); jedis.select(db); if(logger.isDebugEnabled()){ logger.debug("获得链接:" + jedis); } return jedis; }

/** * @param jedis */ public static void returnResource(Jedis jedis){ if(jedis!=null){ pool.returnResource(jedis); if(logger.isDebugEnabled()){ logger.debug("放回链接:" + jedis); } } }

/** * 需要通过Spring确认这个方法被调用。 * @throws Exception */ public static void destroy() throws Exception { pool.destroy(); } }

这个类没有通过技术手段强制调用returnResource和destroy,需要想想办法。

2、队列接口 public interface TaskQueue {

/** * 获取队列名 * @return */ String getName();

/** * 往队列中添加任务 * @param task */ void pushTask(String task);

/** * 从队列中取出一个任务 * @return */ String popTask();

}

用String类型描述任务,也可以考虑byte[],要求对每个任务描述的数据尽可能短。

3、队列的Redis实现类 /** * 任务队列Redis实现。 * * 采用每次获取Jedis并放回pool的方式。 * 如果获得Jedis后一直不放手,反复重用,两个操作耗时可以降低1/3。 * 暂时先忍受这种低性能,不明确Jedis是否线程安全。 * */ public class TaskQueueRedisImpl implements TaskQueue {

private final static int REDIS_DB_IDX = 9;

private final static Logger logger = Logger.getLogger(TaskQueueRedisImpl.class);

private final String name;

/** * 构造函数。 * * @param name */ public TaskQueueRedisImpl(String name) { this.name = name; }

/* (non-Javadoc) * @see com.gwssi.common.mq.TaskQueue#getName() */ public String getName() { return this.name; } /* (non-Javadoc) * @see com.gwssi.common.mq.TaskQueue#pushTask(String) */ public void pushTask(String task) { Jedis jedis = null; try{ jedis = RedisManager.getResource(REDIS_DB_IDX); jedis.lpush(this.name, task); }catch(Throwable e){ logger.error(e.getMessage(),e); }finally{ if(jedis!=null){ RedisManager.returnResource(jedis); } } }

/* (non-Javadoc) * @see com.gwssi.common.mq.TaskQueue#popTask() */ public String popTask() { Jedis jedis = null; String task = null; try{ jedis = RedisManager.getResource(REDIS_DB_IDX); task = jedis.rpop(this.name); }catch(Throwable e){ logger.error(e.getMessage(),e); }finally{ if(jedis!=null){ RedisManager.returnResource(jedis); } } return task; }

}

4、获取队列实例的工具类 /** * <pre> * // 获得队列 * TaskQueue tq = TaskQueueManager.get(TaskQueueManager.SMS_QUEUE); * * // 添加任务到队列 * String task = "task id"; * tq.pushTask(task); * * // 从队列中取出任务执行 * String taskToDo = tq.popTask(); * </pre> * @author liuhailong */ public class TaskQueueManager {

protected final static Logger logger = Logger.getLogger(TaskQueueManager.class);

private static Map<String, TaskQueueRedisImpl> queneMap = new ConcurrentHashMap<String, TaskQueueRedisImpl>();

/** * 短信队列名。 */ public static final String SMS_QUEUE = "SMS_QUEUE";

/** * 规则队列名。 */ public static final String RULE_QUEUE = "RULE_QUEUE";

private static void initQueneMap() { logger.debug("初始化任务队列..."); queneMap.put(RULE_QUEUE, new TaskQueueRedisImpl(RULE_QUEUE)); logger.debug("建立队列:"+RULE_QUEUE); queneMap.put(SMS_QUEUE, new TaskQueueRedisImpl(SMS_QUEUE)); logger.debug("建立队列:"+SMS_QUEUE); }

static { initQueneMap(); }

public static TaskQueue get(String name){ return getRedisTaskQueue(name); }

public static TaskQueue getRedisTaskQueue(String name){ return queneMap.get(name); }

}

和具体的队列过于紧耦合,但简单好用。 先跑起来再说。

5、向队列中添加任务的代码 TaskQueue tq = TaskQueueManager.get(TaskQueueManager.SMS_QUEUE); tq.pushTask(smsMessageId);1 6、从队列中取出任务执行的代码 public class SmsSendTask{

protected final static Logger logger = Logger.getLogger(SmsSendTask.class);

protected static SmsSendService smsSendService = new SmsSendServiceUnicomImpl(); /** * 入口方法。 */ public void execute() { TaskQueue taskQueue = null; String task = null; try { taskQueue = TaskQueueManager.get(TaskQueueManager.SMS_QUEUE);

// 非线程安全 Set<Serializable> executedTaskSet = new HashSet<Serializable>();

task = taskQueue.popTask(); while(task!=null){ // 判断是否把所有任务都执行一遍了,避免死循环 if(executedTaskSet.contains(task)){ taskQueue.pushTask(task); break; }

executeSingleTask(taskQueue,task);

task = taskQueue.popTask(); } }catch(Throwable e){ logger.error(e.getMessage(),e); e.printStackTrace(); } }

/** * 发送单条短信。 * * 取出任务并执行,如果失败,放回任务列表。 * * @param taskQueue * @param task */ @SuppressWarnings({ "rawtypes", "unchecked" }) private void executeSingleTask(TaskQueue taskQueue, String task) { try { // do the job String smsId = task; Map<String,String> sms = smsSendService.getSmsList(smsId);

smsSendService.send(sms);

smsSendService.updateSmsStatus(task,SmsSendService.STATUS_SENT);

String opType = "2"; TaskQueueUtil.taskLog(taskQueue.getName(), opType, task); } catch (Throwable e) { if(task!=null){ taskQueue.pushTask(task); smsSendService.updateSmsStatus(task,SmsSendService.STATUS_WAIT); if(logger.isDebugEnabled()){ logger.error(String.format("任务%s执行失败:%s,重新放回队列", task, e.getMessage())); } }else { e.printStackTrace(); } } }

}

这部分代码是固定模式,而且不这样做存在重大缺陷,会有任务执行失败,被丢弃,这部分代码应该写到队列实现中。

原文发布于微信公众号 - nginx(nginx-study)

原文发表时间:2016-04-15

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Hongten

java的poi技术读取Excel[2003-2007,2010]

这篇blog主要是讲述java中poi读取excel,而excel的版本包括:2003-2007和2010两个版本, 即excel的后缀名为:xls和xlsx。

1.4K20
来自专栏程序员宝库

徒手撸框架---实现 Aop

原文:犀利豆的博客(https://www.xilidou.com/2018/01/13/spring-aop/) 上一讲我们讲解了 Spring 的 IoC ...

313120
来自专栏个人分享

MongoDB项目中常用方法

  前台通过DBCollection 根据名称获取相应的value 然后加入list中~

18330
来自专栏码匠的流水账

聊聊eureka server的RemoteRegionRegistry

本文主要研究下eureka server的RemoteRegionRegistry

11610
来自专栏菩提树下的杨过

struts2: 玩转 rest-plugin

近期使用struts2的rest-plugin,参考官方示例struts2-rest-showcase,做了一个restful service小项目,但官网提供...

33250
来自专栏javathings

稀有名词解释——Java 堆污染(犄角旮旯问题)

所谓堆污染,简单的说就是当一个泛型类型变量赋值给不是泛型类型变量,这种错误在编译期间能被编译器警告,但是可以忽略,直到运行时报错。

39830
来自专栏行者常至

struts2中用jquery、ajax实现下拉框的级联

- 这个地方不添加任何东西,因为我是用的通配符,所以,我在PostAction.java 类中增加了一个postajax()方法(你的方法名叫什么都可以,...

22130
来自专栏架构之路

Spring 数据库连接(Connection)绑定线程(Thread)的实现

最近在看spring事务的时候在想一个问题:spring中的很多bean都是单例的,是非状态的,而数据库连接是一种有状态的对象,所以spring一定在创建出co...

39630
来自专栏Java后端技术

使用Java客户端对Redis进行操作

  上篇文章我们介绍了如何在centos7下面进行安装单机版redis以及redis集群。这篇文章,我们来聊一聊如何使用java客户端来进行操作redis。我们...

7620
来自专栏Spark学习技巧

textFile构建RDD的分区及compute计算策略

1,textFile A),第一点,就是输入格式,key,value类型及并行度的意义。 def textFile( path: String, mi...

26570

扫码关注云+社区

领取腾讯云代金券