前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >rocketmq中可视化和池化

rocketmq中可视化和池化

作者头像
路行的亚洲
发布2023-08-31 13:44:53
2860
发布2023-08-31 13:44:53
举报
文章被收录于专栏:后端技术学习

在rocketmq中rocketmqTool作为可视化和二次开发使用比较多的类org.apache.rocketmq.tools.admin.MQAdminExt,这个类在admin里面:

一、池化

在rocketmq中:

而我们知道创建rocketmq创建的过程中,会启动很多东西,这个连接的过程涉及到的内容比较多,可以看到rocketmq的rocketmq-dashboard和mqcloud里面都使用了池化技术-对象池。rocketmq-dashboard池化org.apache.rocketmq.dashboard.admin.MqAdminExtObjectPool:

代码语言:javascript
复制
    @Bean
    public GenericObjectPool<MQAdminExt> mqAdminExtPool() {
        GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
        genericObjectPoolConfig.setTestWhileIdle(true);
        genericObjectPoolConfig.setMaxWaitMillis(10000);
        genericObjectPoolConfig.setTimeBetweenEvictionRunsMillis(20000);
        MQAdminPooledObjectFactory mqAdminPooledObjectFactory = new MQAdminPooledObjectFactory();
        MQAdminFactory mqAdminFactory = new MQAdminFactory(rmqConfigure);
        mqAdminPooledObjectFactory.setMqAdminFactory(mqAdminFactory);
        GenericObjectPool<MQAdminExt> genericObjectPool = new GenericObjectPool<MQAdminExt>(
            mqAdminPooledObjectFactory,
            genericObjectPoolConfig);
        return genericObjectPool;
    }

同时进行借和还:

代码语言:javascript
复制
 public static void createMQAdmin(GenericObjectPool<MQAdminExt> mqAdminExtPool) {
        try {
            // Get the mqAdmin instance from the object pool
            MQAdminExt mqAdminExt = mqAdminExtPool.borrowObject();
            MQ_ADMIN_EXT_THREAD_LOCAL.set(mqAdminExt);
        } catch (Exception e) {
            LOGGER.error("get mqAdmin from pool error", e);
        }
    }

    public static void returnMQAdmin(GenericObjectPool<MQAdminExt> mqAdminExtPool) {
        MQAdminExt mqAdminExt = MQ_ADMIN_EXT_THREAD_LOCAL.get();
        if (mqAdminExt != null) {
            try {
                // After execution, return the mqAdmin instance to the object pool
                mqAdminExtPool.returnObject(mqAdminExt);
            } catch (Exception e) {
                LOGGER.error("return mqAdmin to pool error", e);
            }
        }
        MQ_ADMIN_EXT_THREAD_LOCAL.remove();
    }

同时mqcloud再次基础上做了自己的封装模板,方便每次调用的时候,进行回调:

代码语言:javascript
复制
 @Autowired
    private GenericKeyedObjectPool<Cluster, MQAdminExt> mqPool;

    /**
     * 执行操作
     * @param callback
     * @return
     * @throws Exception
     */
    public <T> T execute(MQAdminCallback<T> callback) {
        MQAdminExt mqAdmin = null;
        try {
            // 获取mqAdmin实例
            mqAdmin = mqPool.borrowObject(callback.mqCluster());
            if(mqAdmin == null) {
                logger.warn("cluster:{} cannot get mqadmin!", callback.mqCluster());
                return null;
            }
            // 触发回调
            T t = callback.callback(mqAdmin);
            return t;
        } catch (Exception e) {
            try {
                // 触发异常情况回调
                return callback.exception(e);
            } catch (Exception ex) {
                logger.warn("cluster:{} exception err:{}", callback.mqCluster(), ex.getMessage());
                return null;
            }
        } finally {
            if(mqAdmin != null) {
                try {
                    mqPool.returnObject(callback.mqCluster(), mqAdmin);
                } catch (Exception e) {
                    logger.warn("cluster:{} shutdown err:{}", callback.mqCluster(), e.getMessage());
                }
            }
        }
    }

解决了rocketmq实列对象的复用外,还需要知道rocketmq提供了哪些api可以进行扩展。

二、主要使用的api分类

代码语言:javascript
复制
mqClientInstance 启动和关闭
broker操作
BrokerCluster操作
PlainAccess操作
GlobalWhiteAddr 白名单
topic 主题操作
SubscriptionGroup 订阅组操作
consume 消费操作
producer生产者操作
Offset 偏移量
NameServer操作
message操作
subscription 订阅
task 清理任务
trace 链路操作
stat 统计
DefaultMQAdminExtImpl 默认mqAdmin扩展实现

通过rocketmq-tool提供的api相关功能,我们可以进行rocketmq的mqAdmin扩展的启动、关闭,同时针对broker、producer、consume、offset、nameServer、message、trace相关操作和统计,同时触发task进行相关清理工作。

三、rocketmq-dashboard

在rocketmq-dashboard中,可以看到使用了它们的使用:

代码语言:javascript
复制
start 启动
getBrokerConfig 获取broker配置
examineBrokerClusterInfo brokerClusterInfo信息
fetchBrokerRuntimeStats 获取broker运行时统计信息
viewMessage 查看消息
queryMessage 查询消息
messageTrackDetail 链路详情
consumeMessageDirectly 直接消费消息
examineConsumerConnectionInfo 消费者连接信息
examineProducerConnectionInfo 生产者连接信息
examineTopicStats
fetchAllTopicList
fetchBrokerRuntimeStats
examineConsumeStats
examineTopicRouteInfo
getNameServerAddressList
wipeWritePermOfBroker
.....

可以看到dashBoard基于rocketmq-tool模块,做了很多功能,而且这些功能都是可视化的。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-05-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 后端技术学习 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、池化
  • 二、主要使用的api分类
  • 三、rocketmq-dashboard
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档