前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >队列机组件实现

队列机组件实现

作者头像
LiosWong
发布2021-12-01 16:55:55
2310
发布2021-12-01 16:55:55
举报
文章被收录于专栏:后端沉思录后端沉思录

实际的分布式系统中,如何实现生产-消费模型呢?由于该模型通过队列实现,该组件况且称作为队列机。

模型

•生产-消费模型

•线程池模型

对比单机环境下的模型来看,分布式环境的队列机更为复杂,阻塞队列当然不能再使用java提供的工具类,本文使用redis作为队列,使用redis哪种数据类型作为存储方式呢,这里选择zset,不仅仅是为了实现生产消费,结合zset数据结构,而且还可以实现延迟队列。下面介绍延迟队列机的组件实现方式。

代码实现

队列接口
代码语言:javascript
复制
public interface BatchDelayQueue<V> {     /**     * 初始化延迟队列     *     * @param topic 主题     */    void init(String topic);     /**     * 初始化延迟队列     *     * @param topic 主题     */    void init(String topic, Integer batchSize);     /**     * 初始化延迟队列     *     * @param topic         主题     * @param redisTemplate     */    void init(String topic, Integer batchSize, RedisTemplate redisTemplate);     /**     * 新增元素到队列中     *     * @param e   加入的元素     * @param delay 延时时间,单位为毫秒     * @return 添加结果     */    boolean add(V e, long delay);     /**     * 批量获取符合元素     *     * @param time 超时时间,单位为毫秒     * @return 返回并删除符合时间要求的元素     * @throws Exception 异常情况     */    List<QueueTask<V>> poll(long time) throws Exception;     /**     * 删除元素     */    Long remove(V e);}
队列实现类
代码语言:javascript
复制
@Slf4jpublic class ExtBatchDelayQueue implements BatchDelayQueue<String> {     private String topic;     private Lock redisLock;     private RedisTemplate<String, String> redisTemplate;     private Integer batchSize;     public static final Integer DEFAULT_BATCH_SIZE = 50;     public static final Integer DEFAULT_MAX_BATCH_SIZE = 100;     @Override    public void init(String topic) {         this.init(topic, DEFAULT_BATCH_SIZE);    }     // 可以指定批量获取数目    @Override    public void init(String topic, Integer batchSize, RedisTemplate redisTemplate) {        this.topic = topic;        this.redisTemplate = redisTemplate;        this.batchSize = batchSize == null ? DEFAULT_BATCH_SIZE            : (batchSize > DEFAULT_MAX_BATCH_SIZE ? DEFAULT_MAX_BATCH_SIZE : batchSize);        RedisLockRegistry redisLockRegistry = new RedisLockRegistry(redisTemplate.getConnectionFactory(), "lock");        this.redisLock = redisLockRegistry.obtain(topic);    }     @Override    public List<QueueTask<String>> poll(long timeout) throws Exception {         boolean lock = false;        try {            // 加分布式锁,保证同一时间只有一台机器poll任务            lock = redisLock.tryLock(timeout, TimeUnit.MILLISECONDS);            if (lock) {                // 获取任务                Set<TypedTuple<String>> sets = redisTemplate.opsForZSet()                    .rangeByScoreWithScores(topic, 0, System.currentTimeMillis(), 0, batchSize);                if (CollectionUtils.isEmpty(sets)) {                    return null;                }                List<QueueTask<String>> tasks = sets.stream()                    .map(p -> QueueTask.<String>builder().score(p.getScore()).value(p.getValue()).build())                    .collect(Collectors.toList());                try {                    // 删除                    redisTemplate.opsForZSet()                        .remove(topic, sets.stream().map(TypedTuple::getValue).toArray());                } catch (Exception e) {                    log.error("ExtRedisBatchDelayQueue poll remove error.", e);                }                return tasks;            }        } catch (InterruptedException e) {            log.error("ExtRedisBatchDelayQueue poll error.", e);            throw e;        } finally {            if (lock) {                redisLock.unlock();            }        }        return null;    }     @Override    public boolean add(String e, long delay) {         return redisTemplate.opsForZSet().add(topic, e, System.currentTimeMillis() + delay);    }}
Consumer
代码语言:javascript
复制
/*
* 提示:该行代码过长,系统自动注释不进行高亮。一键复制会移除系统注释 
* public class DelayQueueBatchBootstrap {     /**     * 队列     */    private BatchDelayQueue delayQueue;     /**     * 批量拉取任务数,不大于100     */    private int batchSize;     /**     * 队列唯一名称     */    private String delayQueueKey;     /**     * redis     */    private RedisTemplate<String, String> redisTemplate;     /**     * 队列监控(消费速度、队列大小、延迟时间)     */    private DelayQueueMonitor delayQueueMonitor;     /**     * volatile关键字防止构造函数溢出     */    private volatile ExecutorService HANDLE_DELAY_TASK_EXECUTOR;     private final AtomicBoolean RUNNING = new AtomicBoolean(true);     private final ExecutorService DELAY_QUEUE_POLL_EXECUTOR = new ThreadPoolExecutor(1, 1,        60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1),        new ThreadFactoryBuilder().setNameFormat("delayQueuePollThread-%d").build());     private DelayQueueBatchBootstrap() {     }     public DelayQueueBatchBootstrap(String delayQueueKey, BatchDelayQueue delayQueue,        RedisTemplate<String, String> redisTemplate, boolean queueMonitor) {         this(delayQueueKey, delayQueue, redisTemplate, new DefaultDelayQueueMonitor(queueMonitor, queueMonitor),            DEFAULT_BATCH_SIZE);    }     public DelayQueueBatchBootstrap(String delayQueueKey, BatchDelayQueue delayQueue,        RedisTemplate<String, String> redisTemplate, boolean queueMonitor, long delayThreshold) {         this(delayQueueKey, delayQueue, redisTemplate,            new DefaultDelayQueueMonitor(queueMonitor, queueMonitor, delayThreshold),            DEFAULT_BATCH_SIZE);    }     public DelayQueueBatchBootstrap(String delayQueueKey, BatchDelayQueue delayQueue,        RedisTemplate<String, String> redisTemplate, DelayQueueMonitor delayQueueMonitor) {         this(delayQueueKey, delayQueue, redisTemplate, delayQueueMonitor, DEFAULT_BATCH_SIZE);    }     public DelayQueueBatchBootstrap(String delayQueueKey, BatchDelayQueue delayQueue,        RedisTemplate<String, String> redisTemplate, DelayQueueMonitor delayQueueMonitor, Integer batchSize) {         this(delayQueueKey, delayQueue, redisTemplate, delayQueueMonitor, batchSize, null);    }     public DelayQueueBatchBootstrap(String delayQueueKey,        BatchDelayQueue delayQueue, RedisTemplate<String, String> redisTemplate,        DelayQueueMonitor delayQueueMonitor, Integer batchSize,        ExecutorService handleDelayTaskExecutorDefault) {         this.delayQueueKey = delayQueueKey;        this.redisTemplate = redisTemplate;        this.batchSize = batchSize;        this.delayQueue = delayQueue;        this.HANDLE_DELAY_TASK_EXECUTOR = handleDelayTaskExecutorDefault;        this.delayQueueMonitor =            delayQueueMonitor == null ? new DefaultDelayQueueMonitor(false, false) : delayQueueMonitor;    }     /**     * spring容器加载所有的bean之后触发事件时开始poll     *     * @see DelayQueueHandlerLoader 保证所有的Handler加载完成,避免任务丢失     */    @EventListener(value = ContextRefreshedEvent.class)    public void init() {         delayQueue.init(delayQueueKey, batchSize, redisTemplate);        delayQueueMonitor.schedule(delayQueueKey, redisTemplate);        addShutDownHook();        DELAY_QUEUE_POLL_EXECUTOR.execute(() -> {            while (RUNNING.get()) {                List<QueueTask<String>> elementFromDelayQueue = null;                try {                    elementFromDelayQueue = delayQueue.poll(1000);                    if (CollectionUtils.isEmpty(elementFromDelayQueue)) {                        Thread.sleep(100);                        continue;                    }                    elementFromDelayQueue.iterator().forEachRemaining(p -> {                        try {                            // 线程池执行                            executorService                                .submit(() -> {                                ...                                处理具体业务逻辑                                ...                                });                                // 监控                            // delayQueueMonitor                        } catch (Exception e) {                            log.error("delayQueueHandle exec task fail.[{}]", p, e);                        }                    });                } catch (RejectedExecutionException rejectedExecutionException) {                    log.error("delayQueueHandle failed, reject execution! element.[{}]",                        elementFromDelayQueue, rejectedExecutionException);                } catch (Exception e) {                    if (RUNNING.get()) {                        log.error("delayQueueHandle failed, element.[{}]", elementFromDelayQueue, e);                    } else {                        log.warn("delayQueueQueue not running.");                    }                }            }        });    }     private void addShutDownHook() {         Runtime.getRuntime().addShutdownHook(            new Thread(() -> {              // 优雅关停               ...               ...            }));    }     public boolean putElement(Object bizElement, long delay) {         try {            delayQueue.add(bizElement, delay > 0 ? delay : 8000);            log.info("put element into delay queue,element.[{}], delayMillis.[{}]",                elementJson, delay);            return true;        } catch (Exception e) {            log.error("put element into delay queue failed, element.[{}], delayMillis.[{}]",                elementJson, delay, e);            return false;        }    }}
*/
Producer
代码语言:javascript
复制
@Configurationpublic class DelayQueueConfig {      @Bean(name = "delayQueue")    public DelayQueueBatchBootstrap delayQueue() {         return new DelayQueueBatchBootstrap("xxxxx", new ExtBatchDelayQueue(),            redisTemplate, delayQueueMonitor(), "批量拉取大小", "指定线程池)";    }      private DelayQueueMonitor delayQueueMonitor() {         return new DelayQueueMonitor() {             @Override            public boolean logSwitch() {                 return true;            }             @Override            public boolean taskCostSwitch() {               return true;            }        };    } } 
说明

1.由于篇幅的原因,很多代码没有贴出来,有很多其他功能,上面有添加、拉取任务的方法,还有比如批量添加、批量删除、阻塞拉取任务、判断延迟任务队列大小、判断任务是否存在、以及获取任务延迟时间等等。2.支持批量拉取,以及延迟队列监控,线程池消费,优雅关停等等。

总结

上面只是通过延迟队列去实现队列机,其实可以根据该思路,实现分布式环境下的生产-消费模型组件。

1.使用时,注意任务较多时,应该队列隔离开来,不同业务使用单独的队列,否则会导致任务积压,任务延迟处理,影响实际业务。2.消费时,使用线程池消费,可以提高消费速率,不过也要通过具体的业务量,计算线程池大小,否则会造成线程池资源耗尽。

实际业务场景中,大量使用该模型解决业务需求。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-11-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 后端沉思录 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 模型
  • 代码实现
    • 队列接口
      • 队列实现类
        • Consumer
          • Producer
            • 说明
            • 总结
            相关产品与服务
            云数据库 Redis
            腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档