前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >我攻克的技术难题:自定义延时消息队列

我攻克的技术难题:自定义延时消息队列

原创
作者头像
GoBoy
发布2024-01-29 08:27:08
1800
发布2024-01-29 08:27:08
举报
文章被收录于专栏:GoboyGoboy

消息队列是一种异步的服务间通信方式,适用于无服务器和微服务架构。消息在被处理和删除之前一直存储在队列上。每条消息仅可被一位用户处理一次。消息队列可被用于分离重量级处理、缓冲或批。

假设场景:在业务中,我们异步调用了其他服务A服务A处理成功完成后,回调到主业务流程正常处理完成;

服务A出现异常时;主业务通常是不知道,会一直等待服务A的回调处理,线程得不到释放,引发线上故障,这个时候,我们就需要在主业务中,增加超时机制,来保证主业务流程不受到其他业务的影响。

以下关于延迟消息的处理分别围绕:业务事件,超时时间,业务类型,业务阶段,回调处理 五个主体功能进行实践。

构建延时消息表

创建超时处理消息表,用户记录业务事件相关的信息。

代码语言:java
复制
/** 创建超时处理消息表 */
@Data
@Builder
@ToString
@NoArgsConstructor
@AllArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL)
@TableName(value = "app_delay_message")
public class AppDelayMessage {
    /** ID */
    private String id;
    /** 应用ID */
    private String appId;
    /** 超时时长(H) */
    private Integer ttl;
    /** type1, type2*/
    private Type type;
    /** 编译:COMPILE; 测试:TEST; */
    private Stage stage;
    /** 待处理:PENDING; 已处理:PROCESSED; 超时:TIMEOUT; 无效:INVALID; */
    private Status status;
    /** 备考 */
    private String remark;
    /** 回调函数 */
    private String callback;
    /** 创建时间 */
    private LocalDateTime createTime;
    /** 修改时间 */
    private LocalDateTime modifyTime;
    /** 删除标 */
    private String deleteFlg;

    /** 场景类型 */
    public enum Type {
        /** 场景1 */
        TYPE1,
        /** 场景2 */
        TYPE2
    }

    /** 阶段 */
    public enum Stage {
        /** 编译 */
        REAL_COMPILE
    }

    /** 状态 */
    public enum Status {
        /** 待处理 */
        PENDING,
        /** 已处理 */
        PROCESSED,
        /** 超时 */
        TIMEOUT,
        /** 无效 */
        INVALID
    }
}

延迟消息消费者

消息消费者由 DelayQueueConsumer 创建,启动异步线程用于消费以超时的消息,方法中设置延迟队列和超时错误处理。

用于记录消息和更新消息状态。

代码语言:javascript
复制
/**延迟消息消费者 */
@Slf4j
@Component
public class DelayQueueConsumer implements Runnable {

    /** 延迟队列 */
    private DelayQueue<DelayMessage> delayQueue;

    /**
     * 设置延迟队列
     * @param delayQueue 延迟队列
     */
    public void setDelayQueue(DelayQueue<DelayMessage> delayQueue) {
        this.delayQueue = delayQueue;
    }

    /** 超时消息处理服务 */
    @Autowired
    private AppDelayMessageService service;

    @Override
    public void run() {
        while (true) {
            try {
                log.info("@@ 启动异步线程 [{}] 消费以超时的消息", Thread.currentThread().getName());
                // 如果暂时没有过期消息或者队列为空, 则 take 方法会被阻塞, 直到有过期的消息为止
                DelayMessage delayMessage = delayQueue.take();
                AppDelayMessage message = JSON.parseObject(delayMessage.getMessage(), AppDelayMessage.class);
                // 处理 TIMEOUT 异常
                handleTimeoutError(message);
                log.info("@@ 以消费消息:{}", delayMessage.getMessage());
            } catch (InterruptedException e) {
                log.error("@@ 线程 [{}] 消费消息异常", Thread.currentThread().getName(), e);
            }
        }
    }

    /**
     * 超时错误处理
     * @param message 消息内容
     */
    @Transactional
    public void handleTimeoutError(AppDelayMessage message) {
        log.info("@@ 处理超时错误, AppDelayMessage:{}", message);

        // 更新消息状态 [PENDING -> TIMEOUT]
        boolean update = service.lambdaUpdate()
            .set(AppDelayMessage::getStatus, AppDelayMessage.Status.TIMEOUT)
            .set(AppDelayMessage::getModifyTime, LocalDateTime.now())
            .eq(AppDelayMessage::getId, message.getId())
            .eq(AppDelayMessage::getStatus, AppDelayMessage.Status.PENDING)
            .update();

        if (update) {
            log.info("@@ 处理超时调用回调函数, message:{}", JSON.toJSONString(message));
            service.callback(message);
        }
    }
}

延迟消息生产者

消息生产者由 DelayQueueProducer 创建,并用于将消息发送到 DelayQueue队列中。可以调用消息生产者的方法(offer

或 obtainQueue 方法)创建延迟消息队列入队列和获取延迟消息队列。

代码语言:javascript
复制
/** 延迟消息生产者 */
@Slf4j
@Component
public class DelayQueueProducer {

    /** 创建延迟消息队列 */
    private static final DelayQueue<DelayMessage> DELAY_QUEUE = new DelayQueue<>();

    /**
     * 消息入队列
     * @param delayMessage 消息内容
     * @return 成功:{@code true}, 失败:{@code false}
     */
    public boolean offer(DelayMessage delayMessage) {
        return DELAY_QUEUE.offer(delayMessage);
    }

    /**
     * 获取延迟消息队列
     * @return {@link DelayQueue<DelayMessage>}
     */
    public DelayQueue<DelayMessage> obtainQueue() {
        return DELAY_QUEUE;
    }
}

延迟消息体

构建消息的结构体内容,设置统一标准的消息格式和自定义超时时间的范围。

代码语言:javascript
复制
/** 延迟消息体 */
@Data
public class DelayMessage implements Delayed {

    /** 消息内容 */
    private String message;   // 延迟任务中的任务数据
    /** ttl */
    private long ttl;         // 延迟任务到期时间(过期时间)

    /**
     * 构造函数
     * @param message 消息实体
     * @param ttl     延迟时间,单位毫秒
     */
    public DelayMessage(String message, long ttl) {
        setMessage(message);
        this.ttl = System.currentTimeMillis() + ttl;
    }

    /**
     * 获取消息触发剩余时间
     * @param unit the time unit
     * @return {@link long}
     */
    @Override
    public long getDelay(TimeUnit unit) {
        // 计算该任务距离过期还剩多少时间
        long remaining = ttl - System.currentTimeMillis();
        return unit.convert(remaining, TimeUnit.MILLISECONDS);
    }

    /**
     * 比较消息延时时长
     * @param o {@link Delayed}
     * @return 延时时长
     */
    @Override
    public int compareTo(Delayed o) {
        // 比较、排序: 对任务的延时大小进行排序,将延时时间最小的任务放到队列头部
        return (int)(this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }
}

定义回调函数

自定义Callback 注解,定义延时消息回调注解, 将回调类添加改注解 {@link Callback},注入到 Spring Ioc 容器。

代码语言:javascript
复制
/** 定义延时消息回调注解, 将回调类添加改注解 {@link Callback},注入到 Spring Ioc 容器。 */
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface Callback {

    /**
     * The value may indicate a suggestion for a logical component name,
     * to be turned into a Spring bean in case of an autodetected component.
     * @return the suggested component name, if any (or empty String otherwise)
     */
    @AliasFor(annotation = Component.class)
    String value() default "";

}

定义超时接口

提供给实现类完成回调函数处理方法

代码语言:javascript
复制
/** 回调函数接口 */
public interface TimeoutCallback {

    /**
     * 提供给实现类完成回调函数处理方法
     * @param message 消息
     */
    void handle(AppDelayMessage message);

}

延迟消息超时事件

Spring的事件ApplicationEvent为bean和bean之间的消息通信提供了支持。当bean处理完一个事件之后,希望另一个bean能够知道并做相应的处理。这时其他bean监听当前bean所发送的事件。

事件流程如下:

  1. 自己的event需要继承 ApplicationEvent,并且写相应的构造函数
  2. 定义一个监听器listener,监听器(listener)具体根据事件发生的业务处理模块,可以接收处理事件中封装的对象。
  3. 使用ApplicationContext容器发布事件
代码语言:javascript
复制
/** 延迟消息超时事件 */
public class InvokeTimeoutEvent extends ApplicationEvent {

    /**
     * Create a new {@code ApplicationEvent}.
     * @param source the object on which the event initially occurred or with which the event is associated (never
     *               {@code null})
     */
    public InvokeTimeoutEvent(Object source) {
        super(source);
    }
}

延迟消息超时事件监听器

事件监听处理方法

@EventListener 注解,实现对任意的方法都能监听事件。

在任意方法上标注@EventListener 注解,指定 classes,即需要处理的事件类型,一般就是 ApplicationEven 及其子类,可以设置多项。

代码语言:javascript
复制
package com.example.demo.delay;

import com.alibaba.fastjson.JSON;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

import java.util.Objects;

/**
 * 延迟消息超时事件监听器
 */
@Component
public class DelayMessageListener {

    /** 延迟消息生产者 */
    @Autowired
    private DelayQueueProducer producer;
    /** 创建超时消息处理服务接口 */
    @Autowired
    private AppDelayMessageService service;

    /**
     * 事件监听处理方法
     * @param event {@link InvokeTimeoutEvent}
     */
    @EventListener
    public void onApplicationEvent(InvokeTimeoutEvent event) {
        // 监听延迟消息触发事件
        AppDelayMessage source = (AppDelayMessage)event.getSource();
        if (service.save(source)) {
            // 转换为毫秒
            long ttl = 2 * 60 * 60 * 1000;
            if (Objects.nonNull(source.getTtl())) {
                ttl = source.getTtl() * 60 * 60 * 1000;
            }
            producer.offer(new DelayMessage(JSON.toJSONString(source), ttl));
        }
    }
}

以上构建完成了消息的生产,消费,与监听,下面我们进行功能测试。

线程池配置类

  • @EnableAsync 注解:启用了Spring的异步方法执行支持。确保你的应用程序中有 @EnableAsync 生效,否则异步方法可能不会被正确地处理。
  • ThreadPoolConfig 类中的成员变量:coremaxqueuekeepAlive 分别表示核心线程数量、最大线程数、排队线程数和线程回收时间。这些值似乎是通过 @Value 注解从配置文件中读取的。确保在你的配置文件中有这些属性的正确配置。
  • @Bean("toolThreadPool"):这个方法定义了一个名为 "toolThreadPool" 的 Bean,返回一个 ThreadPoolExecutor 对象。该方法使用了 Google Guava 库中的 ThreadFactoryBuilder 来创建一个带有自定义线程名称的线程工厂。
  • 线程池配置:确保你在配置文件中设置了适当的值,以满足你应用的需求。特别是要注意核心线程数量、最大线程数、排队线程数和线程回收时间的设置,这些值应该根据你的应用负载和性能需求来调整。
  • 异步方法的使用:确保你的应用中有异步方法的定义和调用,以便线程池得以发挥作用。
代码语言:javascript
复制
/** 线程池配置类 */
@EnableAsync
@Configuration
public class ThreadPoolConfig {
    /** 核心线程数量 */
    @Value("${import.thread.core}")
    private Integer core;
    /** 最大线程数 */
    @Value("${import.thread.max}")
    private Integer max;
    /** 排队线程数 */
    @Value("${import.thread.queue}")
    private Integer queue;
    /** 线程回收时间 */
    @Value("${import.thread.keepAlive}")
    private Integer keepAlive;

    /**
     * toolsThreadPool
     * @return {@link ThreadPoolExecutor} 线程池
     */
    @Bean("toolThreadPool")
    public ThreadPoolExecutor arxmlThreadPoolExecutor() {
        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("build-tool-%d").build();
        return new ThreadPoolExecutor(core, max, keepAlive, TimeUnit.SECONDS, new LinkedBlockingQueue<>(queue),
                namedThreadFactory);
    }
}

测试消息队列

DelayController

代码语言:javascript
复制
/** 测试类 */
@RestController
public class DelayController {

    @Autowired
    private AppDelayMessageService appDelayMessageService;

    /**发布一个超时的监听*/
    @GetMapping("/testPublishDelay")
    public void testPublishDelay (){
        appDelayMessageService.publish("00000000011111111", 10,
                AppDelayMessage.Type.TYPE1,
                AppDelayMessage.Stage.REAL_COMPILE,
                AppAuditCallback.class);
    }

    /**当业务未超时,修改消息状态*/
    @GetMapping("/testChangeDelay")
    public void testChangeDelay (){
        // 修改延迟消息状态
        appDelayMessageService.changeToProcessed("00000000011111111",
                AppDelayMessage.Stage.REAL_COMPILE);

    }
}

AppDelayMessageService

代码语言:javascript
复制
/** 应用创建超时消息处理服务接口 */
public interface AppDelayMessageService extends IService<AppDelayMessage> {

    /**
     * 发布延迟消息
     * @param appId   应用ID
     * @param timeout 超时时长(H)
     * @param type    TYPE_ONE; TYPE_TWO;
     * @param stage   编译:COMPILE; 测试:TEST;
     * @param callback 回调函数
     */
    void publish(String appId, Integer timeout, AppDelayMessage.Type type, AppDelayMessage.Stage stage, Class callback);

    /**
     * 超时回调
     * @param message 消息体
     */
    void callback(AppDelayMessage message);

    /**
     * 修改延迟消息状态[PROCESSED]
     * @param appId 应用ID
     * @param stage 编译:COMPILE; 测试:TEST;
     * @return 成功:true, 失败:false
     */
    boolean changeToProcessed(String appId, AppDelayMessage.Stage stage);
}

AppDelayMessageServiceImpl

代码语言:javascript
复制
/** 创建超时消息处理服务实现类 */
@Slf4j
@Service
public class AppDelayMessageServiceImpl extends ServiceImpl<AppDelayMessageMapper, AppDelayMessage>
    implements AppDelayMessageService, ApplicationEventPublisherAware {

    /** ApplicationEventPublisher */
    private ApplicationEventPublisher eventPublisher;

    /**
     * 注入事件发布器
     * @param eventPublisher event publisher to be used by this object
     */
    @Override
    public void setApplicationEventPublisher(ApplicationEventPublisher eventPublisher) {
        this.eventPublisher = eventPublisher;
    }

    /**
     * 发布延迟消息
     * @param appId    应用ID
     * @param timeout  超时时长(H)
     * @param type     FOTA; SOTA;
     * @param stage    编译工程:COMPILE; 测试配置:TEST;
     * @param callback 回调函数
     */
    @Async("toolThreadPool")
    @Override
    public void publish(String appId, Integer timeout, AppDelayMessage.Type type, AppDelayMessage.Stage stage,
        Class callback) {
        log.info("@@ 发布延迟消息, appId:{}, type:{}, stage:{}", appId, type, stage);
        AppDelayMessage message = AppDelayMessage.builder()
            .appId(appId)
            .ttl(timeout)
            .type(type)
            .stage(stage)
            .callback(callback.getSimpleName())
            .build();
        // 发布延时消息时间事件
        InvokeTimeoutEvent event = new InvokeTimeoutEvent(message);
        eventPublisher.publishEvent(event);
    }

    /**
     * 超时回调
     * @param message 消息体
     */
    @Async("toolThreadPool")
    @Override
    public void callback(AppDelayMessage message) {
        log.info("@@ 超时回调函数处理, message:{}", JSON.toJSONString(message));
        // 发布延时消息时间事件
        Invoke111TimeoutCallbackEvent event = new Invoke111TimeoutCallbackEvent(message);
        eventPublisher.publishEvent(event);
    }

    /**
     * 修改延迟消息状态
     * @param appId 应用ID
     */
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    @Override
    public boolean changeToProcessed(String appId, AppDelayMessage.Stage stage) {
        log.info("@@ 修改延迟消息状态, appId:{}, Stage:{}", appId, stage);

        // 获取消息
        AppDelayMessage one = lambdaQuery()
            .eq(AppDelayMessage::getAppId, appId)
            .eq(AppDelayMessage::getStage, stage)
            .orderByDesc(AppDelayMessage::getCreateTime)
            .last("limit 1")
            .one();

        // 无消息数据跳过处理
        if (Objects.isNull(one)) {
            return Boolean.TRUE;
        }

        // 消息已超时
        if (AppDelayMessage.Status.TIMEOUT.equals(one.getStatus())) {
            log.error("@@ 接口调用超时, 消息内容:{}", JSON.toJSONString(one));
            return Boolean.FALSE;
        }

        // 修改状态为[以处理]
        lambdaUpdate()
            .set(AppDelayMessage::getStatus, AppDelayMessage.Status.PROCESSED)
            .set(AppDelayMessage::getModifyTime, LocalDateTime.now())
            .eq(AppDelayMessage::getId, one.getId())
            .update();
        return Boolean.TRUE;
    }
}

AppDelayMessageMapper

代码语言:javascript
复制
/** 创建超时消息处理 Mapper */
public interface AppDelayMessageMapper extends BaseMapper<AppDelayMessage> {
}

超时处理

使用自定义的@Callback 注入,实现自定义TimeoutCallback接口。根据回调接口,对自定义的回调方法做超时处理。

当发生业务流程发生超时,使用自定义的回调函数,对超时的问题进行处理。

代码语言:javascript
复制
/** 超时处理 */
@Callback("AppAuditCallback")
@Slf4j
public class AppAuditCallback implements TimeoutCallback {

    /** 超时消息 */
    private static final String MESSAGE = "{}超时, 请重新审核应用";

    @Override
    public void handle(AppDelayMessage message) {
        log.info("@@ 应用超时异常处理, AppDelayMessage : {}", JSON.toJSONString(message));
        // 回调的业务处理
        doAnythings(formatMessage(message.getStage()));
    }

    /**
     * 格式化超时的消息
     * @param stage 超时阶段(COMPILE, TEST)
     * @return 消息内容
     */
    private String formatMessage(AppDelayMessage.Stage stage) {
        // 超时错误消息
        if (AppDelayMessage.Stage.REAL_COMPILE.equals(stage)) {
            return StrUtil.format(MESSAGE, "XXXX业务处理");
        }
        return MESSAGE;
    }

    public void doAnythings(String msg){
        log.info("@@ 超时异常业务处理 : {}", msg);
    }
}

以上是完整的功能实现,请结合自身业务进行实践。

我正在参与2024腾讯技术创作特训营第五期有奖征文,快来和我瓜分大奖!

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 构建延时消息表
  • 延迟消息消费者
  • 延迟消息生产者
  • 延迟消息体
  • 定义回调函数
  • 定义超时接口
  • 延迟消息超时事件
  • 延迟消息超时事件监听器
  • 线程池配置类
  • 测试消息队列
    • DelayController
      • AppDelayMessageService
        • AppDelayMessageServiceImpl
          • AppDelayMessageMapper
            • 超时处理
            相关产品与服务
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档