前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >延迟消息处理

延迟消息处理

作者头像
用户1215919
发布2021-12-28 12:37:48
7980
发布2021-12-28 12:37:48
举报
文章被收录于专栏:大大的微笑大大的微笑

之前有这样一个需求,运营在后端配置一条系统消息或者营销活动等类型的消息等到了需要推送的时间以后会自动的将消息推送给用户APP端显示,一开始是采用的任务调度的方式(定时器),通过轮询扫表去做,因为具体什么时候推送消息没有固定的频率,固定的时间,因此需要每分钟扫表以避免消息在指定时间内未及时推送给APP端内.所以每次都是1分钟扫描一次,太过于频繁。所以不太适合(定时器适合那种固定频率或时间段处理)。

因此这里选取了几种延迟发送的方式:

1.rabbitMQ

2.redis

3.DelayedQueue(慎用)

代码部分(发送端):

代码语言:javascript
复制
/**
 * 提供了一个公有的方法
 */
public interface ISysMessageDelayProcessor {
    long FIVE_MINUTES = 5 * 60 * 1000;
    /**
     * 发送消息的处理
     * @param msg<按需自行封装处理>
     * @param pushDate<推送时间>
     */
    void sendMessage(Object msg, LocalDateTime pushDate);

}
代码语言:javascript
复制
/**
 * 基于RabbitMQ的实现方式(需要下载rabbitMQ插件)
 * 
 * 
 */
@Slf4j
@EnableBinding(SysMessageSink.class)
public class SysMessageRabbitMQDelayProcessorImpl implements ISysMessageDelayProcessor {

    @Autowired
    private BinderAwareChannelResolver resolver;

    @Override
    public void sendMessage(Object msg LocalDateTime pushDate) {
        resolver.resolveDestination(MQTopicConstant.SYS_MESSAGE_SEND_DELAY_TOPIC_PRODUCER)
                .send(MessageBuilder.withPayload(msg)
                        .setHeader("x-delay",
                                Duration.between(LocalDateTime.now(), pushDate)
                                        .toMillis())
                        .build());
    }

}
代码语言:javascript
复制
#配置系统消息的延迟发送
spring.cloud.stream.bindings.your-topic-producer.destination=your-topic
spring.cloud.stream.rabbit.bindings.your-topic-producer.producer.delayed-exchange=true
spring.cloud.stream.bindings.your-topic-consumer.destination=your-topic
spring.cloud.stream.rabbit.bindings.your-topic-consumer.consumer.delayed-exchange=true
spring.cloud.stream.bindings.your-topic-consumer.group=your-topic-group
代码语言:javascript
复制
/**
 * 
 * 基于redis的实现
 * 
 */
public class SysMessageRedisDelayProcessorImpl implements ISysMessageDelayProcessor {
    @Autowired
    private RedisTemplate redisTemplate;


    @Override
    public void sendMessage(Object msg, LocalDateTime pushDate) {
        redisTemplate.opsForZSet().add(MQTopicConstant.SYS_MESSAGE_QUERY_DELAY_TOPIC,msg,
                  pushDate.toInstant(ZoneOffset.of("+8")).toEpochMilli());
    }


}
代码语言:javascript
复制
/**
 * 是一种补备用方案,当不满足redis,rabbitMQ的场景的时候使用
 * 是一种基于内存的方式,一旦宕机,或者重启那么内存中的数据就会丢失
 * 慎用!
 */
public class SysMessageDelayedQueueProcessorImpl implements ISysMessageDelayProcessor, Delayed {
    private LocalDateTime executeTime;
    private Object data;
    // send queue
    private static final DelayQueue<SysMessageDelayedQueueProcessorImpl> sendDelayQueue =
            new DelayQueue<>();
    // query queue
    private static final DelayQueue<SysMessageDelayedQueueProcessorImpl> queryDelayQueue =
            new DelayQueue<>();

    public SysMessageDelayedQueueProcessorImpl() {
        new Thread(new SysMessageDelayedQueueProcessorListener(sendDelayQueue, queryDelayQueue)).start();
    }

    public SysMessageDelayedQueueProcessorImpl(LocalDateTime executeTime, Object data) {
        this.executeTime = executeTime;
        this.data = data;
    }


    @Override
    public void sendMessage(Object msg, LocalDateTime pushDate) {
        sendDelayQueue.offer(new SysMessageDelayedQueueProcessorImpl(pushDate, msg));
    }


    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(Duration.between(LocalDateTime.now(), executeTime).toMillis(),
                TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }
}

接收端:

代码语言:javascript
复制
/**
 * 监听
 */
public abstract class ISysMessageDelayedListener implements Runnable {

    protected static final LinkedBlockingQueue<SysMessageVO> SEND_QUEUE =
            new LinkedBlockingQueue(1000);

    @Override
    public void run() {
        sendProcessor();
    }

    /**
     * 监听发送方法
     */
    public abstract void sendProcessor();
}
代码语言:javascript
复制
/**
 * 只用来监听MQ延迟队列推送过来的数据包,及转发数据包
 * 不做其他业务处理
 *
 */
@Component
@EnableBinding(SysMessageSink.class)
@Slf4j
public class SysMessageRabbitMQDelayedProcessorListener extends ISysMessageDelayedListener {
    @Autowired
    private SysMessageQueryProcessor sysMessageQueryProcessor;

    private static final LinkedBlockingQueue<SysMessageVO> SEND_QUEUE =
            new LinkedBlockingQueue(1000);


    /**
     * 接受发送的数据
     *
     * @param 
     */
    @StreamListener(MQTopicConstant.SYS_MESSAGE_SEND_DELAY_TOPIC_CONSUMER)
    public void onSendHandle(Object msg) {
       
        try {
            // put
            SEND_QUEUE.put(sysMessage);
        } catch (InterruptedException e) {
            log.error("caught onSendHandle invoke fail,e:", e);
        }

    }

    @Override
    public void sendProcessor() {

    }
}
代码语言:javascript
复制
/**
 * redis监听处理
 */
@Slf4j
public class SysMessageRedisDelayedProcessorListener extends ISysMessageDelayedListener {
    private static final String setNX = "lock:sysmessage:delay";
    public static final int LOCK_EXPIRE = 300; // ms

    @Autowired
    private RedisTemplate redisTemplate;

    public SysMessageRedisDelayedProcessorListener() {
        new Thread(new SysMessagePushWork(SEND_QUEUE)).start();
    }

    /**
     * 监听是否有到期的数据
     */
    private void monitorSendQueue() {
        while (true) {
            if (lock()) {
                Set<ZSetOperations.TypedTuple<Object>> set =
                        redisTemplate.opsForZSet().rangeWithScores(MQTopicConstant.SYS_MESSAGE_SEND_DELAY_TOPIC, 0, 0);
                Iterator<ZSetOperations.TypedTuple<Object>> iterator = set.iterator();
                while (iterator.hasNext()) {
                    ZSetOperations.TypedTuple<Object> next = iterator.next();
                    consumer(MQTopicConstant.SYS_MESSAGE_SEND_DELAY_TOPIC, next);
                }
            }
        }
    }

    /**
     * 获取分布式琐
     *
     * @return
     */
    private boolean lock() {
        try {
            long expireAt = System.currentTimeMillis() + LOCK_EXPIRE + 1;
            return (Boolean) this.redisTemplate.execute((RedisCallback) connection -> {
                Boolean acquire = connection.setNX(setNX.getBytes(), String.valueOf(expireAt).getBytes());
                if (acquire) {
                    return true;
                }
                byte[] value = connection.get(setNX.getBytes());
                if (Objects.isNull(value) || value.length <= 0) {
                    return false;
                }
                long expireTime = Long.parseLong(new String(value));
                if (expireTime < System.currentTimeMillis()) {
                    // 如果锁已经过期
                    byte[] oldValue = connection.getSet(setNX.getBytes(),
                            String.valueOf(System.currentTimeMillis() + LOCK_EXPIRE + 1).getBytes());
                    // 防止死锁
                    return Long.parseLong(new String(oldValue)) < System.currentTimeMillis();
                }
                return true;
            });
        } catch (Exception e) {
            log.error("obtain lock option fail, caught exception:", e);
            return false;
        }
    }

    /**
     * 删除过期的数据
     *
     * @param value
     */
    private void removeDataByExpireTime(String key, Object value) {
        redisTemplate.opsForZSet().remove(key, value);
    }

    /**
     * 消费
     *
     * @param next
     */
    private void consumer(String key, ZSetOperations.TypedTuple<Object> next) {
        // processor and remove
        if (!ifExpire(next.getScore())) {
            return;
        }
       if (MQTopicConstant.SYS_MESSAGE_SEND_DELAY_TOPIC.equals(key)) {
            // in queue
            SEND_QUEUE.offer(sysMessage);
        }
        // remove
        removeDataByExpireTime(key, next.getValue());


    }

    /**
     * 过期判断
     *
     * @param expireTime
     * @return
     */
    private boolean ifExpire(Double expireTime) {
        return (expireTime.longValue() + 1000) <= LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli();
    }

    @Override
    public void sendProcessor() {
        // 监听发送队列的变化
        monitorSendQueue();
    }
}
代码语言:javascript
复制
/**
 *
 */
@Slf4j
public class SysMessageDelayedQueueProcessorListener extends ISysMessageDelayedListener {
    private DelayQueue<SysMessageDelayedQueueProcessorImpl> sendDelayQueue;

    public SysMessageDelayedQueueProcessorListener(DelayQueue<SysMessageDelayedQueueProcessorImpl> sendDelayQueue) {
        this.sendDelayQueue = sendDelayQueue;
        new Thread(new SysMessagePushWork(SEND_QUEUE)).start();
    }

  

    @Override
    public void sendProcessor() {
        CompletableFuture.runAsync(() -> {
            while (true) {
                try {
                    // processor
                    SysMessageDelayedQueueProcessorImpl queue = sendDelayQueue.take();
                    if (Objects.isNull(queue)) {
                        continue;
                    }
                   
                    // execute
                    SEND_QUEUE.offer(queue.getData());
                } catch (InterruptedException e) {
                    // 
                }
            }
        });

    }
}
代码语言:javascript
复制
/**
 */
@Configuration
public class SysMessageConfiguration {

    /**
     * 基于rabbitMQ的延迟处理
     * @return
     */
    @Primary
    @ConditionalOnClass(name = "org.springframework.cloud.stream.binding.BinderAwareChannelResolver")
    @Bean
    public SysMessageRabbitMQDelayProcessorImpl createSysMessageRabbitMQDelayProcessor() {

        return new SysMessageRabbitMQDelayProcessorImpl();
    }

//    /**
//     * 基于redis的延迟处理
//     * @return
//     */
//    @ConditionalOnClass(RedisTemplate.class)
//    @ConditionalOnMissingClass("org.springframework.cloud.stream.binding.BinderAwareChannelResolver")
//    @Bean
//    public SysMessageRedisDelayProcessorImpl createSysMessageRedisDelayProcessor() {
//        return new SysMessageRedisDelayProcessorImpl();
//    }

    /**
     * 基于内存的延迟处理
     * @return
     */
    @ConditionalOnMissingClass({"org.springframework.cloud.stream.binding.BinderAwareChannelResolver",
            "org.springframework.data.redis.core.RedisTemplate"})
    @Bean
    public SysMessageDelayedQueueProcessorImpl createSysMessageDelayedQueueProcessor() {
        return new SysMessageDelayedQueueProcessorImpl();
    }
}
代码语言:javascript
复制
   private ISysMessageDelayProcessor sysMessageDelayProcessor;

    @Autowired
    public xxxx(ISysMessageDelayProcessor sysMessageDelayProcessor) {
        this.sysMessageDelayProcessor = sysMessageDelayProcessor;
    }

其他部分业务代码按需处理即可

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档