聊聊rocketmq的AccessChannel

本文主要研究一下rocketmq的AccessChannel

AccessChannel

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/AccessChannel.java

public enum AccessChannel {
    /**
     * Means connect to private IDC cluster.
     */
    LOCAL,
​
    /**
     * Means connect to Cloud service.
     */
    CLOUD,
}
  • AccessChannel定义了两个枚举值,分别是LOCAL及CLOUD

TraceDispatcher

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/trace/TraceDispatcher.java

public interface TraceDispatcher {
​
    /**
     * Initialize asynchronous transfer data module
     */
    void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException;
​
    /**
     * Append the transfering data
     * @param ctx data infomation
     * @return
     */
    boolean append(Object ctx);
​
    /**
     * Write flush action
     *
     * @throws IOException
     */
    void flush() throws IOException;
​
    /**
     * Close the trace Hook
     */
    void shutdown();
}
  • TraceDispatcher的start方法会接收AccessChannel类型的参数

AsyncTraceDispatcher

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java

public class AsyncTraceDispatcher implements TraceDispatcher {
​
    private final static InternalLogger log = ClientLogger.getLog();
    private final int queueSize;
    private final int batchSize;
    private final int maxMsgSize;
    private final DefaultMQProducer traceProducer;
    private final ThreadPoolExecutor traceExecutor;
    // The last discard number of log
    private AtomicLong discardCount;
    private Thread worker;
    private ArrayBlockingQueue<TraceContext> traceContextQueue;
    private ArrayBlockingQueue<Runnable> appenderQueue;
    private volatile Thread shutDownHook;
    private volatile boolean stopped = false;
    private DefaultMQProducerImpl hostProducer;
    private DefaultMQPushConsumerImpl hostConsumer;
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
    private String dispatcherId = UUID.randomUUID().toString();
    private String traceTopicName;
    private AtomicBoolean isStarted = new AtomicBoolean(false);
    private AccessChannel accessChannel = AccessChannel.LOCAL;
​
    //......
​
    public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
        if (isStarted.compareAndSet(false, true)) {
            traceProducer.setNamesrvAddr(nameSrvAddr);
            traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
            traceProducer.start();
        }
        this.accessChannel = accessChannel;
        this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
        this.worker.setDaemon(true);
        this.worker.start();
        this.registerShutDownHook();
    }
​
    //......
​
    class AsyncAppenderRequest implements Runnable {
        List<TraceContext> contextList;
​
        public AsyncAppenderRequest(final List<TraceContext> contextList) {
            if (contextList != null) {
                this.contextList = contextList;
            } else {
                this.contextList = new ArrayList<TraceContext>(1);
            }
        }
​
        private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic, String regionId) {
            String traceTopic = traceTopicName;
            if (AccessChannel.CLOUD == accessChannel) {
                traceTopic = TraceConstants.TRACE_TOPIC_PREFIX + regionId;
            }
            final Message message = new Message(traceTopic, data.getBytes());
            // Keyset of message trace includes msgId of or original message
            message.setKeys(keySet);
​
            //......
        }
​
        //......
    }
​
    //......
}
  • AsyncTraceDispatcher内部类AsyncAppenderRequest的sendTraceDataByMQ方法,针对accessChannel为AccessChannel.CLOUD类型的,会给TraceConstants.TRACE_TOPIC_PREFIX加上regionId作为traceTopic

小结

AccessChannel定义了两个枚举值,分别是LOCAL及CLOUD;TraceDispatcher的start方法会接收AccessChannel类型的参数;AsyncTraceDispatcher内部类AsyncAppenderRequest的sendTraceDataByMQ方法,针对accessChannel为AccessChannel.CLOUD类型的,会给TraceConstants.TRACE_TOPIC_PREFIX加上regionId作为traceTopic

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Jimoer

Java NIO之Java中的IO分类

前面两篇文章(Java NIO之理解I/O模型(一)、Java NIO之理解I/O模型(二))介绍了,IO的机制,以及几种IO模型的内容,还有涉及到的设计模式。...

6420
来自专栏曲水流觞TechRill

我为什么反对用异常做流程控制?

像SSH/M这种基础框架的出现,让不少程序员“瘫痪”成了流水线工人。以前小心翼翼方能写就的逻辑分支判断,演变成了直接丢个异常然后坐等AOP拦截处理,此时的拦截器...

18120
来自专栏cwl_Java

2016年系统架构师软考案例分析考点

10210
来自专栏cwl_Java

系统架构师论文-论企业应用集成(-集成ERP/PDM/E-mail)

本文讨论了某公司的应用系统集成项目。某公司为了应对市场变化的需要,决定把公司几个主要的应用系统ERP系统,PDM系统,E-mail系统集成在一起,系统集成完成后...

8310
来自专栏cwl_Java

软考分类精讲-软件架构设计(五)

9020
来自专栏曲水流觞TechRill

一次线上内存泄露历险

刚进公司那段时间,在敏捷项目制的执行下,需求有条不紊地进行着。某个周末,业务系统反馈群内,操作人员反馈系统不可用,我们急忙寻求运维的帮助,将系统重启并恢复使用。...

7730
来自专栏MasiMaro 的技术博文

Servlet 常用类

Servlet 是一套标准的接口规范,当用户通过web请求来访问服务器时,由web容器根据配置调用我们实现的对应的servlet对象来提供服务。同时为了方便开发...

6020
来自专栏cwl_Java

2015年系统架构师软考案例分析考点

两者最大的区别是:状态图侧重于描述行为的结果,而活动图侧重描述行为的动作。其 次活动图可描述并发行为,而状态图不能。

9610
来自专栏cwl_Java

系统架构师论文-财务数据仓库系统的设计与实现

近年来,数据仓库技术在信息系统的建设中得到了广泛应用,有效地为决策提供了支持。2004年6月,本人所在单位组织开发了财务管理决策系统,该系统主要是使高层领导掌握...

7110
来自专栏曲水流觞TechRill

IT范儿 | 你是个会取名儿的人么?

还有无数的取名场景,我可没打算也不可能列全,暂时想起上面这些比较有趣的供大家一乐。

7030

扫码关注云+社区

领取腾讯云代金券

年度创作总结 领取年终奖励