前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >如何基于Spring容器封装更适用的消息组件?

如何基于Spring容器封装更适用的消息组件?

作者头像
码农架构
发布2022-04-13 16:53:33
3580
发布2022-04-13 16:53:33
举报
文章被收录于专栏:码农架构码农架构码农架构

导读:针对不同业务对MQ的技术选型问题,在实施过程中因为某些版本导致无法闭环,因此抽取公共组件有存在的必要。总结本篇文章希望对从事相关工作的同学能够有所帮助或者启发 。

一、背景


对于Spring ApplicationEvent 事件处理作为Java开发来说已经是见多不怪了,都知道 ApplicationEvent 只能基于单体应用来处理事件。也就是说只能在同一个JVM中分发与监听.如下图

当多节点部署是ApplicationEvent无法进行跨服务分发与监听,如下图

那如何基于Spring 对于Spring ApplicationEvent 事件在同一注册中心中都可以随意分发与全局节点监听呢?总结本篇文章希望对从事相关工作的同学能够有所帮助或者启发

二、知识点回顾


对于Spring容器的一些事件,可以监听并且触发相应的方法。通常的方法有 2 种,ApplicationListener 接口和@EventListener 注解

对Spring容器的一些事件拓展前面一篇文章也粗略介绍过,当时解决的业务场景主要是解决表单引擎层拓展数据源问题,但是没有做详细的介绍。

三、封装组件


▐ 定义抽象事件类

/**
 * 全局事件定义
 * <p>
 * 注意:发布全局事件,事件必须构造函数AbstractApplicationGlobalEvent(String)
 */
public abstract class AbstractApplicationGlobalEvent extends ApplicationEvent {

    @Getter
    List<MessageDTO> messageDTOS;

    public AbstractApplicationGlobalEvent(String source) {
        super(source);
        this.messageDTOS = (List) JSONArray.parseArray(source);
    }

    // 消息组件类型
    public enum plugType {
        Redis,
        ActiveMQ,
        RocketMQ,
        KafkaMQ
    }

    @Data
    public class MessageDTO{

        plugType plugType;

        //消息内容
    }
}

这里注意下在申明自定义的拓展事件时候需要注意构造函数必须构造函数AbstractApplicationGlobalEvent(String)方法,基于后面反射用到。

▐ 定义插件组件

@Slf4j
@Component
public class ApplicationsGlobalEventPlugin implements InitializingBean {

    @Autowired
    private RedissonClient redissonClient;
    @Autowired
    private ContextRefresher contextRefresher;
    @Autowired
    private ApplicationContext applicationContext;

    private static final String KEY_REFRESH_ENVIRONMENT = "refreshEnvironment";

    private static final String TOPIC_GLOBAL_EVENT = "core:TOPIC_GLOBAL_EVENT";

    /**
     * 发布全网事件,事件必须构造函数ApplicationEvent(String)
     */
    public void publishGlobalEvent(AbstractApplicationGlobalEvent globalEvent) {
        String text = globalEvent.getClass().getName() + ":" + globalEvent.getSource();
        
        // 根据消息类型对接选择对应组件
        
        redissonClient.getTopic(TOPIC_GLOBAL_EVENT).publish(text);
        
        
        log.debug("send:{}", text);
    }
    ......
}

案例中可以通过事件中申明的组件类型选择实现对应的消息组件,本文以Redis为案例。

到这里目前已完成事件源的定义,消息的分发。消息监听如何解决呢?

要解决部署节点都能监听到,所以监听点必须存在于所有的应用服务中。因此在设计的组件的时候,ApplicationsGlobalEventPlugin 应该放在common包中统一集成打包部署。如下图所示

每个部署节点都你那个发布消息,同时也在实时监听消息。

@Slf4j
@Component
public class ApplicationsGlobalEventPlugin implements InitializingBean {

    @Autowired
    private RedissonClient redissonClient;
    @Autowired
    private ContextRefresher contextRefresher;
    @Autowired
    private ApplicationContext applicationContext;

    private static final String KEY_REFRESH_ENVIRONMENT = "refreshEnvironment";

    private static final String TOPIC_GLOBAL_EVENT = "core:TOPIC_GLOBAL_EVENT";

    /**
     * 发布全网事件,事件必须构造函数ApplicationEvent(String)
     */
    public void publishGlobalEvent(AbstractApplicationGlobalEvent globalEvent) {
        String text = globalEvent.getClass().getName() + ":" + globalEvent.getSource();
        redissonClient.getTopic(TOPIC_GLOBAL_EVENT).publish(text);
        log.debug("send:{}", text);
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        redissonClient.getTopic(TOPIC_GLOBAL_EVENT).addListener(String.class,
                this::onMessage);
    }

    private void onMessage(CharSequence channel, String message) {
        log.debug("receive:{}", message);
        int index = message.indexOf(':');
        String className = message.substring(0, index);
        String arg = message.substring(index + 1);

        Class<?> clazz = ReflectUtil.classForName(className);
        if (clazz == null) {
            return;
        }
        try {
            Constructor<?> constructor = clazz.getConstructor(String.class);
            ApplicationEvent event = (ApplicationEvent) constructor
                    .newInstance(arg);
            applicationContext.publishEvent(event);
        } catch (Exception e) {
            log.warn("事件反序列化失败", e);
        }
    }
}

最终落地到我们最开始能处理的场景在同一JVM中发布事件,与监听事件的逻辑。

这里值得留意的是监听消息后对于事件的处理。

  • 根据事件的Classs 反射AbstractApplicationGlobalEvent 并实例化,构建本地事件ApplicationEvent

最终形成一个完整闭环

四、总结


针对不同业务对MQ的技术选型问题,在实施过程中因为某些版本导致无法闭环,因此抽取公共组件有存在的必要。总结本篇文章希望对从事相关工作的同学能够有所帮助或者启发

- END -

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-03-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码农架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档