聊聊rocketmq的AccessChannel

本文主要研究一下rocketmq的AccessChannel

AccessChannel

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/AccessChannel.java

public enum AccessChannel {
    /**
     * Means connect to private IDC cluster.
     */
    LOCAL,

    /**
     * Means connect to Cloud service.
     */
    CLOUD,
}
  • AccessChannel定义了两个枚举值,分别是LOCAL及CLOUD

TraceDispatcher

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/trace/TraceDispatcher.java

public interface TraceDispatcher {

    /**
     * Initialize asynchronous transfer data module
     */
    void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException;

    /**
     * Append the transfering data
     * @param ctx data infomation
     * @return
     */
    boolean append(Object ctx);

    /**
     * Write flush action
     *
     * @throws IOException
     */
    void flush() throws IOException;

    /**
     * Close the trace Hook
     */
    void shutdown();
}
  • TraceDispatcher的start方法会接收AccessChannel类型的参数

AsyncTraceDispatcher

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java

public class AsyncTraceDispatcher implements TraceDispatcher {

    private final static InternalLogger log = ClientLogger.getLog();
    private final int queueSize;
    private final int batchSize;
    private final int maxMsgSize;
    private final DefaultMQProducer traceProducer;
    private final ThreadPoolExecutor traceExecutor;
    // The last discard number of log
    private AtomicLong discardCount;
    private Thread worker;
    private ArrayBlockingQueue<TraceContext> traceContextQueue;
    private ArrayBlockingQueue<Runnable> appenderQueue;
    private volatile Thread shutDownHook;
    private volatile boolean stopped = false;
    private DefaultMQProducerImpl hostProducer;
    private DefaultMQPushConsumerImpl hostConsumer;
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
    private String dispatcherId = UUID.randomUUID().toString();
    private String traceTopicName;
    private AtomicBoolean isStarted = new AtomicBoolean(false);
    private AccessChannel accessChannel = AccessChannel.LOCAL;

    //......

    public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
        if (isStarted.compareAndSet(false, true)) {
            traceProducer.setNamesrvAddr(nameSrvAddr);
            traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
            traceProducer.start();
        }
        this.accessChannel = accessChannel;
        this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
        this.worker.setDaemon(true);
        this.worker.start();
        this.registerShutDownHook();
    }

    //......

    class AsyncAppenderRequest implements Runnable {
        List<TraceContext> contextList;

        public AsyncAppenderRequest(final List<TraceContext> contextList) {
            if (contextList != null) {
                this.contextList = contextList;
            } else {
                this.contextList = new ArrayList<TraceContext>(1);
            }
        }

        private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic, String regionId) {
            String traceTopic = traceTopicName;
            if (AccessChannel.CLOUD == accessChannel) {
                traceTopic = TraceConstants.TRACE_TOPIC_PREFIX + regionId;
            }
            final Message message = new Message(traceTopic, data.getBytes());
            // Keyset of message trace includes msgId of or original message
            message.setKeys(keySet);

            //......
        }

        //......
    }

    //......
}
  • AsyncTraceDispatcher内部类AsyncAppenderRequest的sendTraceDataByMQ方法,针对accessChannel为AccessChannel.CLOUD类型的,会给TraceConstants.TRACE_TOPIC_PREFIX加上regionId作为traceTopic

小结

AccessChannel定义了两个枚举值,分别是LOCAL及CLOUD;TraceDispatcher的start方法会接收AccessChannel类型的参数;AsyncTraceDispatcher内部类AsyncAppenderRequest的sendTraceDataByMQ方法,针对accessChannel为AccessChannel.CLOUD类型的,会给TraceConstants.TRACE_TOPIC_PREFIX加上regionId作为traceTopic

doc

  • AccessChannel

本文分享自微信公众号 - 码匠的流水账(geek_luandun)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-11-12

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏码匠的流水账

聊聊nacos address的getCluster

nacos-1.1.3/address/src/main/java/com/alibaba/nacos/address/controller/ServerLis...

7400
来自专栏别先生

Eclipse导入SpringBoot项目pom.xml第一行报错Unknown error

1、网上搜的都说是将SpringBoot2.1.5版本降级到SpringBoot2.1.4版本,感觉这治标不治本啊,以后想升级不是玩完了。

51730
来自专栏Java研发军团

学习本身不难,难得是了解该学哪些!

先说明,本文说的是技术架构,而不是业务架构,另外,这个架构是指目前比较热门的高并发大数据的架构。论能力,我还达不到架构师的水平,所以我目前还在不断努力。

9750
来自专栏Java架构沉思录

如何选择消息队列?

在高并发业务场景下,消息队列在流量削峰、解耦上有不可替代的作用。当前使用较多的消息队列有 RabbitMQ、RocketMQ、ActiveMQ、Kafka、Ze...

9520
来自专栏大数据文摘

干了5年程序员,该如何转行?5个新工作方向了解一下

写了5年代码,年龄已近30,头发尚存几缕,除了写代码其他并无所长,职业未来在何方?

14130
来自专栏申城异乡人

Spring Boot入门(二):获取配置文件值

Spring Boot默认生成的配置文件为application.properties,不过它也支持yaml语言的配置文件,

55220
来自专栏Java架构沉思录

一种单机支持 JavaWeb 容器万级并发的设想

当前的大部分 Java web 容器基于 Bio 线程模型,例如常见的 Tomcat ,默认 200 线程,即 200 连接。由此带来的问题是,如果想提高并发,...

5120
来自专栏云服务器活动

centos7搭建javaweb服务器tomcat

首选未注册腾讯云账号要先注册,并完成实名认证,购买腾讯云服务器等云产品前,记得领取(代金券礼包)

15100
来自专栏Java架构沉思录

Spring 的 BeanUtils 填坑记

最近项目中在和第三方进行联调一个接口,我们这边发送http请求给对方,然后接收对方的回应,代码都是老代码。

9310
来自专栏云服务器活动

腾讯云Centos7安装java服务器

首选未注册腾讯云账号要先注册,并完成实名认证,购买腾讯云服务器等云产品前,记得领取(代金券礼包)

15200

扫码关注云+社区

领取腾讯云代金券

年度创作总结 领取年终奖励