前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ主题的自动创建机制

RocketMQ主题的自动创建机制

作者头像
CBeann
发布2023-12-25 19:16:36
2350
发布2023-12-25 19:16:36
举报
文章被收录于专栏:CBeann的博客

问题

在学习RocketMQ的时候,有几个疑问。 如果主题不存在,client把消息发给谁呢? 当发送消息给不存在的主题时,主题是什么时候创建的呢?

猜测

当我执行下面代码时,主题不存在,那么什么时候创建的主题"TopicTest202112151152"呢?

代码语言:javascript
复制
  Message msg = new Message("TopicTest202112151152" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);
            SendResult sendResult = producer.send(msg,1000000000);

其实我当时猜测的是可能发现主题不存在时先给服务器发个消息,让其创建主题,然后再发送消息。 结果是:发送消息的时候创建主题

问题1:client发送消息,主题不存在给谁发?

源码跟踪

以下面一段代码为例,要给“TopicTest202112151154”主题发送消息,发送的内容是时间字符串,跟producer.send方法

代码语言:javascript
复制
// Instantiate with a producer group name.
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    // Specify name server addresses.
    producer.setNamesrvAddr("localhost:9876");
    // Launch the instance.
    producer.start();
    // Create a message instance, specifying topic, tag and message body.
    Message msg =
        new Message(
            "TopicTest202112151154",
            "TagA",
            (LocalDateTime.now().toString()).getBytes(RemotingHelper.DEFAULT_CHARSET));
    // Call send message to deliver message to one of brokers.
    SendResult sendResult = producer.send(msg, 1000000000);
    System.out.printf("%s%n", sendResult);
    // Shut down once the producer instance is not longer in use.
    producer.shutdown();

跟到DefaultMQProducerImpl###sendDefaultImpl方法

代码语言:javascript
复制
private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    	//...
    	TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    	//....
    	//...发送消息
    }

跟到DefaultMQProducerImpl###tryToFindTopicPublishInfo方法

代码语言:javascript
复制
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        //首先从本地缓存中获取,因为主题不存在,所以返回null
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            //然后从NameServer获取,因为主题不存在,所以返回一个不Ok的TopicPublishInfo 
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }
        //因为TopicPublishInfo不Ok
        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            //重新获取主题,该方法是重点,跟进去
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }

跟到MQClientInstance###updateTopicRouteInfoFromNameServer方法 在该方法中获取默认的主题“TBW102”主题在NameServer的路由信息,把新主题的路由信息参考“TBW102”复制一份,此时在客户端上已经认为新主题已经创建好,不过在服务器端是没有创建好改主题的。

代码语言:javascript
复制
    public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {
   
                    TopicRouteData topicRouteData;
                    if (isDefault && defaultMQProducer != null) {
                        //获取默认主题defaultMQProducer.getCreateTopicKey(),即TBW102的路由信息
                        topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                            1000 * 3);
                            //省略。。。
                    }
                    //然后按照TBW102的topicRouteData把新主题的topicRouteData创建出来,此时客户端就有了新主题的路由信息(实际是TBW102的路由信息)   
        return false;
    }

此时客户端就有新主题的路由信息了,但是路由信息对应的broker上是没有该主题的信息的,不过客户端此时已经知道把消息发给哪个IP了。

问题回答

客户端如果获取的主题信息不存在,会根据“TBW102”主题的信息创建新主题,然后把该新主题的信息存储到客户端本地,此时客户端知道给哪个IP发数据了,然后客户端就会和那个IP的Netty建立连接,然后发数据,Ok了。

问题2:broker收到消息后发现主题不存在,什么时候创建?

从哪开始打断点

首先你要会Netty,这样按照常理你就能知道逻辑在SimpleChannelInboundHandler里。 那么去哪找SimpleChannelInboundHandler呢,应该先找到NettyServer。NettyServer应该在Broker的启动源码里去找。 BrokerController###start方法里有下面的代码

代码语言:javascript
复制
if (this.remotingServer != null) {
            this.remotingServer.start();
        }

remotingServer的实现类选择NettyRemotingServer,里面的start方法里有如下代码

代码语言:javascript
复制
 ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                            .addLast(defaultEventExecutorGroup,
                                encoder,
                                new NettyDecoder(),
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                connectionManageHandler,
                                serverHandler
                            );
                    }
                });

其中serverHandler就是MQ自定义的方法,顺藤摸瓜,就找到了NettyServerHandler的channelRead0方法 NettyRemotingAbstract###processMessageReceived方法,在processRequestCommand里打条件多线程断,条件是cmd.code == 310(RequestCode.SEND_MESSAGE_V2 = 310)

代码语言:javascript
复制
   public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if (cmd != null) {
            switch (cmd.getType()) {
                case REQUEST_COMMAND:
                    processRequestCommand(ctx, cmd);
                    break;
                case RESPONSE_COMMAND:
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            }
        }
    }

开始跟源码

当客户端发送消息时,broker的断点会停在下面的processRequestCommand这一行

NettyRemotingAbstract###processMessageReceived方法,在processRequestCommand里打条件多线程断,条件是cmd.code == 310(RequestCode.SEND_MESSAGE_V2 = 310)

代码语言:javascript
复制
   public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if (cmd != null) {
            switch (cmd.getType()) {
                case REQUEST_COMMAND:
                    processRequestCommand(ctx, cmd);
                    break;
                case RESPONSE_COMMAND:
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            }
        }
    }

NettyRemotingAbstract###processRequestCommand方法 RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd)把任务提交,会到下面代码里的run匿名类里

代码语言:javascript
复制
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
        final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
        final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
        final int opaque = cmd.getOpaque();

        if (pair != null) {
      Runnable run =
          new Runnable() {
            @Override
            public void run() {
              try {
                doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
                final RemotingResponseCallback callback =
                    new RemotingResponseCallback() {
                      @Override
                      public void callback(RemotingCommand response) {
                        doAfterRpcHooks(
                            RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
                        if (!cmd.isOnewayRPC()) {
                          if (response != null) {
                            response.setOpaque(opaque);
                            response.markResponseType();
                            try {
                              System.out.println(response);
                              ctx.writeAndFlush(response);
                            } catch (Throwable e) {
                              log.error("process request over, but response failed", e);
                              log.error(cmd.toString());
                              log.error(response.toString());
                            }
                          } else {
                          }
                        }
                      }
                    };
                if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
                  AsyncNettyRequestProcessor processor =
                      (AsyncNettyRequestProcessor) pair.getObject1();
                  processor.asyncProcessRequest(ctx, cmd, callback);
                } else {
                  NettyRequestProcessor processor = pair.getObject1();
                  RemotingCommand response = processor.processRequest(ctx, cmd);
                  callback.callback(response);
                }
              } catch (Throwable e) {
                log.error("process request exception", e);
                log.error(cmd.toString());

                if (!cmd.isOnewayRPC()) {
                  final RemotingCommand response =
                      RemotingCommand.createResponseCommand(
                          RemotingSysResponseCode.SYSTEM_ERROR,
                          RemotingHelper.exceptionSimpleDesc(e));
                  response.setOpaque(opaque);
                  ctx.writeAndFlush(response);
                }
              }
            }
          };

            if (pair.getObject1().rejectRequest()) {
                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                    "[REJECTREQUEST]system busy, start flow control for a while");
                response.setOpaque(opaque);
                ctx.writeAndFlush(response);
                return;
            }

            try {
                 //使用线程池把任务提交
                final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
                pair.getObject2().submit(requestTask);
            } catch (RejectedExecutionException e) {
               
    }

然后跟SendMessageProcessor###asyncProcessRequest方法

代码语言:javascript
复制
public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception {
        asyncProcessRequest(ctx, request).thenAcceptAsync(responseCallback::callback, this.brokerController.getSendMessageExecutor());
    }

然后跟SendMessageProcessor###asyncProcessRequest方法

代码语言:javascript
复制
public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
                                                                  RemotingCommand request) throws RemotingCommandException {
        final SendMessageContext mqtraceContext;
        switch (request.getCode()) {
            case RequestCode.CONSUMER_SEND_MSG_BACK:
                return this.asyncConsumerSendMsgBack(ctx, request);
            default:
                SendMessageRequestHeader requestHeader = parseRequestHeader(request);
                if (requestHeader == null) {
                    return CompletableFuture.completedFuture(null);
                }
                mqtraceContext = buildMsgContext(ctx, requestHeader);
                this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
                if (requestHeader.isBatch()) {
                    return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);
                } else {
                    //走这个分支
                    return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);
                }
        }
    }

然后跟SendMessageProcessor###asyncSendMessage方法 方法里有一个preSend方法

代码语言:javascript
复制
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
                                                                SendMessageContext mqtraceContext,
                                                                SendMessageRequestHeader requestHeader) {
                                                                
        final RemotingCommand response = preSend(ctx, request, requestHeader);
        //省略
    }

然后跟SendMessageProcessor###asyncSendMessage方法

代码语言:javascript
复制
private RemotingCommand preSend(ChannelHandlerContext ctx, RemotingCommand request,
                                    SendMessageRequestHeader requestHeader) {
       
       //省略

        //检查主题的问题
        super.msgCheck(ctx, requestHeader, response);
        
        //省略
    }

跟进AbstractSendMessageProcessor###msgCheck方法

代码语言:javascript
复制
 protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
        final SendMessageRequestHeader requestHeader, final RemotingCommand response) {
            
            //省略
            
            //broker上创建主题,跟进去
            topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
                requestHeader.getTopic(),
                requestHeader.getDefaultTopic(),
                RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                requestHeader.getDefaultTopicQueueNums(), topicSysFlag);

           //省略

    }

TopicConfigManager###createTopicInSendMessageMethod 该方法会创建主题并且持久化,此时主题在broker中存在但是NameServer不存在

代码语言:javascript
复制
public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic,
        final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) {
      
                    
                        if (PermName.isInherited(defaultTopicConfig.getPerm())) {
                            //创建新主题的topic信息
                            topicConfig = new TopicConfig(topic);

                            int queueNums = Math.min(clientDefaultTopicQueueNums, defaultTopicConfig.getWriteQueueNums());

                            if (queueNums < 0) {
                                queueNums = 0;
                            }

                            topicConfig.setReadQueueNums(queueNums);
                            topicConfig.setWriteQueueNums(queueNums);
                            int perm = defaultTopicConfig.getPerm();
                            perm &= ~PermName.PERM_INHERIT;
                            topicConfig.setPerm(perm);
                            topicConfig.setTopicSysFlag(topicSysFlag);
                            topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType());
                        } 
                    } 
                    if (topicConfig != null) {
                       
                        //持久化
                        this.persist();
                    }
                } 

        return topicConfig;
    }

###ConfigManager###persist方法

代码语言:javascript
复制
public synchronized void persist() {
        String jsonString = this.encode(true);
        if (jsonString != null) {
            //我的值是C:\Users\25682\store\config\topics.json
            String fileName = this.configFilePath();
            try {
                MixAll.string2File(jsonString, fileName);
            } catch (IOException e) {
                log.error("persist file " + fileName + " exception", e);
            }
        }
    }

MixAll###string2File

代码语言:javascript
复制
//str为最新的全部topic信息
public static void string2File(final String str, final String fileName) throws IOException {
        //先把str存到topics.json.tmp里
        String tmpFile = fileName + ".tmp";
        string2FileNotSafe(str, tmpFile);
        //把topics.json里的数据存储到topics.json.bk里
        String bakFile = fileName + ".bak";
        String prevContent = file2String(fileName);
        if (prevContent != null) {
            string2FileNotSafe(prevContent, bakFile);
        }
        //删除topics.json
        File file = new File(fileName);
        file.delete();
        //把topics.json.tmp重命名为topics.json
        file = new File(tmpFile);
        file.renameTo(new File(fileName));
    }

TBW102主题的作用

Producer 在发送消息时,默认情况下,不需要提前创建好 Topic,如果 Topic 不存在,Broker 会自动创建 Topic。但是新创建的 Topic 它的权限是什么?读写队列数是多少呢?这个时候就需要用到TBW102 了,RocketMQ 会基于该 Topic 的配置创建新的 Topic。

参考

深度解析RocketMQ 主题的创建机制,为何生产建议关掉自动创建Topic

https://blog.csdn.net/a1036645146/article/details/109581499

TBW102主题的作用

https://www.modb.pro/db/130866

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-03-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 问题
  • 猜测
  • 问题1:client发送消息,主题不存在给谁发?
    • 源码跟踪
      • 问题回答
      • 问题2:broker收到消息后发现主题不存在,什么时候创建?
        • 从哪开始打断点
          • 开始跟源码
          • TBW102主题的作用
          • 参考
          相关产品与服务
          数据保险箱
          数据保险箱(Cloud Data Coffer Service,CDCS)为您提供更高安全系数的企业核心数据存储服务。您可以通过自定义过期天数的方法删除数据,避免误删带来的损害,还可以将数据跨地域存储,防止一些不可抗因素导致的数据丢失。数据保险箱支持通过控制台、API 等多样化方式快速简单接入,实现海量数据的存储管理。您可以使用数据保险箱对文件数据进行上传、下载,最终实现数据的安全存储和提取。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档